(ns auto-ap.ledger (:require [auto-ap.datomic :refer [conn pull-id pull-ref]] [auto-ap.logging :as alog] [auto-ap.utils :refer [by dollars= heartbeat]] [clj-time.coerce :as c] [clj-time.core :as t] [com.brunobonacci.mulog :as mu] [com.unbounce.dogstatsd.core :as statsd] [datomic.api :as dc] [iol-ion.query :refer [account-sets]] [mount.core :as mount] [yang.scheduler :as scheduler]) (:import [iol_ion.query Line])) (defn reconcile-ledger ([] (reconcile-ledger (-> (t/now) (t/plus (t/months -6)) (c/to-date)))) ([start-date] (let [txes-missing-ledger-entries (->> (dc/q {:find ['?t ] :in ['$ '?sd] :where [ '[?t :transaction/date ?d] '[(>= ?d ?sd)] '(not [_ :journal-entry/original-entity ?t]) '(not [?t :transaction/amount 0.0]) '(not [?t :transaction/approval-status :transaction-approval-status/excluded]) '(not [?t :transaction/approval-status :transaction-approval-status/suppressed]) ]} (dc/db conn) start-date) (map first) (mapv (fn [t] [:upsert-transaction {:db/id t}]))) invoices-missing-ledger-entries (->> (dc/q {:find ['?t ] :in ['$ '?sd] :where ['[?t :invoice/date ?d] '[(>= ?d ?sd)] '(not [_ :journal-entry/original-entity ?t]) '[?t :invoice/total ?amt] '[(not= 0.0 ?amt)] '(not [?t :invoice/status :invoice-status/voided]) '(not [?t :invoice/import-status :import-status/pending]) '(not [?t :invoice/exclude-from-ledger true]) ]} (dc/db conn) start-date) (map first) (mapv (fn [i] [:upsert-invoice {:db/id i}]))) repairs (vec (concat txes-missing-ledger-entries invoices-missing-ledger-entries))] (when (seq repairs) (mu/log ::ledger-repairs-needed :sample (take 3 repairs) :transaction-count (count txes-missing-ledger-entries) :invoice-count (count invoices-missing-ledger-entries)) @(dc/transact conn repairs))))) (defn touch-transaction [e] @(dc/transact conn [{:db/id "datomic.tx" :db/doc "touching transaction to update ledger"} [:upsert-transaction {:db/id e}]])) (defn touch-invoice [e] @(dc/transact conn [{:db/id "datomic.tx" :db/doc "touching invoice to update ledger"} [:upsert-invoice {:db/id e}]])) (defn recently-changed-entities [start end] (into #{} (map first) (dc/q '[:find ?e :in $ :where (or [?e :transaction/date] [?e :invoice/date])] (dc/since (dc/db conn) start)))) (defn mismatched-transactions ([] (mismatched-transactions (c/to-date (t/minus (t/now) (t/days 7))) (c/to-date (t/minus (t/now) (t/hours 1)))) ) ([changed-between-start changed-between-end] (mu/trace ::calculating-mismatched-transactions [:range {:start changed-between-start :end changed-between-end}] (let [entities-to-consider (recently-changed-entities changed-between-start changed-between-end) _ (mu/log ::checking-mismatched-transactions :count (count entities-to-consider)) jel-accounts (reduce (fn [acc [e lia]] (update acc e (fnil conj #{} ) lia)) {} (dc/q '[:find ?e ?lia :in $ [?e ...] :where [?je :journal-entry/original-entity ?e] [?e :transaction/date] [?je :journal-entry/line-items ?li] [?li :journal-entry-line/account ?lia] [?lia :account/name]] (dc/db conn) entities-to-consider)) transaction-accounts (reduce (fn [acc [e lia]] (update acc e (fnil conj #{} ) lia)) {} (dc/q '[:find ?e ?lia :in $ [?e ...] :where [?e :transaction/date ?d] [?e :transaction/accounts ?li] (not [?e :transaction/approval-status :transaction-approval-status/excluded]) (not [?e :transaction/approval-status :transaction-approval-status/suppressed]) [?li :transaction-account/account ?lia] [?lia :account/name] [?e :transaction/amount ?amt] [(not= ?amt 0.0)]] (dc/db conn) entities-to-consider))] (->> transaction-accounts (filter (fn [[e accounts]] (not= accounts (get jel-accounts e)))) (doall)))))) (defn unbalanced-transactions ([] (unbalanced-transactions (c/to-date (t/minus (t/now) (t/days 7))) (c/to-date (t/minus (t/now) (t/hours 1))))) ([changed-between-start changed-between-end] (let [entities-to-consider (recently-changed-entities changed-between-start changed-between-end)] (->> (dc/q '[:find ?je ?a (sum ?debit) (sum ?credit) :with ?jel :in $ [?je ...] :where [?je :journal-entry/amount ?a] [?je :journal-entry/line-items ?jel] [(get-else $ ?jel :journal-entry-line/debit 0.0) ?debit] [(get-else $ ?jel :journal-entry-line/credit 0.0) ?credit] ] (dc/db conn) entities-to-consider) (filter (fn [[_ a d c]] (or (not (dollars= a d)) (not (dollars= a c))))) (map first) (map (fn [je] (pull-ref (dc/db conn) :journal-entry/original-entity je))))))) (defn unbalanced-invoices ([] (unbalanced-invoices (c/to-date (t/minus (t/now) (t/days 7))) (c/to-date (t/minus (t/now) (t/hours 1))))) ([changed-between-start changed-between-end] (let [entities-to-consider (recently-changed-entities changed-between-start changed-between-end)] (->> (dc/q '[:find ?je ?a (sum ?debit) (sum ?credit) :with ?jel :in $ [?je ...] :where [?je :journal-entry/amount ?a] [?je :journal-entry/original-entity ?i] [?i :invoice/date] [?je :journal-entry/line-items ?jel] [(get-else $ ?jel :journal-entry-line/debit 0.0) ?debit] [(get-else $ ?jel :journal-entry-line/credit 0.0) ?credit]] (dc/db conn) entities-to-consider) (filter (fn [[_ a d c]] (or (not (dollars= a d)) (not (dollars= a c))))) (map first) (map (fn [je] (pull-ref (dc/db conn) :journal-entry/original-entity je))))))) (defn mismatched-invoices ([] (mismatched-invoices (c/to-date (t/minus (t/now) (t/days 7))) (c/to-date (t/minus (t/now) (t/hours 1)))) ) ([changed-between-start changed-between-end] (let [entities-to-consider (recently-changed-entities changed-between-start changed-between-end) jel-accounts (reduce (fn [acc [e lia]] (update acc e (fnil conj #{} ) lia)) {} (dc/q '[:find ?e ?lia :in $ [?e ...] :where [?je :journal-entry/original-entity ?e] [?e :invoice/date] [?je :journal-entry/line-items ?li] [?li :journal-entry-line/account ?lia] (not [?lia :account/numeric-code 21000]) [?lia :account/name]] (dc/db conn) entities-to-consider)) invoice-accounts (reduce (fn [acc [e lia]] (update acc e (fnil conj #{} ) lia)) {} (dc/q '[:find ?e ?lia :in $ [?e ...] :where [?e :invoice/expense-accounts ?li] (not [?e :invoice/total 0.0]) [?li :invoice-expense-account/account ?lia] [?lia :account/name] (not [?lia :account/numeric-code 21000]) (not [?e :invoice/status :invoice-status/voided]) (not [?e :invoice/exclude-from-ledger true]) [?e :invoice/import-status :import-status/imported]] (dc/db conn) entities-to-consider)) ] (filter (fn [[e accounts]] (not= accounts (get jel-accounts e))) invoice-accounts)))) (defn touch-broken-ledger [] (statsd/event {:title "Reconciling Ledger" :text "This process looks for unbalance ledger entries, or missing ledger entries" :priority :low} nil) (mu/trace ::fixing-mismatched-transactions [] (mu/log ::started-fixing-mismatched-transactions) (let [mismatched-ts (mismatched-transactions)] (if (seq mismatched-ts) (do (mu/log ::found-mismatched-transactions :status "WARN" :count (count mismatched-ts) :sample (take 10 mismatched-ts)) (doseq [[m] mismatched-ts] (touch-transaction m)) (statsd/gauge "data.mismatched_transactions" (count (mismatched-transactions)))) (statsd/gauge "data.mismatched_transactions" 0.0)))) (mu/trace ::fixing-unbalanced-transactions [] (mu/log ::started-fixing-unbalanced-transactions) (let [unbalanced-ts (unbalanced-transactions)] (if (seq unbalanced-ts) (do (mu/log ::found-unbalanced-transactions :status "WARN" :count (count unbalanced-ts) :sample (take 10 unbalanced-ts)) (doseq [m unbalanced-ts] (touch-transaction m)) (statsd/gauge "data.unbalanced_transactions" (count (unbalanced-transactions)))) (statsd/gauge "data.unbalanced_transactions" 0.0)))) (mu/trace ::fixing-mismatched-invoices [] (mu/log ::started-fixing-mismatched-invoices) (let [mismatched-is (mismatched-invoices)] (if (seq mismatched-is) (do (mu/log ::found-mismatched-invoices :status "WARN" :count (count mismatched-is) :sample (take 10 mismatched-is)) (doseq [[m] mismatched-is] (touch-invoice m)) (statsd/gauge "data.mismatched_invoices" (count (mismatched-invoices)))) (statsd/gauge "data.mismatched_invoices" 0.0)))) (mu/trace ::fixing-unbalanced-invoices [] (mu/log ::started-fixing-unbalance-invoices) (let [unbalanced-is (unbalanced-invoices)] (if (seq unbalanced-is) (do (mu/log ::found-mismatched-invoices :status "WARN" :count (count unbalanced-is) :sample (take 10 unbalanced-is)) (doseq [m unbalanced-is] (touch-invoice m)) (statsd/gauge "data.unbalanced_invoices" (count (unbalanced-invoices)))) (statsd/gauge "data.unbalanced_invoices" 0.0)))) (statsd/event {:title "Finished Reconciling Ledger" :text "This process looks for unbalance ledger entries, or missing ledger entries" :priority :low} nil)) (defn build-account-lookup [client-id] (let [accounts (by :db/id (map first (dc/q {:find ['(pull ?e [:db/id :account/name :account/numeric-code {:account/type [:db/ident] :account/client-overrides [:account-client-override/client :account-client-override/name]} ])] :in ['$] :where ['[?e :account/name]]} (dc/db conn )))) bank-accounts (by :db/id (map first (dc/q {:find ['(pull ?e [:db/id :bank-account/name :bank-account/numeric-code {:bank-account/type [:db/ident]}])] :in ['$] :where ['[?e :bank-account/name]]} (dc/db conn)))) overrides-by-client (->> accounts vals (mapcat (fn [a] (map (fn [o] [[(:db/id a) (:db/id (:account-client-override/client o))] (:account-client-override/name o)]) (:account/client-overrides a)) ) ) (into {} ))] (fn [a] {:name (or (:bank-account/name (bank-accounts a)) (overrides-by-client [a client-id]) (:account/name (accounts a))) :account_type (or (:db/ident (:account/type (accounts a))) ({:bank-account-type/check :account-type/asset :bank-account-type/cash :account-type/asset :bank-account-type/credit :account-type/liability} (:db/ident (:bank-account/type (bank-accounts a)))) :account-type/asset ;; DEFAULT TO ASSET, for things like unknown ) :numeric_code (or (:account/numeric-code (accounts a)) (:bank-account/numeric-code (bank-accounts a))) :client_id client-id}))) (defn find-mismatch-index [] (reduce + 0 (for [c (map first (dc/q '[:find ?c :where [?c :client/code]] (dc/db conn))) :let [_ (println "searching for" c) a (->> (dc/index-pull (dc/db conn) {:index :avet :selector [:db/id :journal-entry-line/location :journal-entry-line/account :journal-entry-line/client+account+location+date {:journal-entry/_line-items [:journal-entry/date :journal-entry/client]}] :start [:journal-entry-line/client+account+location+date [c]]}) (take-while (fn [result] (= c (first (:journal-entry-line/client+account+location+date result))) )) (filter (fn [{index :journal-entry-line/client+account+location+date :as result}] (not= index [(-> result :journal-entry/_line-items :journal-entry/client :db/id) (-> result :journal-entry-line/account :db/id) (-> result :journal-entry-line/location) (-> result :journal-entry/_line-items :journal-entry/date)]))))]] (do (println (count a)) (count a))))) (defn clients-needing-refresh [db available] (let [clients (->> (dc/q '[:find (pull ?c [:client/code :db/id :client/ledger-last-change :client/last-running-balance]) :in $ :where [?c :client/code] [(get-else $ ?c :client/ledger-last-change #inst "2040-01-01") ?last-change] [(get-else $ ?c :client/last-running-balance #inst "2000-01-01") ?last-running-balance] [(> ?last-change ?last-running-balance)]] db) (map (fn [[client]] client)))] (if (and (set? available) (seq available)) (filter (comp available :db/id) clients) clients))) #_(clients-needing-refresh (dc/db conn) #{ 17592273679867}) #_(comment [17592334354011 #inst "0024-08-03T07:52:58.000-00:00"] [17592302554688 #inst "0023-07-20T07:52:58.000-00:00"] [17592302554682 #inst "0023-07-16T07:52:58.000-00:00"] [17592302554691 #inst "0023-07-22T07:52:58.000-00:00"] [17592334353995 #inst "0024-08-10T07:52:58.000-00:00"] [17592302554694 #inst "0023-07-27T07:52:58.000-00:00"] [17592241669405 #inst "0218-08-04T07:52:58.000-00:00"] [17592334353207 #inst "0024-07-27T07:52:58.000-00:00"] [17592302554685 #inst "0023-07-16T07:52:58.000-00:00"] [17592334353244 #inst "0024-07-14T07:52:58.000-00:00"]) (defn mark-all-clients-dirty [] (auto-ap.datomic/audit-transact-batch (for [[c] (dc/q '[:find ?c :in $ :where [?c :client/code]] (dc/db conn))] {:db/id c :client/ledger-last-change (c/to-date (t/now)) :client/last-running-balance #inst "2000-01-01"}) {:user/name "backfill-client and dates"})) (defn mark-client-dirty [client-code] (auto-ap.datomic/audit-transact-batch (for [[c] (dc/q '[:find ?c :in $ ?cd :where [?c :client/code ?cd]] (dc/db conn) client-code)] {:db/id c :client/ledger-last-change (c/to-date (t/now)) :client/last-running-balance #inst "2000-01-01"}) {:user/name "backfill-client and dates"})) (defn refresh-bank-account-balances [client-ids] (mu/with-context {:source "current-balance-refresh"} (let [db (dc/db conn) clients (map first (dc/q '[:find (pull ?c [:db/id :client/code {:client/bank-accounts [:db/id :bank-account/code]}]) :in $ [?c ...] :where [?c :client/code]] db client-ids))] (dorun (for [{client :db/id code :client/code bank-accounts :client/bank-accounts} clients {bank-account :db/id bac :bank-account/code} bank-accounts] (let [{[_ _ _ _ _ _ running-balance] :v} (->> (dc/index-range db :journal-entry-line/running-balance-tuple [client bank-account "A"] [client bank-account "A" #inst "2050-01-01"]) seq (sort-by (fn [{id :e [_ _ _ current-date] :v}] [current-date id])) (last)) running-balance (or running-balance 0.0)] (alog/info ::updating-bank-account-balance :bank-account bac :balance running-balance) @(dc/transact conn [{:db/id bank-account :bank-account/current-balance-synced (c/to-date (t/now)) :bank-account/current-balance running-balance}]))))))) ;; TODO using iol-ion query as the base, building running balance sets (defn upsert-running-balance ([] (upsert-running-balance nil)) ([clients] (mu/with-context {:service "upsert-running-balance" :source "upsert-running-balance"} (mu/trace ::updating-balance [:service "upsert-running-balance" :source "upsert-running-balance"] (let [db (dc/db conn) starting-at (c/to-date (t/now)) clients (clients-needing-refresh db clients) _ (alog/info ::clients-needing-update :clients clients :count (count clients)) client-change-stats (atom {}) changes (for [c clients :let [client-id (:db/id c) account-lookup (build-account-lookup client-id)] running-balance-set (account-sets db client-id) running-balance-change (->> running-balance-set (reduce (fn [{:keys [changes last-running-balance]} ^Line line-item] #_(if (= 0 (rand-int 1000)) (println (.-account-id line-item) (.-debit line-item) (.-credit line-item))) (let [delta (if (#{:account-type/asset :account-type/dividend :account-type/expense} (:account_type (account-lookup (.-account-id line-item)))) (- (or (.-debit line-item) 0.0) (or (.-credit line-item) 0.0)) (- (or (.-credit line-item) 0.0) (or (.-debit line-item) 0.0))) correct-running-balance (+ last-running-balance delta) running-balance-changed? (not (dollars= correct-running-balance (or (.-running-balance line-item) 0.0)))] (when running-balance-changed? (swap! client-change-stats update (:client/code c) (fnil inc 0))) (cond-> {:last-account-lookup account-lookup :last-running-balance correct-running-balance :changes changes} running-balance-changed? (update :changes conj {:db/id (.-id line-item) :journal-entry-line/running-balance correct-running-balance})))) {:last-running-balance 0.0}) :changes)] running-balance-change)] (mu/trace ::update-running-balance [] (auto-ap.datomic/audit-transact-batch changes {:user/name "running balance updater"})) (auto-ap.datomic/audit-transact (mapv (fn [c] {:db/id (:db/id c) :client/last-running-balance starting-at}) clients) {:user/name "running balance updater"}) (alog/info ::change-stats :stats @client-change-stats) (refresh-bank-account-balances (map :db/id clients)) (count changes)))))) (comment (pull-id (dc/db conn) [:client/code "SCCB"]) #_(do (mu/with-context {:service "upsert-running-balance" :source "upsert-running-balance" } (mu/trace ::updating-balance [:service "upsert-running-balance" :source "upsert-running-balance" ] (let [db (dc/db conn) starting-at (c/to-date (t/now)) _ (mark-client-dirty "NGA1") clients (clients-needing-refresh db) _ (alog/info ::clients-needing-update :clients clients :count (count clients)) client-change-stats (atom {}) changes (for [c clients :let [client-id (:db/id c) account-lookup (build-account-lookup client-id)] running-balance-set (account-sets db client-id) running-balance-change (->> running-balance-set (reduce (fn [{:keys [changes last-running-balance]} line-item] (if (= 0 (rand-int 1000)) (println (.-account-id line-item) (.-debit line-item) (.-credit line-item))) (let [delta (if (#{:account-type/asset :account-type/dividend :account-type/expense} (:account_type (account-lookup (.-account-id line-item)))) (- (or (.-debit line-item) 0.0) (or (.-credit line-item) 0.0)) (- (or (.-credit line-item) 0.0) (or (.-debit line-item) 0.0))) correct-running-balance (+ last-running-balance delta) running-balance-changed? (not (dollars= correct-running-balance (or (.-running-balance line-item) 0.0)))] (when running-balance-changed? (swap! client-change-stats update (:client/code c) (fnil inc 0))) (cond-> {:last-account-lookup account-lookup :last-running-balance correct-running-balance :changes changes} running-balance-changed? (update :changes conj {:db/id (.-id line-item) :journal-entry-line/running-balance correct-running-balance})))) {:last-running-balance 0.0}) :changes)] running-balance-change)] (mu/trace ::update-running-balance [] (auto-ap.datomic/audit-transact-batch changes {:user/name "running balance updater"})) (auto-ap.datomic/audit-transact (mapv (fn [c] {:db/id (:db/id c) :client/last-running-balance starting-at}) clients) {:user/name "running balance updater"}) (alog/info ::change-stats :stats @client-change-stats) (refresh-bank-account-balances (map :db/id clients)) (count changes))))) (mark-client-dirty "NGA1") (mark-all-clients-dirty) (count (clients-needing-refresh (dc/db conn))) (upsert-running-balance) ;; SETUP running-balance-tuple (doseq [[c] (dc/q '[:find ?c :in $ :where [?c :client/code]] (dc/db conn))] (println "CLIENT " c) (auto-ap.datomic/audit-transact-batch (for [[date client line] (->> (dc/q '[:find ?jed ?jec (pull ?jel [:journal-entry-line/debit :journal-entry-line/credit :journal-entry-line/running-balance :db/id]) :in $ ?jec :where [?je :journal-entry/client ?jec] [?je :journal-entry/date ?jed] [?je :journal-entry/line-items ?jel]] (dc/db conn) c))] {:db/id (:db/id line) :journal-entry-line/date date :journal-entry-line/client client}) {:user/name "backfill-client and dates"}) (println "done.")) #_(dc/q '[:find (pull ?je [*]) (pull ?jel [*]) :where [?je :journal-entry/line-items ?jel] (not [?jel :journal-entry-line/running-balance-tuple])] (dc/db conn))) ;; TODO only enable once IOL is set up in clod #_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]} (mount/defstate running-balance-cache-worker :start (scheduler/every (* 5 60 (+ 500 (rand-int 500))) (heartbeat upsert-running-balance "running-balance-cache")) :stop (scheduler/stop running-balance-cache-worker))