Files
integreat/src/clj/auto_ap/ledger.clj
2024-09-26 00:11:49 -07:00

632 lines
31 KiB
Clojure

(ns auto-ap.ledger
(:require
[auto-ap.datomic :refer [conn pull-id pull-ref transact-with-backoff]]
[auto-ap.utils :refer [by dollars= heartbeat]]
[auto-ap.logging :as alog]
[clj-time.coerce :as c]
[clj-time.core :as t]
[com.brunobonacci.mulog :as mu]
[com.unbounce.dogstatsd.core :as statsd]
[datomic.api :as dc]
[manifold.deferred :as de]
[manifold.stream :as s]
[mount.core :as mount]
[yang.scheduler :as scheduler]))
(defn reconcile-ledger
([] (reconcile-ledger (-> (t/now)
(t/plus (t/months -6))
(c/to-date))))
([start-date]
(let [txes-missing-ledger-entries (->> (dc/q {:find ['?t ]
:in ['$ '?sd]
:where [
'[?t :transaction/date ?d]
'[(>= ?d ?sd)]
'(not [_ :journal-entry/original-entity ?t])
'(not [?t :transaction/amount 0.0])
'(not [?t :transaction/approval-status :transaction-approval-status/excluded])
'(not [?t :transaction/approval-status :transaction-approval-status/suppressed])
]}
(dc/db conn) start-date)
(map first)
(mapv (fn [t]
[:upsert-transaction {:db/id t}])))
invoices-missing-ledger-entries (->> (dc/q {:find ['?t ]
:in ['$ '?sd]
:where ['[?t :invoice/date ?d]
'[(>= ?d ?sd)]
'(not [_ :journal-entry/original-entity ?t])
'[?t :invoice/total ?amt]
'[(not= 0.0 ?amt)]
'(not [?t :invoice/status :invoice-status/voided])
'(not [?t :invoice/import-status :import-status/pending])
'(not [?t :invoice/exclude-from-ledger true])
]}
(dc/db conn) start-date)
(map first)
(mapv (fn [i]
[:upsert-invoice {:db/id i}])))
repairs (vec (concat txes-missing-ledger-entries invoices-missing-ledger-entries))]
(when (seq repairs)
(mu/log ::ledger-repairs-needed
:sample (take 3 repairs)
:transaction-count (count txes-missing-ledger-entries)
:invoice-count (count invoices-missing-ledger-entries))
@(dc/transact conn repairs)))))
(defn touch-transaction [e]
@(dc/transact conn [{:db/id "datomic.tx"
:db/doc "touching transaction to update ledger"}
[:upsert-transaction {:db/id e}]]))
(defn touch-invoice [e]
@(dc/transact conn [{:db/id "datomic.tx"
:db/doc "touching invoice to update ledger"}
[:upsert-invoice {:db/id e}]]))
(defn recently-changed-entities [start end]
(into #{}
(map first)
(dc/q '[:find ?e
:in $
:where (or [?e :transaction/date]
[?e :invoice/date])]
(dc/since (dc/db conn) start))))
(defn mismatched-transactions
([]
(mismatched-transactions (c/to-date (t/minus (t/now) (t/days 7)))
(c/to-date (t/minus (t/now) (t/hours 1)))) )
([changed-between-start changed-between-end]
(mu/trace ::calculating-mismatched-transactions
[:range {:start changed-between-start
:end changed-between-end}]
(let [entities-to-consider (recently-changed-entities
changed-between-start
changed-between-end)
_ (mu/log ::checking-mismatched-transactions
:count (count entities-to-consider))
jel-accounts (reduce
(fn [acc [e lia]]
(update acc e (fnil conj #{} ) lia))
{}
(dc/q '[:find ?e ?lia
:in $ [?e ...]
:where
[?je :journal-entry/original-entity ?e]
[?e :transaction/date]
[?je :journal-entry/line-items ?li]
[?li :journal-entry-line/account ?lia]
[?lia :account/name]]
(dc/db conn)
entities-to-consider))
transaction-accounts (reduce
(fn [acc [e lia]]
(update acc e (fnil conj #{} ) lia))
{}
(dc/q '[:find ?e ?lia
:in $ [?e ...]
:where
[?e :transaction/date ?d]
[?e :transaction/accounts ?li]
(not [?e :transaction/approval-status :transaction-approval-status/excluded])
(not [?e :transaction/approval-status :transaction-approval-status/suppressed])
[?li :transaction-account/account ?lia]
[?lia :account/name]
[?e :transaction/amount ?amt]
[(not= ?amt 0.0)]]
(dc/db conn)
entities-to-consider))]
(->> transaction-accounts
(filter
(fn [[e accounts]] (not= accounts (get jel-accounts e))))
(doall))))))
(defn unbalanced-transactions
([] (unbalanced-transactions (c/to-date (t/minus (t/now) (t/days 7)))
(c/to-date (t/minus (t/now) (t/hours 1)))))
([changed-between-start changed-between-end]
(let [entities-to-consider (recently-changed-entities changed-between-start changed-between-end)]
(->> (dc/q '[:find ?je ?a (sum ?debit) (sum ?credit)
:with ?jel
:in $ [?je ...]
:where [?je :journal-entry/amount ?a]
[?je :journal-entry/line-items ?jel]
[(get-else $ ?jel :journal-entry-line/debit 0.0) ?debit]
[(get-else $ ?jel :journal-entry-line/credit 0.0) ?credit]
]
(dc/db conn)
entities-to-consider)
(filter (fn [[_ a d c]]
(or (not (dollars= a d))
(not (dollars= a c)))))
(map first)
(map (fn [je]
(pull-ref (dc/db conn) :journal-entry/original-entity je)))))))
(defn unbalanced-invoices
([] (unbalanced-invoices (c/to-date (t/minus (t/now) (t/days 7)))
(c/to-date (t/minus (t/now) (t/hours 1)))))
([changed-between-start changed-between-end]
(let [entities-to-consider (recently-changed-entities
changed-between-start
changed-between-end)]
(->> (dc/q '[:find ?je ?a (sum ?debit) (sum ?credit)
:with ?jel
:in $ [?je ...]
:where [?je :journal-entry/amount ?a]
[?je :journal-entry/original-entity ?i]
[?i :invoice/date]
[?je :journal-entry/line-items ?jel]
[(get-else $ ?jel :journal-entry-line/debit 0.0) ?debit]
[(get-else $ ?jel :journal-entry-line/credit 0.0) ?credit]]
(dc/db conn)
entities-to-consider)
(filter (fn [[_ a d c]]
(or (not (dollars= a d))
(not (dollars= a c)))))
(map first)
(map (fn [je]
(pull-ref (dc/db conn) :journal-entry/original-entity je)))))))
(defn mismatched-invoices
([]
(mismatched-invoices (c/to-date (t/minus (t/now) (t/days 7)))
(c/to-date (t/minus (t/now) (t/hours 1)))) )
([changed-between-start changed-between-end]
(let [entities-to-consider (recently-changed-entities changed-between-start changed-between-end)
jel-accounts (reduce
(fn [acc [e lia]]
(update acc e (fnil conj #{} ) lia))
{}
(dc/q '[:find ?e ?lia
:in $ [?e ...]
:where
[?je :journal-entry/original-entity ?e]
[?e :invoice/date]
[?je :journal-entry/line-items ?li]
[?li :journal-entry-line/account ?lia]
(not [?lia :account/numeric-code 21000])
[?lia :account/name]]
(dc/db conn)
entities-to-consider))
invoice-accounts (reduce
(fn [acc [e lia]]
(update acc e (fnil conj #{} ) lia))
{}
(dc/q '[:find ?e ?lia
:in $ [?e ...]
:where
[?e :invoice/expense-accounts ?li]
(not [?e :invoice/total 0.0])
[?li :invoice-expense-account/account ?lia]
[?lia :account/name]
(not [?lia :account/numeric-code 21000])
(not [?e :invoice/status :invoice-status/voided])
(not [?e :invoice/exclude-from-ledger true])
[?e :invoice/import-status :import-status/imported]]
(dc/db conn)
entities-to-consider))
]
(filter
(fn [[e accounts]]
(not= accounts (get jel-accounts e)))
invoice-accounts))))
(defn touch-broken-ledger []
(statsd/event {:title "Reconciling Ledger"
:text "This process looks for unbalance ledger entries, or missing ledger entries"
:priority :low}
nil)
(mu/trace ::fixing-mismatched-transactions
[]
(mu/log ::started-fixing-mismatched-transactions)
(let [mismatched-ts (mismatched-transactions)]
(if (seq mismatched-ts)
(do
(mu/log ::found-mismatched-transactions
:status "WARN"
:count (count mismatched-ts)
:sample (take 10 mismatched-ts))
(doseq [[m] mismatched-ts]
(touch-transaction m))
(statsd/gauge "data.mismatched_transactions" (count (mismatched-transactions))))
(statsd/gauge "data.mismatched_transactions" 0.0))))
(mu/trace ::fixing-unbalanced-transactions
[]
(mu/log ::started-fixing-unbalanced-transactions)
(let [unbalanced-ts (unbalanced-transactions)]
(if (seq unbalanced-ts)
(do
(mu/log ::found-unbalanced-transactions
:status "WARN"
:count (count unbalanced-ts)
:sample (take 10 unbalanced-ts))
(doseq [m unbalanced-ts]
(touch-transaction m))
(statsd/gauge "data.unbalanced_transactions" (count (unbalanced-transactions))))
(statsd/gauge "data.unbalanced_transactions" 0.0))))
(mu/trace ::fixing-mismatched-invoices
[]
(mu/log ::started-fixing-mismatched-invoices)
(let [mismatched-is (mismatched-invoices)]
(if (seq mismatched-is)
(do
(mu/log ::found-mismatched-invoices
:status "WARN"
:count (count mismatched-is)
:sample (take 10 mismatched-is))
(doseq [[m] mismatched-is]
(touch-invoice m))
(statsd/gauge "data.mismatched_invoices" (count (mismatched-invoices))))
(statsd/gauge "data.mismatched_invoices" 0.0))))
(mu/trace ::fixing-unbalanced-invoices
[]
(mu/log ::started-fixing-unbalance-invoices)
(let [unbalanced-is (unbalanced-invoices)]
(if (seq unbalanced-is)
(do
(mu/log ::found-mismatched-invoices
:status "WARN"
:count (count unbalanced-is)
:sample (take 10 unbalanced-is))
(doseq [m unbalanced-is]
(touch-invoice m))
(statsd/gauge "data.unbalanced_invoices" (count (unbalanced-invoices))))
(statsd/gauge "data.unbalanced_invoices" 0.0))))
(statsd/event {:title "Finished Reconciling Ledger"
:text "This process looks for unbalance ledger entries, or missing ledger entries"
:priority :low}
nil))
(defn build-account-lookup [client-id]
(let [accounts (by :db/id (map first (dc/q {:find ['(pull ?e [:db/id :account/name
:account/numeric-code
{:account/type [:db/ident]
:account/client-overrides [:account-client-override/client :account-client-override/name]}
])]
:in ['$]
:where ['[?e :account/name]]}
(dc/db conn ))))
bank-accounts (by :db/id (map first (dc/q {:find ['(pull ?e [:db/id :bank-account/name :bank-account/numeric-code {:bank-account/type [:db/ident]}])]
:in ['$]
:where ['[?e :bank-account/name]]}
(dc/db conn))))
overrides-by-client (->> accounts
vals
(mapcat (fn [a]
(map (fn [o]
[[(:db/id a) (:db/id (:account-client-override/client o))]
(:account-client-override/name o)])
(:account/client-overrides a))
) )
(into {} ))]
(fn [a]
{:name (or (:bank-account/name (bank-accounts a))
(overrides-by-client [a client-id])
(:account/name (accounts a)))
:account_type (or (:db/ident (:account/type (accounts a)))
({:bank-account-type/check :account-type/asset
:bank-account-type/cash :account-type/asset
:bank-account-type/credit :account-type/liability}
(:db/ident (:bank-account/type (bank-accounts a))))
:account-type/asset ;; DEFAULT TO ASSET, for things like unknown
)
:numeric_code (or (:account/numeric-code (accounts a))
(:bank-account/numeric-code (bank-accounts a)))
:client_id client-id})))
(defn reset-client+account+location+date
([] (reset-client+account+location+date (map first (dc/q '[:find ?c :where [?c :client/code]] (dc/db conn)))))
([clients]
(doseq [[client i] (map vector clients (range))]
(mu/trace ::reset-index
[:client client]
(mu/with-context {:client client
:client-index i
:client-count (count clients)}
(mu/log ::reseting-index)
(let [so-far (atom 0)]
@(->> (dc/qseq {:query '[:find (pull ?je [:journal-entry/date :journal-entry/client {:journal-entry/line-items [:journal-entry-line/account :journal-entry-line/location :db/id]}])
:in $ ?c
:where [?je :journal-entry/client ?c]]
:args [(dc/db conn) client]})
(map first)
(mapcat (fn [je]
(map (fn [jel]
{:db/id (:db/id jel)
:journal-entry-line/client+account+location+date
[(-> je :journal-entry/client :db/id)
(-> jel :journal-entry-line/account :db/id)
(-> jel :journal-entry-line/location)
(-> je :journal-entry/date)]})
(:journal-entry/line-items je))))
(partition-all 500)
(s/->source)
(s/map (fn [batch]
(de/future
(transact-with-backoff batch)
(count batch))))
(s/buffer 50)
(s/realize-each)
(s/consume (fn [batch-count]
(swap! so-far #(+ % batch-count))
(mu/log ::reset :count batch-count
:so-far @so-far
:client client
:client-index i
:client-count (count clients))))))
(mu/log ::client-completed))))))
(defn find-mismatch-index []
(reduce + 0
(for [c (map first (dc/q '[:find ?c :where [?c :client/code]] (dc/db conn)))
:let [_ (println "searching for" c)
a (->> (dc/index-pull (dc/db conn)
{:index :avet
:selector [:db/id :journal-entry-line/location :journal-entry-line/account :journal-entry-line/client+account+location+date {:journal-entry/_line-items [:journal-entry/date :journal-entry/client]}]
:start [:journal-entry-line/client+account+location+date [c]]})
(take-while (fn [result]
(= c (first (:journal-entry-line/client+account+location+date result)))
))
(filter (fn [{index :journal-entry-line/client+account+location+date :as result}]
(not= index
[(-> result :journal-entry/_line-items :journal-entry/client :db/id)
(-> result :journal-entry-line/account :db/id)
(-> result :journal-entry-line/location)
(-> result :journal-entry/_line-items :journal-entry/date)]))))]]
(do (println (count a))
(count a)))))
(defn accounts-needing-rebuild [ db client]
(let [client (pull-id db client)]
(->> (dc/qseq {:query '[:find ?c ?a ?l
:in $ $recent ?c
:where
[$recent ?jel :journal-entry-line/dirty true]
[?je :journal-entry/line-items ?jel]
[?je :journal-entry/client ?c]
[(get-else $ ?jel :journal-entry-line/account :unknown-account) ?a]
[(get-else $ ?jel :journal-entry-line/location "Unknown") ?l]]
:args [db (dc/since db (c/to-date (t/plus (t/now) (t/hours -8)))) client]})
(map (fn [[client account location]]
{:client client
:account account
:location location})))))
(defn all-accounts-needing-rebuild [ db client]
(let [client (pull-id db client)]
(->> (dc/qseq {:query '[:find ?c ?a ?l
:in $ ?c
:where
[?je :journal-entry/client ?c]
[?je :journal-entry/line-items ?jel]
[(get-else $ ?jel :journal-entry-line/account :unknown-account) ?a]
[(get-else $ ?jel :journal-entry-line/location "Unknown") ?l] ]
:args [db client]})
(map (fn [[client account location ]]
{:client client
:account account
:location location})))))
(defn get-entries-to-refresh [{:keys [client account location]} db ]
(let [client (pull-id db client)
account-id (if (= :unknown-account account)
nil
(pull-id db account))
matching-location (if (= "Unknown" location)
nil
location)]
(into []
(comp
(take-while (fn [{[result-client result-account result-location] :journal-entry-line/client+account+location+date}]
(and
(= client result-client)
(= account-id result-account)
(= matching-location result-location))))
(map (fn [result]
[(:db/id result) (:journal-entry-line/debit result 0.0) (:journal-entry-line/credit result 0.0) (:journal-entry-line/running-balance result 0.0) (:journal-entry-line/dirty result)])))
(dc/index-pull db
{:index :avet
:selector [:db/id :journal-entry-line/debit :journal-entry-line/credit :journal-entry-line/client+account+location+date :journal-entry-line/running-balance :journal-entry-line/dirty]
:start [:journal-entry-line/client+account+location+date
[client
account-id
matching-location
#inst "0001-01-01" ]] }))))
(defn compute-running-balance [account-needing-refresh]
(mu/trace ::compute
[:dirty-count (count (:dirty-entries account-needing-refresh))]
(when (= 0 (count (:dirty-entries account-needing-refresh)))
(alog/warn ::no-entries-to-compute
:message "This typically means that an account is determined to be dirty, but no entries are found, meaning bad query"))
(second
(reduce
(fn [[running-balance changed-rows] [id debit credit extant-running-balance dirty] ]
(let [new-running-balance (+ running-balance
(if (#{:account-type/asset
:account-type/dividend
:account-type/expense} (:account-type account-needing-refresh))
(- debit credit)
(- credit debit)))
row-changed? (or (not (dollars= new-running-balance extant-running-balance))
dirty)]
[new-running-balance
(if row-changed?
(conj changed-rows
{:db/id id
:journal-entry-line/running-balance new-running-balance
:journal-entry-line/debit debit
:journal-entry-line/credit credit
:journal-entry-line/dirty false})
changed-rows)]))
[0.0 []]
(:dirty-entries account-needing-refresh)))))
(defn refresh-running-balance-accounts [accounts-needing-rebuild clients c i db]
(mu/log ::found-accounts-needing-rebuild
:accounts (count accounts-needing-rebuild))
(let [so-far (atom 0)]
@(->> accounts-needing-rebuild
(s/->source)
(s/map (fn [account-needing-rebuild]
(->
(de/future
(mu/with-context {:account account-needing-rebuild}
(let [result (-> account-needing-rebuild
(assoc :dirty-entries (get-entries-to-refresh account-needing-rebuild db))
(assoc :account-type (:account_type ((build-account-lookup (:client account-needing-rebuild)) (:account account-needing-rebuild))))
(compute-running-balance))]
(alog/info ::recomputed-entries-with-new-sum :count (count result) )
result)
))
(de/catch (fn [error]
(alog/error ::cant-rebuild
:error error)
nil)))))
(s/realize-each)
(s/mapcat (fn [x]
x))
(s/buffer 50)
(s/transform (partition-all 500))
(s/map (fn [batch]
(de/future
(transact-with-backoff batch)
(count batch))))
(s/buffer 50)
(s/realize-each)
(s/consume (fn [batch-count]
(swap! so-far #(+ % batch-count))
(mu/log ::reset
:count batch-count
:so-far @so-far
:client c
:client-index i
:client-count (count clients)))))))
(defn clients-needing-refresh []
(->>
(dc/q '[:find (pull ?c [:client/code :db/id]) (count ?jel)
:in $ $recent
:where [$recent ?jel :journal-entry-line/dirty true]
[$ ?je :journal-entry/line-items ?jel]
[$ ?je :journal-entry/client ?c]
[$ ?jel :journal-entry-line/dirty true]]
(dc/db conn)
(dc/since
(dc/db conn)
(c/to-date (t/plus (t/now) (t/hours -8)))))
(map (fn [[client outdated]]
(assoc client :dirty-count outdated)))))
(defn refresh-running-balance-cache
([] (refresh-running-balance-cache (shuffle (clients-needing-refresh))))
([clients]
(doseq [[c i] (map vector clients (range))]
(mu/trace ::building-running-balance
[:client c]
(mu/with-context {:client c
:client-index i
:client-count (count clients)}
(mu/log ::searching-for-accounts)
(let [db (dc/db conn)
accounts-needing-rebuild (accounts-needing-rebuild db (:db/id c))]
(when (seq accounts-needing-rebuild)
(refresh-running-balance-accounts accounts-needing-rebuild clients c i db)
(mu/log ::client-completed))))))))
(defn rebuild-running-balance-cache
([] (rebuild-running-balance-cache (shuffle (map first
(dc/q '[:find (pull ?c [:client/code :db/id])
:where [?c :client/code]]
(dc/db conn))))))
([clients]
(doseq [[c i] (map vector clients (range))]
(mu/trace ::building-running-balance
[:client c]
(mu/with-context {:client c
:client-index i
:client-count (count clients)}
(mu/log ::searching-for-accounts)
(let [db (dc/db conn)
accounts-needing-rebuild (all-accounts-needing-rebuild db (:db/id c))]
(when (seq accounts-needing-rebuild)
(refresh-running-balance-accounts accounts-needing-rebuild clients c i db)
(mu/log ::client-completed))))))))
#_(seq (dc/q '[:find ?le ?d
:in $
:where [?le :journal-entry/date ?d]
[(<= ?d #inst "2000-01-01")]]
(dc/db conn)))
#_(
(refresh-running-balance-cache)
(accounts-needing-rebuild (dc/db conn) [ :client/code "SCCB"])
(user/init-repl)
(get-entries-to-refresh {:client (pull-id (dc/db conn) [:client/code "SCCB"])
:account (pull-id (dc/db conn) [:bank-account/code "SCCB-USB9598" ])
:location "A"}
(dc/db conn))
(rebuild-running-balance-cache [{:client/code "SCCB" :db/id (pull-id (dc/db conn) [:client/code "SCCB"])}])
(clients-needing-refresh)
(dc/q '[:find (count ?d)
:in $
:where [?d :journal-entry-line/dirty true]
]
(dc/db conn))
)
#_(comment [17592334354011 #inst "0024-08-03T07:52:58.000-00:00"]
[17592302554688 #inst "0023-07-20T07:52:58.000-00:00"]
[17592302554682 #inst "0023-07-16T07:52:58.000-00:00"]
[17592302554691 #inst "0023-07-22T07:52:58.000-00:00"]
[17592334353995 #inst "0024-08-10T07:52:58.000-00:00"]
[17592302554694 #inst "0023-07-27T07:52:58.000-00:00"]
[17592241669405 #inst "0218-08-04T07:52:58.000-00:00"]
[17592334353207 #inst "0024-07-27T07:52:58.000-00:00"]
[17592302554685 #inst "0023-07-16T07:52:58.000-00:00"]
[17592334353244 #inst "0024-07-14T07:52:58.000-00:00"])
;; TODO
;; 1. X Having an uncategorized running balance
;; 1a. X dirty should always be 0
;; 2. X rebuild from beginning of history always, only update entry if it currently doesn't match
;; 3. X deterministic order (date + entityid)
;; 4. Check for errors
;; TODO only enable once IOL is set up in clod
#_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]}
(mount/defstate running-balance-cache-worker
:start (scheduler/every (* 15 60 (+ 500 (rand-int 500))) (heartbeat refresh-running-balance-cache "running-balance-cache"))
:stop (scheduler/stop running-balance-cache-worker))