added sysco poller

This commit is contained in:
2021-12-18 17:10:23 -08:00
parent d8322bf498
commit fcdc00ed16
3 changed files with 149 additions and 0 deletions

View File

@@ -0,0 +1,139 @@
(ns auto-ap.background.sysco
(:require
[amazonica.aws.s3 :as s3]
[auto-ap.datomic :refer [conn]]
[auto-ap.datomic.clients :as d-clients]
[auto-ap.datomic.vendors :as d-vendors]
[auto-ap.parse :as parse]
[auto-ap.time :as t]
[auto-ap.time-utils :refer [next-dom]]
[auto-ap.utils :refer [by]]
[clj-time.coerce :as coerce]
[clojure.data.csv :as csv]
[clojure.java.io :as io]
[clojure.string :as str]
[clojure.tools.logging :as log]
[config.core :refer [env]]
[datomic.api :as d]
[unilog.context :as lc]
[clj-time.core :as time]))
(def bucket-name (:data-bucket env))
(def header-keys ["TransCode" "GroupID" "Company" "CustomerNumber" "InvoiceNumber" "RecordType" "Item" "InvoiceDocument" "AccountName" "AccountDunsNo" "InvoiceDate" "AccountDate" "CustomerPONo" "PaymentTerms" "TermsDescription" "StoreNumber" "CustomerName" "AddressLine1" "AddressLine2" "City1" "State1" "Zip1" "Phone1" "Duns1" "Hin1" "Dea1" "TIDCustomer" "ChainNumber" "BidNumber" "ContractNumber" "CompanyNumber" "BriefName" "Address" "Address2" "City2" "State2" "Zip2" "Phone2" "Duns2" "Hin2" "Dea2" "Tid_OPCO" "ObligationIndicator" "Manifest" "Route" "Stop" "TermsDiscountPercent" "TermsDiscountDueDate" "TermsNetDueDate" "TermsDiscountAmount" "TermsDiscountCode" "OrderDate" "DepartmentCode"])
(def summary-keys ["TranCode" "GroupID" "Company" "CustomerNumber" "InvoiceNumber" "RecordType" "Item" "InvoiceDocument" "TotalLines" "TotalQtyInvoice" "TotalQty" "TotalQtySplit" "TotalQtyPounds" "TotalExtendedPrice" "TotalTaxAmount" "TotalInvoiceAmount" "AccountDate"])
(defn get-sysco-vendor []
(let [db (d/db conn)]
(d/entity
db
(->
(d/q '[:find ?v
:in $
:where [?v :vendor/name "Sysco"]]
db)
first
first))))
(defn read-sysco-csv [k]
(-> (s3/get-object {:bucket-name bucket-name
:key k})
:input-stream
io/reader
csv/read-csv))
(defn extract-invoice-details [csv-rows clients sysco-vendor]
(let [[header-row & csv-rows] csv-rows
header-row (into {} (map vector header-keys header-row))
summary-row (->> csv-rows
(filter (fn [[_ _ _ _ _ row-type]]
(= row-type "SUM")))
first)
summary-row (into {} (map vector summary-keys summary-row))
customer-identifier (header-row "CustomerName")
account-number (header-row "CustomerNumber")
location-hint (str/join " "
[(header-row "CustomerName")
(header-row "Address")
(header-row "Address2")
(header-row "AddressLine1")
(header-row "AddressLine2")
(header-row "City1")
(header-row "City2")])
matching-client (or (and account-number
(parse/best-match clients account-number 0.0))
(and customer-identifier
(parse/best-match clients customer-identifier)))
total (Double/parseDouble (summary-row "TotalExtendedPrice"))
date (t/parse
(header-row "InvoiceDate")
"yyMMdd")
automatically-paid-for (set (map :db/id (:vendor/automatically-paid-when-due sysco-vendor)))
schedule-payment-dom (get (by (comp :db/id :vendor-schedule-payment-dom/client) :vendor-schedule-payment-dom/dom (:vendor/schedule-payment-dom sysco-vendor))
(:db/id matching-client))]
(log/infof "Importing %s for %s" (header-row "InvoiceNumber") (header-row "CustomerName"))
[:propose-invoice (cond-> #:invoice {:invoice-number (header-row "InvoiceNumber")
:total total
:outstanding-balance total
:date (coerce/to-date date)
:vendor (:db/id sysco-vendor )
:client (:db/id matching-client)
:import-status :import-status/completed
:status :invoice-status/unpaid
:client-identifier customer-identifier
:expense-accounts [#:invoice-expense-account {:account (:db/id (:vendor/default-account sysco-vendor))
:amount total
:location (parse/best-location-match matching-client location-hint location-hint )}]}
(:vendor/terms sysco-vendor) (assoc :invoice/due (coerce/to-date
(time/plus date (time/days (d-vendors/terms-for-client-id sysco-vendor (:db/id matching-client))))))
(boolean (automatically-paid-for (:db/id matching-client))) (assoc :invoice/scheduled-payment (coerce/to-date
(time/plus date (time/days (d-vendors/terms-for-client-id sysco-vendor (:db/id matching-client))))))
schedule-payment-dom (assoc :invoice/scheduled-payment (coerce/to-date
(next-dom date schedule-payment-dom))))]))
(defn mark-key [k]
(s3/copy-object {:source-bucket-name bucket-name
:destination-bucket-name bucket-name
:destination-key (str/replace-first k "pending" "imported")
:source-key k})
(s3/delete-object {:bucket-name bucket-name
:key k}))
(defn unmark-key [k]
(s3/copy-object {:source-bucket-name bucket-name
:destination-bucket-name bucket-name
:destination-key (str/replace-first k "imported" "pending")
:source-key k})
(s3/delete-object {:bucket-name bucket-name
:key k}))
(defn import-sysco []
(lc/with-context {:source "sysco-importer"}
(let [sysco-vendor (get-sysco-vendor)
clients (d-clients/get-all)
keys (->> (s3/list-objects-v2 {:bucket-name bucket-name
:prefix "sysco/pending"})
:object-summaries
(map :key))]
(log/infof "Found %d sysco invoice to import: %s" (count keys) (pr-str keys))
(let [result @(d/transact conn (mapv (fn [k]
(-> k
read-sysco-csv
(extract-invoice-details clients sysco-vendor)
))
keys))]
(log/infof "Imported %d invoices" (/ (count (:tempids result)) 2)))
#_(doseq [k keys]
(mark-key k)))))
(mount/defstate sysco-invoice-importer
:start (scheduler/every (* 1000 60 60) import)
:stop (scheduler/stop sysco-invoice-importer))