much more background process tracking

This commit is contained in:
2022-06-22 10:43:37 -07:00
parent 480066b202
commit 1db8d7a52c
15 changed files with 379 additions and 486 deletions

View File

@@ -1,37 +1,32 @@
(ns auto-ap.background.invoices
(:require [auto-ap.datomic.invoices :as d-invoices]
[auto-ap.datomic :refer [uri conn]]
[datomic.api :as d]
[auto-ap.time :as time]
[clj-time.coerce :as coerce]
[mount.core :as mount]
[yang.scheduler :as scheduler]
[unilog.context :as lc]
[clojure.tools.logging :as log]))
(:require
[auto-ap.datomic :refer [conn]]
[auto-ap.time :as time]
[auto-ap.utils :refer [heartbeat]]
[clj-time.coerce :as coerce]
[clojure.tools.logging :as log]
[datomic.api :as d]
[mount.core :as mount]
[yang.scheduler :as scheduler]))
(defn close-auto-invoices []
(lc/with-context {:source "close-auto-invoices"}
(try
(let [invoices-to-close (d/query {:query {:find ['?e]
:in ['$ '?today]
:where ['[?e :invoice/scheduled-payment ?d]
'[?e :invoice/status :invoice-status/unpaid]
'[(<= ?d ?today)]]}
:args [(d/db conn) (coerce/to-date (time/local-now))]})]
(log/info "Closing " (count invoices-to-close) "scheduled invoices")
(some->> invoices-to-close
seq
(let [invoices-to-close (d/query {:query {:find ['?e]
:in ['$ '?today]
:where ['[?e :invoice/scheduled-payment ?d]
'[?e :invoice/status :invoice-status/unpaid]
'[(<= ?d ?today)]]}
:args [(d/db conn) (coerce/to-date (time/local-now))]})]
(log/info "Closing " (count invoices-to-close) "scheduled invoices")
(some->> invoices-to-close
seq
(mapv (fn [[i]] {:db/id i
:invoice/outstanding-balance 0.0
:invoice/status :invoice-status/paid}))
(d/transact conn)
deref)
(log/info "Closed " (count invoices-to-close) "scheduled invoices"))
(catch Exception e
(log/error e)))))
(mapv (fn [[i]] {:db/id i
:invoice/outstanding-balance 0.0
:invoice/status :invoice-status/paid}))
(d/transact conn)
deref)
(log/info "Closed " (count invoices-to-close) "scheduled invoices")))
(mount/defstate close-auto-invoices-worker
:start (scheduler/every 60000 close-auto-invoices)
:start (scheduler/every 60000 (heartbeat close-auto-invoices "close-auto-invoices"))
:stop (scheduler/stop close-auto-invoices-worker))

View File

@@ -1,22 +1,24 @@
(ns auto-ap.background.mail
(:require [amazonica.aws.s3 :as s3]
[amazonica.aws.sqs :as sqs]
[amazonica.aws.simpleemail :as ses]
[auto-ap.parse :as parse]
[auto-ap.routes.invoices :as invoices]
[clojure-mail.message :as message]
[clojure.data.json :as json]
[clojure.java.io :as io]
[clojure.string :as str]
[config.core :refer [env]]
[clojure.tools.logging :as log]
[unilog.context :as lc]
[mount.core :as mount]
[yang.scheduler :as scheduler])
(:import (java.util Properties UUID)
(javax.mail Session)
(javax.mail.internet MimeMessage)))
(:require
[amazonica.aws.s3 :as s3]
[amazonica.aws.simpleemail :as ses]
[amazonica.aws.sqs :as sqs]
[auto-ap.parse :as parse]
[auto-ap.routes.invoices :as invoices]
[auto-ap.utils :refer [heartbeat]]
[clojure-mail.message :as message]
[clojure.data.json :as json]
[clojure.java.io :as io]
[clojure.string :as str]
[clojure.tools.logging :as log]
[config.core :refer [env]]
[mount.core :as mount]
[unilog.context :as lc]
[yang.scheduler :as scheduler])
(:import
(java.util Properties UUID)
(javax.mail Session)
(javax.mail.internet MimeMessage)))
(defn send-email-about-failed-message [mail-bucket mail-key]
(let [target-key (str "failed-emails/" mail-key ".eml")
@@ -33,54 +35,49 @@
(defn process-sqs []
(lc/with-context {:source "import-uploaded-invoices"}
(try
(log/info "Fetching messages from sqs...")
(doseq [message (:messages (sqs/receive-message {:queue-url (:invoice-import-queue-url env)
:wait-time-seconds 5
:max-number-of-messages 10
#_#_:attribute-names ["All"]}))]
(let [message-body (json/read-str (:body message)
:key-fn keyword)]
(doseq [r (:Records message-body)]
(log/info "Processing record " r)
(let [props (Session/getDefaultInstance (Properties.))
message-stream (-> (s3/get-object {:key (-> r :s3 :object :key)
:bucket-name (-> r :s3 :bucket :name)})
:input-stream)
mail (message/read-message (MimeMessage. props message-stream))]
(log/info "reading mail" (->> mail :body (filter :content-type) (map :body) ))
(doseq [pdf-stream (->> (-> mail :body)
(filter :content-type)
#_(filter #(re-find #"application/pdf" (:content-type %)) ))
:let [filename (str "/tmp/" (UUID/randomUUID) ".pdf")]]
(try
(let [
_ (io/copy (:body pdf-stream) (io/file filename))
extension (last (str/split (.getName (io/file filename)) #"\."))
s3-location (str "invoice-files/" (str (UUID/randomUUID)) "." extension)
_ (s3/put-object :bucket-name (:data-bucket env)
:key s3-location
:input-stream (io/input-stream filename)
:metadata {:content-type "application/pdf"})
imports (->> (parse/parse-file filename filename)
(map #(assoc %
:source-url (str "http://" (:data-bucket env)
".s3-website-us-east-1.amazonaws.com/"
s3-location)
:import-status :import-status/approved)))]
(log/info "Found imports" imports)
(invoices/import-uploaded-invoice {:user/role "admin"} imports ))
(catch Exception e
(log/warn e)
(send-email-about-failed-message (-> r :s3 :bucket :name) (-> r :s3 :object :key)))
(finally
(io/delete-file filename)))))))
(sqs/delete-message (assoc message :queue-url (:invoice-import-queue-url env) )))
(catch Exception e
(log/error e)))))
(log/info "Fetching messages from sqs...")
(doseq [message (:messages (sqs/receive-message {:queue-url (:invoice-import-queue-url env)
:wait-time-seconds 5
:max-number-of-messages 10
#_#_:attribute-names ["All"]}))]
(let [message-body (json/read-str (:body message)
:key-fn keyword)]
(doseq [r (:Records message-body)]
(log/info "Processing record " r)
(let [props (Session/getDefaultInstance (Properties.))
message-stream (-> (s3/get-object {:key (-> r :s3 :object :key)
:bucket-name (-> r :s3 :bucket :name)})
:input-stream)
mail (message/read-message (MimeMessage. props message-stream))]
(log/info "reading mail" (->> mail :body (filter :content-type) (map :body) ))
(doseq [pdf-stream (->> (-> mail :body)
(filter :content-type)
#_(filter #(re-find #"application/pdf" (:content-type %)) ))
:let [filename (str "/tmp/" (UUID/randomUUID) ".pdf")]]
(try
(let [
_ (io/copy (:body pdf-stream) (io/file filename))
extension (last (str/split (.getName (io/file filename)) #"\."))
s3-location (str "invoice-files/" (str (UUID/randomUUID)) "." extension)
_ (s3/put-object :bucket-name (:data-bucket env)
:key s3-location
:input-stream (io/input-stream filename)
:metadata {:content-type "application/pdf"})
imports (->> (parse/parse-file filename filename)
(map #(assoc %
:source-url (str "http://" (:data-bucket env)
".s3-website-us-east-1.amazonaws.com/"
s3-location)
:import-status :import-status/approved)))]
(log/info "Found imports" imports)
(invoices/import-uploaded-invoice {:user/role "admin"} imports ))
(catch Exception e
(log/warn e)
(send-email-about-failed-message (-> r :s3 :bucket :name) (-> r :s3 :object :key)))
(finally
(io/delete-file filename)))))))
(sqs/delete-message (assoc message :queue-url (:invoice-import-queue-url env) ))))
(mount/defstate import-invoices
:start (scheduler/every (* 60 5000) process-sqs)
:start (scheduler/every (* 60 5000) (heartbeat process-sqs "import-uploaded-invoices"))
:stop (scheduler/stop import-invoices))

View File

@@ -1,62 +1,44 @@
(ns auto-ap.background.requests
(:require
[amazonica.aws.sqs :as sqs]
[config.core :refer [env]]
[mount.core :as mount]
[yang.scheduler :as scheduler]
[auto-ap.yodlee.core2 :as client2]
[clojure.tools.logging :as log]
[auto-ap.import.intuit :as i]
[auto-ap.import.plaid :as p]
[unilog.context :as lc]
[auto-ap.import.yodlee :as y]
[auto-ap.import.yodlee2 :as y2]
))
[auto-ap.utils :refer [heartbeat]]
[clojure.tools.logging :as log]
[config.core :refer [env]]
[mount.core :as mount]
[yang.scheduler :as scheduler]))
(def queue-url (:requests-queue-url env))
(defn process-1 []
(lc/with-context {:source "Request poller"}
(log/info "Checking SQS...")
(let [[{:keys [message-id receipt-handle body]}] (:messages (sqs/receive-message {:queue-url queue-url
:wait-time-seconds 1
:count 1}))]
(when message-id
(sqs/delete-message {:queue-url queue-url
:receipt-handle receipt-handle} )
(log/infof "processing message %s with body %s" message-id body )
(cond
(= ":intuit" body)
(try
(i/import-intuit)
(catch Exception e
(log/error e)))
(let [[{:keys [message-id receipt-handle body]}] (:messages (sqs/receive-message {:queue-url queue-url
:wait-time-seconds 1
:count 1}))]
(when message-id
(sqs/delete-message {:queue-url queue-url
:receipt-handle receipt-handle} )
(log/infof "processing message %s with body %s" message-id body )
(cond
(= ":intuit" body)
(i/import-intuit)
(= ":yodlee" body)
(try
(y/import-yodlee)
(catch Exception e
(log/error e)))
(= ":yodlee" body)
(y/import-yodlee)
(= ":yodlee2" body)
(try
(client2/upsert-accounts)
(y2/import-yodlee2)
(catch Exception e
(log/error e)))
(= ":yodlee2" body)
(y2/import-yodlee2)
(= ":plaid" body)
(try
(p/import-plaid)
(catch Exception e
(log/error e))))
))))
(= ":plaid" body)
(p/import-plaid)))))
(defn fake-message []
(sqs/send-message {:queue-url (:requests-queue-url env)
(sqs/send-message {:queue-url (:requests-queue-url env)
:message-body ":intuit"} ))
(mount/defstate request-listener
:start (scheduler/every (* 1000 30) process-1)
:start (scheduler/every (* 1000 30) (heartbeat process-1 "request-poller"))
:stop (scheduler/stop request-listener))

View File

@@ -6,6 +6,7 @@
[auto-ap.datomic.invoices :refer [code-invoice]]
[auto-ap.parse :as parse]
[auto-ap.time :as t]
[auto-ap.utils :refer [heartbeat]]
[clj-time.coerce :as coerce]
[clojure.data.csv :as csv]
[clojure.java.io :as io]
@@ -15,8 +16,6 @@
[config.core :refer [env]]
[datomic.api :as d]
[mount.core :as mount]
[unilog.context :as lc]
#_{:clj-kondo/ignore [:unused-namespace]}
[yang.scheduler :as scheduler])
(:import
(java.util UUID)))
@@ -111,54 +110,53 @@
(defn import-sysco []
(lc/with-context {:source "sysco-importer"}
(let [sysco-vendor (get-sysco-vendor)
clients (d-clients/get-all)
keys (->> (s3/list-objects-v2 {:bucket-name bucket-name
:prefix "sysco/pending"})
:object-summaries
(map :key))]
(let [sysco-vendor (get-sysco-vendor)
clients (d-clients/get-all)
keys (->> (s3/list-objects-v2 {:bucket-name bucket-name
:prefix "sysco/pending"})
:object-summaries
(map :key))]
(statsd/event {:title "Sysco import started"
:text (format "Found %d sysco invoice to import: %s" (count keys) (pr-str keys))
:priority :low}
nil)
(log/infof "Found %d sysco invoice to import: %s" (count keys) (pr-str keys))
(let [transaction (->> keys
(mapcat (fn [k]
(try
(let [invoice-key (str "invoice-files/" (UUID/randomUUID) ".csv") ;
invoice-url (str "http://" (:data-bucket env) ".s3-website-us-east-1.amazonaws.com/" invoice-key)]
(s3/copy-object {:source-bucket-name (:data-bucket env)
(statsd/event {:title "Sysco import started"
:text (format "Found %d sysco invoice to import: %s" (count keys) (pr-str keys))
:priority :low}
nil)
(log/infof "Found %d sysco invoice to import: %s" (count keys) (pr-str keys))
(let [transaction (->> keys
(mapcat (fn [k]
(try
(let [invoice-key (str "invoice-files/" (UUID/randomUUID) ".csv") ;
invoice-url (str "http://" (:data-bucket env) ".s3-website-us-east-1.amazonaws.com/" invoice-key)]
(s3/copy-object {:source-bucket-name (:data-bucket env)
:destination-bucket-name (:data-bucket env)
:source-key k
:destination-key invoice-key})
[[:propose-invoice
(-> k
read-sysco-csv
(extract-invoice-details clients sysco-vendor)
(assoc :invoice/source-url invoice-url))]])
(catch Exception e
(log/error (str "Cannot load file " k) e)
(log/info
(s3/copy-object {:source-bucket-name (:data-bucket env)
:destination-bucket-name (:data-bucket env)
:source-key k
:destination-key invoice-key})
[[:propose-invoice
(-> k
read-sysco-csv
(extract-invoice-details clients sysco-vendor)
(assoc :invoice/source-url invoice-url))]])
(catch Exception e
(log/error (str "Cannot load file " k) e)
(log/info
(s3/copy-object {:source-bucket-name (:data-bucket env)
:destination-bucket-name (:data-bucket env)
:source-key k
:destination-key (doto (str "sysco/error/"
(.getName (io/file k)))
println)}))
[])))))
result @(d/transact conn transaction)]
(log/infof "Imported %d invoices" (/ (count (:tempids result)) 2)))
(doseq [k keys]
(mark-key k))
(statsd/event {:title "Sysco import ended"
:text "Sysco completed"
:priority :low} nil))))
:destination-key (doto (str "sysco/error/"
(.getName (io/file k)))
println)}))
[])))))
result @(d/transact conn transaction)]
(log/infof "Imported %d invoices" (/ (count (:tempids result)) 2)))
(doseq [k keys]
(mark-key k))
(statsd/event {:title "Sysco import ended"
:text "Sysco completed"
:priority :low} nil)))
(mount/defstate sysco-invoice-importer
:start (scheduler/every (* 1000 60 60) import-sysco)
:start (scheduler/every (* 1000 60 60) (heartbeat import-sysco "sysco-importer"))
:stop (scheduler/stop sysco-invoice-importer))

View File

@@ -1,44 +1,35 @@
(ns auto-ap.background.vendor
(:require [auto-ap.datomic.invoices :as d-invoices]
[auto-ap.datomic :refer [uri conn]]
[datomic.api :as d]
[auto-ap.time :as time]
[clj-time.coerce :as coerce]
[mount.core :as mount]
[yang.scheduler :as scheduler]
[unilog.context :as lc]
[clojure.tools.logging :as log]))
(:require
[auto-ap.datomic :refer [conn]]
[auto-ap.utils :refer [heartbeat]]
[datomic.api :as d]
[mount.core :as mount]
[yang.scheduler :as scheduler]))
(defn refresh-vendor-usages []
(lc/with-context {:source "refreshing vendor-usages"}
(try
(->> {:query {:find ['?v '?c '(count ?e)]
:in ['$]
:where ['[?v :vendor/name]
'(or-join [?v ?c ?e]
(and
[?e :invoice/vendor ?v]
[?e :invoice/client ?c])
(and
[?e :transaction/vendor ?v]
[?e :transaction/client ?c])
(and
[?e :journal-entry/vendor ?v]
[?e :journal-entry/client ?c]))]}
:args [(d/db conn)]}
(d/query)
(map (fn [[v c cnt]]
#:vendor-usage {:vendor v
:client c
:key (str v "-" c)
:count cnt}))
(d/transact conn)
deref)
(log/info "updated vendor usage")
(catch Exception e
(log/error e)))))
(->> {:query {:find ['?v '?c '(count ?e)]
:in ['$]
:where ['[?v :vendor/name]
'(or-join [?v ?c ?e]
(and
[?e :invoice/vendor ?v]
[?e :invoice/client ?c])
(and
[?e :transaction/vendor ?v]
[?e :transaction/client ?c])
(and
[?e :journal-entry/vendor ?v]
[?e :journal-entry/client ?c]))]}
:args [(d/db conn)]}
(d/query)
(map (fn [[v c cnt]]
#:vendor-usage {:vendor v
:client c
:key (str v "-" c)
:count cnt}))
(d/transact conn)
deref))
(mount/defstate refresh-vendor-usages-worker
:start (scheduler/every (* 60 60 1000) refresh-vendor-usages)
:start (scheduler/every (* 60 60 1000) (heartbeat refresh-vendor-usages "vendor-usages"))
:stop (scheduler/stop refresh-vendor-usages))