From 1db8d7a52c80139a93f3a19594bee27b6c129bbb Mon Sep 17 00:00:00 2001 From: Bryce Covert Date: Wed, 22 Jun 2022 10:43:37 -0700 Subject: [PATCH] much more background process tracking --- src/clj/auto_ap/background/invoices.clj | 55 ++++----- src/clj/auto_ap/background/mail.clj | 129 ++++++++++---------- src/clj/auto_ap/background/requests.clj | 66 ++++------ src/clj/auto_ap/background/sysco.clj | 90 +++++++------- src/clj/auto_ap/background/vendor.clj | 67 +++++----- src/clj/auto_ap/graphql/clients.clj | 51 ++++---- src/clj/auto_ap/graphql/ledger.clj | 17 +-- src/clj/auto_ap/import/intuit.clj | 67 +++++----- src/clj/auto_ap/import/plaid.clj | 12 +- src/clj/auto_ap/import/yodlee.clj | 65 +++++----- src/clj/auto_ap/import/yodlee2.clj | 6 +- src/clj/auto_ap/ledger.clj | 156 +++++++++++------------- src/clj/auto_ap/plaid/import.clj | 59 --------- src/clj/auto_ap/square/core.clj | 3 +- src/cljc/auto_ap/utils.cljc | 22 +++- 15 files changed, 379 insertions(+), 486 deletions(-) delete mode 100644 src/clj/auto_ap/plaid/import.clj diff --git a/src/clj/auto_ap/background/invoices.clj b/src/clj/auto_ap/background/invoices.clj index dc7934b0..0aa1acc0 100644 --- a/src/clj/auto_ap/background/invoices.clj +++ b/src/clj/auto_ap/background/invoices.clj @@ -1,37 +1,32 @@ (ns auto-ap.background.invoices - (:require [auto-ap.datomic.invoices :as d-invoices] - [auto-ap.datomic :refer [uri conn]] - [datomic.api :as d] - [auto-ap.time :as time] - [clj-time.coerce :as coerce] - [mount.core :as mount] - [yang.scheduler :as scheduler] - [unilog.context :as lc] - [clojure.tools.logging :as log])) + (:require + [auto-ap.datomic :refer [conn]] + [auto-ap.time :as time] + [auto-ap.utils :refer [heartbeat]] + [clj-time.coerce :as coerce] + [clojure.tools.logging :as log] + [datomic.api :as d] + [mount.core :as mount] + [yang.scheduler :as scheduler])) (defn close-auto-invoices [] - (lc/with-context {:source "close-auto-invoices"} - (try - - (let [invoices-to-close (d/query {:query {:find ['?e] - :in ['$ '?today] - :where ['[?e :invoice/scheduled-payment ?d] - '[?e :invoice/status :invoice-status/unpaid] - '[(<= ?d ?today)]]} - :args [(d/db conn) (coerce/to-date (time/local-now))]})] - (log/info "Closing " (count invoices-to-close) "scheduled invoices") - (some->> invoices-to-close - seq + (let [invoices-to-close (d/query {:query {:find ['?e] + :in ['$ '?today] + :where ['[?e :invoice/scheduled-payment ?d] + '[?e :invoice/status :invoice-status/unpaid] + '[(<= ?d ?today)]]} + :args [(d/db conn) (coerce/to-date (time/local-now))]})] + (log/info "Closing " (count invoices-to-close) "scheduled invoices") + (some->> invoices-to-close + seq - (mapv (fn [[i]] {:db/id i - :invoice/outstanding-balance 0.0 - :invoice/status :invoice-status/paid})) - (d/transact conn) - deref) - (log/info "Closed " (count invoices-to-close) "scheduled invoices")) - (catch Exception e - (log/error e))))) + (mapv (fn [[i]] {:db/id i + :invoice/outstanding-balance 0.0 + :invoice/status :invoice-status/paid})) + (d/transact conn) + deref) + (log/info "Closed " (count invoices-to-close) "scheduled invoices"))) (mount/defstate close-auto-invoices-worker - :start (scheduler/every 60000 close-auto-invoices) + :start (scheduler/every 60000 (heartbeat close-auto-invoices "close-auto-invoices")) :stop (scheduler/stop close-auto-invoices-worker)) diff --git a/src/clj/auto_ap/background/mail.clj b/src/clj/auto_ap/background/mail.clj index 7764b7e5..fa7410b5 100644 --- a/src/clj/auto_ap/background/mail.clj +++ b/src/clj/auto_ap/background/mail.clj @@ -1,22 +1,24 @@ (ns auto-ap.background.mail - (:require [amazonica.aws.s3 :as s3] - [amazonica.aws.sqs :as sqs] - [amazonica.aws.simpleemail :as ses] - [auto-ap.parse :as parse] - [auto-ap.routes.invoices :as invoices] - [clojure-mail.message :as message] - [clojure.data.json :as json] - [clojure.java.io :as io] - [clojure.string :as str] - [config.core :refer [env]] - [clojure.tools.logging :as log] - [unilog.context :as lc] - [mount.core :as mount] - [yang.scheduler :as scheduler]) - (:import (java.util Properties UUID) - (javax.mail Session) - (javax.mail.internet MimeMessage))) - + (:require + [amazonica.aws.s3 :as s3] + [amazonica.aws.simpleemail :as ses] + [amazonica.aws.sqs :as sqs] + [auto-ap.parse :as parse] + [auto-ap.routes.invoices :as invoices] + [auto-ap.utils :refer [heartbeat]] + [clojure-mail.message :as message] + [clojure.data.json :as json] + [clojure.java.io :as io] + [clojure.string :as str] + [clojure.tools.logging :as log] + [config.core :refer [env]] + [mount.core :as mount] + [unilog.context :as lc] + [yang.scheduler :as scheduler]) + (:import + (java.util Properties UUID) + (javax.mail Session) + (javax.mail.internet MimeMessage))) (defn send-email-about-failed-message [mail-bucket mail-key] (let [target-key (str "failed-emails/" mail-key ".eml") @@ -33,54 +35,49 @@ (defn process-sqs [] - (lc/with-context {:source "import-uploaded-invoices"} - - (try - (log/info "Fetching messages from sqs...") - (doseq [message (:messages (sqs/receive-message {:queue-url (:invoice-import-queue-url env) - :wait-time-seconds 5 - :max-number-of-messages 10 - #_#_:attribute-names ["All"]}))] - (let [message-body (json/read-str (:body message) - :key-fn keyword)] - (doseq [r (:Records message-body)] - (log/info "Processing record " r) - (let [props (Session/getDefaultInstance (Properties.)) - message-stream (-> (s3/get-object {:key (-> r :s3 :object :key) - :bucket-name (-> r :s3 :bucket :name)}) - :input-stream) - mail (message/read-message (MimeMessage. props message-stream))] - (log/info "reading mail" (->> mail :body (filter :content-type) (map :body) )) - (doseq [pdf-stream (->> (-> mail :body) - (filter :content-type) - #_(filter #(re-find #"application/pdf" (:content-type %)) )) - :let [filename (str "/tmp/" (UUID/randomUUID) ".pdf")]] - (try - (let [ - _ (io/copy (:body pdf-stream) (io/file filename)) - extension (last (str/split (.getName (io/file filename)) #"\.")) - s3-location (str "invoice-files/" (str (UUID/randomUUID)) "." extension) - _ (s3/put-object :bucket-name (:data-bucket env) - :key s3-location - :input-stream (io/input-stream filename) - :metadata {:content-type "application/pdf"}) - imports (->> (parse/parse-file filename filename) - (map #(assoc % - :source-url (str "http://" (:data-bucket env) - ".s3-website-us-east-1.amazonaws.com/" - s3-location) - :import-status :import-status/approved)))] - (log/info "Found imports" imports) - (invoices/import-uploaded-invoice {:user/role "admin"} imports )) - (catch Exception e - (log/warn e) - (send-email-about-failed-message (-> r :s3 :bucket :name) (-> r :s3 :object :key))) - (finally - (io/delete-file filename))))))) - (sqs/delete-message (assoc message :queue-url (:invoice-import-queue-url env) ))) - (catch Exception e - (log/error e))))) + (log/info "Fetching messages from sqs...") + (doseq [message (:messages (sqs/receive-message {:queue-url (:invoice-import-queue-url env) + :wait-time-seconds 5 + :max-number-of-messages 10 + #_#_:attribute-names ["All"]}))] + (let [message-body (json/read-str (:body message) + :key-fn keyword)] + (doseq [r (:Records message-body)] + (log/info "Processing record " r) + (let [props (Session/getDefaultInstance (Properties.)) + message-stream (-> (s3/get-object {:key (-> r :s3 :object :key) + :bucket-name (-> r :s3 :bucket :name)}) + :input-stream) + mail (message/read-message (MimeMessage. props message-stream))] + (log/info "reading mail" (->> mail :body (filter :content-type) (map :body) )) + (doseq [pdf-stream (->> (-> mail :body) + (filter :content-type) + #_(filter #(re-find #"application/pdf" (:content-type %)) )) + :let [filename (str "/tmp/" (UUID/randomUUID) ".pdf")]] + (try + (let [ + _ (io/copy (:body pdf-stream) (io/file filename)) + extension (last (str/split (.getName (io/file filename)) #"\.")) + s3-location (str "invoice-files/" (str (UUID/randomUUID)) "." extension) + _ (s3/put-object :bucket-name (:data-bucket env) + :key s3-location + :input-stream (io/input-stream filename) + :metadata {:content-type "application/pdf"}) + imports (->> (parse/parse-file filename filename) + (map #(assoc % + :source-url (str "http://" (:data-bucket env) + ".s3-website-us-east-1.amazonaws.com/" + s3-location) + :import-status :import-status/approved)))] + (log/info "Found imports" imports) + (invoices/import-uploaded-invoice {:user/role "admin"} imports )) + (catch Exception e + (log/warn e) + (send-email-about-failed-message (-> r :s3 :bucket :name) (-> r :s3 :object :key))) + (finally + (io/delete-file filename))))))) + (sqs/delete-message (assoc message :queue-url (:invoice-import-queue-url env) )))) (mount/defstate import-invoices - :start (scheduler/every (* 60 5000) process-sqs) + :start (scheduler/every (* 60 5000) (heartbeat process-sqs "import-uploaded-invoices")) :stop (scheduler/stop import-invoices)) diff --git a/src/clj/auto_ap/background/requests.clj b/src/clj/auto_ap/background/requests.clj index 7c7ba6a3..3728dde6 100644 --- a/src/clj/auto_ap/background/requests.clj +++ b/src/clj/auto_ap/background/requests.clj @@ -1,62 +1,44 @@ (ns auto-ap.background.requests (:require [amazonica.aws.sqs :as sqs] - [config.core :refer [env]] - [mount.core :as mount] - [yang.scheduler :as scheduler] - [auto-ap.yodlee.core2 :as client2] - [clojure.tools.logging :as log] [auto-ap.import.intuit :as i] [auto-ap.import.plaid :as p] - [unilog.context :as lc] [auto-ap.import.yodlee :as y] [auto-ap.import.yodlee2 :as y2] - )) + [auto-ap.utils :refer [heartbeat]] + [clojure.tools.logging :as log] + [config.core :refer [env]] + [mount.core :as mount] + [yang.scheduler :as scheduler])) (def queue-url (:requests-queue-url env)) (defn process-1 [] - (lc/with-context {:source "Request poller"} - (log/info "Checking SQS...") - (let [[{:keys [message-id receipt-handle body]}] (:messages (sqs/receive-message {:queue-url queue-url - :wait-time-seconds 1 - :count 1}))] - - (when message-id - (sqs/delete-message {:queue-url queue-url - :receipt-handle receipt-handle} ) - (log/infof "processing message %s with body %s" message-id body ) - (cond - (= ":intuit" body) - (try - (i/import-intuit) - (catch Exception e - (log/error e))) + (let [[{:keys [message-id receipt-handle body]}] (:messages (sqs/receive-message {:queue-url queue-url + :wait-time-seconds 1 + :count 1}))] + + (when message-id + (sqs/delete-message {:queue-url queue-url + :receipt-handle receipt-handle} ) + (log/infof "processing message %s with body %s" message-id body ) + (cond + (= ":intuit" body) + (i/import-intuit) - (= ":yodlee" body) - (try - (y/import-yodlee) - (catch Exception e - (log/error e))) + (= ":yodlee" body) + (y/import-yodlee) - (= ":yodlee2" body) - (try - (client2/upsert-accounts) - (y2/import-yodlee2) - (catch Exception e - (log/error e))) + (= ":yodlee2" body) + (y2/import-yodlee2) - (= ":plaid" body) - (try - (p/import-plaid) - (catch Exception e - (log/error e)))) - )))) + (= ":plaid" body) + (p/import-plaid))))) (defn fake-message [] - (sqs/send-message {:queue-url (:requests-queue-url env) + (sqs/send-message {:queue-url (:requests-queue-url env) :message-body ":intuit"} )) (mount/defstate request-listener - :start (scheduler/every (* 1000 30) process-1) + :start (scheduler/every (* 1000 30) (heartbeat process-1 "request-poller")) :stop (scheduler/stop request-listener)) diff --git a/src/clj/auto_ap/background/sysco.clj b/src/clj/auto_ap/background/sysco.clj index 8d14c363..f9734c16 100644 --- a/src/clj/auto_ap/background/sysco.clj +++ b/src/clj/auto_ap/background/sysco.clj @@ -6,6 +6,7 @@ [auto-ap.datomic.invoices :refer [code-invoice]] [auto-ap.parse :as parse] [auto-ap.time :as t] + [auto-ap.utils :refer [heartbeat]] [clj-time.coerce :as coerce] [clojure.data.csv :as csv] [clojure.java.io :as io] @@ -15,8 +16,6 @@ [config.core :refer [env]] [datomic.api :as d] [mount.core :as mount] - [unilog.context :as lc] - #_{:clj-kondo/ignore [:unused-namespace]} [yang.scheduler :as scheduler]) (:import (java.util UUID))) @@ -111,54 +110,53 @@ (defn import-sysco [] - (lc/with-context {:source "sysco-importer"} - (let [sysco-vendor (get-sysco-vendor) - clients (d-clients/get-all) - keys (->> (s3/list-objects-v2 {:bucket-name bucket-name - :prefix "sysco/pending"}) - :object-summaries - (map :key))] - + (let [sysco-vendor (get-sysco-vendor) + clients (d-clients/get-all) + keys (->> (s3/list-objects-v2 {:bucket-name bucket-name + :prefix "sysco/pending"}) + :object-summaries + (map :key))] + - (statsd/event {:title "Sysco import started" - :text (format "Found %d sysco invoice to import: %s" (count keys) (pr-str keys)) - :priority :low} - nil) - (log/infof "Found %d sysco invoice to import: %s" (count keys) (pr-str keys)) - - (let [transaction (->> keys - (mapcat (fn [k] - (try - (let [invoice-key (str "invoice-files/" (UUID/randomUUID) ".csv") ; - invoice-url (str "http://" (:data-bucket env) ".s3-website-us-east-1.amazonaws.com/" invoice-key)] - (s3/copy-object {:source-bucket-name (:data-bucket env) + (statsd/event {:title "Sysco import started" + :text (format "Found %d sysco invoice to import: %s" (count keys) (pr-str keys)) + :priority :low} + nil) + (log/infof "Found %d sysco invoice to import: %s" (count keys) (pr-str keys)) + + (let [transaction (->> keys + (mapcat (fn [k] + (try + (let [invoice-key (str "invoice-files/" (UUID/randomUUID) ".csv") ; + invoice-url (str "http://" (:data-bucket env) ".s3-website-us-east-1.amazonaws.com/" invoice-key)] + (s3/copy-object {:source-bucket-name (:data-bucket env) + :destination-bucket-name (:data-bucket env) + :source-key k + :destination-key invoice-key}) + [[:propose-invoice + (-> k + read-sysco-csv + (extract-invoice-details clients sysco-vendor) + (assoc :invoice/source-url invoice-url))]]) + (catch Exception e + (log/error (str "Cannot load file " k) e) + (log/info + (s3/copy-object {:source-bucket-name (:data-bucket env) :destination-bucket-name (:data-bucket env) :source-key k - :destination-key invoice-key}) - [[:propose-invoice - (-> k - read-sysco-csv - (extract-invoice-details clients sysco-vendor) - (assoc :invoice/source-url invoice-url))]]) - (catch Exception e - (log/error (str "Cannot load file " k) e) - (log/info - (s3/copy-object {:source-bucket-name (:data-bucket env) - :destination-bucket-name (:data-bucket env) - :source-key k - :destination-key (doto (str "sysco/error/" - (.getName (io/file k))) - println)})) - []))))) - result @(d/transact conn transaction)] - (log/infof "Imported %d invoices" (/ (count (:tempids result)) 2))) - (doseq [k keys] - (mark-key k)) - (statsd/event {:title "Sysco import ended" - :text "Sysco completed" - :priority :low} nil)))) + :destination-key (doto (str "sysco/error/" + (.getName (io/file k))) + println)})) + []))))) + result @(d/transact conn transaction)] + (log/infof "Imported %d invoices" (/ (count (:tempids result)) 2))) + (doseq [k keys] + (mark-key k)) + (statsd/event {:title "Sysco import ended" + :text "Sysco completed" + :priority :low} nil))) (mount/defstate sysco-invoice-importer - :start (scheduler/every (* 1000 60 60) import-sysco) + :start (scheduler/every (* 1000 60 60) (heartbeat import-sysco "sysco-importer")) :stop (scheduler/stop sysco-invoice-importer)) diff --git a/src/clj/auto_ap/background/vendor.clj b/src/clj/auto_ap/background/vendor.clj index cb82ac49..a3f19fd4 100644 --- a/src/clj/auto_ap/background/vendor.clj +++ b/src/clj/auto_ap/background/vendor.clj @@ -1,44 +1,35 @@ (ns auto-ap.background.vendor - (:require [auto-ap.datomic.invoices :as d-invoices] - [auto-ap.datomic :refer [uri conn]] - [datomic.api :as d] - [auto-ap.time :as time] - [clj-time.coerce :as coerce] - [mount.core :as mount] - [yang.scheduler :as scheduler] - [unilog.context :as lc] - [clojure.tools.logging :as log])) + (:require + [auto-ap.datomic :refer [conn]] + [auto-ap.utils :refer [heartbeat]] + [datomic.api :as d] + [mount.core :as mount] + [yang.scheduler :as scheduler])) (defn refresh-vendor-usages [] - (lc/with-context {:source "refreshing vendor-usages"} - - (try - (->> {:query {:find ['?v '?c '(count ?e)] - :in ['$] - :where ['[?v :vendor/name] - '(or-join [?v ?c ?e] - (and - [?e :invoice/vendor ?v] - [?e :invoice/client ?c]) - (and - [?e :transaction/vendor ?v] - [?e :transaction/client ?c]) - (and - [?e :journal-entry/vendor ?v] - [?e :journal-entry/client ?c]))]} - :args [(d/db conn)]} - (d/query) - (map (fn [[v c cnt]] - #:vendor-usage {:vendor v - :client c - :key (str v "-" c) - :count cnt})) - (d/transact conn) - deref) - (log/info "updated vendor usage") - (catch Exception e - (log/error e))))) + (->> {:query {:find ['?v '?c '(count ?e)] + :in ['$] + :where ['[?v :vendor/name] + '(or-join [?v ?c ?e] + (and + [?e :invoice/vendor ?v] + [?e :invoice/client ?c]) + (and + [?e :transaction/vendor ?v] + [?e :transaction/client ?c]) + (and + [?e :journal-entry/vendor ?v] + [?e :journal-entry/client ?c]))]} + :args [(d/db conn)]} + (d/query) + (map (fn [[v c cnt]] + #:vendor-usage {:vendor v + :client c + :key (str v "-" c) + :count cnt})) + (d/transact conn) + deref)) (mount/defstate refresh-vendor-usages-worker - :start (scheduler/every (* 60 60 1000) refresh-vendor-usages) + :start (scheduler/every (* 60 60 1000) (heartbeat refresh-vendor-usages "vendor-usages")) :stop (scheduler/stop refresh-vendor-usages)) diff --git a/src/clj/auto_ap/graphql/clients.clj b/src/clj/auto_ap/graphql/clients.clj index 5f58cfbe..86ca0dd7 100644 --- a/src/clj/auto_ap/graphql/clients.clj +++ b/src/clj/auto_ap/graphql/clients.clj @@ -1,24 +1,26 @@ (ns auto-ap.graphql.clients - (:require [auto-ap.datomic :refer [audit-transact conn remove-nils]] - [auto-ap.datomic.clients :as d-clients] - [auto-ap.graphql.utils :refer [->graphql assert-admin can-see-client? is-admin?]] - [auto-ap.utils :refer [by]] - [auto-ap.yodlee.core :refer [in-memory-cache]] - [auto-ap.routes.queries :as q] - [auto-ap.square.core :as square] - [com.walmartlabs.lacinia.util :refer [attach-resolvers]] - [clj-time.coerce :as coerce] - [clojure.string :as str] - [unilog.context :as lc] - [clojure.tools.logging :as log] - [datomic.api :as d] - [clojure.java.io :as io] - [amazonica.aws.s3 :as s3] - #_{:clj-kondo/ignore [:unused-namespace]} - [yang.scheduler :as scheduler] - [mount.core :as mount]) - (:import [org.apache.commons.codec.binary Base64] - java.util.UUID)) + (:require + [amazonica.aws.s3 :as s3] + [auto-ap.datomic :refer [audit-transact conn remove-nils]] + [auto-ap.datomic.clients :as d-clients] + [auto-ap.graphql.utils + :refer [->graphql assert-admin can-see-client? is-admin?]] + [auto-ap.routes.queries :as q] + [auto-ap.square.core :as square] + [auto-ap.utils :refer [by heartbeat]] + [auto-ap.yodlee.core :refer [in-memory-cache]] + [clj-time.coerce :as coerce] + [clojure.java.io :as io] + [clojure.string :as str] + [clojure.tools.logging :as log] + [com.walmartlabs.lacinia.util :refer [attach-resolvers]] + [datomic.api :as d] + [mount.core :as mount] + [unilog.context :as lc] + [yang.scheduler :as scheduler]) + (:import + (java.util UUID) + (org.apache.commons.codec.binary Base64))) (defn assert-client-code-is-unique [code] (when (seq (d/query {:query {:find '[?id] @@ -245,15 +247,10 @@ (map first))))) (defn refresh-current-balance [] - (lc/with-context {:source "current-balance-cache"} - (try - (log/info "Refreshing running balance cache") - (build-current-balance (bank-accounts-needing-refresh)) - (catch Exception e - (log/error e))))) + (build-current-balance (bank-accounts-needing-refresh))) (mount/defstate current-balance-worker - :start (scheduler/every (* 17 60 1000) refresh-current-balance) + :start (scheduler/every (* 17 60 1000) (heartbeat refresh-current-balance "current-balance-cache")) :stop (scheduler/stop current-balance-worker)) (defn get-client [context _ _] diff --git a/src/clj/auto_ap/graphql/ledger.clj b/src/clj/auto_ap/graphql/ledger.clj index b9ab53a4..daf75023 100644 --- a/src/clj/auto_ap/graphql/ledger.clj +++ b/src/clj/auto_ap/graphql/ledger.clj @@ -8,16 +8,16 @@ [auto-ap.graphql.utils :refer [->graphql <-graphql assert-admin assert-can-see-client result->page]] [auto-ap.parse.util :as parse] - [auto-ap.utils :refer [by dollars=]] - [auto-ap.pdf.ledger :refer [print-pnl print-balance-sheet]] + [auto-ap.pdf.ledger :refer [print-balance-sheet print-pnl]] + [auto-ap.utils :refer [by dollars= heartbeat]] [clj-time.coerce :as coerce] + [clj-time.core :as t] [clojure.tools.logging :as log] [com.walmartlabs.lacinia.util :refer [attach-resolvers]] [datomic.api :as d] [mount.core :as mount] [unilog.context :as lc] - [yang.scheduler :as scheduler] - [clj-time.core :as t])) + [yang.scheduler :as scheduler])) (mount/defstate running-balance-cache :start (atom {})) @@ -525,16 +525,11 @@ (defn refresh-running-balance-cache [] - (lc/with-context {:source "running-balance-cache"} - (try - (log/info "Refreshing running balance cache") - (build-running-balance-cache) - (catch Exception e - (log/error e))))) + (build-running-balance-cache)) (mount/defstate running-balance-cache-worker - :start (scheduler/every (* 15 60 1000) refresh-running-balance-cache) + :start (scheduler/every (* 15 60 1000) (heartbeat refresh-running-balance-cache "running-balance-cache")) :stop (scheduler/stop running-balance-cache-worker)) diff --git a/src/clj/auto_ap/import/intuit.clj b/src/clj/auto_ap/import/intuit.clj index 61499ff7..5714b1f8 100644 --- a/src/clj/auto_ap/import/intuit.clj +++ b/src/clj/auto_ap/import/intuit.clj @@ -2,18 +2,18 @@ (:require [auto-ap.datomic :refer [conn]] [auto-ap.import.transactions :as t] - [auto-ap.time :as atime] [auto-ap.intuit.core :as i] - [auto-ap.utils :refer [allow-once]] + [auto-ap.time :as atime] + [auto-ap.utils :refer [allow-once heartbeat]] [clj-time.coerce :as coerce] [clj-time.core :as time] + [clojure.string :as str] + [clojure.tools.logging :as log] [com.unbounce.dogstatsd.core :as statsd] [datomic.api :as d] [mount.core :as mount] [unilog.context :as lc] - [yang.scheduler :as scheduler] - [clojure.string :as str] - [clojure.tools.logging :as log])) + [yang.scheduler :as scheduler])) (defn get-intuit-bank-accounts [db] (d/q '[:find ?external-id ?ba ?c @@ -47,34 +47,33 @@ (t/apply-synthetic-ids))) (defn import-intuit [] - (lc/with-context {:source "Import intuit transactions"} - (statsd/event {:title "Intuit import started" - :text "Starting" - :priority :low} - nil) - (let [import-batch (t/start-import-batch :import-source/intuit "Automated intuit user") - db (d/db conn) - end (auto-ap.time/local-now) - start (time/plus end (time/days -30))] - (try - (doseq [[external-id bank-account-id client-id] (get-intuit-bank-accounts db) - transaction (-> (i/get-transactions (auto-ap.time/unparse start auto-ap.time/iso-date) - (auto-ap.time/unparse end auto-ap.time/iso-date) - external-id) - (intuits->transactions bank-account-id client-id))] - (t/import-transaction! import-batch transaction)) - (t/finish! import-batch) - (statsd/event {:title "Intuit import Finished" - :text (pr-str (t/get-stats import-batch)) - :priority :low} - nil) - (catch Exception e - (t/fail! import-batch e) - (statsd/event {:title "Intuit import failed" - :text (str e) - :alert-type :warning - :priority :normal} - nil)))))) + (statsd/event {:title "Intuit import started" + :text "Starting" + :priority :low} + nil) + (let [import-batch (t/start-import-batch :import-source/intuit "Automated intuit user") + db (d/db conn) + end (auto-ap.time/local-now) + start (time/plus end (time/days -30))] + (try + (doseq [[external-id bank-account-id client-id] (get-intuit-bank-accounts db) + transaction (-> (i/get-transactions (auto-ap.time/unparse start auto-ap.time/iso-date) + (auto-ap.time/unparse end auto-ap.time/iso-date) + external-id) + (intuits->transactions bank-account-id client-id))] + (t/import-transaction! import-batch transaction)) + (t/finish! import-batch) + (statsd/event {:title "Intuit import Finished" + :text (pr-str (t/get-stats import-batch)) + :priority :low} + nil) + (catch Exception e + (t/fail! import-batch e) + (statsd/event {:title "Intuit import failed" + :text (str e) + :alert-type :warning + :priority :normal} + nil))))) (def upsert-transactions (allow-once upsert-transactions)) @@ -88,7 +87,7 @@ bank-accounts)))) (mount/defstate import-worker - :start (scheduler/every (* 1000 60 60 24) import-intuit) + :start (scheduler/every (* 1000 60 60 24) (heartbeat import-intuit "import-intuit")) :stop (scheduler/stop import-worker)) (mount/defstate account-worker diff --git a/src/clj/auto_ap/import/plaid.clj b/src/clj/auto_ap/import/plaid.clj index 9bf12632..991021c7 100644 --- a/src/clj/auto_ap/import/plaid.clj +++ b/src/clj/auto_ap/import/plaid.clj @@ -1,17 +1,15 @@ (ns auto-ap.import.plaid (:require [auto-ap.datomic :refer [conn]] - [auto-ap.plaid.core :as p] - [auto-ap.utils :refer [allow-once by]] [auto-ap.import.transactions :as t] + [auto-ap.plaid.core :as p] + [auto-ap.utils :refer [allow-once by heartbeat]] + [clj-time.coerce :as coerce] [clj-time.core :as time] - [clojure.tools.logging :as log] [datomic.api :as d] [mount.core :as mount] [unilog.context :as lc] - [yang.scheduler :as scheduler] - [clj-time.coerce :as coerce])) - + [yang.scheduler :as scheduler])) (defn get-plaid-accounts [db] (-> (d/q '[:find ?ba ?c ?external-id ?t @@ -59,7 +57,7 @@ (def import-plaid (allow-once import-plaid)) (mount/defstate import-worker - :start (scheduler/every (* 1000 60 60 3) import-plaid) + :start (scheduler/every (* 1000 60 60 3) (heartbeat import-plaid "import-plaid")) :stop (scheduler/stop import-worker)) diff --git a/src/clj/auto_ap/import/yodlee.clj b/src/clj/auto_ap/import/yodlee.clj index d06fbb54..192303f3 100644 --- a/src/clj/auto_ap/import/yodlee.clj +++ b/src/clj/auto_ap/import/yodlee.clj @@ -3,7 +3,7 @@ [auto-ap.datomic :refer [conn]] [auto-ap.import.transactions :as t] [auto-ap.time :as atime] - [auto-ap.utils :refer [allow-once]] + [auto-ap.utils :refer [allow-once heartbeat]] [auto-ap.yodlee.core :as client] [clj-time.coerce :as coerce] [clojure.string :as str] @@ -45,43 +45,42 @@ :status status})) (defn import-yodlee [] - (lc/with-context {:source "Import yodlee transactions"} - (statsd/event {:title "Yodlee import started" - :text "Starting" - :priority :low} - nil) - (let [import-batch (t/start-import-batch :import-source/yodlee "Automated yodlee user")] - (try - (let [account-lookup (d/q '[:find ?ya ?ba ?c - :in $ - :where [?ba :bank-account/yodlee-account-id ?ya] - [?c :client/bank-accounts ?ba]] - (d/db conn))] - (doseq [[yodlee-account bank-account client-id] account-lookup - transaction (try - (client/get-specific-transactions yodlee-account (client/get-auth-header)) - (catch Exception e - (log/warn e) - []))] - (t/import-transaction! import-batch (assoc (yodlee->transaction transaction) + (statsd/event {:title "Yodlee import started" + :text "Starting" + :priority :low} + nil) + (let [import-batch (t/start-import-batch :import-source/yodlee "Automated yodlee user")] + (try + (let [account-lookup (d/q '[:find ?ya ?ba ?c + :in $ + :where [?ba :bank-account/yodlee-account-id ?ya] + [?c :client/bank-accounts ?ba]] + (d/db conn))] + (doseq [[yodlee-account bank-account client-id] account-lookup + transaction (try + (client/get-specific-transactions yodlee-account (client/get-auth-header)) + (catch Exception e + (log/warn e) + []))] + (t/import-transaction! import-batch (assoc (yodlee->transaction transaction) :transaction/bank-account bank-account :transaction/client client-id))) - (t/finish! import-batch)) - (statsd/event {:title "Yodlee import Finished" - :text (pr-str (t/get-stats import-batch)) - :priority :low} - nil) - (catch Exception e - (t/fail! import-batch e) - (statsd/event {:title "Yodlee import failed" - :text (str e) - :alert-type :warning - :priority :normal} - nil)))))) + (t/finish! import-batch)) + (statsd/event {:title "Yodlee import Finished" + :text (pr-str (t/get-stats import-batch)) + :priority :low} + nil) + (catch Exception e + (t/fail! import-batch e) + (statsd/event {:title "Yodlee import failed" + :text (str e) + :alert-type :warning + :priority :normal} + nil))))) (def import-yodlee (allow-once import-yodlee)) (mount/defstate import-worker - :start (scheduler/every (* 1000 60 60 8) import-yodlee) + :start (scheduler/every (* 1000 60 60 8) (heartbeat import-yodlee "import-yodlee")) :stop (scheduler/stop import-worker)) diff --git a/src/clj/auto_ap/import/yodlee2.clj b/src/clj/auto_ap/import/yodlee2.clj index dd06c009..5e7d6d92 100644 --- a/src/clj/auto_ap/import/yodlee2.clj +++ b/src/clj/auto_ap/import/yodlee2.clj @@ -3,7 +3,7 @@ [auto-ap.datomic :refer [conn]] [auto-ap.import.transactions :as t] [auto-ap.import.yodlee :as y] - [auto-ap.utils :refer [allow-once]] + [auto-ap.utils :refer [allow-once heartbeat]] [auto-ap.yodlee.core2 :as client2] [com.unbounce.dogstatsd.core :as statsd] [datomic.api :as d] @@ -53,9 +53,9 @@ (mount/defstate import-worker - :start (scheduler/every (* 1000 60 60 4) import-yodlee2) + :start (scheduler/every (* 1000 60 60 4) (heartbeat import-yodlee2 "import-yodlee")) :stop (scheduler/stop import-worker)) (mount/defstate account-worker - :start (scheduler/every (* 5 60 1000) client2/upsert-accounts) + :start (scheduler/every (* 5 60 1000) (heartbeat client2/upsert-accounts "upsert-yodlee2-accounts")) :stop (scheduler/stop account-worker)) diff --git a/src/clj/auto_ap/ledger.clj b/src/clj/auto_ap/ledger.clj index 7faa6247..062e0985 100644 --- a/src/clj/auto_ap/ledger.clj +++ b/src/clj/auto_ap/ledger.clj @@ -196,47 +196,41 @@ :stop (-> process-txes-worker :running? (reset! false))) (defn reconcile-ledger [] - (lc/with-context {:source "reconcile-ledger"} - (try - (log/info "Attempting to reconcile the ledger") - (let [txes-missing-ledger-entries (->> (d/query {:query {:find ['?t ] - :in ['$] - :where ['[?t :transaction/date] - '[?t :transaction/amount ?amt] + (let [txes-missing-ledger-entries (->> (d/query {:query {:find ['?t ] + :in ['$] + :where ['[?t :transaction/date] + '[?t :transaction/amount ?amt] + '[(not= 0.0 ?amt)] + '(not [?t :transaction/approval-status :transaction-approval-status/excluded]) + '(not [?t :transaction/approval-status :transaction-approval-status/suppressed]) + '(not-join [?t] [?e :journal-entry/original-entity ?t])]} + :args [(d/db conn)]}) + (map first) + (mapv #(entity-change->ledger (d/db conn) [:transaction %]))) + + + invoices-missing-ledger-entries (->> (d/query {:query {:find ['?t ] + :in ['$] + :where ['[?t :invoice/date] + '[?t :invoice/total ?amt] '[(not= 0.0 ?amt)] - '(not [?t :transaction/approval-status :transaction-approval-status/excluded]) - '(not [?t :transaction/approval-status :transaction-approval-status/suppressed]) + '(not [?t :invoice/status :invoice-status/voided]) + '(not [?t :invoice/import-status :import-status/pending]) + '(not [?t :invoice/exclude-from-ledger true]) '(not-join [?t] [?e :journal-entry/original-entity ?t])]} - :args [(d/db conn)]}) + :args [(d/db conn)]}) (map first) - (mapv #(entity-change->ledger (d/db conn) [:transaction %]))) - - - invoices-missing-ledger-entries (->> (d/query {:query {:find ['?t ] - :in ['$] - :where ['[?t :invoice/date] - '[?t :invoice/total ?amt] - '[(not= 0.0 ?amt)] - '(not [?t :invoice/status :invoice-status/voided]) - '(not [?t :invoice/import-status :import-status/pending]) - '(not [?t :invoice/exclude-from-ledger true]) - '(not-join [?t] [?e :journal-entry/original-entity ?t])]} - :args [(d/db conn)]}) - (map first) - (mapv #(entity-change->ledger (d/db conn) [:invoice %]))) - repairs (vec (concat txes-missing-ledger-entries invoices-missing-ledger-entries))] - (when (seq repairs) - (log/info (take 3 repairs)) - (log/warn "repairing " (count txes-missing-ledger-entries) " missing transactions, " (count invoices-missing-ledger-entries) " missing invoices that were missing ledger entries") - @(d/transact conn repairs)) - (log/info "Finished reconciling ledger")) - (catch Exception e - (log/error e))))) + (mapv #(entity-change->ledger (d/db conn) [:invoice %]))) + repairs (vec (concat txes-missing-ledger-entries invoices-missing-ledger-entries))] + (when (seq repairs) + (log/info (take 3 repairs)) + (log/warn "repairing " (count txes-missing-ledger-entries) " missing transactions, " (count invoices-missing-ledger-entries) " missing invoices that were missing ledger entries") + @(d/transact conn repairs)))) (mount/defstate reconciliation-frequency :start (* 1000 60 60)) (mount/defstate ledger-reconciliation-worker - :start (scheduler/every reconciliation-frequency reconcile-ledger) + :start (scheduler/every reconciliation-frequency (heartbeat reconcile-ledger "reconcile-ledger")) :stop (scheduler/stop ledger-reconciliation-worker)) @@ -371,56 +365,52 @@ invoice-accounts))) (defn touch-broken-ledger [] - (lc/with-context {:source "touch-broken-ledger"} - (statsd/event {:title "Reconciling Ledger" - :text "This process looks for unbalance ledger entries, or missing ledger entries" - :priority :low} - nil) - (try - (log/info "Attempting to fix transactions that are in the ledger but are wrong") - (let [mismatched-ts (mismatched-transactions)] - (if (seq mismatched-ts) - (do - (log/warn (count mismatched-ts) " transactions exist but don't match ledger " (pr-str (take 10 mismatched-ts) )) - (doseq [[m] mismatched-ts] - (touch-transaction m)) - (statsd/gauge "data.mismatched_transactions" (count (mismatched-transactions)))) - (statsd/gauge "data.mismatched_transactions" 0.0))) - (log/info "Attempting to fix transactions that are in the ledger but debits/credits don't add up") - (let [unbalanced-ts (unbalanced-transactions)] - (if (seq unbalanced-ts) - (do - (log/warn (count unbalanced-ts) " transactions exist but don't have matching debits/credits (" (pr-str (take 10 unbalanced-ts) ) ")") - (doseq [m unbalanced-ts] - (touch-transaction m)) - (statsd/gauge "data.unbalanced_transactions" (count (unbalanced-transactions)))) - (statsd/gauge "data.unbalanced_transactions" 0.0))) - (log/info "Finished fixing transactions that are in the ledger but are wrong") - (let [mismatched-is (mismatched-invoices)] - (if (seq mismatched-is) - (do - (log/warn (count mismatched-is) " invoice exist but don't match ledger ") - (doseq [[m] mismatched-is] - (touch-invoice m)) - (statsd/gauge "data.mismatched_invoices" (count (mismatched-invoices)))) - (statsd/gauge "data.mismatched_invoices" 0.0))) - (log/info "Attempting to fix transactions that are in the ledger but debits/credits don't add up") - (let [unbalanced-invoices (unbalanced-invoices)] - (if (seq unbalanced-invoices) - (do - (log/warn (count unbalanced-invoices) " invoices exist but don't have matching debits/credits ") - (doseq [m unbalanced-invoices] - (touch-invoice m)) - (statsd/gauge "data.unbalanced_invoices" (count (unbalanced-invoices)))) - (statsd/gauge "data.unbalanced_invoices" 0.0))) + (statsd/event {:title "Reconciling Ledger" + :text "This process looks for unbalance ledger entries, or missing ledger entries" + :priority :low} + nil) + (log/info "Attempting to fix transactions that are in the ledger but are wrong") + (let [mismatched-ts (mismatched-transactions)] + (if (seq mismatched-ts) + (do + (log/warn (count mismatched-ts) " transactions exist but don't match ledger " (pr-str (take 10 mismatched-ts) )) + (doseq [[m] mismatched-ts] + (touch-transaction m)) + (statsd/gauge "data.mismatched_transactions" (count (mismatched-transactions)))) + (statsd/gauge "data.mismatched_transactions" 0.0))) + (log/info "Attempting to fix transactions that are in the ledger but debits/credits don't add up") + (let [unbalanced-ts (unbalanced-transactions)] + (if (seq unbalanced-ts) + (do + (log/warn (count unbalanced-ts) " transactions exist but don't have matching debits/credits (" (pr-str (take 10 unbalanced-ts) ) ")") + (doseq [m unbalanced-ts] + (touch-transaction m)) + (statsd/gauge "data.unbalanced_transactions" (count (unbalanced-transactions)))) + (statsd/gauge "data.unbalanced_transactions" 0.0))) + (log/info "Finished fixing transactions that are in the ledger but are wrong") + (let [mismatched-is (mismatched-invoices)] + (if (seq mismatched-is) + (do + (log/warn (count mismatched-is) " invoice exist but don't match ledger ") + (doseq [[m] mismatched-is] + (touch-invoice m)) + (statsd/gauge "data.mismatched_invoices" (count (mismatched-invoices)))) + (statsd/gauge "data.mismatched_invoices" 0.0))) + (log/info "Attempting to fix transactions that are in the ledger but debits/credits don't add up") + (let [unbalanced-invoices (unbalanced-invoices)] + (if (seq unbalanced-invoices) + (do + (log/warn (count unbalanced-invoices) " invoices exist but don't have matching debits/credits ") + (doseq [m unbalanced-invoices] + (touch-invoice m)) + (statsd/gauge "data.unbalanced_invoices" (count (unbalanced-invoices)))) + (statsd/gauge "data.unbalanced_invoices" 0.0))) - (log/info "Finish fixing invoices that are in the ledger but are wrong") - (statsd/event {:title "Finished Reconciling Ledger" - :text "This process looks for unbalance ledger entries, or missing ledger entries" - :priority :low} - nil) - (catch Exception e - (log/error e))))) + (log/info "Finish fixing invoices that are in the ledger but are wrong") + (statsd/event {:title "Finished Reconciling Ledger" + :text "This process looks for unbalance ledger entries, or missing ledger entries" + :priority :low} + nil)) (mount/defstate touch-broken-ledger-worker :start (scheduler/every reconciliation-frequency (heartbeat touch-broken-ledger "touch-broken-ledger")) diff --git a/src/clj/auto_ap/plaid/import.clj b/src/clj/auto_ap/plaid/import.clj deleted file mode 100644 index f124d709..00000000 --- a/src/clj/auto_ap/plaid/import.clj +++ /dev/null @@ -1,59 +0,0 @@ -(ns auto-ap.plaid.import - (:require - [auto-ap.datomic :refer [conn]] - [auto-ap.plaid.core :as p] - [auto-ap.utils :refer [allow-once]] - [auto-ap.yodlee.import :as y] - [clj-time.core :as time] - [clojure.tools.logging :as log] - [datomic.api :as d] - [mount.core :as mount] - [unilog.context :as lc] - [yang.scheduler :as scheduler] - [clj-time.coerce :as coerce])) - -(defn get-plaid-accounts [db] - (-> (d/q '[:find ?ba ?c ?external-id ?t - :in $ - :where - [?c :client/bank-accounts ?ba] - [?ba :bank-account/plaid-account ?pa] - [?pa :plaid-account/external-id ?external-id] - [?pi :plaid-item/accounts ?pa] - [?pi :plaid-item/access-token ?t]] - db ))) - - -(defn plaid->transaction [t] - #:transaction {:description-original (:name t) - :raw-id (:transaction_id t) - :id (digest/sha-256 (:transaction_id t)) - :amount (double (:amount t)) - :date (coerce/to-date (auto-ap.time/parse (:date t) auto-ap.time/iso-date)) - :status "POSTED"}) - - -(defn import-plaid [] - (lc/with-context {:source "Import plaid transactions"} - (let [import-batch (y/start-import-batch :import-source/plaid "Automated plaid user") - end (auto-ap.time/local-now) - start (time/plus end (time/days -30))] - (try - (doseq [[bank-account-id client-id external-id access-token] (get-plaid-accounts (d/db conn)) - transaction (:transactions (p/get-transactions access-token external-id start end))] - (when (not (:pending transaction)) - (y/import-transaction! import-batch (assoc (plaid->transaction transaction) - :transaction/bank-account bank-account-id - :transaction/client client-id)))) - (y/finish! import-batch) - (catch Exception e - (y/fail! import-batch e)))))) - -(def import-plaid (allow-once import-plaid)) - -(mount/defstate import-worker - :start (scheduler/every (* 1000 60 60 3) import-plaid) - :stop (scheduler/stop import-worker)) - - - diff --git a/src/clj/auto_ap/square/core.clj b/src/clj/auto_ap/square/core.clj index 3e3893f3..d5e44749 100644 --- a/src/clj/auto_ap/square/core.clj +++ b/src/clj/auto_ap/square/core.clj @@ -491,8 +491,7 @@ (defn upsert-all [] (doseq [client (get-square-clients) :when (seq (filter :square-location/client-location (:client/square-locations client)))] - (lc/with-context {:source "Square loading" - :client (:client/code client)} + (lc/with-context {:client (:client/code client)} (upsert-locations client) (log/info "Loading Orders") (upsert client) diff --git a/src/cljc/auto_ap/utils.cljc b/src/cljc/auto_ap/utils.cljc index 2ec226f5..bd5a49c9 100644 --- a/src/cljc/auto_ap/utils.cljc +++ b/src/cljc/auto_ap/utils.cljc @@ -1,7 +1,9 @@ (ns auto-ap.utils #?@ (:clj - [(:require [com.unbounce.dogstatsd.core :as statsd])])) + [(:require [com.unbounce.dogstatsd.core :as statsd] + [clojure.tools.logging :as log] + [unilog.context :as lc])])) (defn by ([f xs] @@ -87,9 +89,19 @@ (defn heartbeat [f id] (fn [] #?(:clj (do - (f) - (statsd/service-check {:name (str id) - :status :ok} - nil)) + (lc/with-context {:source id} + (try + (log/info "Starting background process " id) + (f) + (log/info "Completed background process " id) + (statsd/service-check {:name (str id) + :status :ok} + nil) + (catch Exception e + (log/error e) + (statsd/service-check {:name (str id) + :status :critical} + nil))))) + :cljs (do (println "Heartbeat for " id) (f)))))