(ns auto-ap.ledger (:require [auto-ap.datomic :refer [conn pull-id pull-ref transact-with-backoff]] [auto-ap.utils :refer [by dollars= heartbeat]] [clj-time.coerce :as c] [clj-time.core :as t] [com.brunobonacci.mulog :as mu] [com.unbounce.dogstatsd.core :as statsd] [datomic.api :as dc] [manifold.deferred :as de] [manifold.stream :as s] [mount.core :as mount] [yang.scheduler :as scheduler])) (defn reconcile-ledger ([] (reconcile-ledger (-> (t/now) (t/plus (t/months -6)) (c/to-date)))) ([start-date] (let [txes-missing-ledger-entries (->> (dc/q {:find ['?t ] :in ['$ '?sd] :where [ '[?t :transaction/date ?d] '[(>= ?d ?sd)] '(not [_ :journal-entry/original-entity ?t]) '(not [?t :transaction/amount 0.0]) '(not [?t :transaction/approval-status :transaction-approval-status/excluded]) '(not [?t :transaction/approval-status :transaction-approval-status/suppressed]) ]} (dc/db conn) start-date) (map first) (mapv (fn [t] [:upsert-transaction {:db/id t}]))) invoices-missing-ledger-entries (->> (dc/q {:find ['?t ] :in ['$ '?sd] :where ['[?t :invoice/date ?d] '[(>= ?d ?sd)] '(not [_ :journal-entry/original-entity ?t]) '[?t :invoice/total ?amt] '[(not= 0.0 ?amt)] '(not [?t :invoice/status :invoice-status/voided]) '(not [?t :invoice/import-status :import-status/pending]) '(not [?t :invoice/exclude-from-ledger true]) ]} (dc/db conn) start-date) (map first) (mapv (fn [i] [:upsert-invoice {:db/id i}]))) repairs (vec (concat txes-missing-ledger-entries invoices-missing-ledger-entries))] (when (seq repairs) (mu/log ::ledger-repairs-needed :sample (take 3 repairs) :transaction-count (count txes-missing-ledger-entries) :invoice-count (count invoices-missing-ledger-entries)) @(dc/transact conn (map (fn [repair] [:upsert-ledger repair]) repairs)))))) (defn touch-transaction [e] @(dc/transact conn [{:db/id "datomic.tx" :db/doc "touching transaction to update ledger"} [:upsert-transaction {:db/id e}]])) (defn touch-invoice [e] @(dc/transact conn [{:db/id "datomic.tx" :db/doc "touching invoice to update ledger"} [:upsert-invoice {:db/id e}]])) (defn recently-changed-entities [start end] (into #{} (map first) (dc/q '[:find ?e :in $ :where (or [?e :transaction/date] [?e :invoice/date])] (dc/since (dc/db conn) start)))) (defn mismatched-transactions ([] (mismatched-transactions (c/to-date (t/minus (t/now) (t/days 7))) (c/to-date (t/minus (t/now) (t/hours 1)))) ) ([changed-between-start changed-between-end] (mu/trace ::calculating-mismatched-transactions [:range {:start changed-between-start :end changed-between-end}] (let [entities-to-consider (recently-changed-entities changed-between-start changed-between-end) _ (mu/log ::checking-mismatched-transactions :count (count entities-to-consider)) jel-accounts (reduce (fn [acc [e lia]] (update acc e (fnil conj #{} ) lia)) {} (dc/q '[:find ?e ?lia :in $ [?e ...] :where [?je :journal-entry/original-entity ?e] [?e :transaction/date] [?je :journal-entry/line-items ?li] [?li :journal-entry-line/account ?lia] [?lia :account/name]] (dc/db conn) entities-to-consider)) transaction-accounts (reduce (fn [acc [e lia]] (update acc e (fnil conj #{} ) lia)) {} (dc/q '[:find ?e ?lia :in $ [?e ...] :where [?e :transaction/date ?d] [?e :transaction/accounts ?li] (not [?e :transaction/approval-status :transaction-approval-status/excluded]) (not [?e :transaction/approval-status :transaction-approval-status/suppressed]) [?li :transaction-account/account ?lia] [?lia :account/name] [?e :transaction/amount ?amt] [(not= ?amt 0.0)]] (dc/db conn) entities-to-consider))] (->> transaction-accounts (filter (fn [[e accounts]] (not= accounts (get jel-accounts e)))) (doall)))))) (defn unbalanced-transactions ([] (unbalanced-transactions (c/to-date (t/minus (t/now) (t/days 7))) (c/to-date (t/minus (t/now) (t/hours 1))))) ([changed-between-start changed-between-end] (let [entities-to-consider (recently-changed-entities changed-between-start changed-between-end)] (->> (dc/q '[:find ?je ?a (sum ?debit) (sum ?credit) :with ?jel :in $ [?je ...] :where [?je :journal-entry/amount ?a] [?je :journal-entry/line-items ?jel] [(get-else $ ?jel :journal-entry-line/debit 0.0) ?debit] [(get-else $ ?jel :journal-entry-line/credit 0.0) ?credit] ] (dc/db conn) entities-to-consider) (filter (fn [[_ a d c]] (or (not (dollars= a d)) (not (dollars= a c))))) (map first) (map (fn [je] (pull-ref (dc/db conn) :journal-entry/original-entity je))))))) (defn unbalanced-invoices ([] (unbalanced-invoices (c/to-date (t/minus (t/now) (t/days 7))) (c/to-date (t/minus (t/now) (t/hours 1))))) ([changed-between-start changed-between-end] (let [entities-to-consider (recently-changed-entities changed-between-start changed-between-end)] (->> (dc/q '[:find ?je ?a (sum ?debit) (sum ?credit) :with ?jel :in $ [?je ...] :where [?je :journal-entry/amount ?a] [?je :journal-entry/original-entity ?i] [?i :invoice/date] [?je :journal-entry/line-items ?jel] [(get-else $ ?jel :journal-entry-line/debit 0.0) ?debit] [(get-else $ ?jel :journal-entry-line/credit 0.0) ?credit]] (dc/db conn) entities-to-consider) (filter (fn [[_ a d c]] (or (not (dollars= a d)) (not (dollars= a c))))) (map first) (map (fn [je] (pull-ref (dc/db conn) :journal-entry/original-entity je))))))) (defn mismatched-invoices ([] (mismatched-invoices (c/to-date (t/minus (t/now) (t/days 7))) (c/to-date (t/minus (t/now) (t/hours 1)))) ) ([changed-between-start changed-between-end] (let [entities-to-consider (recently-changed-entities changed-between-start changed-between-end) jel-accounts (reduce (fn [acc [e lia]] (update acc e (fnil conj #{} ) lia)) {} (dc/q '[:find ?e ?lia :in $ [?e ...] :where [?je :journal-entry/original-entity ?e] [?e :invoice/date] [?je :journal-entry/line-items ?li] [?li :journal-entry-line/account ?lia] (not [?lia :account/numeric-code 21000]) [?lia :account/name]] (dc/db conn) entities-to-consider)) invoice-accounts (reduce (fn [acc [e lia]] (update acc e (fnil conj #{} ) lia)) {} (dc/q '[:find ?e ?lia :in $ [?e ...] :where [?e :invoice/expense-accounts ?li] (not [?e :invoice/total 0.0]) [?li :invoice-expense-account/account ?lia] [?lia :account/name] (not [?lia :account/numeric-code 21000]) (not [?e :invoice/status :invoice-status/voided]) (not [?e :invoice/exclude-from-ledger true]) [?e :invoice/import-status :import-status/imported]] (dc/db conn) entities-to-consider)) ] (filter (fn [[e accounts]] (not= accounts (get jel-accounts e))) invoice-accounts)))) (defn touch-broken-ledger [] (statsd/event {:title "Reconciling Ledger" :text "This process looks for unbalance ledger entries, or missing ledger entries" :priority :low} nil) (mu/trace ::fixing-mismatched-transactions [] (mu/log ::started-fixing-mismatched-transactions) (let [mismatched-ts (mismatched-transactions)] (if (seq mismatched-ts) (do (mu/log ::found-mismatched-transactions :status "WARN" :count (count mismatched-ts) :sample (take 10 mismatched-ts)) (doseq [[m] mismatched-ts] (touch-transaction m)) (statsd/gauge "data.mismatched_transactions" (count (mismatched-transactions)))) (statsd/gauge "data.mismatched_transactions" 0.0)))) (mu/trace ::fixing-unbalanced-transactions [] (mu/log ::started-fixing-unbalanced-transactions) (let [unbalanced-ts (unbalanced-transactions)] (if (seq unbalanced-ts) (do (mu/log ::found-unbalanced-transactions :status "WARN" :count (count unbalanced-ts) :sample (take 10 unbalanced-ts)) (doseq [m unbalanced-ts] (touch-transaction m)) (statsd/gauge "data.unbalanced_transactions" (count (unbalanced-transactions)))) (statsd/gauge "data.unbalanced_transactions" 0.0)))) (mu/trace ::fixing-mismatched-invoices [] (mu/log ::started-fixing-mismatched-invoices) (let [mismatched-is (mismatched-invoices)] (if (seq mismatched-is) (do (mu/log ::found-mismatched-invoices :status "WARN" :count (count mismatched-is) :sample (take 10 mismatched-is)) (doseq [[m] mismatched-is] (touch-invoice m)) (statsd/gauge "data.mismatched_invoices" (count (mismatched-invoices)))) (statsd/gauge "data.mismatched_invoices" 0.0)))) (mu/trace ::fixing-unbalanced-invoices [] (mu/log ::started-fixing-unbalance-invoices) (let [unbalanced-is (unbalanced-invoices)] (if (seq unbalanced-is) (do (mu/log ::found-mismatched-invoices :status "WARN" :count (count unbalanced-is) :sample (take 10 unbalanced-is)) (doseq [m unbalanced-is] (touch-invoice m)) (statsd/gauge "data.unbalanced_invoices" (count (unbalanced-invoices)))) (statsd/gauge "data.unbalanced_invoices" 0.0)))) (statsd/event {:title "Finished Reconciling Ledger" :text "This process looks for unbalance ledger entries, or missing ledger entries" :priority :low} nil)) (defn build-account-lookup [client-id] (let [accounts (by :db/id (map first (dc/q {:find ['(pull ?e [:db/id :account/name :account/numeric-code {:account/type [:db/ident] :account/client-overrides [:account-client-override/client :account-client-override/name]} ])] :in ['$] :where ['[?e :account/name]]} (dc/db conn )))) bank-accounts (by :db/id (map first (dc/q {:find ['(pull ?e [:db/id :bank-account/name :bank-account/numeric-code {:bank-account/type [:db/ident]}])] :in ['$] :where ['[?e :bank-account/name]]} (dc/db conn)))) overrides-by-client (->> accounts vals (mapcat (fn [a] (map (fn [o] [[(:db/id a) (:db/id (:account-client-override/client o))] (:account-client-override/name o)]) (:account/client-overrides a)) ) ) (into {} ))] (fn [a] {:name (or (:bank-account/name (bank-accounts a)) (overrides-by-client [a client-id]) (:account/name (accounts a))) :account_type (or (:db/ident (:account/type (accounts a))) ({:bank-account-type/check :account-type/asset :bank-account-type/cash :account-type/asset :bank-account-type/credit :account-type/liability} (:db/ident (:bank-account/type (bank-accounts a))))) :numeric_code (or (:account/numeric-code (accounts a)) (:bank-account/numeric-code (bank-accounts a))) :client_id client-id}))) (defn reset-client+account+location+date ([] (reset-client+account+location+date (map first (dc/q '[:find ?c :where [?c :client/code]] (dc/db conn))))) ([clients] (doseq [[client i] (map vector clients (range))] (mu/trace ::reset-index [:client client] (mu/with-context {:client client :client-index i :client-count (count clients)} (mu/log ::reseting-index) (let [so-far (atom 0)] @(->> (dc/qseq {:query '[:find (pull ?je [:journal-entry/date :journal-entry/client {:journal-entry/line-items [:journal-entry-line/account :journal-entry-line/location :db/id]}]) :in $ ?c :where [?je :journal-entry/client ?c]] :args [(dc/db conn)]} client) (map first) (mapcat (fn [je] (map (fn [jel] {:db/id (:db/id jel) :journal-entry-line/client+account+location+date [(-> je :journal-entry/client :db/id) (-> jel :journal-entry-line/account :db/id) (-> jel :journal-entry-line/location) (-> je :journal-entry/date)]}) (:journal-entry/line-items je)))) (partition-all 500) (s/->source) (s/map (fn [batch] (de/future (transact-with-backoff batch) (count batch)))) (s/buffer 50) (s/realize-each) (s/consume (fn [batch-count] (swap! so-far #(+ % batch-count)) (mu/log ::reset :count batch-count :so-far @so-far :client client :client-index i :client-count (count clients)))))) (mu/log ::client-completed)))))) (defn find-mismatch-index [] (reduce + 0 (for [c (map first (dc/q '[:find ?c :where [?c :client/code]] (dc/db conn))) :let [_ (println "searching for" c) a (->> (dc/index-pull (dc/db conn) {:index :avet :selector [:db/id :journal-entry-line/location :journal-entry-line/account :journal-entry-line/client+account+location+date {:journal-entry/_line-items [:journal-entry/date :journal-entry/client]}] :start [:journal-entry-line/client+account+location+date [c]]}) (take-while (fn [result] (= c (first (:journal-entry-line/client+account+location+date result))) )) (filter (fn [{index :journal-entry-line/client+account+location+date :as result}] (not= index [(-> result :journal-entry/_line-items :journal-entry/client :db/id) (-> result :journal-entry-line/account :db/id) (-> result :journal-entry-line/location) (-> result :journal-entry/_line-items :journal-entry/date)]))))]] (do (println (count a)) (count a))))) (defn accounts-needing-rebuild [ db client] (let [client (pull-id db client)] (->> (dc/qseq {:query '[:find ?c ?a ?l (min ?d) :in $ ?c :where [?jel :journal-entry-line/dirty true] [?jel :journal-entry-line/account ?a] [?jel :journal-entry-line/location ?l] [?je :journal-entry/line-items ?jel] [?je :journal-entry/client ?c] [?je :journal-entry/date ?d]] :args [db client]}) (map (fn [[client account location starting-at ]] {:client client :account account :starting-at starting-at :location location}))))) (defn all-accounts-needing-rebuild [ db client] (let [client (pull-id db client)] (->> (dc/qseq {:query '[:find ?c ?a ?l (min ?d) :in $ ?c :where [?je :journal-entry/client ?c] [?je :journal-entry/line-items ?jel] [?jel :journal-entry-line/account ?a] [?jel :journal-entry-line/location ?l] [?je :journal-entry/date ?d]] :args [db client]}) (map (fn [[client account location starting-at ]] {:client client :account account :starting-at starting-at :location location}))))) (defn find-running-balance-start [{:keys [client account location starting-at]} db ] (let [client (pull-id db client) account (pull-id db account)] (or (->> (dc/index-pull db {:index :avet :selector [:db/id :journal-entry-line/running-balance :journal-entry-line/client+account+location+date] :start [:journal-entry-line/client+account+location+date [client account location starting-at]] :reverse true :limit 500}) (take-while (fn [result] (= [client account location] (take 3 (:journal-entry-line/client+account+location+date result))))) (take 5) (drop-while (fn [{[_ _ _ date] :journal-entry-line/client+account+location+date}] (>= (compare date starting-at) 0))) first :journal-entry-line/running-balance) 0.0))) (defn get-dirty-entries [{:keys [client account location starting-at]} db ] (let [client (pull-id db client) account (pull-id db account)] (into [] (comp (map (fn [i] (dc/index-pull db {:index :avet :selector [:db/id :journal-entry-line/debit :journal-entry-line/credit :journal-entry-line/client+account+location+date] :start [:journal-entry-line/client+account+location+date [client account location starting-at]] :offset (* i 1000) :limit 1000} ))) (take-while identity) (mapcat identity) (take-while (fn [{[result-client result-account result-location] :journal-entry-line/client+account+location+date}] (and (= client result-client) (= account result-account) (= location result-location)))) (map (fn [result] [(:db/id result) (:journal-entry-line/debit result 0.0) (:journal-entry-line/credit result 0.0) ]))) (range 100)))) (defn compute-running-balance [account-needing-refresh] (mu/trace ::compute [:dirty-count (count (:dirty-entries account-needing-refresh))] (second (reduce (fn [[running-balance rows] [id debit credit] ] (let [new-running-balance (+ running-balance (if (#{:account-type/asset :account-type/dividend :account-type/expense} (:account-type account-needing-refresh)) (- debit credit) (- credit debit)))] [new-running-balance (conj rows {:db/id id :journal-entry-line/running-balance new-running-balance :journal-entry-line/dirty false})])) [(:build-from account-needing-refresh) []] (:dirty-entries account-needing-refresh))))) (defn refresh-running-balance-accounts [accounts-needing-rebuild clients c i db] (mu/log ::found-accounts-needing-rebuild :accounts (count accounts-needing-rebuild)) (let [so-far (atom 0)] @(->> accounts-needing-rebuild (s/->source) (s/map (fn [account-needing-rebuild] (de/future (mu/with-context {:account account-needing-rebuild} (-> account-needing-rebuild (assoc :build-from (find-running-balance-start account-needing-rebuild db)) (assoc :dirty-entries (get-dirty-entries account-needing-rebuild db)) (assoc :account-type (:account_type ((build-account-lookup (:client account-needing-rebuild)) (:account account-needing-rebuild)))) (compute-running-balance)))))) (s/realize-each) (s/mapcat (fn [x] x)) (s/buffer 50) (s/transform (partition-all 500)) (s/map (fn [batch] (de/future (transact-with-backoff batch) (count batch)))) (s/buffer 50) (s/realize-each) (s/consume (fn [batch-count] (swap! so-far #(+ % batch-count)) (mu/log ::reset :count batch-count :so-far @so-far :client c :client-index i :client-count (count clients))))))) (defn refresh-running-balance-cache ([] (refresh-running-balance-cache (shuffle (map first (dc/q '[:find (pull ?c [:client/code :db/id]) :where [?c :client/code]] (dc/db conn)))))) ([clients] (doseq [[c i] (map vector clients (range))] (mu/trace ::building-running-balance [:client c] (mu/with-context {:client c :client-index i :client-count (count clients)} (mu/log ::searching-for-accounts) (let [db (dc/db conn) accounts-needing-rebuild (accounts-needing-rebuild db (:db/id c))] (when (seq accounts-needing-rebuild) (refresh-running-balance-accounts accounts-needing-rebuild clients c i db) (mu/log ::client-completed)))))))) (defn rebuild-running-balance-cache ([] (rebuild-running-balance-cache (shuffle (map first (dc/q '[:find (pull ?c [:client/code :db/id]) :where [?c :client/code]] (dc/db conn)))))) ([clients] (doseq [[c i] (map vector clients (range))] (mu/trace ::building-running-balance [:client c] (mu/with-context {:client c :client-index i :client-count (count clients)} (mu/log ::searching-for-accounts) (let [db (dc/db conn) accounts-needing-rebuild (all-accounts-needing-rebuild db (:db/id c))] (when (seq accounts-needing-rebuild) (refresh-running-balance-accounts accounts-needing-rebuild clients c i db) (mu/log ::client-completed)))))))) ;; TODO only enable once IOL is set up in clod #_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]} (mount/defstate running-balance-cache-worker :start (scheduler/every (* 15 60 (+ 500 (rand-int 500))) (heartbeat refresh-running-balance-cache "running-balance-cache")) :stop (scheduler/stop running-balance-cache-worker))