(ns auto-ap.ledger (:require [auto-ap.datomic :refer [conn remove-nils]] [auto-ap.logging :refer [info-event]] [auto-ap.utils :refer [dollars-0? dollars= heartbeat]] [clj-time.coerce :as c] [clj-time.core :as t] [clojure.tools.logging :as log] [com.unbounce.dogstatsd.core :as statsd] [datomic.api :as d] [mount.core :as mount] [unilog.context :as lc] [yang.scheduler :as scheduler])) (defn datums->impacted-entity [db [e changes]] (let [entity (d/pull db '[* {:invoice/_expense-accounts [:db/id] :transaction/_accounts [:db/id]}] e) namespaces (->> changes (map :a) (map namespace) set)] (cond (namespaces "invoice" ) [[:invoice e]] (namespaces "invoice-expense-account" ) [[:invoice (:db/id (:invoice/_expense-accounts entity))]] (namespaces "transaction-account" ) [[:transaction (:db/id (:transaction/_accounts entity))]] (namespaces "transaction" ) [[:transaction e]] #_#_(namespaces "expected-deposit" ) [[:expected-deposit e]] :else nil))) (defn infer-entity [_ [_ changes]] (let [namespaces (->> changes (map :a) (map namespace) set)] (cond (namespaces "invoice" ) :invoice (namespaces "invoice-expense-account" ) :invoice-expense-account (namespaces "transaction-account" ) :transaction-account #_#_(namespaces "expected-deposit" ) :expected-deposit :else nil))) (defmulti entity-change->ledger (fn [_ [type]] type)) (defmethod entity-change->ledger :invoice [db [_ id]] (when id (let [entity (d/pull db ['* {:invoice/vendor '[*] :invoice/payment '[*] :invoice/status '[:db/ident] :invoice/import-status '[:db/ident]}] id) credit-invoice? (< (:invoice/total entity) 0.0)] (when-not (or (not (:invoice/total entity)) (= true (:invoice/exclude-from-ledger entity)) (= :import-status/pending (:db/ident (:invoice/import-status entity))) (= :invoice-status/voided (:db/ident (:invoice/status entity))) (dollars-0? (:invoice/total entity))) (remove-nils {:journal-entry/source "invoice" :journal-entry/client (:db/id (:invoice/client entity)) :journal-entry/date (:invoice/date entity) :journal-entry/original-entity (:db/id entity) :journal-entry/vendor (:db/id (:invoice/vendor entity)) :journal-entry/amount (Math/abs (:invoice/total entity)) :journal-entry/line-items (into [(cond-> {:journal-entry-line/account :account/accounts-payable :journal-entry-line/location "A" } credit-invoice? (assoc :journal-entry-line/debit (Math/abs (:invoice/total entity))) (not credit-invoice?) (assoc :journal-entry-line/credit (Math/abs (:invoice/total entity))))] (map (fn [ea] (cond-> {:journal-entry-line/account (:db/id (:invoice-expense-account/account ea)) :journal-entry-line/location (or (:invoice-expense-account/location ea) "HQ") } credit-invoice? (assoc :journal-entry-line/credit (Math/abs (:invoice-expense-account/amount ea))) (not credit-invoice?) (assoc :journal-entry-line/debit (Math/abs (:invoice-expense-account/amount ea))))) (:invoice/expense-accounts entity))) :journal-entry/cleared (and (< (:invoice/outstanding-balance entity) 0.01) (every? #(= :payment-status/cleared (:payment/status %)) (:invoice/payments entity)) )}))))) (defmethod entity-change->ledger :transaction [db [_ id]] (when id (let [entity (d/pull db ['* {:transaction/vendor '[*] :transaction/client '[*] :transaction/approval-status '[*] :transaction/bank-account '[* {:bank-account/type [:db/ident]}] :transaction/accounts '[* {:transaction-account/account [*]}] }] id) decreasing? (< (:transaction/amount entity) 0.0) credit-from-bank? decreasing? debit-from-bank? (not decreasing?)] (when-not (or (= :transaction-approval-status/excluded (:db/ident (:transaction/approval-status entity))) (= :transaction-approval-status/suppressed (:db/ident (:transaction/approval-status entity))) (dollars-0? (:transaction/amount entity))) (remove-nils {:journal-entry/source "transaction" :journal-entry/client (:db/id (:transaction/client entity)) :journal-entry/date (:transaction/date entity) :journal-entry/original-entity (:db/id entity) :journal-entry/alternate-description (:transaction/description-original entity) :journal-entry/vendor (:db/id (:transaction/vendor entity)) :journal-entry/amount (Math/abs (:transaction/amount entity)) :journal-entry/cleared-against (:transaction/cleared-against entity) :journal-entry/line-items (into [(remove-nils {:journal-entry-line/account (:db/id (:transaction/bank-account entity)) :journal-entry-line/location "A" :journal-entry-line/credit (when credit-from-bank? (Math/abs (:transaction/amount entity))) :journal-entry-line/debit (when debit-from-bank? (Math/abs (:transaction/amount entity)))}) ] (map (fn [a] (remove-nils{:journal-entry-line/account (:db/id (:transaction-account/account a)) :journal-entry-line/location (:transaction-account/location a) :journal-entry-line/debit (when credit-from-bank? (Math/abs (:transaction-account/amount a))) :journal-entry-line/credit (when debit-from-bank? (Math/abs (:transaction-account/amount a)))})) (if (seq (:transaction/accounts entity)) (:transaction/accounts entity) [{:transaction-account/amount (:transaction/amount entity)}]))) :journal-entry/cleared true}))))) #_(defmethod entity-change->ledger :expected-deposit [db [type id]] (let [{:expected-deposit/keys [total client date]} (d/pull db '[:expected-deposit/total :expected-deposit/client :expected-deposit/date] id)] #:journal-entry {:source "expected-deposit" :original-entity id :client client :date date :amount total :vendor :vendor/ccp-square :line-items [#:journal-entry-line {:credit total :location "A" :account :account/receipts-split} #:journal-entry-line {:debit total :location "A" :account :account/ccp}]})) (defmethod entity-change->ledger :invoice-expense-account [_ _] nil ) (defmethod entity-change->ledger nil [_ _] nil) (mount/defstate tx-report-queue :start (d/tx-report-queue conn) :stop (d/remove-tx-report-queue conn)) (defn process-one [] (lc/with-context {:source "process-txes"} (try (let [transaction (.take tx-report-queue) _ (log/info "Converting tranasction to ledger") db (:db-after transaction) affected-entities (->> (:tx-data transaction) (map (fn [^datomic.db.Datum x] {:e (:e x) :a (d/ident db (:a x)) :v (:v x) :added (:added x)})) (group-by :e) (mapcat #(datums->impacted-entity db %)) (set)) _ (info-event (str "Found " (count affected-entities) " affected entities") {:affected-entities (count affected-entities)}) d-txs (->> affected-entities (map #(entity-change->ledger db %)) (filter seq)) retractions (map (fn [[_ e]] [:db/retractEntity [:journal-entry/original-entity e]]) affected-entities)] (when (seq retractions) @(d/transact conn retractions)) (doseq [d-tx d-txs] @(d/transact conn [d-tx])) (log/info "Succesfully process transaction")) (catch Exception e (log/error e))))) (mount/defstate process-txes-worker :start (scheduler/run-fun process-one 1) :stop (-> process-txes-worker :running? (reset! false))) (defn reconcile-ledger ([] (reconcile-ledger (-> (t/now) (t/plus (t/months -6)) (c/to-date)))) ([start-date] (let [txes-missing-ledger-entries (->> (d/query {:query {:find ['?t ] :in ['$ '?sd] :where [ '[?t :transaction/date ?d] '[(>= ?d ?sd)] '(not [_ :journal-entry/original-entity ?t]) '(not [?t :transaction/amount 0.0]) '(not [?t :transaction/approval-status :transaction-approval-status/excluded]) '(not [?t :transaction/approval-status :transaction-approval-status/suppressed]) ]} :args [(d/db conn) start-date]}) (map first) (mapv #(entity-change->ledger (d/db conn) [:transaction %]))) invoices-missing-ledger-entries (->> (d/query {:query {:find ['?t ] :in ['$ '?sd] :where ['[?t :invoice/date ?d] '[(>= ?d ?sd)] '(not [_ :journal-entry/original-entity ?t]) '[?t :invoice/total ?amt] '[(not= 0.0 ?amt)] '(not [?t :invoice/status :invoice-status/voided]) '(not [?t :invoice/import-status :import-status/pending]) '(not [?t :invoice/exclude-from-ledger true]) ]} :args [(d/db conn) start-date]}) (map first) (mapv #(entity-change->ledger (d/db conn) [:invoice %]))) repairs (vec (concat txes-missing-ledger-entries invoices-missing-ledger-entries))] (when (seq repairs) (log/info (take 3 repairs)) (log/warn "repairing " (count txes-missing-ledger-entries) " missing transactions, " (count invoices-missing-ledger-entries) " missing invoices that were missing ledger entries") @(d/transact conn repairs))))) (mount/defstate reconciliation-frequency :start (* 1000 60 60)) (mount/defstate ledger-reconciliation-worker :start (scheduler/every reconciliation-frequency (heartbeat reconcile-ledger "reconcile-ledger")) :stop (scheduler/stop ledger-reconciliation-worker)) (defn touch-transaction [e] @(d/transact conn [[:db/retractEntity [:journal-entry/original-entity e]]]) @(d/transact conn [{:db/id "datomic.tx" :db/doc "touching transaction to update ledger"} (entity-change->ledger (d/db conn) [:transaction e])])) (defn touch-invoice [e] @(d/transact conn [[:db/retractEntity [:journal-entry/original-entity e]]]) @(d/transact conn [{:db/id "datomic.tx" :db/doc "touching invoice to update ledger"} (entity-change->ledger (d/db conn) [:invoice e])])) (defn mismatched-transactions ([] (mismatched-transactions (c/to-date (t/minus (t/now) (t/days 7))) (c/to-date (t/minus (t/now) (t/hours 1)))) ) ([changed-between-start changed-between-end] (let [entities-to-consider (d/q '[:find [?t ...] :in $ ?log ?start ?end :where [(tx-ids ?log ?start ?end) [?tx ...]] [(tx-data ?log ?tx) [[?t]]] [?t :transaction/date]] (d/db auto-ap.datomic/conn) (d/log auto-ap.datomic/conn) changed-between-start changed-between-end) _ (log/info "checking" (count entities-to-consider) "transactions looking for mismatches between" changed-between-start changed-between-end) jel-accounts (reduce (fn [acc [e lia]] (update acc e (fnil conj #{} ) lia)) {} (d/q '[:find ?e ?lia :in $ [?e ...] :where [?je :journal-entry/original-entity ?e] [?je :journal-entry/line-items ?li] [?li :journal-entry-line/account ?lia] [?lia :account/name]] (d/db auto-ap.datomic/conn) entities-to-consider)) transaction-accounts (reduce (fn [acc [e lia]] (update acc e (fnil conj #{} ) lia)) {} (d/q '[:find ?e ?lia :in $ [?e ...] :where [?e :transaction/date ?d] [?e :transaction/accounts ?li] (not [?e :transaction/approval-status :transaction-approval-status/excluded]) (not [?e :transaction/approval-status :transaction-approval-status/suppressed]) [?li :transaction-account/account ?lia] [?lia :account/name] [?e :transaction/amount ?amt] [(not= ?amt 0.0)]] (d/db auto-ap.datomic/conn) entities-to-consider))] (->> transaction-accounts (filter (fn [[e accounts]] (not= accounts (get jel-accounts e)))))))) (defn unbalanced-transactions ([] (unbalanced-transactions (c/to-date (t/minus (t/now) (t/days 7))) (c/to-date (t/minus (t/now) (t/hours 1))))) ([changed-between-start changed-between-end] (let [entities-to-consider (d/q '[:find [?je ...] :in $ ?log ?start ?end :where [(tx-ids ?log ?start ?end) [?tx ...]] [(tx-data ?log ?tx) [[?je]]] [?je :journal-entry/amount] [?je :journal-entry/original-entity ?i] [?i :transaction/date]] (d/db auto-ap.datomic/conn) (d/log auto-ap.datomic/conn) changed-between-start changed-between-end)] (log/info "checking" (count entities-to-consider) "transaction journal entries looking for mismatches between" changed-between-start changed-between-end) (->> (d/q '[:find ?je ?a (sum ?debit) (sum ?credit) :with ?jel :in $ [?je ...] :where [?je :journal-entry/amount ?a] [?je :journal-entry/line-items ?jel] [(get-else $ ?jel :journal-entry-line/debit 0.0) ?debit] [(get-else $ ?jel :journal-entry-line/credit 0.0) ?credit] ] (d/db auto-ap.datomic/conn) entities-to-consider) (filter (fn [[_ a d c]] (or (not (dollars= a d)) (not (dollars= a c))))) (map first) (map (fn [je] (:journal-entry/original-entity (d/entity (d/db auto-ap.datomic/conn) je)))) (map :db/id))))) (defn unbalanced-invoices ([] (unbalanced-invoices (c/to-date (t/minus (t/now) (t/days 7))) (c/to-date (t/minus (t/now) (t/hours 1))))) ([changed-between-start changed-between-end] (let [entities-to-consider (d/q '[:find [?je ...] :in $ ?log ?start ?end :where [(tx-ids ?log ?start ?end) [?tx ...]] [(tx-data ?log ?tx) [[?je]]] [?je :journal-entry/amount] [?je :journal-entry/original-entity ?i] [?i :invoice/date]] (d/db auto-ap.datomic/conn) (d/log auto-ap.datomic/conn) changed-between-start changed-between-end)] (log/info "checking" (count entities-to-consider) "invoice journal entries looking for mismatches between" changed-between-start changed-between-end) (->> (d/q '[:find ?je ?a (sum ?debit) (sum ?credit) :with ?jel :in $ [?je ...] :where [?je :journal-entry/amount ?a] [?je :journal-entry/line-items ?jel] [(get-else $ ?jel :journal-entry-line/debit 0.0) ?debit] [(get-else $ ?jel :journal-entry-line/credit 0.0) ?credit] ] (d/db auto-ap.datomic/conn) entities-to-consider) (filter (fn [[_ a d c]] (or (not (dollars= a d)) (not (dollars= a c))))) (map first) (map (fn [je] (:journal-entry/original-entity (d/entity (d/db auto-ap.datomic/conn) je)))) (map :db/id))))) (defn mismatched-invoices ([] (mismatched-invoices (c/to-date (t/minus (t/now) (t/days 7))) (c/to-date (t/minus (t/now) (t/hours 1)))) ) ([changed-between-start changed-between-end] (let [entities-to-consider (d/q '[:find [?i ...] :in $ ?log ?start ?end :where [(tx-ids ?log ?start ?end) [?tx ...]] [(tx-data ?log ?tx) [[?i]]] [?i :invoice/date]] (d/db auto-ap.datomic/conn) (d/log auto-ap.datomic/conn) changed-between-start changed-between-end) _ (log/info (count entities-to-consider) "invoices have changed between" changed-between-start "and" changed-between-end) jel-accounts (reduce (fn [acc [e lia]] (update acc e (fnil conj #{} ) lia)) {} (d/q '[:find ?e ?lia :in $ [?e ...] :where [?je :journal-entry/original-entity ?e] [?je :journal-entry/line-items ?li] [?li :journal-entry-line/account ?lia] (not [?lia :account/numeric-code 21000]) [?lia :account/name]] (d/db auto-ap.datomic/conn) entities-to-consider)) invoice-accounts (reduce (fn [acc [e lia]] (update acc e (fnil conj #{} ) lia)) {} (d/q '[:find ?e ?lia :in $ [?e ...] :where [?e :invoice/expense-accounts ?li] (not [?e :invoice/total 0.0]) [?li :invoice-expense-account/account ?lia] [?lia :account/name] (not [?lia :account/numeric-code 21000]) (not [?e :invoice/status :invoice-status/voided]) (not [?e :invoice/exclude-from-ledger true]) [?e :invoice/import-status :import-status/imported]] (d/db auto-ap.datomic/conn) entities-to-consider)) ] (filter (fn [[e accounts]] (not= accounts (get jel-accounts e))) invoice-accounts)))) (defn touch-broken-ledger [] (statsd/event {:title "Reconciling Ledger" :text "This process looks for unbalance ledger entries, or missing ledger entries" :priority :low} nil) (log/info "Attempting to fix transactions that are in the ledger but are wrong") (let [mismatched-ts (mismatched-transactions)] (if (seq mismatched-ts) (do (log/warn (count mismatched-ts) " transactions exist but don't match ledger " (pr-str (take 10 mismatched-ts) )) (doseq [[m] mismatched-ts] (touch-transaction m)) (statsd/gauge "data.mismatched_transactions" (count (mismatched-transactions)))) (statsd/gauge "data.mismatched_transactions" 0.0))) (log/info "Attempting to fix transactions that are in the ledger but debits/credits don't add up") (let [unbalanced-ts (unbalanced-transactions)] (if (seq unbalanced-ts) (do (log/warn (count unbalanced-ts) " transactions exist but don't have matching debits/credits (" (pr-str (take 10 unbalanced-ts) ) ")") (doseq [m unbalanced-ts] (touch-transaction m)) (statsd/gauge "data.unbalanced_transactions" (count (unbalanced-transactions)))) (statsd/gauge "data.unbalanced_transactions" 0.0))) (log/info "Finished fixing transactions that are in the ledger but are wrong") (let [mismatched-is (mismatched-invoices)] (if (seq mismatched-is) (do (log/warn (count mismatched-is) " invoice exist but don't match ledger ") (doseq [[m] mismatched-is] (touch-invoice m)) (statsd/gauge "data.mismatched_invoices" (count (mismatched-invoices)))) (statsd/gauge "data.mismatched_invoices" 0.0))) (log/info "Attempting to fix transactions that are in the ledger but debits/credits don't add up") (let [unbalanced-invoices (unbalanced-invoices)] (if (seq unbalanced-invoices) (do (log/warn (count unbalanced-invoices) " invoices exist but don't have matching debits/credits ") (doseq [m unbalanced-invoices] (touch-invoice m)) (statsd/gauge "data.unbalanced_invoices" (count (unbalanced-invoices)))) (statsd/gauge "data.unbalanced_invoices" 0.0))) (log/info "Finish fixing invoices that are in the ledger but are wrong") (statsd/event {:title "Finished Reconciling Ledger" :text "This process looks for unbalance ledger entries, or missing ledger entries" :priority :low} nil)) (mount/defstate touch-broken-ledger-worker :start (scheduler/every reconciliation-frequency (heartbeat touch-broken-ledger "touch-broken-ledger")) :stop (scheduler/stop touch-broken-ledger-worker))