diff --git a/project.clj b/project.clj index 90f91a06..7d337f84 100644 --- a/project.clj +++ b/project.clj @@ -13,6 +13,9 @@ [kibu/pushy "0.3.8"] [bidi "2.1.2"] [ring/ring-defaults "0.2.1"] + [mount "0.1.16"] + [tolitius/yang "0.1.10"] + [ring "1.6.3" :exclusions [commons-codec commons-io clj-time diff --git a/src/clj/auto_ap/background/invoices.clj b/src/clj/auto_ap/background/invoices.clj index 46e4f383..326b5e46 100644 --- a/src/clj/auto_ap/background/invoices.clj +++ b/src/clj/auto_ap/background/invoices.clj @@ -1,29 +1,35 @@ (ns auto-ap.background.invoices (:require [auto-ap.datomic.invoices :as d-invoices] - [auto-ap.datomic :refer [uri]] + [auto-ap.datomic :refer [uri conn]] [datomic.api :as d] [auto-ap.time :as time] - [clj-time.coerce :as coerce])) - -(def break (atom false)) + [clj-time.coerce :as coerce] + [mount.core :as mount] + [yang.scheduler :as scheduler])) (defn close-auto-invoices [] - (while (and (not @break) - (not (Thread/interrupted))) - (println "Clearing automatic invoices") - (try - (->> (d/query {:query {:find ['?e] - :in ['$ '?today] - :where ['[?e :invoice/automatically-paid-when-due true] - '[?e :invoice/status :invoice-status/unpaid] - '[?e :invoice/due ?d] - '[(<= ?d ?today)]]} - :args [(d/db (d/connect uri)) (coerce/to-date (time/local-now))]}) - (mapv (fn [[i]] {:db/id i - :invoice/outstanding-balance 0.0 - :invoice/status :invoice-status/paid})) - (d/transact (d/connect uri)) - deref) - (Thread/sleep (* 1000 60 60)) - (catch Exception e - (println (.toString e)))))) + (try + + (let [invoices-to-close (d/query {:query {:find ['?e] + :in ['$ '?today] + :where ['[?e :invoice/automatically-paid-when-due true] + '[?e :invoice/status :invoice-status/unpaid] + '[?e :invoice/due ?d] + '[(<= ?d ?today)]]} + :args [(d/db conn) (coerce/to-date (time/local-now))]})] + (println "Closing " (count invoices-to-close) "automatic invoices") + (some->> invoices-to-close + seq + + (mapv (fn [[i]] {:db/id i + :invoice/outstanding-balance 0.0 + :invoice/status :invoice-status/paid})) + (d/transact conn) + deref) + (println "Closed " (count invoices-to-close) "automatic invoices")) + (catch Exception e + (println (.toString e))))) + +(mount/defstate close-auto-invoices-worker + :start (scheduler/every 60000 close-auto-invoices) + :stop (scheduler/stop close-auto-invoices-worker)) diff --git a/src/clj/auto_ap/datomic.clj b/src/clj/auto_ap/datomic.clj index c1b6b321..cafbc571 100644 --- a/src/clj/auto_ap/datomic.clj +++ b/src/clj/auto_ap/datomic.clj @@ -10,10 +10,15 @@ [auto-ap.db.transactions :as transactions] [clojure.string :as str] [clj-time.core :as time] - [clj-time.coerce :as coerce])) + [clj-time.coerce :as coerce] + [mount.core :as mount])) (def uri "datomic:sql://invoices?jdbc:postgresql://database:5432/datomic?user=datomic&password=datomic") +(mount/defstate conn + :start (d/connect uri) + :stop (d/release conn)) + #_(def uri "datomic:mem://datomic-transactor:4334/invoice") (defn create-database [] diff --git a/src/clj/auto_ap/ledger.clj b/src/clj/auto_ap/ledger.clj index 49b4595f..129a8ed9 100644 --- a/src/clj/auto_ap/ledger.clj +++ b/src/clj/auto_ap/ledger.clj @@ -1,7 +1,10 @@ (ns auto-ap.ledger (:require [datomic.api :as d] + [yang.scheduler :as scheduler] + [mount.core :as mount] [auto-ap.datomic.accounts :as a] - [auto-ap.datomic :refer [uri remove-nils]])) + [auto-ap.datomic :refer [uri remove-nils]] + [clojure.spec.alpha :as s])) (defn datums->impacted-entity [db [e changes]] @@ -112,16 +115,21 @@ [db [entity changes]] nil) -#_(defn entity-change->ledger [[entity-id changes]] - [entity-id (infer-entity changes)]) - (defn ledger-entries->transaction [entries] (into [[:replace-general-ledger (:journal-entry/original-entity (first entries))]] entries)) -(defn process-one [report-queue] - - (let [transaction (.take report-queue) +(mount/defstate conn + :start (d/connect uri) + :stop (d/release conn)) + +(mount/defstate tx-report-queue + :start (d/tx-report-queue conn) + :stop (d/remove-tx-report-queue conn)) + + +(defn process-one [] + (let [transaction (.take tx-report-queue) _ (println "processing transaction") db (:db-after transaction) affected-entities (->> (:tx-data transaction) @@ -133,57 +141,57 @@ (group-by :e) (mapcat #(datums->impacted-entity db %)) (set)) - _ (println "affected" (count affected-entities)) + _ (println "processing transaction affected" (count affected-entities)) d-txs (->> affected-entities (map #(entity-change->ledger db %)) (filter seq)) retractions (map (fn [[_ e]] [:db/retractEntity [:journal-entry/original-entity e]]) affected-entities)] (when (seq retractions) - @(d/transact (d/connect uri) retractions)) - - + @(d/transact conn retractions)) + (doseq [d-tx d-txs] - #_(println "updating general-ledger " d-tx) - @(d/transact (d/connect uri) [d-tx])))) + @(d/transact conn [d-tx])))) -(def break (atom false)) +(mount/defstate process-txes-worker + :start (scheduler/run-fun process-one 1) + :stop (-> process-txes-worker :running? (reset! false))) -(defn process-all [] - (println "Starting worker") - (while (and (not @break) - (not (Thread/interrupted))) - (try - (process-one (d/tx-report-queue (d/connect uri) )) - (catch Exception e - (println (.toString e)))))) +(defn reconcile-ledger [] + (try + (println "Attempting to reconcile the ledger") + (let [txes-missing-ledger-entries (->> (d/query {:query {:find ['?t ] + :in ['$] + :where ['[?t :transaction/date] + '(not [?t :transaction/approval-status :transaction-approval-status/excluded]) + '(not-join [?t] [?e :journal-entry/original-entity ?t])]} + :args [(d/db conn)]}) + (map first) + (mapv #(entity-change->ledger (d/db conn) [:transaction %]))) + invoices-missing-ledger-entries (->> (d/query {:query {:find ['?t ] + :in ['$] + :where ['[?t :invoice/date] + '(not [?t :invoice/status :invoice-status/voided]) + '(not [?t :invoice/import-status :import-status/pending]) + '(not [?t :invoice/exclude-from-ledger true]) + '(not-join [?t] [?e :journal-entry/original-entity ?t])]} + :args [(d/db conn)]}) + (map first) + (mapv #(entity-change->ledger (d/db conn) [:invoice %]))) + repairs (vec (concat txes-missing-ledger-entries invoices-missing-ledger-entries))] -#_(process-one (d/tx-report-queue (d/connect uri) )) + + (when (seq repairs) + (println "repairing " (count txes-missing-ledger-entries) " missing transactions, " (count invoices-missing-ledger-entries) " missing invoices that were missing ledger entries") -#_(process-all) + + @(d/transact conn repairs))) + (catch Exception e + (println e)))) -#_(reset! break true) +(mount/defstate reconciliation-frequency :start 60000) - - -(defn keep-up-to-date [] - (while (and (not @break) - (not (Thread/interrupted))) - (try - @(d/transact - (d/connect uri) - (mapv - #(entity-change->ledger (d/db (d/connect uri)) [:transaction %]) - (concat - (->> - (d/query {:query {:find ['?t ] - :in ['$] - :where ['[?t :transaction/date] - '(not [?t :transaction/approval-status :transaction-approval-status/excluded]) - '(not-join [?t] [?e :journal-entry/original-entity ?t])]} - :args [(d/db (d/connect uri))]}) - (map first))))) - (Thread/sleep 60000) - (catch Exception e - (println (.toString e)))))) +(mount/defstate ledger-reconciliation-worker + :start (scheduler/every reconciliation-frequency reconcile-ledger) + :stop (scheduler/stop ledger-reconciliation-worker)) diff --git a/src/clj/auto_ap/server.clj b/src/clj/auto_ap/server.clj index 191bf404..6a34e384 100644 --- a/src/clj/auto_ap/server.clj +++ b/src/clj/auto_ap/server.clj @@ -1,24 +1,20 @@ (ns auto-ap.server (:require #_[auto-ap.background.mail :refer [always-process-sqs]] [auto-ap.handler :refer [app]] - [auto-ap.ledger :refer [process-all keep-up-to-date]] - [auto-ap.yodlee.core :refer [load-in-memory-cache]] - [auto-ap.background.invoices :refer [close-auto-invoices]] + [auto-ap.ledger] + [auto-ap.yodlee.core] + [auto-ap.background.invoices] [nrepl.server :refer [start-server stop-server]] [config.core :refer [env]] - [ring.adapter.jetty :refer [run-jetty]]) + [ring.adapter.jetty :refer [run-jetty]] + [mount.core :as mount]) (:gen-class)) -#_(defn cider-nrepl-handler [] - (require 'cider.nrepl) - (ns-resolve 'cider.nrepl 'cider-nrepl-handler)) - (defn -main [& args] (start-server :port 9000 :bind "0.0.0.0" #_#_:handler (cider-nrepl-handler)) (let [port (Integer/parseInt (or (env :port) "3000"))] - (future (process-all)) - (future (load-in-memory-cache)) - (future (keep-up-to-date)) - (future (close-auto-invoices)) + + + (mount/start) #_(future (always-process-sqs)) (run-jetty app {:port port :join? false}))) diff --git a/src/clj/auto_ap/yodlee/core.clj b/src/clj/auto_ap/yodlee/core.clj index 7e313c0b..814009bb 100644 --- a/src/clj/auto_ap/yodlee/core.clj +++ b/src/clj/auto_ap/yodlee/core.clj @@ -3,7 +3,9 @@ [auto-ap.utils :refer [by]] [cemerick.url :as u] [clojure.data.json :as json] - [config.core :refer [env]])) + [config.core :refer [env]] + [mount.core :as mount] + [yang.scheduler :as scheduler])) (defn auth-header ([cob-session] (str "{cobSession=" cob-session "}")) @@ -253,20 +255,20 @@ (update-in provider-accounts [(:providerAccountId a) :accounts] conj a)) provider-accounts) vals))) -(defonce in-memory-cache (atom [])) +(mount/defstate in-memory-cache + :start (doto (atom (get-provider-accounts-with-accounts)) println)) -(defonce break? (atom false)) +(defn refresh-in-memory-cache [] + (try + (println "Refreshing Yodlee in memory cache") + (reset! in-memory-cache (get-provider-accounts-with-accounts)) + (catch Exception e + (println e)))) + +(mount/defstate in-memory-cache-worker + :start (scheduler/every (* 5 60 1000) refresh-in-memory-cache) + :stop (scheduler/stop in-memory-cache-worker)) -(defn load-in-memory-cache [] - (future - (loop [] - (try - (reset! in-memory-cache (get-provider-accounts-with-accounts)) - (catch Exception e - (println e))) - (Thread/sleep (* 30 1000 5)) - (when-not @break? - (recur))))) (defn refresh-provider-account [id] (swap! in-memory-cache