More background task cleanup
This commit is contained in:
@@ -1,32 +0,0 @@
|
||||
(ns auto-ap.background.invoices
|
||||
(:require
|
||||
[auto-ap.datomic :refer [conn]]
|
||||
[auto-ap.time :as time]
|
||||
[auto-ap.utils :refer [heartbeat]]
|
||||
[clj-time.coerce :as coerce]
|
||||
[clojure.tools.logging :as log]
|
||||
[datomic.api :as d]
|
||||
[mount.core :as mount]
|
||||
[yang.scheduler :as scheduler]))
|
||||
|
||||
(defn close-auto-invoices []
|
||||
(let [invoices-to-close (d/query {:query {:find ['?e]
|
||||
:in ['$ '?today]
|
||||
:where ['[?e :invoice/scheduled-payment ?d]
|
||||
'[?e :invoice/status :invoice-status/unpaid]
|
||||
'[(<= ?d ?today)]]}
|
||||
:args [(d/db conn) (coerce/to-date (time/local-now))]})]
|
||||
(log/info "Closing " (count invoices-to-close) "scheduled invoices")
|
||||
(some->> invoices-to-close
|
||||
seq
|
||||
|
||||
(mapv (fn [[i]] {:db/id i
|
||||
:invoice/outstanding-balance 0.0
|
||||
:invoice/status :invoice-status/paid}))
|
||||
(d/transact conn)
|
||||
deref)
|
||||
(log/info "Closed " (count invoices-to-close) "scheduled invoices")))
|
||||
|
||||
(mount/defstate close-auto-invoices-worker
|
||||
:start (scheduler/every 60000 (heartbeat close-auto-invoices "close-auto-invoices"))
|
||||
:stop (scheduler/stop close-auto-invoices-worker))
|
||||
@@ -1,83 +0,0 @@
|
||||
(ns auto-ap.background.mail
|
||||
(:require
|
||||
[amazonica.aws.s3 :as s3]
|
||||
[amazonica.aws.simpleemail :as ses]
|
||||
[amazonica.aws.sqs :as sqs]
|
||||
[auto-ap.parse :as parse]
|
||||
[auto-ap.routes.invoices :as invoices]
|
||||
[auto-ap.utils :refer [heartbeat]]
|
||||
[clojure-mail.message :as message]
|
||||
[clojure.data.json :as json]
|
||||
[clojure.java.io :as io]
|
||||
[clojure.string :as str]
|
||||
[clojure.tools.logging :as log]
|
||||
[config.core :refer [env]]
|
||||
[mount.core :as mount]
|
||||
[yang.scheduler :as scheduler])
|
||||
(:import
|
||||
(java.util Properties UUID)
|
||||
(javax.mail Session)
|
||||
(javax.mail.internet MimeMessage)))
|
||||
|
||||
(defn send-email-about-failed-message [mail-bucket mail-key]
|
||||
(let [target-key (str "failed-emails/" mail-key ".eml")
|
||||
target-url (str "http://" (:data-bucket env)
|
||||
".s3-website-us-east-1.amazonaws.com/"
|
||||
target-key)]
|
||||
(log/info "sending email to " (:import-failure-destination-emails env))
|
||||
(s3/copy-object mail-bucket mail-key (:data-bucket env) target-key)
|
||||
(ses/send-email {:destination {:to-addresses (:import-failure-destination-emails env)}
|
||||
:source (:invoice-email env)
|
||||
:message {:subject "An email invoice import failed"
|
||||
:body {:html (str "<div>You can download the original email <a href=\"" target-url "\">here</a>.</div>")
|
||||
:text (str "<div>You can download the original email here: " target-url)}}})))
|
||||
|
||||
|
||||
(defn process-sqs []
|
||||
(log/info "Fetching messages from sqs...")
|
||||
(doseq [message (:messages (sqs/receive-message {:queue-url (:invoice-import-queue-url env)
|
||||
:wait-time-seconds 5
|
||||
:max-number-of-messages 10
|
||||
#_#_:attribute-names ["All"]}))]
|
||||
(let [message-body (json/read-str (:body message)
|
||||
:key-fn keyword)]
|
||||
(doseq [r (:Records message-body)]
|
||||
(log/info "Processing record " r)
|
||||
(let [props (Session/getDefaultInstance (Properties.))
|
||||
message-stream (-> (s3/get-object {:key (-> r :s3 :object :key)
|
||||
:bucket-name (-> r :s3 :bucket :name)})
|
||||
:input-stream)
|
||||
mail (message/read-message (MimeMessage. props message-stream))]
|
||||
(log/info "reading mail" (->> mail :body (filter :content-type) (map :body) ))
|
||||
(doseq [pdf-stream (->> (-> mail :body)
|
||||
(filter :content-type)
|
||||
#_(filter #(re-find #"application/pdf" (:content-type %)) ))
|
||||
:let [filename (str "/tmp/" (UUID/randomUUID) ".pdf")]]
|
||||
(try
|
||||
(let [
|
||||
_ (io/copy (:body pdf-stream) (io/file filename))
|
||||
extension (last (str/split (.getName (io/file filename)) #"\."))
|
||||
s3-location (str "invoice-files/" (str (UUID/randomUUID)) "." extension)
|
||||
_ (s3/put-object :bucket-name (:data-bucket env)
|
||||
:key s3-location
|
||||
:input-stream (io/input-stream filename)
|
||||
:metadata {:content-type "application/pdf"
|
||||
:content-length (.length (io/file filename))})
|
||||
imports (->> (parse/parse-file filename filename)
|
||||
(map #(assoc %
|
||||
:source-url (str "http://" (:data-bucket env)
|
||||
".s3-website-us-east-1.amazonaws.com/"
|
||||
s3-location)
|
||||
:import-status :import-status/approved)))]
|
||||
(log/info "Found imports" imports)
|
||||
(invoices/import-uploaded-invoice {:user/role "admin"} imports ))
|
||||
(catch Exception e
|
||||
(log/warn e)
|
||||
(send-email-about-failed-message (-> r :s3 :bucket :name) (-> r :s3 :object :key)))
|
||||
(finally
|
||||
(io/delete-file filename)))))))
|
||||
(sqs/delete-message (assoc message :queue-url (:invoice-import-queue-url env) ))))
|
||||
|
||||
(mount/defstate import-invoices
|
||||
:start (scheduler/every (* 60 5000) (heartbeat process-sqs "import-uploaded-invoices"))
|
||||
:stop (scheduler/stop import-invoices))
|
||||
@@ -1,163 +0,0 @@
|
||||
(ns auto-ap.background.sysco
|
||||
(:require
|
||||
[amazonica.aws.s3 :as s3]
|
||||
[auto-ap.datomic :refer [conn]]
|
||||
[auto-ap.datomic.clients :as d-clients]
|
||||
[auto-ap.datomic.invoices :refer [code-invoice]]
|
||||
[auto-ap.parse :as parse]
|
||||
[auto-ap.time :as t]
|
||||
[auto-ap.utils :refer [heartbeat]]
|
||||
[clj-time.coerce :as coerce]
|
||||
[clojure.data.csv :as csv]
|
||||
[clojure.java.io :as io]
|
||||
[clojure.string :as str]
|
||||
[clojure.tools.logging :as log]
|
||||
[com.unbounce.dogstatsd.core :as statsd]
|
||||
[config.core :refer [env]]
|
||||
[datomic.api :as d]
|
||||
[mount.core :as mount]
|
||||
[yang.scheduler :as scheduler])
|
||||
(:import
|
||||
(java.util UUID)))
|
||||
|
||||
(def bucket-name (:data-bucket env))
|
||||
|
||||
(def header-keys ["TransCode" "GroupID" "Company" "CustomerNumber" "InvoiceNumber" "RecordType" "Item" "InvoiceDocument" "AccountName" "AccountDunsNo" "InvoiceDate" "AccountDate" "CustomerPONo" "PaymentTerms" "TermsDescription" "StoreNumber" "CustomerName" "AddressLine1" "AddressLine2" "City1" "State1" "Zip1" "Phone1" "Duns1" "Hin1" "Dea1" "TIDCustomer" "ChainNumber" "BidNumber" "ContractNumber" "CompanyNumber" "BriefName" "Address" "Address2" "City2" "State2" "Zip2" "Phone2" "Duns2" "Hin2" "Dea2" "Tid_OPCO" "ObligationIndicator" "Manifest" "Route" "Stop" "TermsDiscountPercent" "TermsDiscountDueDate" "TermsNetDueDate" "TermsDiscountAmount" "TermsDiscountCode" "OrderDate" "DepartmentCode"])
|
||||
|
||||
(def summary-keys ["TranCode" "GroupID" "Company" "CustomerNumber" "InvoiceNumber" "RecordType" "Item" "InvoiceDocument" "TotalLines" "TotalQtyInvoice" "TotalQty" "TotalQtySplit" "TotalQtyPounds" "TotalExtendedPrice" "TotalTaxAmount" "TotalInvoiceAmount" "AccountDate"])
|
||||
|
||||
(defn get-sysco-vendor []
|
||||
(let [db (d/db conn)]
|
||||
(d/entity
|
||||
db
|
||||
(->
|
||||
(d/q '[:find ?v
|
||||
:in $
|
||||
:where [?v :vendor/name "Sysco"]]
|
||||
db)
|
||||
first
|
||||
first))))
|
||||
|
||||
|
||||
(defn read-sysco-csv [k]
|
||||
(-> (s3/get-object {:bucket-name bucket-name
|
||||
:key k})
|
||||
:input-stream
|
||||
io/reader
|
||||
csv/read-csv))
|
||||
|
||||
(defn extract-invoice-details [csv-rows clients sysco-vendor]
|
||||
(let [[header-row & csv-rows] csv-rows
|
||||
header-row (into {} (map vector header-keys header-row))
|
||||
summary-row (->> csv-rows
|
||||
(filter (fn [[_ _ _ _ _ row-type]]
|
||||
(= row-type "SUM")))
|
||||
first)
|
||||
summary-row (into {} (map vector summary-keys summary-row))
|
||||
customer-identifier (header-row "CustomerName")
|
||||
account-number (header-row "CustomerNumber")
|
||||
location-hint (str/join " "
|
||||
[(header-row "CustomerName")
|
||||
(header-row "Address")
|
||||
(header-row "Address2")
|
||||
(header-row "AddressLine1")
|
||||
(header-row "AddressLine2")
|
||||
(header-row "City1")
|
||||
(header-row "City2")])
|
||||
|
||||
account-number (some-> account-number Long/parseLong str)
|
||||
[matching-client similarity] (and account-number
|
||||
(parse/best-match clients account-number 0.0))
|
||||
_ (when-not matching-client
|
||||
(throw (ex-info "cannot find matching client"
|
||||
{:account-number account-number
|
||||
:name customer-identifier})))
|
||||
total (Double/parseDouble (summary-row "TotalExtendedPrice"))
|
||||
tax (Double/parseDouble (summary-row "TotalTaxAmount"))
|
||||
date (t/parse
|
||||
(header-row "InvoiceDate")
|
||||
"yyMMdd")]
|
||||
(log/infof "Importing %s for %s" (header-row "InvoiceNumber") (header-row "CustomerName"))
|
||||
|
||||
(cond-> #:invoice {:invoice-number (header-row "InvoiceNumber")
|
||||
:total (+ total tax)
|
||||
:outstanding-balance (+ total tax)
|
||||
:location (parse/best-location-match matching-client location-hint location-hint )
|
||||
:date (coerce/to-date date)
|
||||
:vendor (:db/id sysco-vendor )
|
||||
:client (:db/id matching-client)
|
||||
:import-status :import-status/completed
|
||||
:status :invoice-status/unpaid
|
||||
:client-identifier customer-identifier}
|
||||
similarity (assoc :invoice/similarity (- 1.0 (double similarity)))
|
||||
true (code-invoice))))
|
||||
|
||||
(defn mark-key [k]
|
||||
(s3/copy-object {:source-bucket-name bucket-name
|
||||
:destination-bucket-name bucket-name
|
||||
:destination-key (str/replace-first k "pending" "imported")
|
||||
:source-key k})
|
||||
(s3/delete-object {:bucket-name bucket-name
|
||||
:key k}))
|
||||
|
||||
#_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]}
|
||||
(defn unmark-key [k]
|
||||
(s3/copy-object {:source-bucket-name bucket-name
|
||||
:destination-bucket-name bucket-name
|
||||
:destination-key (str/replace-first k "imported" "pending")
|
||||
:source-key k})
|
||||
(s3/delete-object {:bucket-name bucket-name
|
||||
:key k}))
|
||||
|
||||
|
||||
(defn import-sysco []
|
||||
(let [sysco-vendor (get-sysco-vendor)
|
||||
clients (d-clients/get-all)
|
||||
keys (->> (s3/list-objects-v2 {:bucket-name bucket-name
|
||||
:prefix "sysco/pending"})
|
||||
:object-summaries
|
||||
(map :key))]
|
||||
|
||||
|
||||
(statsd/event {:title "Sysco import started"
|
||||
:text (format "Found %d sysco invoice to import: %s" (count keys) (pr-str keys))
|
||||
:priority :low}
|
||||
nil)
|
||||
(log/infof "Found %d sysco invoice to import: %s" (count keys) (pr-str keys))
|
||||
|
||||
(let [transaction (->> keys
|
||||
(mapcat (fn [k]
|
||||
(try
|
||||
(let [invoice-key (str "invoice-files/" (UUID/randomUUID) ".csv") ;
|
||||
invoice-url (str "http://" (:data-bucket env) ".s3-website-us-east-1.amazonaws.com/" invoice-key)]
|
||||
(s3/copy-object {:source-bucket-name (:data-bucket env)
|
||||
:destination-bucket-name (:data-bucket env)
|
||||
:source-key k
|
||||
:destination-key invoice-key})
|
||||
[[:propose-invoice
|
||||
(-> k
|
||||
read-sysco-csv
|
||||
(extract-invoice-details clients sysco-vendor)
|
||||
(assoc :invoice/source-url invoice-url))]])
|
||||
(catch Exception e
|
||||
(log/error (str "Cannot load file " k) e)
|
||||
(log/info
|
||||
(s3/copy-object {:source-bucket-name (:data-bucket env)
|
||||
:destination-bucket-name (:data-bucket env)
|
||||
:source-key k
|
||||
:destination-key (doto (str "sysco/error/"
|
||||
(.getName (io/file k)))
|
||||
println)}))
|
||||
[])))))
|
||||
result @(d/transact conn transaction)]
|
||||
(log/infof "Imported %d invoices" (/ (count (:tempids result)) 2)))
|
||||
(doseq [k keys]
|
||||
(mark-key k))
|
||||
(statsd/event {:title "Sysco import ended"
|
||||
:text "Sysco completed"
|
||||
:priority :low} nil)))
|
||||
|
||||
|
||||
(mount/defstate sysco-invoice-importer
|
||||
:start (scheduler/every (* 1000 60 60) (heartbeat import-sysco "sysco-importer"))
|
||||
:stop (scheduler/stop sysco-invoice-importer))
|
||||
@@ -1,35 +0,0 @@
|
||||
(ns auto-ap.background.vendor
|
||||
(:require
|
||||
[auto-ap.datomic :refer [conn]]
|
||||
[auto-ap.utils :refer [heartbeat]]
|
||||
[datomic.api :as d]
|
||||
[mount.core :as mount]
|
||||
[yang.scheduler :as scheduler]))
|
||||
|
||||
(defn refresh-vendor-usages []
|
||||
(->> {:query {:find ['?v '?c '(count ?e)]
|
||||
:in ['$]
|
||||
:where ['[?v :vendor/name]
|
||||
'(or-join [?v ?c ?e]
|
||||
(and
|
||||
[?e :invoice/vendor ?v]
|
||||
[?e :invoice/client ?c])
|
||||
(and
|
||||
[?e :transaction/vendor ?v]
|
||||
[?e :transaction/client ?c])
|
||||
(and
|
||||
[?e :journal-entry/vendor ?v]
|
||||
[?e :journal-entry/client ?c]))]}
|
||||
:args [(d/db conn)]}
|
||||
(d/query)
|
||||
(map (fn [[v c cnt]]
|
||||
#:vendor-usage {:vendor v
|
||||
:client c
|
||||
:key (str v "-" c)
|
||||
:count cnt}))
|
||||
(d/transact conn)
|
||||
deref))
|
||||
|
||||
(mount/defstate refresh-vendor-usages-worker
|
||||
:start (scheduler/every (* 60 60 1000) (heartbeat refresh-vendor-usages "vendor-usages"))
|
||||
:stop (scheduler/stop refresh-vendor-usages))
|
||||
Reference in New Issue
Block a user