(cloud) Added running balance cache for ledger

This commit is contained in:
2023-03-21 17:03:55 -07:00
parent f34c5c65fe
commit 0af50758bd
8 changed files with 146 additions and 120 deletions

View File

@@ -13,13 +13,10 @@
(def uri (:datomic-url env))
(mount/defstate client
:start (dc/client {:server-type :cloud
:region "us-east-1"
:system "iol-cloud"
:endpoint "https://53syis8n1m.execute-api.us-east-1.amazonaws.com"})
:start (dc/client (:client-config env))
:stop nil)
(mount/defstate conn
:start (dc/connect client {:db-name "prod-mirror"})
:start (dc/connect client {:db-name (:db-name env)})
:stop nil)
#_(def uri "datomic:mem://datomic-transactor:4334/invoice")
@@ -566,7 +563,6 @@
(update-in [:args] into (get-in query-part-2 [:args]))))
(defn add-sorter-fields [q sort-map args]
(log/info "sort-map" (pr-str sort-map))
(reduce
(fn [q {:keys [sort-key]}]
(merge-query q
@@ -578,7 +574,6 @@
(:sort args)))
(defn add-sorter-fields-2 [q sort-map args]
(log/info "sort-map" (pr-str sort-map))
(reduce
(fn [q {:keys [sort-key]}]
(merge-query q
@@ -627,7 +622,9 @@
(let [batch (conj (vec batch) {:db/id "datomic.tx"
:audit/user (str (:user/role id) "-" (:user/name id))
:audit/batch batch-id})
_ (log/info "transacting batch " batch-id " " (count batch))
_ (mu/log ::transacting-batch
:batch batch-id
:count (count batch))
tx-result (dc/transact conn {:tx-data batch})]
(cond-> full-tx
@@ -810,4 +807,8 @@
(defn transact-schema [conn]
(dc/transact conn
{:tx-data (edn/read-string (slurp (io/resource "schema.edn")))}))
{:tx-data (edn/read-string (slurp (io/resource "schema.edn")))})
;; this is temporary for any new stuff that needs to be asserted for cloud migration.
(dc/transact conn
{:tx-data (edn/read-string (slurp (io/resource "cloud-migration-schema.edn")))}))

View File

@@ -8,7 +8,7 @@
[auto-ap.time :as atime]
[auto-ap.ledger.reports :as l-reports]
[auto-ap.graphql.utils
:refer [->graphql <-graphql assert-admin assert-can-see-client result->page]]
:refer [->graphql <-graphql assert-admin assert-can-see-client result->page attach-tracing-resolvers]]
[auto-ap.parse.util :as parse]
[auto-ap.pdf.ledger :refer [print-balance-sheet print-pnl print-journal-detail-report]]
[auto-ap.utils :refer [by dollars= heartbeat]]
@@ -19,13 +19,9 @@
[datomic.client.api :as dc]
[mount.core :as mount]
[com.brunobonacci.mulog :as mu]
[yang.scheduler :as scheduler]
[auto-ap.graphql.utils :refer [attach-tracing-resolvers]])
[yang.scheduler :as scheduler])
(:import [org.apache.commons.codec.binary Base64]))
(mount/defstate running-balance-cache
:start (atom {}))
(defn get-ledger-page [context args _]
(let [args (assoc args :id (:id context))
[journal-entries journal-entries-count] (l/get-graphql (assoc (<-graphql (:filters args))
@@ -34,15 +30,7 @@
journal-entries (mapv
(fn [je]
(-> je
(update :journal-entry/original-entity :db/id)
(update :journal-entry/line-items
(fn [jels]
(mapv
(fn [jel]
(assoc jel :running-balance (get-in @running-balance-cache [(:db/id (:journal-entry/client je))
(:db/id jel)])))
jels)))))
(update :journal-entry/original-entity :db/id)))
journal-entries)]
(result->page journal-entries journal-entries-count :journal_entries (:filters args))))
@@ -485,95 +473,120 @@
:errors (map (fn [x] {:external_id (:external_id x)
:error (:error x)}) errors)}))
(defn build-running-balance
([lookup-account all-ledger-entries]
(->> all-ledger-entries
(reduce
(fn [[rollup cache] [_ _ jel account location debit credit]]
(let [rollup (-> rollup
(update-in [[location account] :debit] (fnil + 0.0) debit)
(update-in [[location account] :credit] (fnil + 0.0) credit)
(update-in [[location account] :count] (fnil + 0) 1))]
[rollup
(assoc cache jel (assoc (get rollup [location account]) :account-id account))]))
[{} {}])
(second)
(reduce-kv
(fn [acc jel {:keys [debit credit account-id]}]
(let [account (lookup-account account-id)
account-type (:account_type account)]
(assoc acc jel
(if account-type (if (#{:account-type/asset
(defn accounts-needing-rebuild [ db client]
(->> (dc/qseq '[:find ?c ?a ?l (min ?d)
:in $ ?c
:where [?je :journal-entry/client ?c]
[?je :journal-entry/line-items ?jel]
(or (not [?jel :journal-entry-line/running-balance])
[?jel :journal-entry-line/dirty true])
[?jel :journal-entry-line/account ?a]
[?jel :journal-entry-line/location ?l]
[?je :journal-entry/date ?d]]
db
client)
(map (fn [[client account location starting-at ]]
{:client client
:account account
:starting-at starting-at
:location location}))))
(defn find-running-balance-start [account-needing-rebuild db ]
(let [starting-from (or (->> (dc/q '[:find ?d ?je ?jel ?rbs
:in $ ?c ?starting-at ?a ?l
:where
[?je :journal-entry/client ?c]
[?je :journal-entry/date ?d]
[(< ?d ?starting-at)]
[?je :journal-entry/line-items ?jel]
[?jel :journal-entry-line/account ?a]
[?jel :journal-entry-line/location ?l]
[?jel :journal-entry-line/running-balance ?rbs]
]
db
(:client account-needing-rebuild)
(:starting-at account-needing-rebuild)
(:account account-needing-rebuild)
(:location account-needing-rebuild))
(sort)
(last)
(last))
0.0)]
(mu/log ::starting-rebuild-at
:at starting-from)
starting-from))
(defn get-dirty-entries [account-needing-rebuild db ]
(->> (dc/q
'[:find ?d ?jel ?debit ?credit
:in $ ?c ?starting-at ?a ?l
:where
[?e :journal-entry/client ?c]
[?e :journal-entry/date ?d]
[(>= ?d ?starting-at)]
[?e :journal-entry/line-items ?jel]
[?jel :journal-entry-line/account ?a]
[?jel :journal-entry-line/location ?l]
[(get-else $ ?jel :journal-entry-line/debit 0.0) ?debit ]
[(get-else $ ?jel :journal-entry-line/credit 0.0) ?credit]]
db
(:client account-needing-rebuild)
(:starting-at account-needing-rebuild)
(:account account-needing-rebuild)
(:location account-needing-rebuild))
sort
(map #(drop 1 %))))
(defn compute-running-balance [account-needing-refresh]
(mu/log ::compute
:dirty-count (count (:dirty-entries account-needing-refresh)))
(second
(reduce
(fn [[running-balance rows] [id debit credit] ]
(let [new-running-balance (+ running-balance
(if (#{:account-type/asset
:account-type/dividend
:account-type/expense} account-type)
:account-type/expense} (:account-type account-needing-refresh))
(- debit credit)
(- credit debit))
0.0))))
{}))))
(defn running-balance-for [client-id]
(let [lookup-account (build-account-lookup client-id)]
(->> (dc/q
{:query {:find ['?d '?e '?jel '?account '?location '?debit '?credit]
:in ['$ '?client-id]
:where '[[?e :journal-entry/client ?client-id]
[?e :journal-entry/date ?d]
[?e :journal-entry/line-items ?jel]
(or-join [?e]
(and [?e :journal-entry/original-entity ?i]
(or-join [?e ?i]
(and
[?i :transaction/bank-account ?b]
(or [?b :bank-account/include-in-reports true]
(not [?b :bank-account/include-in-reports])))
(not [?i :transaction/bank-account])))
(not [?e :journal-entry/original-entity ]))
[(get-else $ ?jel :journal-entry-line/account :account/unknown) ?account]
[(get-else $ ?jel :journal-entry-line/debit 0.0) ?debit ]
[(get-else $ ?jel :journal-entry-line/credit 0.0) ?credit]
[(get-else $ ?jel :journal-entry-line/location "") ?location]]
}
:args [(dc/db conn) client-id]})
(sort-by (juxt first second))
(build-running-balance lookup-account))))
(def last-run-running-balance (atom nil))
(defn build-running-balance-cache []
(let [clients-needing-refresh (if-let [last-run @last-run-running-balance]
(->> (dc/q
{:query {:find ['?v]
:in ['$ '[?tx ...]]
:where ['[$ _ :journal-entry/client ?v ?tx]]}
:args [(dc/history (dc/db conn))
(map :t (mapcat :data (dc/tx-range conn {:start last-run
:end (java.util.Date.)})))]})
(map first)
(into #{}))
(into #{} (map :db/id (d-clients/get-all))))
starting (java.util.Date.)]
(log/info (count clients-needing-refresh) "Clients need their balance cache refreshed.")
(swap! running-balance-cache
merge
(reduce
(fn [acc client]
(log/info "Computing running balance cache for " (pull-attr (dc/db conn) :client/code client ))
(assoc acc client (running-balance-for client)))
{}
clients-needing-refresh))
(log/info "Done refreshing " (count clients-needing-refresh) " client caches")
(reset! last-run-running-balance starting)))
(- credit debit)))]
[new-running-balance
(conj rows
{:db/id id
:journal-entry-line/running-balance new-running-balance
:journal-entry-line/dirty false})]))
[(:build-from account-needing-refresh) []]
(:dirty-entries account-needing-refresh))))
(defn refresh-running-balance-cache []
(build-running-balance-cache))
(doseq [c (shuffle (map first
(dc/q '[:find (pull ?c [:client/code :db/id])
:where [?c :client/code]]
(dc/db conn))))]
(mu/trace ::building-running-balance
[:client c]
(mu/with-context {:client c}
(let [db (dc/db conn)
accounts-needing-rebuild (accounts-needing-rebuild db (:db/id c))]
(when (seq accounts-needing-rebuild)
(mu/log ::found-accounts-needing-rebuild
:accounts accounts-needing-rebuild)
(audit-transact-batch
(->> accounts-needing-rebuild
(mapcat (fn [account-needing-rebuild]
(mu/with-context {:account account-needing-rebuild}
(-> account-needing-rebuild
(assoc :build-from (find-running-balance-start account-needing-rebuild db))
(assoc :dirty-entries (get-dirty-entries account-needing-rebuild db))
(assoc :account-type (:account_type ((build-account-lookup (:client account-needing-rebuild)) (:account account-needing-rebuild))))
(compute-running-balance))))))
{:user/name "running-balance-cache"})))))))
#_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]}
(mount/defstate running-balance-cache-worker
:start (scheduler/every (* 15 60 1000) (heartbeat refresh-running-balance-cache "running-balance-cache"))
:start (scheduler/every (* 15 60 (+ 500 (rand-int 500))) (heartbeat refresh-running-balance-cache "running-balance-cache"))
:stop (scheduler/stop running-balance-cache-worker))

View File

@@ -48,6 +48,7 @@
:journal-entry/amount (Math/abs (:invoice/total entity))
:journal-entry/line-items (into [(cond-> {:journal-entry-line/account :account/accounts-payable
:journal-entry-line/dirty true
:journal-entry-line/location "A"
}
credit-invoice? (assoc :journal-entry-line/debit (Math/abs (:invoice/total entity)))
@@ -55,6 +56,7 @@
(map (fn [ea]
(cond->
{:journal-entry-line/account (:db/id (:invoice-expense-account/account ea))
:journal-entry-line/dirty true
:journal-entry-line/location (or (:invoice-expense-account/location ea) "HQ")
}
credit-invoice? (assoc :journal-entry-line/credit (Math/abs (:invoice-expense-account/amount ea)))
@@ -91,6 +93,7 @@
:journal-entry/cleared-against (:transaction/cleared-against entity)
:journal-entry/line-items (into [(remove-nils {:journal-entry-line/account (:db/id (:transaction/bank-account entity))
:journal-entry-line/dirty true
:journal-entry-line/location "A"
:journal-entry-line/credit (when credit-from-bank?
(Math/abs (:transaction/amount entity)))
@@ -101,6 +104,7 @@
(fn [a]
(remove-nils{:journal-entry-line/account (:db/id (:transaction-account/account a))
:journal-entry-line/location (:transaction-account/location a)
:journal-entry-line/dirty true
:journal-entry-line/debit (when credit-from-bank?
(Math/abs (:transaction-account/amount a)))
:journal-entry-line/credit (when debit-from-bank?
@@ -429,6 +433,7 @@
ledger-txs (->> affected-entities
(map #(entity-change->ledger (:db-after tx) %))
(filter seq))
;; TODO mark deleted journal-entry-line accounts as dirty, needing refresh
retractions (map (fn [[_ e]] [:db/retractEntity [:journal-entry/original-entity e]]) affected-entities)]
(when (seq retractions)
(audit-transact retractions id))
@@ -443,8 +448,7 @@
(let [batch (conj (vec batch) {:db/id "datomic.tx"
:audit/batch batch-id})
_ (log/info "transacting batch " batch-id " " (count batch))
tx-result (transact-with-ledger batch id)
_ (Thread/sleep 1000)]
tx-result (transact-with-ledger batch id)]
(cond-> full-tx
(:tx-data full-tx) (update :tx-data #(into % (:tx-data tx-result)))

View File

@@ -50,7 +50,7 @@
([n index-name]
(search n index-name []))
([n index-name other-keys]
(let [directory (FSDirectory/open (Paths/get (java.net.URI. (str "file:///tmp/search" (:dd-env env) "/" index-name))))
(let [directory (FSDirectory/open (Paths/get (java.net.URI. (str "file:///tmp/search/" (:dd-env env) "/" index-name))))
index-reader (DirectoryReader/open directory)
index-searcher (IndexSearcher. index-reader)]
(for [x (seq (.scoreDocs (.search index-searcher (make-query n) 10)))]
@@ -64,7 +64,7 @@
)
(defn search-ids [n index-name]
(let [directory (FSDirectory/open (Paths/get (java.net.URI. (str "file:///tmp/search" (:dd-env env) "/" index-name))))
(let [directory (FSDirectory/open (Paths/get (java.net.URI. (str "file:///tmp/search/" (:dd-env env) "/" index-name))))
index-reader (DirectoryReader/open directory)
index-searcher (IndexSearcher. index-reader)]
(for [x (seq (.scoreDocs (.search index-searcher (make-query n) 100)))]