Greatly simplified background processes

This commit is contained in:
Bryce Covert
2020-07-31 23:15:43 -07:00
parent be9c789003
commit 49f98522c0
6 changed files with 116 additions and 96 deletions

View File

@@ -1,7 +1,10 @@
(ns auto-ap.ledger
(:require [datomic.api :as d]
[yang.scheduler :as scheduler]
[mount.core :as mount]
[auto-ap.datomic.accounts :as a]
[auto-ap.datomic :refer [uri remove-nils]]))
[auto-ap.datomic :refer [uri remove-nils]]
[clojure.spec.alpha :as s]))
(defn datums->impacted-entity [db [e changes]]
@@ -112,16 +115,21 @@
[db [entity changes]]
nil)
#_(defn entity-change->ledger [[entity-id changes]]
[entity-id (infer-entity changes)])
(defn ledger-entries->transaction [entries]
(into [[:replace-general-ledger (:journal-entry/original-entity (first entries))]]
entries))
(defn process-one [report-queue]
(let [transaction (.take report-queue)
(mount/defstate conn
:start (d/connect uri)
:stop (d/release conn))
(mount/defstate tx-report-queue
:start (d/tx-report-queue conn)
:stop (d/remove-tx-report-queue conn))
(defn process-one []
(let [transaction (.take tx-report-queue)
_ (println "processing transaction")
db (:db-after transaction)
affected-entities (->> (:tx-data transaction)
@@ -133,57 +141,57 @@
(group-by :e)
(mapcat #(datums->impacted-entity db %))
(set))
_ (println "affected" (count affected-entities))
_ (println "processing transaction affected" (count affected-entities))
d-txs (->> affected-entities
(map #(entity-change->ledger db %))
(filter seq))
retractions (map (fn [[_ e]] [:db/retractEntity [:journal-entry/original-entity e]]) affected-entities)]
(when (seq retractions)
@(d/transact (d/connect uri) retractions))
@(d/transact conn retractions))
(doseq [d-tx d-txs]
#_(println "updating general-ledger " d-tx)
@(d/transact (d/connect uri) [d-tx]))))
@(d/transact conn [d-tx]))))
(def break (atom false))
(mount/defstate process-txes-worker
:start (scheduler/run-fun process-one 1)
:stop (-> process-txes-worker :running? (reset! false)))
(defn process-all []
(println "Starting worker")
(while (and (not @break)
(not (Thread/interrupted)))
(try
(process-one (d/tx-report-queue (d/connect uri) ))
(catch Exception e
(println (.toString e))))))
(defn reconcile-ledger []
(try
(println "Attempting to reconcile the ledger")
(let [txes-missing-ledger-entries (->> (d/query {:query {:find ['?t ]
:in ['$]
:where ['[?t :transaction/date]
'(not [?t :transaction/approval-status :transaction-approval-status/excluded])
'(not-join [?t] [?e :journal-entry/original-entity ?t])]}
:args [(d/db conn)]})
(map first)
(mapv #(entity-change->ledger (d/db conn) [:transaction %])))
invoices-missing-ledger-entries (->> (d/query {:query {:find ['?t ]
:in ['$]
:where ['[?t :invoice/date]
'(not [?t :invoice/status :invoice-status/voided])
'(not [?t :invoice/import-status :import-status/pending])
'(not [?t :invoice/exclude-from-ledger true])
'(not-join [?t] [?e :journal-entry/original-entity ?t])]}
:args [(d/db conn)]})
(map first)
(mapv #(entity-change->ledger (d/db conn) [:invoice %])))
repairs (vec (concat txes-missing-ledger-entries invoices-missing-ledger-entries))]
#_(process-one (d/tx-report-queue (d/connect uri) ))
(when (seq repairs)
(println "repairing " (count txes-missing-ledger-entries) " missing transactions, " (count invoices-missing-ledger-entries) " missing invoices that were missing ledger entries")
#_(process-all)
@(d/transact conn repairs)))
(catch Exception e
(println e))))
#_(reset! break true)
(mount/defstate reconciliation-frequency :start 60000)
(defn keep-up-to-date []
(while (and (not @break)
(not (Thread/interrupted)))
(try
@(d/transact
(d/connect uri)
(mapv
#(entity-change->ledger (d/db (d/connect uri)) [:transaction %])
(concat
(->>
(d/query {:query {:find ['?t ]
:in ['$]
:where ['[?t :transaction/date]
'(not [?t :transaction/approval-status :transaction-approval-status/excluded])
'(not-join [?t] [?e :journal-entry/original-entity ?t])]}
:args [(d/db (d/connect uri))]})
(map first)))))
(Thread/sleep 60000)
(catch Exception e
(println (.toString e))))))
(mount/defstate ledger-reconciliation-worker
:start (scheduler/every reconciliation-frequency reconcile-ledger)
:stop (scheduler/stop ledger-reconciliation-worker))