makes solr ec2 based and adds resiliency to running balances.
This commit is contained in:
@@ -622,6 +622,31 @@
|
||||
(range length)))]
|
||||
(sort comparator results)))
|
||||
|
||||
;; TODO replace COULD JUST BE SORT-3
|
||||
(defn apply-sort-4 [args results]
|
||||
|
||||
(let [sort-bys (-> []
|
||||
(into (:sort args))
|
||||
(conj {:sort-key "default" :asc (if (contains? args :default-asc?)
|
||||
(:default-asc? args)
|
||||
true)})
|
||||
(conj {:sort-key "e" :asc true}))
|
||||
length (count sort-bys)
|
||||
comparator (fn [xs ys]
|
||||
(reduce
|
||||
(fn [_ i]
|
||||
|
||||
(let [comparison (if (:asc (nth sort-bys i))
|
||||
(compare (nth xs i) (nth ys i))
|
||||
(compare (nth ys i) (nth xs i)))]
|
||||
|
||||
(if (not= 0 comparison)
|
||||
(reduced comparison)
|
||||
0)))
|
||||
0
|
||||
(range length)))]
|
||||
(sort comparator results)))
|
||||
|
||||
(defn apply-pagination-raw [args results]
|
||||
{:entries (->> results
|
||||
(drop (or (:start args) 0))
|
||||
|
||||
@@ -95,8 +95,7 @@
|
||||
(for [a (:accounts accounts)]
|
||||
(remove-nils
|
||||
{:plaid-account/external-id (:account_id a)
|
||||
:plaid-account/last-synced (coerce/to-date (coerce/to-date-time (-> item :status :transactions :last_successful_update)))
|
||||
:plaid-account/balance (or (some-> a
|
||||
:plaid-account/last-synced (coerce/to-date (coerce/to-date-time (-> item :status :transactions :last_successful_update))) :plaid-account/balance (or (some-> a
|
||||
:balances
|
||||
:current
|
||||
double)
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
(:require
|
||||
[auto-ap.datomic :refer [conn pull-id pull-ref transact-with-backoff]]
|
||||
[auto-ap.utils :refer [by dollars= heartbeat]]
|
||||
[auto-ap.logging :as alog]
|
||||
[clj-time.coerce :as c]
|
||||
[clj-time.core :as t]
|
||||
[com.brunobonacci.mulog :as mu]
|
||||
@@ -324,7 +325,9 @@
|
||||
({:bank-account-type/check :account-type/asset
|
||||
:bank-account-type/cash :account-type/asset
|
||||
:bank-account-type/credit :account-type/liability}
|
||||
(:db/ident (:bank-account/type (bank-accounts a)))))
|
||||
(:db/ident (:bank-account/type (bank-accounts a))))
|
||||
:account-type/asset ;; DEFAULT TO ASSET, for things like unknown
|
||||
)
|
||||
:numeric_code (or (:account/numeric-code (accounts a))
|
||||
(:bank-account/numeric-code (bank-accounts a)))
|
||||
:client_id client-id})))
|
||||
@@ -394,120 +397,134 @@
|
||||
(do (println (count a))
|
||||
(count a)))))
|
||||
|
||||
|
||||
|
||||
(defn accounts-needing-rebuild [ db client]
|
||||
(let [client (pull-id db client)]
|
||||
(->> (dc/qseq {:query '[:find ?c ?a ?l (min ?d)
|
||||
(->> (dc/qseq {:query '[:find ?c ?a ?l
|
||||
:in $ $recent ?c
|
||||
:where
|
||||
[$recent ?jel :journal-entry-line/dirty true]
|
||||
[?jel :journal-entry-line/account ?a]
|
||||
[?jel :journal-entry-line/location ?l]
|
||||
[?je :journal-entry/line-items ?jel]
|
||||
[?je :journal-entry/client ?c]
|
||||
[?je :journal-entry/date ?d]]
|
||||
[(get-else $ ?jel :journal-entry-line/account :unknown-account) ?a]
|
||||
[(get-else $ ?jel :journal-entry-line/location "Unknown") ?l]]
|
||||
:args [db (dc/since db (c/to-date (t/plus (t/now) (t/hours -8)))) client]})
|
||||
(map (fn [[client account location starting-at ]]
|
||||
(map (fn [[client account location]]
|
||||
{:client client
|
||||
:account account
|
||||
:starting-at starting-at
|
||||
:location location})))))
|
||||
|
||||
(defn all-accounts-needing-rebuild [ db client]
|
||||
(let [client (pull-id db client)]
|
||||
(->> (dc/qseq {:query '[:find ?c ?a ?l (min ?d)
|
||||
(->> (dc/qseq {:query '[:find ?c ?a ?l
|
||||
:in $ ?c
|
||||
:where
|
||||
[?je :journal-entry/client ?c]
|
||||
[?je :journal-entry/line-items ?jel]
|
||||
[?jel :journal-entry-line/account ?a]
|
||||
[?jel :journal-entry-line/location ?l]
|
||||
[?je :journal-entry/date ?d]]
|
||||
[(get-else $ ?jel :journal-entry-line/account :unknown-account) ?a]
|
||||
[(get-else $ ?jel :journal-entry-line/location "Unknown") ?l] ]
|
||||
:args [db client]})
|
||||
(map (fn [[client account location starting-at ]]
|
||||
(map (fn [[client account location ]]
|
||||
{:client client
|
||||
:account account
|
||||
:starting-at starting-at
|
||||
:location location})))))
|
||||
|
||||
(defn find-running-balance-start [{:keys [client account location starting-at]} db ]
|
||||
(defn get-entries-to-refresh [{:keys [client account location]} db ]
|
||||
(let [client (pull-id db client)
|
||||
account (pull-id db account)]
|
||||
(or
|
||||
(->> (dc/index-pull db
|
||||
{:index :avet
|
||||
:selector [:db/id :journal-entry-line/running-balance :journal-entry-line/client+account+location+date]
|
||||
:start [:journal-entry-line/client+account+location+date [client account location starting-at]]
|
||||
:reverse true})
|
||||
(take-while (fn [result]
|
||||
(= [client
|
||||
account
|
||||
location]
|
||||
(take 3 (:journal-entry-line/client+account+location+date result)))))
|
||||
(take 5)
|
||||
(drop-while (fn [{[_ _ _ date] :journal-entry-line/client+account+location+date}]
|
||||
(>= (compare date starting-at) 0)))
|
||||
first
|
||||
:journal-entry-line/running-balance)
|
||||
0.0)))
|
||||
account-id (if (= :unknown-account account)
|
||||
nil
|
||||
(pull-id db account))
|
||||
matching-location (if (= "Unknown" location)
|
||||
nil
|
||||
location)]
|
||||
|
||||
(defn get-dirty-entries [{:keys [client account location starting-at]} db ]
|
||||
(let [client (pull-id db client)
|
||||
account (pull-id db account)]
|
||||
(into []
|
||||
(comp
|
||||
(take-while (fn [{[result-client result-account result-location] :journal-entry-line/client+account+location+date}]
|
||||
(and
|
||||
(= client result-client)
|
||||
(= account result-account)
|
||||
(= location result-location))))
|
||||
(= account-id result-account)
|
||||
(= matching-location result-location))))
|
||||
(map (fn [result]
|
||||
[(:db/id result) (:journal-entry-line/debit result 0.0) (:journal-entry-line/credit result 0.0) ])))
|
||||
[(:db/id result) (:journal-entry-line/debit result 0.0) (:journal-entry-line/credit result 0.0) (:journal-entry-line/running-balance result 0.0) (:journal-entry-line/dirty result)])))
|
||||
(dc/index-pull db
|
||||
{:index :avet
|
||||
:selector [:db/id :journal-entry-line/debit :journal-entry-line/credit :journal-entry-line/client+account+location+date]
|
||||
:selector [:db/id :journal-entry-line/debit :journal-entry-line/credit :journal-entry-line/client+account+location+date :journal-entry-line/running-balance :journal-entry-line/dirty]
|
||||
:start [:journal-entry-line/client+account+location+date
|
||||
[client account location starting-at]]
|
||||
|
||||
}))))
|
||||
[client
|
||||
account-id
|
||||
matching-location
|
||||
#inst "0001-01-01" ]] }))))
|
||||
|
||||
(defn compute-running-balance [account-needing-refresh]
|
||||
(mu/trace ::compute
|
||||
[:dirty-count (count (:dirty-entries account-needing-refresh))]
|
||||
(when (= 0 (count (:dirty-entries account-needing-refresh)))
|
||||
(alog/warn ::no-entries-to-compute
|
||||
:message "This typically means that an account is determined to be dirty, but no entries are found, meaning bad query"))
|
||||
(second
|
||||
(reduce
|
||||
(fn [[running-balance rows] [id debit credit] ]
|
||||
(fn [[running-balance changed-rows] [id debit credit extant-running-balance dirty] ]
|
||||
(let [new-running-balance (+ running-balance
|
||||
(if (#{:account-type/asset
|
||||
:account-type/dividend
|
||||
:account-type/expense} (:account-type account-needing-refresh))
|
||||
(- debit credit)
|
||||
(- credit debit)))]
|
||||
(- credit debit)))
|
||||
row-changed? (or (not (dollars= new-running-balance extant-running-balance))
|
||||
dirty)]
|
||||
[new-running-balance
|
||||
(conj rows
|
||||
{:db/id id
|
||||
:journal-entry-line/running-balance new-running-balance
|
||||
:journal-entry-line/dirty false})]))
|
||||
|
||||
[(:build-from account-needing-refresh) []]
|
||||
(conj changed-rows
|
||||
{:db/id id
|
||||
:journal-entry-line/running-balance new-running-balance
|
||||
:journal-entry-line/debit debit
|
||||
:journal-entry-line/credit credit
|
||||
:journal-entry-line/dirty false})
|
||||
#_(if row-changed?
|
||||
(conj changed-rows
|
||||
{:db/id id
|
||||
:journal-entry-line/running-balance new-running-balance
|
||||
:journal-entry-line/debit debit
|
||||
:journal-entry-line/credit credit
|
||||
:journal-entry-line/dirty false})
|
||||
changed-rows)]))
|
||||
[0.0 []]
|
||||
(:dirty-entries account-needing-refresh)))))
|
||||
|
||||
(let [db (dc/db conn)
|
||||
account-needing-rebuild
|
||||
{:account (pull-id (dc/db conn) [:bank-account/code "SCCB-2888CB"])
|
||||
:client (pull-id (dc/db conn) [:client/code "SCCB"])
|
||||
:location "A"}
|
||||
|
||||
result (-> account-needing-rebuild
|
||||
(assoc :dirty-entries (get-entries-to-refresh account-needing-rebuild db))
|
||||
(assoc :account-type (:account_type ((build-account-lookup (:client account-needing-rebuild)) (:account account-needing-rebuild))))
|
||||
(compute-running-balance))]
|
||||
(alog/info ::recomputed-entries-with-new-sum :count (count result))
|
||||
result)
|
||||
|
||||
(defn refresh-running-balance-accounts [accounts-needing-rebuild clients c i db]
|
||||
|
||||
(mu/log ::found-accounts-needing-rebuild
|
||||
:accounts (count accounts-needing-rebuild))
|
||||
(let [so-far (atom 0)]
|
||||
@(->> accounts-needing-rebuild
|
||||
(s/->source)
|
||||
(s/map (fn [account-needing-rebuild]
|
||||
(de/future
|
||||
(mu/with-context {:account account-needing-rebuild}
|
||||
(-> account-needing-rebuild
|
||||
(assoc :build-from (find-running-balance-start account-needing-rebuild db))
|
||||
(assoc :dirty-entries (get-dirty-entries account-needing-rebuild db))
|
||||
(assoc :account-type (:account_type ((build-account-lookup (:client account-needing-rebuild)) (:account account-needing-rebuild))))
|
||||
(compute-running-balance))))))
|
||||
(->
|
||||
(de/future
|
||||
(mu/with-context {:account account-needing-rebuild}
|
||||
(let [result (-> account-needing-rebuild
|
||||
(assoc :dirty-entries (get-entries-to-refresh account-needing-rebuild db))
|
||||
(assoc :account-type (:account_type ((build-account-lookup (:client account-needing-rebuild)) (:account account-needing-rebuild))))
|
||||
(compute-running-balance))]
|
||||
(alog/info ::recomputed-entries-with-new-sum :count (count result) )
|
||||
result)
|
||||
))
|
||||
(de/catch (fn [error]
|
||||
(alog/error ::cant-rebuild
|
||||
:error error)
|
||||
nil)))))
|
||||
(s/realize-each)
|
||||
(s/mapcat (fn [x]
|
||||
x))
|
||||
@@ -528,17 +545,21 @@
|
||||
:client-index i
|
||||
:client-count (count clients)))))))
|
||||
|
||||
|
||||
(defn clients-needing-refresh []
|
||||
(map first
|
||||
(dc/q '[:find (pull ?c [:client/code :db/id])
|
||||
:in $ $recent
|
||||
:where [$recent ?jel :journal-entry-line/dirty true]
|
||||
[$ ?je :journal-entry/line-items ?jel]
|
||||
[$ ?je :journal-entry/client ?c]]
|
||||
(dc/db conn)
|
||||
(dc/since
|
||||
(dc/db conn)
|
||||
(c/to-date (t/plus (t/now) (t/hours -4)))))))
|
||||
(->>
|
||||
(dc/q '[:find (pull ?c [:client/code :db/id]) (count ?jel)
|
||||
:in $ $recent
|
||||
:where [$recent ?jel :journal-entry-line/dirty true]
|
||||
[$ ?je :journal-entry/line-items ?jel]
|
||||
[$ ?je :journal-entry/client ?c]
|
||||
[$ ?jel :journal-entry-line/dirty true]]
|
||||
(dc/db conn)
|
||||
(dc/since
|
||||
(dc/db conn)
|
||||
(c/to-date (t/plus (t/now) (t/hours -8)))))
|
||||
(map (fn [[client outdated]]
|
||||
(assoc client :dirty-count outdated)))))
|
||||
|
||||
(defn refresh-running-balance-cache
|
||||
([] (refresh-running-balance-cache (shuffle (clients-needing-refresh))))
|
||||
@@ -575,6 +596,30 @@
|
||||
(refresh-running-balance-accounts accounts-needing-rebuild clients c i db)
|
||||
(mu/log ::client-completed))))))))
|
||||
|
||||
#_(seq (dc/q '[:find ?le ?d
|
||||
:in $
|
||||
:where [?le :journal-entry/date ?d]
|
||||
[(<= ?d #inst "2000-01-01")]]
|
||||
(dc/db conn)))
|
||||
|
||||
|
||||
#_(comment [17592334354011 #inst "0024-08-03T07:52:58.000-00:00"]
|
||||
[17592302554688 #inst "0023-07-20T07:52:58.000-00:00"]
|
||||
[17592302554682 #inst "0023-07-16T07:52:58.000-00:00"]
|
||||
[17592302554691 #inst "0023-07-22T07:52:58.000-00:00"]
|
||||
[17592334353995 #inst "0024-08-10T07:52:58.000-00:00"]
|
||||
[17592302554694 #inst "0023-07-27T07:52:58.000-00:00"]
|
||||
[17592241669405 #inst "0218-08-04T07:52:58.000-00:00"]
|
||||
[17592334353207 #inst "0024-07-27T07:52:58.000-00:00"]
|
||||
[17592302554685 #inst "0023-07-16T07:52:58.000-00:00"]
|
||||
[17592334353244 #inst "0024-07-14T07:52:58.000-00:00"])
|
||||
|
||||
;; TODO
|
||||
;; 1. Having an uncategorized running balance
|
||||
;; 1a. dirty should always be 0
|
||||
;; 2. rebuild from beginning of history always, only update entry if it currently doesn't match
|
||||
;; 3. deterministic order (date + entityid)
|
||||
|
||||
;; TODO only enable once IOL is set up in clod
|
||||
#_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]}
|
||||
(mount/defstate running-balance-cache-worker
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
(ns auto-ap.ssr.ledger
|
||||
(:require [auto-ap.client-routes :as client-routes]
|
||||
[auto-ap.datomic
|
||||
:refer [add-sorter-fields apply-pagination apply-sort-3
|
||||
:refer [add-sorter-fields apply-pagination apply-sort-4
|
||||
audit-transact audit-transact-batch conn merge-query
|
||||
observable-query pull-many remove-nils]]
|
||||
[auto-ap.datomic.accounts :as d-accounts]
|
||||
@@ -289,7 +289,7 @@
|
||||
(merge-query {:query {:find ['?sort-default '?e]}})))]
|
||||
|
||||
(->> (observable-query query)
|
||||
(apply-sort-3 (assoc query-params :default-asc? true))
|
||||
(apply-sort-4 (assoc query-params :default-asc? true))
|
||||
(apply-pagination query-params))))
|
||||
|
||||
(def default-read
|
||||
|
||||
Reference in New Issue
Block a user