revamped logging!
This commit is contained in:
@@ -4,7 +4,10 @@
|
||||
[mount.core :as mount]
|
||||
[auto-ap.datomic.accounts :as a]
|
||||
[auto-ap.datomic :refer [uri remove-nils]]
|
||||
[clojure.spec.alpha :as s]))
|
||||
[clojure.spec.alpha :as s]
|
||||
[clojure.tools.logging :as log]
|
||||
[auto-ap.logging :refer [info-event]]
|
||||
[unilog.context :as lc]))
|
||||
|
||||
|
||||
(defn datums->impacted-entity [db [e changes]]
|
||||
@@ -129,66 +132,75 @@
|
||||
|
||||
|
||||
(defn process-one []
|
||||
(let [transaction (.take tx-report-queue)
|
||||
_ (println "processing transaction")
|
||||
db (:db-after transaction)
|
||||
affected-entities (->> (:tx-data transaction)
|
||||
(map (fn [^datomic.db.Datum x]
|
||||
{:e (:e x)
|
||||
:a (d/ident db (:a x))
|
||||
:v (:v x)
|
||||
:added (:added x)}))
|
||||
(group-by :e)
|
||||
(mapcat #(datums->impacted-entity db %))
|
||||
(set))
|
||||
_ (println "processing transaction affected" (count affected-entities))
|
||||
d-txs (->> affected-entities
|
||||
(map #(entity-change->ledger db %))
|
||||
(filter seq))
|
||||
retractions (map (fn [[_ e]] [:db/retractEntity [:journal-entry/original-entity e]]) affected-entities)]
|
||||
(lc/with-context {:source "process-txes"}
|
||||
(try
|
||||
(let [transaction (.take tx-report-queue)
|
||||
_ (log/info "Converting tranasction to ledger")
|
||||
db (:db-after transaction)
|
||||
affected-entities (->> (:tx-data transaction)
|
||||
(map (fn [^datomic.db.Datum x]
|
||||
{:e (:e x)
|
||||
:a (d/ident db (:a x))
|
||||
:v (:v x)
|
||||
:added (:added x)}))
|
||||
(group-by :e)
|
||||
(mapcat #(datums->impacted-entity db %))
|
||||
(set))
|
||||
_ (info-event (str "Found " (count affected-entities) " affected entities")
|
||||
{:affected-entities (count affected-entities)})
|
||||
d-txs (->> affected-entities
|
||||
(map #(entity-change->ledger db %))
|
||||
(filter seq))
|
||||
retractions (map (fn [[_ e]] [:db/retractEntity [:journal-entry/original-entity e]]) affected-entities)]
|
||||
|
||||
(when (seq retractions)
|
||||
@(d/transact conn retractions))
|
||||
(when (seq retractions)
|
||||
@(d/transact conn retractions))
|
||||
|
||||
(doseq [d-tx d-txs]
|
||||
@(d/transact conn [d-tx]))))
|
||||
(doseq [d-tx d-txs]
|
||||
@(d/transact conn [d-tx]))
|
||||
(log/info "Succesfully process transaction"))
|
||||
(catch Exception e
|
||||
(log/error e)))))
|
||||
|
||||
(mount/defstate process-txes-worker
|
||||
:start (scheduler/run-fun process-one 1)
|
||||
:stop (-> process-txes-worker :running? (reset! false)))
|
||||
|
||||
(defn reconcile-ledger []
|
||||
(try
|
||||
(println "Attempting to reconcile the ledger")
|
||||
(let [txes-missing-ledger-entries (->> (d/query {:query {:find ['?t ]
|
||||
:in ['$]
|
||||
:where ['[?t :transaction/date]
|
||||
'(not [?t :transaction/approval-status :transaction-approval-status/excluded])
|
||||
'(not-join [?t] [?e :journal-entry/original-entity ?t])]}
|
||||
:args [(d/db conn)]})
|
||||
(map first)
|
||||
(mapv #(entity-change->ledger (d/db conn) [:transaction %])))
|
||||
(lc/with-context {:source "reconcile-ledger"}
|
||||
(try
|
||||
(log/info "Attempting to reconcile the ledger")
|
||||
(let [txes-missing-ledger-entries (->> (d/query {:query {:find ['?t ]
|
||||
:in ['$]
|
||||
:where ['[?t :transaction/date]
|
||||
'(not [?t :transaction/approval-status :transaction-approval-status/excluded])
|
||||
'(not-join [?t] [?e :journal-entry/original-entity ?t])]}
|
||||
:args [(d/db conn)]})
|
||||
(map first)
|
||||
(mapv #(entity-change->ledger (d/db conn) [:transaction %])))
|
||||
|
||||
invoices-missing-ledger-entries (->> (d/query {:query {:find ['?t ]
|
||||
:in ['$]
|
||||
:where ['[?t :invoice/date]
|
||||
'(not [?t :invoice/status :invoice-status/voided])
|
||||
'(not [?t :invoice/import-status :import-status/pending])
|
||||
'(not [?t :invoice/exclude-from-ledger true])
|
||||
'(not-join [?t] [?e :journal-entry/original-entity ?t])]}
|
||||
:args [(d/db conn)]})
|
||||
(map first)
|
||||
(mapv #(entity-change->ledger (d/db conn) [:invoice %])))
|
||||
repairs (vec (concat txes-missing-ledger-entries invoices-missing-ledger-entries))]
|
||||
|
||||
|
||||
(when (seq repairs)
|
||||
(println "repairing " (count txes-missing-ledger-entries) " missing transactions, " (count invoices-missing-ledger-entries) " missing invoices that were missing ledger entries")
|
||||
invoices-missing-ledger-entries (->> (d/query {:query {:find ['?t ]
|
||||
:in ['$]
|
||||
:where ['[?t :invoice/date]
|
||||
'(not [?t :invoice/status :invoice-status/voided])
|
||||
'(not [?t :invoice/import-status :import-status/pending])
|
||||
'(not [?t :invoice/exclude-from-ledger true])
|
||||
'(not-join [?t] [?e :journal-entry/original-entity ?t])]}
|
||||
:args [(d/db conn)]})
|
||||
(map first)
|
||||
(mapv #(entity-change->ledger (d/db conn) [:invoice %])))
|
||||
repairs (vec (concat txes-missing-ledger-entries invoices-missing-ledger-entries))]
|
||||
|
||||
|
||||
@(d/transact conn repairs)))
|
||||
(catch Exception e
|
||||
(println e))))
|
||||
(when (seq repairs)
|
||||
(log/warn "repairing " (count txes-missing-ledger-entries) " missing transactions, " (count invoices-missing-ledger-entries) " missing invoices that were missing ledger entries")
|
||||
|
||||
|
||||
@(d/transact conn repairs))
|
||||
(log/info "Finished reconciling ledger"))
|
||||
(catch Exception e
|
||||
(log/error e)))))
|
||||
|
||||
(mount/defstate reconciliation-frequency :start 60000)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user