Greatly simplifies keeping bank accounts in check

This commit is contained in:
2024-09-27 15:59:21 -07:00
parent b7d976cf19
commit 4c14e43734
7 changed files with 157 additions and 288 deletions

View File

@@ -1,5 +1,5 @@
(ns iol-ion.tx.upsert-ledger
(:import [java.util UUID])
(:import [java.util UUID Date])
(:require [datomic.api :as dc]))
(defn -random-tempid []
@@ -10,7 +10,7 @@
next-jel (->> (dc/index-pull db {:index :avet
:selector [:db/id :journal-entry-line/client+account+location+date]
:start [:journal-entry-line/client+account+location+date
(:journal-entry-line/client+account+location+date jel)
(:journal-entry-line/client+account+location+date jel)
(:db/id jel)]})
(take-while (fn line-must-match-client-account-location [result]
(and
@@ -25,6 +25,9 @@
(def extant-read '[:db/id :journal-entry/date :journal-entry/client {:journal-entry/line-items [:journal-entry-line/account :journal-entry-line/location :db/id :journal-entry-line/client+account+location+date]}])
(defn now []
(Date.))
(defn calc-client+account+location+date [je jel]
[(or
(:db/id (:journal-entry/client je))
@@ -43,8 +46,7 @@
(let [extant-entry (or (when-let [original-entity (:journal-entry/original-entity ledger-entry)]
(dc/pull db extant-read [:journal-entry/original-entity original-entity]))
(when-let [external-id (:journal-entry/external-id ledger-entry)]
(dc/pull db extant-read [:journal-entry/external-id external-id])))
extant-entry-exists? (:db/id extant-entry)]
(dc/pull db extant-read [:journal-entry/external-id external-id]))) ]
(cond->
[[:upsert-entity (into (-> ledger-entry
@@ -55,12 +57,9 @@
(update :journal-entry/line-items
(fn [lis]
(mapv #(-> %
(assoc :journal-entry-line/dirty true)
(assoc :journal-entry-line/client+account+location+date
(calc-client+account+location+date ledger-entry %)))
lis)))))]]
extant-entry-exists? (into (map (fn [li]
{:journal-entry-line/dirty true
:db/id li})
(get-line-items-after db extant-entry))))))
(assoc :journal-entry-line/date (:journal-entry/date ledger-entry))
(assoc :journal-entry-line/client (:journal-entry/client ledger-entry)))
lis)))))]
{:db/id (:journal-entry/client ledger-entry)
:client/ledger-last-change (now)}])))

File diff suppressed because one or more lines are too long

View File

@@ -325,6 +325,16 @@
:db/cardinality #:db{:ident :db.cardinality/one},
:db/doc "A client's human-friendly name",
:db/ident :client/name}
{:db/valueType :db.type/instant
:db/cardinality :db.cardinality/one,
:db/doc "When was the ledger last updated?",
:db/ident :client/ledger-last-change
:db/noHistory true}
{:db/valueType :db.type/instant
:db/cardinality :db.cardinality/one,
:db/doc "When were the running balances last updated?",
:db/ident :client/last-running-balance
:db/noHistory true}
{:db/valueType #:db{:ident :db.type/string},
:db/cardinality #:db{:ident :db.cardinality/one},
:db/doc "hello@example.com",
@@ -878,10 +888,32 @@
:db/cardinality #:db{:ident :db.cardinality/one},
:db/doc "The amount to credit",
:db/ident :journal-entry-line/credit}
{:db/valueType #:db{:ident :db.type/instant},
:db/cardinality #:db{:ident :db.cardinality/one},
:db/doc "The time for this entry",
:db/ident :journal-entry-line/date}
{:db/valueType #:db{:ident :db.type/string},
:db/cardinality #:db{:ident :db.cardinality/one},
:db/doc "Location of the entry",
:db/ident :journal-entry-line/location}
{:db/valueType :db.type/ref,
:db/cardinality #:db{:ident :db.cardinality/one},
:db/doc "The client for the journal entry line",
:db/ident :journal-entry-line/client}
{:db/valueType :db.type/tuple
:db/tupleAttrs [:journal-entry-line/client
:journal-entry-line/account
:journal-entry-line/location
:journal-entry-line/date
:journal-entry-line/debit
:journal-entry-line/credit
:journal-entry-line/running-balance]
:db/index true
:db/cardinality :db.cardinality/one,
:db/ident :journal-entry-line/running-balance-tuple
:db/doc "[:journal-entry-line/client :journal-entry-line/account :journal-entry-line/location :journal-entry-line/date :db/id :journal-entry-line/debit :journal-entry-line/credit :journal-entry-line/running-balance]",
:db/noHistory true
}
{:db/ident :legal-entity-1099-type/none}
{:db/ident :legal-entity-1099-type/landlord}

