389 lines
21 KiB
Clojure
389 lines
21 KiB
Clojure
(ns auto-ap.ledger
|
|
(:require [datomic.api :as d]
|
|
[yang.scheduler :as scheduler]
|
|
[mount.core :as mount]
|
|
[auto-ap.datomic.accounts :as a]
|
|
[auto-ap.datomic :refer [uri remove-nils conn]]
|
|
[clojure.spec.alpha :as s]
|
|
[auto-ap.utils :refer [dollars-0? dollars=] ]
|
|
[clojure.tools.logging :as log]
|
|
[auto-ap.logging :refer [info-event]]
|
|
[unilog.context :as lc]))
|
|
|
|
|
|
(defn datums->impacted-entity [db [e changes]]
|
|
(let [entity (d/pull db '[* {:invoice/_expense-accounts [:db/id] :transaction/_accounts [:db/id]}] e)
|
|
namespaces (->> changes
|
|
(map :a)
|
|
(map namespace)
|
|
set)]
|
|
(cond (namespaces "invoice" ) [[:invoice e]]
|
|
(namespaces "invoice-expense-account" ) [[:invoice (:db/id (:invoice/_expense-accounts entity))]]
|
|
(namespaces "transaction-account" ) [[:transaction (:db/id (:transaction/_accounts entity))]]
|
|
(namespaces "transaction" ) [[:transaction e]]
|
|
:else nil)))
|
|
|
|
|
|
|
|
(defn infer-entity [_ [_ changes]]
|
|
(let [namespaces (->> changes
|
|
(map :a)
|
|
(map namespace)
|
|
set)]
|
|
(cond (namespaces "invoice" ) :invoice
|
|
(namespaces "invoice-expense-account" ) :invoice-expense-account
|
|
(namespaces "transaction-account" ) :transaction-account
|
|
:else nil)))
|
|
|
|
(defmulti entity-change->ledger (fn [_ [type]]
|
|
type))
|
|
|
|
(defmethod entity-change->ledger :invoice
|
|
[db [type id]]
|
|
(let [entity (d/pull db ['* {:invoice/vendor '[*]
|
|
:invoice/payment '[*]
|
|
:invoice/status '[:db/ident]
|
|
:invoice/import-status '[:db/ident]}] id)
|
|
credit-invoice? (< (:invoice/total entity) 0.0)]
|
|
(when-not (or
|
|
(not (:invoice/total entity))
|
|
(= true (:invoice/exclude-from-ledger entity))
|
|
(= :import-status/pending (:db/ident (:invoice/import-status entity)))
|
|
(= :invoice-status/voided (:db/ident (:invoice/status entity)))
|
|
(dollars-0? (:invoice/total entity)))
|
|
(remove-nils
|
|
{:journal-entry/source "invoice"
|
|
:journal-entry/client (:db/id (:invoice/client entity))
|
|
:journal-entry/date (:invoice/date entity)
|
|
:journal-entry/original-entity (:db/id entity)
|
|
:journal-entry/vendor (:db/id (:invoice/vendor entity))
|
|
:journal-entry/amount (Math/abs (:invoice/total entity))
|
|
|
|
:journal-entry/line-items (into [(cond-> {:journal-entry-line/account (:db/id (a/get-account-by-numeric-code-and-sets 21000 ["default"]))
|
|
:journal-entry-line/location "A"
|
|
}
|
|
credit-invoice? (assoc :journal-entry-line/debit (Math/abs (:invoice/total entity)))
|
|
(not credit-invoice?) (assoc :journal-entry-line/credit (Math/abs (:invoice/total entity))))]
|
|
(map (fn [ea]
|
|
(cond->
|
|
{:journal-entry-line/account (:db/id (:invoice-expense-account/account ea))
|
|
:journal-entry-line/location (or (:invoice-expense-account/location ea) "HQ")
|
|
}
|
|
credit-invoice? (assoc :journal-entry-line/credit (Math/abs (:invoice-expense-account/amount ea)))
|
|
(not credit-invoice?) (assoc :journal-entry-line/debit (Math/abs (:invoice-expense-account/amount ea)))))
|
|
(:invoice/expense-accounts entity)))
|
|
:journal-entry/cleared (and (< (:invoice/outstanding-balance entity) 0.01)
|
|
(every? #(= :payment-status/cleared (:payment/status %)) (:invoice/payments entity))
|
|
)}))))
|
|
|
|
(defmethod entity-change->ledger :transaction
|
|
[db [type id]]
|
|
(let [entity (d/pull db ['* {:transaction/vendor '[*]
|
|
:transaction/client '[*]
|
|
:transaction/approval-status '[*]
|
|
:transaction/bank-account '[* {:bank-account/type [:db/ident]}]
|
|
:transaction/accounts '[*
|
|
{:transaction-account/account [*]}] }] id)
|
|
bank-account-type (-> entity :transaction/bank-account :bank-account/type :db/ident)
|
|
decreasing? (< (:transaction/amount entity) 0.0)
|
|
credit-from-bank? decreasing?
|
|
debit-from-bank? (not decreasing?)]
|
|
(when-not (or (= :transaction-approval-status/excluded (:db/ident (:transaction/approval-status entity)))
|
|
(= :transaction-approval-status/suppressed (:db/ident (:transaction/approval-status entity)))
|
|
(dollars-0? (:transaction/amount entity)))
|
|
(remove-nils
|
|
{:journal-entry/source "transaction"
|
|
:journal-entry/client (:db/id (:transaction/client entity))
|
|
:journal-entry/date (:transaction/date entity)
|
|
:journal-entry/original-entity (:db/id entity)
|
|
:journal-entry/alternate-description (:transaction/description-original entity)
|
|
:journal-entry/vendor (:db/id (:transaction/vendor entity))
|
|
:journal-entry/amount (Math/abs (:transaction/amount entity))
|
|
:journal-entry/cleared-against (:transaction/cleared-against entity)
|
|
|
|
:journal-entry/line-items (into [(remove-nils {:journal-entry-line/account (:db/id (:transaction/bank-account entity))
|
|
:journal-entry-line/location "A"
|
|
:journal-entry-line/credit (when credit-from-bank?
|
|
(Math/abs (:transaction/amount entity)))
|
|
:journal-entry-line/debit (when debit-from-bank?
|
|
(Math/abs (:transaction/amount entity)))})
|
|
]
|
|
(map
|
|
(fn [a]
|
|
(remove-nils{:journal-entry-line/account (:db/id (:transaction-account/account a))
|
|
:journal-entry-line/location (:transaction-account/location a)
|
|
:journal-entry-line/debit (when credit-from-bank?
|
|
(Math/abs (:transaction-account/amount a)))
|
|
:journal-entry-line/credit (when debit-from-bank?
|
|
(Math/abs (:transaction-account/amount a)))}))
|
|
(if (seq (:transaction/accounts entity))
|
|
(:transaction/accounts entity)
|
|
[{:transaction-account/amount (:transaction/amount entity)}])))
|
|
|
|
:journal-entry/cleared true}))))
|
|
|
|
(defmethod entity-change->ledger :invoice-expense-account
|
|
[db [entity changes]]
|
|
nil
|
|
)
|
|
|
|
(defmethod entity-change->ledger nil
|
|
[db [entity changes]]
|
|
nil)
|
|
|
|
(defn ledger-entries->transaction [entries]
|
|
(into [[:replace-general-ledger (:journal-entry/original-entity (first entries))]]
|
|
entries))
|
|
|
|
|
|
|
|
(mount/defstate tx-report-queue
|
|
:start (d/tx-report-queue conn)
|
|
:stop (d/remove-tx-report-queue conn))
|
|
|
|
|
|
(defn process-one []
|
|
(lc/with-context {:source "process-txes"}
|
|
(try
|
|
(let [transaction (.take tx-report-queue)
|
|
_ (log/info "Converting tranasction to ledger")
|
|
db (:db-after transaction)
|
|
affected-entities (->> (:tx-data transaction)
|
|
(map (fn [^datomic.db.Datum x]
|
|
{:e (:e x)
|
|
:a (d/ident db (:a x))
|
|
:v (:v x)
|
|
:added (:added x)}))
|
|
(group-by :e)
|
|
(mapcat #(datums->impacted-entity db %))
|
|
(set))
|
|
_ (info-event (str "Found " (count affected-entities) " affected entities")
|
|
{:affected-entities (count affected-entities)})
|
|
d-txs (->> affected-entities
|
|
(map #(entity-change->ledger db %))
|
|
(filter seq))
|
|
retractions (map (fn [[_ e]] [:db/retractEntity [:journal-entry/original-entity e]]) affected-entities)]
|
|
|
|
(when (seq retractions)
|
|
@(d/transact conn retractions))
|
|
|
|
(doseq [d-tx d-txs]
|
|
@(d/transact conn [d-tx]))
|
|
(log/info "Succesfully process transaction"))
|
|
(catch Exception e
|
|
(log/error e)))))
|
|
|
|
(mount/defstate process-txes-worker
|
|
:start (scheduler/run-fun process-one 1)
|
|
:stop (-> process-txes-worker :running? (reset! false)))
|
|
|
|
(defn reconcile-ledger []
|
|
(lc/with-context {:source "reconcile-ledger"}
|
|
(try
|
|
(log/info "Attempting to reconcile the ledger")
|
|
(let [txes-missing-ledger-entries (->> (d/query {:query {:find ['?t ]
|
|
:in ['$]
|
|
:where ['[?t :transaction/date]
|
|
'[?t :transaction/amount ?amt]
|
|
'[(not= 0.0 ?amt)]
|
|
'(not [?t :transaction/approval-status :transaction-approval-status/excluded])
|
|
'(not [?t :transaction/approval-status :transaction-approval-status/suppressed])
|
|
'(not-join [?t] [?e :journal-entry/original-entity ?t])]}
|
|
:args [(d/db conn)]})
|
|
(map first)
|
|
(mapv #(entity-change->ledger (d/db conn) [:transaction %])))
|
|
|
|
|
|
invoices-missing-ledger-entries (->> (d/query {:query {:find ['?t ]
|
|
:in ['$]
|
|
:where ['[?t :invoice/date]
|
|
'[?t :invoice/total ?amt]
|
|
'[(not= 0.0 ?amt)]
|
|
'(not [?t :invoice/status :invoice-status/voided])
|
|
'(not [?t :invoice/import-status :import-status/pending])
|
|
'(not [?t :invoice/exclude-from-ledger true])
|
|
'(not-join [?t] [?e :journal-entry/original-entity ?t])]}
|
|
:args [(d/db conn)]})
|
|
(map first)
|
|
(mapv #(entity-change->ledger (d/db conn) [:invoice %])))
|
|
repairs (vec (concat txes-missing-ledger-entries invoices-missing-ledger-entries))]
|
|
|
|
|
|
(when (seq repairs)
|
|
(log/info (take 3 repairs))
|
|
(log/warn "repairing " (count txes-missing-ledger-entries) " missing transactions, " (count invoices-missing-ledger-entries) " missing invoices that were missing ledger entries")
|
|
|
|
|
|
@(d/transact conn repairs))
|
|
(log/info "Finished reconciling ledger"))
|
|
(catch Exception e
|
|
(log/error e)))))
|
|
|
|
(mount/defstate reconciliation-frequency :start (* 1000 60 60))
|
|
|
|
(mount/defstate ledger-reconciliation-worker
|
|
:start (scheduler/every reconciliation-frequency reconcile-ledger)
|
|
:stop (scheduler/stop ledger-reconciliation-worker))
|
|
|
|
|
|
(defn touch-transaction [e]
|
|
@(d/transact conn [[:db/retractEntity [:journal-entry/original-entity e]]])
|
|
@(d/transact conn [{:db/id "datomic.tx"
|
|
:db/doc "touching transaction to update ledger"}
|
|
(entity-change->ledger (d/db conn)
|
|
[:transaction e])]))
|
|
|
|
(defn touch-invoice [e]
|
|
@(d/transact conn [[:db/retractEntity [:journal-entry/original-entity e]]])
|
|
@(d/transact conn [{:db/id "datomic.tx"
|
|
:db/doc "touching invoice to update ledger"}
|
|
(entity-change->ledger (d/db conn)
|
|
[:invoice e])]))
|
|
(defn mismatched-transactions []
|
|
(let [jel-accounts (reduce
|
|
(fn [acc [e lia]]
|
|
(update acc e (fnil conj #{} ) lia))
|
|
{}
|
|
(d/query {:query {:find ['?e '?lia]
|
|
:in ['$]
|
|
:where ['[?je :journal-entry/line-items ?li]
|
|
'[?je :journal-entry/original-entity ?e]
|
|
'[?li :journal-entry-line/account ?lia]
|
|
'[?lia :account/name]]}
|
|
:args [(d/db auto-ap.datomic/conn)]}))
|
|
transaction-accounts (reduce
|
|
(fn [acc [e lia]]
|
|
(update acc e (fnil conj #{} ) lia))
|
|
{}
|
|
(d/query {:query {:find ['?e '?lia]
|
|
:in ['$]
|
|
:where ['[?e :transaction/accounts ?li]
|
|
'(not [?e :transaction/approval-status :transaction-approval-status/excluded])
|
|
'(not [?e :transaction/approval-status :transaction-approval-status/suppressed])
|
|
'[?li :transaction-account/account ?lia]
|
|
|
|
'[?lia :account/name]
|
|
'[?e :transaction/amount ?amt]
|
|
'[(not= ?amt 0.0)]]}
|
|
:args [(d/db auto-ap.datomic/conn)]}))
|
|
]
|
|
(filter
|
|
(fn [[e accounts]] (not= accounts (get jel-accounts e)))
|
|
transaction-accounts)))
|
|
|
|
(defn unbalanced-transactions []
|
|
(->> (d/query {:query {:find ['?je '?a '(sum ?debit) '(sum ?credit)]
|
|
:with ['?jel]
|
|
:in '[$]
|
|
:where ['[?je :journal-entry/amount ?a]
|
|
'[?je :journal-entry/line-items ?jel]
|
|
'[(get-else $ ?jel :journal-entry-line/debit 0.0) ?debit]
|
|
'[(get-else $ ?jel :journal-entry-line/credit 0.0) ?credit]]
|
|
}
|
|
:args [(d/db auto-ap.datomic/conn)]})
|
|
(filter (fn [[_ a d c]]
|
|
(or (not (dollars= a d))
|
|
(not (dollars= a c)))))
|
|
(map first)
|
|
(map (fn [je]
|
|
(:journal-entry/original-entity (d/entity (d/db auto-ap.datomic/conn)
|
|
je))))
|
|
(filter (fn [transaction?]
|
|
(:transaction/amount transaction?)))
|
|
(map :db/id)))
|
|
|
|
(defn unbalanced-invoices []
|
|
(->> (d/query {:query {:find ['?je '?a '(sum ?debit) '(sum ?credit)]
|
|
:with ['?jel]
|
|
:in '[$]
|
|
:where ['[?je :journal-entry/amount ?a]
|
|
'[?je :journal-entry/line-items ?jel]
|
|
'[(get-else $ ?jel :journal-entry-line/debit 0.0) ?debit]
|
|
'[(get-else $ ?jel :journal-entry-line/credit 0.0) ?credit]]
|
|
}
|
|
:args [(d/db auto-ap.datomic/conn)]})
|
|
(filter (fn [[_ a d c]]
|
|
(or (not (dollars= a d))
|
|
(not (dollars= a c)))))
|
|
(map first)
|
|
(map (fn [je]
|
|
(:journal-entry/original-entity (d/entity (d/db auto-ap.datomic/conn)
|
|
je))))
|
|
(filter (fn [invoice?]
|
|
(:invoice/total invoice?)))
|
|
(map :db/id)))
|
|
|
|
(defn mismatched-invoices []
|
|
(let [jel-accounts (reduce
|
|
(fn [acc [e lia]]
|
|
(update acc e (fnil conj #{} ) lia))
|
|
{}
|
|
(d/query {:query {:find ['?e '?lia]
|
|
:in ['$]
|
|
:where ['[?je :journal-entry/line-items ?li]
|
|
'[?je :journal-entry/original-entity ?e]
|
|
'[?li :journal-entry-line/account ?lia]
|
|
'(not [?lia :account/numeric-code 21000])
|
|
'[?lia :account/name]]}
|
|
:args [(d/db auto-ap.datomic/conn)]}))
|
|
invoice-accounts (reduce
|
|
(fn [acc [e lia]]
|
|
(update acc e (fnil conj #{} ) lia))
|
|
{}
|
|
(d/query {:query {:find ['?e '?lia]
|
|
:in ['$]
|
|
:where ['[?e :invoice/expense-accounts ?li]
|
|
'(not [?e :invoice/total 0.0])
|
|
'[?li :invoice-expense-account/account ?lia]
|
|
'[?lia :account/name]
|
|
'(not [?lia :account/numeric-code 21000])
|
|
'(not [?e :invoice/status :invoice-status/voided])
|
|
'(not [?e :invoice/exclude-from-ledger true])
|
|
'[?e :invoice/import-status :import-status/imported]]}
|
|
:args [(d/db auto-ap.datomic/conn)]}))
|
|
]
|
|
(filter
|
|
(fn [[e accounts]]
|
|
(not= accounts (get jel-accounts e)))
|
|
invoice-accounts)))
|
|
|
|
(defn touch-broken-ledger []
|
|
(lc/with-context {:source "touch-broken-ledger"}
|
|
(try
|
|
(log/info "Attempting to fix transactions that are in the ledger but are wrong")
|
|
(let [mismatched-ts (mismatched-transactions)]
|
|
(if (seq mismatched-ts)
|
|
(do
|
|
(log/warn (count mismatched-ts) " transactions exist but don't match ledger ")
|
|
(doseq [[m] mismatched-ts]
|
|
(touch-transaction m)))))
|
|
(log/info "Attempting to fix transactions that are in the ledger but debits/credits don't add up")
|
|
(let [unbalanced-ts (unbalanced-transactions)]
|
|
(if (seq unbalanced-ts)
|
|
(do
|
|
(log/warn (count unbalanced-ts) " transactions exist but don't have matching debits/credits ")
|
|
(doseq [m unbalanced-ts]
|
|
(touch-transaction m)))))
|
|
(log/info "Finished fixing transactions that are in the ledger but are wrong")
|
|
(let [mismatched-is (mismatched-invoices)]
|
|
(if (seq mismatched-is)
|
|
(do
|
|
(log/warn (count mismatched-is) " invoice exist but don't match ledger ")
|
|
(doseq [[m] mismatched-is]
|
|
(touch-invoice m)))))
|
|
(log/info "Attempting to fix transactions that are in the ledger but debits/credits don't add up")
|
|
(let [unbalanced-invoices (unbalanced-invoices)]
|
|
(if (seq unbalanced-invoices)
|
|
(do
|
|
(log/warn (count unbalanced-invoices) " invoices exist but don't have matching debits/credits ")
|
|
(doseq [m unbalanced-invoices]
|
|
(touch-invoice m)))))
|
|
|
|
(log/info "Finished fixing invoices that are in the ledger but are wrong")
|
|
(catch Exception e
|
|
(log/error e)))))
|
|
|
|
(mount/defstate touch-broken-ledger-worker
|
|
:start (scheduler/every reconciliation-frequency touch-broken-ledger)
|
|
:stop (scheduler/stop touch-broken-ledger-worker))
|