View File

@@ -390,7 +390,8 @@
(get-stats [_]
@stats)
(get-pending-balance [_ bank-account]
(get @pending-balance bank-account))
(or (get @pending-balance bank-account)
0.0))
(fail! [_ error]
(alog/error ::cant-complete-import

View File

@@ -130,10 +130,9 @@
(mu/log ::restore-complete)
(mu/log ::beginning-index-build)
(auto-ap.datomic/transact-schema auto-ap.datomic/conn)
(auto-ap.ledger/reset-client+account+location+date)
(mu/log ::index-build-complete)
(mu/log ::refresh-running-balance-cache)
(auto-ap.ledger/rebuild-running-balance-cache)
#_(auto-ap.ledger/rebuild-running-balance-cache)
(mu/log ::refresh-running-balance-cache-complete)
(mu/log ::done))

View File

@@ -332,51 +332,6 @@
(:bank-account/numeric-code (bank-accounts a)))
:client_id client-id})))
(defn reset-client+account+location+date
([] (reset-client+account+location+date (map first (dc/q '[:find ?c :where [?c :client/code]] (dc/db conn)))))
([clients]
(doseq [[client i] (map vector clients (range))]
(mu/trace ::reset-index
[:client client]
(mu/with-context {:client client
:client-index i
:client-count (count clients)}
(mu/log ::reseting-index)
(let [so-far (atom 0)]
@(->> (dc/qseq {:query '[:find (pull ?je [:journal-entry/date :journal-entry/client {:journal-entry/line-items [:journal-entry-line/account :journal-entry-line/location :db/id]}])
:in $ ?c
:where [?je :journal-entry/client ?c]]
:args [(dc/db conn) client]})
(map first)
(mapcat (fn [je]
(map (fn [jel]
{:db/id (:db/id jel)
:journal-entry-line/client+account+location+date
[(-> je :journal-entry/client :db/id)
(-> jel :journal-entry-line/account :db/id)
(-> jel :journal-entry-line/location)
(-> je :journal-entry/date)]})
(:journal-entry/line-items je))))
(partition-all 500)
(s/->source)
(s/map (fn [batch]
(de/future
(transact-with-backoff batch)
(count batch))))
(s/buffer 50)
(s/realize-each)
(s/consume (fn [batch-count]
(swap! so-far #(+ % batch-count))
(mu/log ::reset :count batch-count
:so-far @so-far
:client client
:client-index i
:client-count (count clients))))))
(mu/log ::client-completed))))))
(defn find-mismatch-index []
(reduce + 0
(for [c (map first (dc/q '[:find ?c :where [?c :client/code]] (dc/db conn)))
@@ -397,214 +352,18 @@
(do (println (count a))
(count a)))))
(defn accounts-needing-rebuild [ db client]
(let [client (pull-id db client)]
(->> (dc/qseq {:query '[:find ?c ?a ?l
:in $ $recent ?c
:where
[$recent ?jel :journal-entry-line/dirty true]
[?je :journal-entry/line-items ?jel]
[?je :journal-entry/client ?c]
[(get-else $ ?jel :journal-entry-line/account :unknown-account) ?a]
[(get-else $ ?jel :journal-entry-line/location "Unknown") ?l]]
:args [db (dc/since db (c/to-date (t/plus (t/now) (t/hours -8)))) client]})
(map (fn [[client account location]]
{:client client
:account account
:location location})))))
(defn all-accounts-needing-rebuild [ db client]
(let [client (pull-id db client)]
(->> (dc/qseq {:query '[:find ?c ?a ?l
:in $ ?c
:where
[?je :journal-entry/client ?c]
[?je :journal-entry/line-items ?jel]
[(get-else $ ?jel :journal-entry-line/account :unknown-account) ?a]
[(get-else $ ?jel :journal-entry-line/location "Unknown") ?l] ]
:args [db client]})
(map (fn [[client account location ]]
{:client client
:account account
:location location})))))
(defn get-entries-to-refresh [{:keys [client account location]} db ]
(let [client (pull-id db client)
account-id (if (= :unknown-account account)
nil
(pull-id db account))
matching-location (if (= "Unknown" location)
nil
location)]
(into []
(comp
(take-while (fn [{[result-client result-account result-location] :journal-entry-line/client+account+location+date}]
(and
(= client result-client)
(= account-id result-account)
(= matching-location result-location))))
(map (fn [result]
[(:db/id result) (:journal-entry-line/debit result 0.0) (:journal-entry-line/credit result 0.0) (:journal-entry-line/running-balance result 0.0) (:journal-entry-line/dirty result)])))
(dc/index-pull db
{:index :avet
:selector [:db/id :journal-entry-line/debit :journal-entry-line/credit :journal-entry-line/client+account+location+date :journal-entry-line/running-balance :journal-entry-line/dirty]
:start [:journal-entry-line/client+account+location+date
[client
account-id
matching-location
#inst "0001-01-01" ]] }))))
(defn compute-running-balance [account-needing-refresh]
(mu/trace ::compute
[:dirty-count (count (:dirty-entries account-needing-refresh))]
(when (= 0 (count (:dirty-entries account-needing-refresh)))
(alog/warn ::no-entries-to-compute
:message "This typically means that an account is determined to be dirty, but no entries are found, meaning bad query"))
(second
(reduce
(fn [[running-balance changed-rows] [id debit credit extant-running-balance dirty] ]
(let [new-running-balance (+ running-balance
(if (#{:account-type/asset
:account-type/dividend
:account-type/expense} (:account-type account-needing-refresh))
(- debit credit)
(- credit debit)))
row-changed? (or (not (dollars= new-running-balance extant-running-balance))
dirty)]
[new-running-balance
(if row-changed?
(conj changed-rows
{:db/id id
:journal-entry-line/running-balance new-running-balance
:journal-entry-line/debit debit
:journal-entry-line/credit credit
:journal-entry-line/dirty false})
changed-rows)]))
[0.0 []]
(:dirty-entries account-needing-refresh)))))
(defn refresh-running-balance-accounts [accounts-needing-rebuild clients c i db]
(mu/log ::found-accounts-needing-rebuild
:accounts (count accounts-needing-rebuild))
(let [so-far (atom 0)]
@(->> accounts-needing-rebuild
(s/->source)
(s/map (fn [account-needing-rebuild]
(->
(de/future
(mu/with-context {:account account-needing-rebuild}
(let [result (-> account-needing-rebuild
(assoc :dirty-entries (get-entries-to-refresh account-needing-rebuild db))
(assoc :account-type (:account_type ((build-account-lookup (:client account-needing-rebuild)) (:account account-needing-rebuild))))
(compute-running-balance))]
(alog/info ::recomputed-entries-with-new-sum :count (count result) )
result)
))
(de/catch (fn [error]
(alog/error ::cant-rebuild
:error error)
nil)))))
(s/realize-each)
(s/mapcat (fn [x]
x))
(s/buffer 50)
(s/transform (partition-all 500))
(s/map (fn [batch]
(de/future
(transact-with-backoff batch)
(count batch))))
(s/buffer 50)
(s/realize-each)
(s/consume (fn [batch-count]
(swap! so-far #(+ % batch-count))
(mu/log ::reset
:count batch-count
:so-far @so-far
:client c
:client-index i
:client-count (count clients)))))))
(defn clients-needing-refresh []
(defn clients-needing-refresh [db]
(->>
(dc/q '[:find (pull ?c [:client/code :db/id]) (count ?jel)
:in $ $recent
:where [$recent ?jel :journal-entry-line/dirty true]
[$ ?je :journal-entry/line-items ?jel]
[$ ?je :journal-entry/client ?c]
[$ ?jel :journal-entry-line/dirty true]]
(dc/db conn)
(dc/since
(dc/db conn)
(c/to-date (t/plus (t/now) (t/hours -8)))))
(map (fn [[client outdated]]
(assoc client :dirty-count outdated)))))
(defn refresh-running-balance-cache
([] (refresh-running-balance-cache (shuffle (clients-needing-refresh))))
([clients]
(doseq [[c i] (map vector clients (range))]
(mu/trace ::building-running-balance
[:client c]
(mu/with-context {:client c
:client-index i
:client-count (count clients)}
(mu/log ::searching-for-accounts)
(let [db (dc/db conn)
accounts-needing-rebuild (accounts-needing-rebuild db (:db/id c))]
(when (seq accounts-needing-rebuild)
(refresh-running-balance-accounts accounts-needing-rebuild clients c i db)
(mu/log ::client-completed))))))))
(defn rebuild-running-balance-cache
([] (rebuild-running-balance-cache (shuffle (map first
(dc/q '[:find (pull ?c [:client/code :db/id])
:where [?c :client/code]]
(dc/db conn))))))
([clients]
(doseq [[c i] (map vector clients (range))]
(mu/trace ::building-running-balance
[:client c]
(mu/with-context {:client c
:client-index i
:client-count (count clients)}
(mu/log ::searching-for-accounts)
(let [db (dc/db conn)
accounts-needing-rebuild (all-accounts-needing-rebuild db (:db/id c))]
(when (seq accounts-needing-rebuild)
(refresh-running-balance-accounts accounts-needing-rebuild clients c i db)
(mu/log ::client-completed))))))))
#_(seq (dc/q '[:find ?le ?d
:in $
:where [?le :journal-entry/date ?d]
[(<= ?d #inst "2000-01-01")]]
(dc/db conn)))
#_(
(refresh-running-balance-cache)
(accounts-needing-rebuild (dc/db conn) [ :client/code "SCCB"])
(user/init-repl)
(get-entries-to-refresh {:client (pull-id (dc/db conn) [:client/code "SCCB"])
:account (pull-id (dc/db conn) [:bank-account/code "SCCB-USB9598" ])
:location "A"}
(dc/db conn))
(rebuild-running-balance-cache [{:client/code "SCCB" :db/id (pull-id (dc/db conn) [:client/code "SCCB"])}])
(clients-needing-refresh)
(dc/q '[:find (count ?d)
:in $
:where [?d :journal-entry-line/dirty true]
]
(dc/db conn))
)
(dc/q '[:find (pull ?c [:client/code :db/id :client/ledger-last-change :client/last-running-balance])
:in $
:where
[?c :client/code]
[(get-else $ ?c :client/ledger-last-change #inst "2040-01-01") ?last-change]
[(get-else $ ?c :client/last-running-balance #inst "2000-01-01") ?last-running-balance]
[(> ?last-change ?last-running-balance)] ]
db)
(map (fn [[client]]
client))))
#_(comment [17592334354011 #inst "0024-08-03T07:52:58.000-00:00"]
[17592302554688 #inst "0023-07-20T07:52:58.000-00:00"]
@@ -617,15 +376,107 @@
[17592302554685 #inst "0023-07-16T07:52:58.000-00:00"]
[17592334353244 #inst "0024-07-14T07:52:58.000-00:00"])
;; TODO
;; 1. X Having an uncategorized running balance
;; 1a. X dirty should always be 0
;; 2. X rebuild from beginning of history always, only update entry if it currently doesn't match
;; 3. X deterministic order (date + entityid)
;; 4. Check for errors
(defn mark-all-clients-dirty []
(auto-ap.datomic/audit-transact-batch
(for [[c] (dc/q '[:find ?c
:in $
:where [?c :client/code]]
(dc/db conn))]
{:db/id c :client/ledger-last-change (c/to-date (t/now))})
{:user/name "backfill-client and dates"}))
(defn upsert-running-balance []
(let [db (dc/db conn)
starting-at (c/to-date (t/now))
clients (clients-needing-refresh db)
_ (alog/info ::clients-needing-update :clients clients :count (count clients))
client-change-stats (atom {})
changes (for [c clients
:let [client-id (:db/id c)
account-lookup (build-account-lookup client-id)]
running-balance-set (->> (dc/index-range db :journal-entry-line/running-balance-tuple [client-id] [(inc client-id)])
(seq)
(partition-by (fn [{[current-client current-account current-location current-date debit credit running-balance]
:v}]
[current-client current-account current-location])))
running-balance-change (->> running-balance-set
(sort-by (fn [{id :e [_ _ _ current-date] :v}]
[current-date id]))
(reduce
(fn [{:keys [changes last-running-balance]}
{id :e
[current-client current-account current-location current-date debit credit running-balance]
:v}]
(let [delta (if (#{:account-type/asset
:account-type/dividend
:account-type/expense} (:account_type (account-lookup current-account)))
(- (or debit 0.0) (or credit 0.0))
(- (or credit 0.0) (or debit 0.0)))
correct-running-balance (+ last-running-balance delta)
running-balance-changed? (not (dollars= correct-running-balance (or running-balance 0.0)))]
(when running-balance-changed?
(swap! client-change-stats update (:client/code c) (fnil inc 0)))
(cond-> {:last-account-lookup account-lookup
:last-running-balance correct-running-balance
:changes changes}
running-balance-changed?
(update :changes conj {:db/id id
:journal-entry-line/running-balance correct-running-balance}))))
{:last-running-balance 0.0})
:changes)]
running-balance-change)]
(alog/info ::change-stats :stats @client-change-stats)
(mu/trace ::update-running-balance []
(auto-ap.datomic/audit-transact-batch changes
{:user/name "running balance updater"}))
(auto-ap.datomic/audit-transact (mapv (fn [c]
{:db/id (:db/id c)
:client/last-running-balance starting-at})
clients)
{:user/name "running balance updater"})
(count changes)))
(comment
(mark-all-clients-dirty)
(upsert-running-balance)
;; SETUP running-balance-tuple
(doseq [[c] (dc/q '[:find ?c
:in $
:where [?c :client/code]]
(dc/db conn))]
(println "CLIENT " c)
(auto-ap.datomic/audit-transact-batch
(for [[date client line] (->> (dc/q '[:find ?jed ?jec (pull ?jel [:journal-entry-line/debit :journal-entry-line/credit :journal-entry-line/running-balance :db/id])
:in $ ?jec
:where [?je :journal-entry/client ?jec]
[?je :journal-entry/date ?jed]
[?je :journal-entry/line-items ?jel]]
(dc/db conn)
c))]
{:db/id (:db/id line)
:journal-entry-line/date date
:journal-entry-line/client client})
{:user/name "backfill-client and dates"})
(println "done."))
#_(dc/q '[:find (pull ?je [*]) (pull ?jel [*])
:where [?je :journal-entry/line-items ?jel]
(not [?jel :journal-entry-line/running-balance-tuple])]
(dc/db conn)))
;; TODO only enable once IOL is set up in clod
#_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]}
(mount/defstate running-balance-cache-worker
:start (scheduler/every (* 15 60 (+ 500 (rand-int 500))) (heartbeat refresh-running-balance-cache "running-balance-cache"))
:start (scheduler/every (* 5 60 (+ 500 (rand-int 500))) (heartbeat upsert-running-balance "running-balance-cache"))
:stop (scheduler/stop running-balance-cache-worker))

View File

@@ -293,19 +293,6 @@ module "reconcile_ledger_job" {
cpu = 2048
}
module "current_balance_cache" {
count = var.enable_schedules ? 1 : 0
source = "./background-job/"
ecs_cluster = var.ecs_cluster
task_role_arn = var.task_role_arn
stage = var.stage
schedule = "rate(30 minutes)"
job_name = "current-balance-cache"
execution_role_arn = var.execution_role_arn
memory = 2048
cpu = 512
}
module "yodlee2_job" {
count = var.enable_schedules ? 1 : 0
source = "./background-job/"