Makes cintas import work
This commit is contained in:
@@ -2,61 +2,37 @@
|
||||
(:require
|
||||
[amazonica.aws.s3 :as s3]
|
||||
[auto-ap.datomic :refer [conn]]
|
||||
[auto-ap.datomic.clients :as d-clients]
|
||||
[auto-ap.datomic.invoices :refer [code-invoice]]
|
||||
[auto-ap.jobs.core :refer [execute]]
|
||||
[auto-ap.ledger :refer [transact-with-ledger]]
|
||||
[auto-ap.logging :as log]
|
||||
[auto-ap.parse :as parse]
|
||||
[auto-ap.time :as t]
|
||||
[auto-ap.time :as atime]
|
||||
[clj-time.coerce :as coerce]
|
||||
[clj-time.core :as time]
|
||||
[clojure.data.csv :as csv]
|
||||
[clojure.java.io :as io]
|
||||
[clojure.string :as str]
|
||||
[auto-ap.logging :as log :refer [capture-context->lc with-context-as]]
|
||||
[com.unbounce.dogstatsd.core :as statsd]
|
||||
[config.core :refer [env]]
|
||||
[datomic.api :as d]
|
||||
[com.brunobonacci.mulog :as mu]
|
||||
|
||||
[auto-ap.time :as atime])
|
||||
[clojure.xml :as xml]
|
||||
[clojure.zip :as zip]
|
||||
[datomic.api :as d])
|
||||
(:import
|
||||
(java.util UUID)))
|
||||
|
||||
|
||||
(def bucket-name "data.prod.app.integreatconsult.com" #_(:data-bucket env))
|
||||
|
||||
(defn read-csv [k]
|
||||
(log/info ::reading-csv :key k)
|
||||
(-> (s3/get-object {:bucket-name bucket-name
|
||||
:key k})
|
||||
:input-stream
|
||||
(defn read-csv [stream]
|
||||
(-> stream
|
||||
io/reader
|
||||
csv/read-csv))
|
||||
|
||||
(defn read-xml [stream]
|
||||
(-> (slurp stream)
|
||||
(.getBytes)
|
||||
(java.io.ByteArrayInputStream. )
|
||||
xml/parse
|
||||
zip/xml-zip))
|
||||
|
||||
|
||||
(defn extract-invoice-details [csv-rows clients]
|
||||
(clojure.pprint/pprint (take 4 csv-rows))
|
||||
(->> csv-rows
|
||||
(drop 1)
|
||||
(filter (fn [[_ _ _ _ _ _ _ _ _ _ _ break-flag]]
|
||||
|
||||
(= "Y" break-flag)))
|
||||
(map (fn [[vendor location-hint invoice-number ship-date invoice-total ]]
|
||||
|
||||
(let [[matching-client similarity] (and location-hint
|
||||
(parse/best-match clients location-hint 0.0))]
|
||||
(clojure.pprint/pprint {:invoice/vendor vendor
|
||||
:invoice/location (parse/best-location-match matching-client location-hint location-hint )
|
||||
:invoice/date (coerce/to-date (atime/parse ship-date atime/normal-date))
|
||||
:invoice/invoice-number invoice-number
|
||||
:invoice/total (Double/parseDouble invoice-total)
|
||||
:invoice/outstanding-balance (Double/parseDouble invoice-total)
|
||||
:invoice/client matching-client
|
||||
:invoice/import-status :import-status/completed
|
||||
:invoice/status :invoice-status/unpaid
|
||||
:invoice/client-identifier location-hint
|
||||
}))))
|
||||
(filter :invoice/client)))
|
||||
|
||||
(defn mark-key [k]
|
||||
(s3/copy-object {:source-bucket-name bucket-name
|
||||
@@ -69,46 +45,212 @@
|
||||
(defn is-csv-file? [x]
|
||||
(= "csv" (last (str/split x #"[\\.]"))))
|
||||
|
||||
(defn decipher-source [k]
|
||||
(cond
|
||||
(str/includes? k "Cintas")
|
||||
:cintas
|
||||
|
||||
(and (str/includes? k "GeneralProduce")
|
||||
(is-csv-file? k))
|
||||
:general-produce
|
||||
|
||||
:else
|
||||
:unknown))
|
||||
|
||||
(defmulti extract-invoice-details (fn [key input-stream clients]
|
||||
(decipher-source key)))
|
||||
|
||||
(defmethod extract-invoice-details :general-produce
|
||||
[k input-stream clients]
|
||||
(log/info ::parsing-general-produce :key k)
|
||||
(->> (read-csv input-stream)
|
||||
(drop 1)
|
||||
(filter (fn [[_ _ _ _ _ _ _ _ _ _ _ break-flag]]
|
||||
(= "Y" break-flag)))
|
||||
(map (fn [[vendor location-hint invoice-number ship-date invoice-total ]]
|
||||
(let [[matching-client similarity] (and location-hint
|
||||
(parse/best-match clients location-hint 0.0))]
|
||||
{:invoice/vendor vendor
|
||||
:invoice/location (parse/best-location-match matching-client location-hint location-hint )
|
||||
:invoice/date (coerce/to-date (atime/parse ship-date atime/normal-date))
|
||||
:invoice/invoice-number invoice-number
|
||||
:invoice/total (Double/parseDouble invoice-total)
|
||||
:invoice/outstanding-balance (Double/parseDouble invoice-total)
|
||||
:invoice/client matching-client
|
||||
:invoice/import-status :import-status/imported
|
||||
:invoice/status :invoice-status/unpaid
|
||||
:invoice/client-identifier location-hint
|
||||
})))
|
||||
(filter :invoice/client)))
|
||||
|
||||
(defmethod extract-invoice-details :unknown
|
||||
[k input-stream clients]
|
||||
(log/warn ::unknown-invoice-format
|
||||
:key k)
|
||||
[])
|
||||
|
||||
(defn zip-seq [zipper]
|
||||
(->> (zip/xml-zip (zip/node zipper))
|
||||
(iterate zip/next )
|
||||
(take-while (complement zip/end?))))
|
||||
|
||||
(defmethod extract-invoice-details :cintas
|
||||
[k input-stream clients]
|
||||
(log/info ::parsing-cintas :key k)
|
||||
(let [vendor (d/pull (d/db conn) '[:vendor/default-account] :vendor/cintas)
|
||||
top (read-xml input-stream)
|
||||
node-seq (->> top
|
||||
(iterate zip/next)
|
||||
(take-while (complement zip/end?)))
|
||||
location-hint (->> node-seq
|
||||
(filter (fn [z]
|
||||
(= (:tag (zip/node z))
|
||||
:InvoiceDetailShipping)))
|
||||
first
|
||||
zip-seq
|
||||
(map zip/node)
|
||||
(filter (fn [node]
|
||||
(= :Street
|
||||
(:tag node))))
|
||||
first
|
||||
:content
|
||||
first)
|
||||
[matching-client similarity] (and location-hint (parse/best-match clients location-hint 0.0))
|
||||
|
||||
]
|
||||
(if matching-client
|
||||
(let [invoice-date (->> node-seq
|
||||
(map zip/node)
|
||||
(filter (fn [node]
|
||||
(= (:tag node)
|
||||
:InvoiceDetailRequestHeader)))
|
||||
first
|
||||
(#(-> % :attrs :invoiceDate
|
||||
coerce/to-date-time
|
||||
atime/localize
|
||||
(atime/unparse atime/iso-date)
|
||||
(atime/parse atime/iso-date))))
|
||||
location (parse/best-location-match matching-client location-hint location-hint )
|
||||
due (-> invoice-date
|
||||
(time/plus (time/days 30))
|
||||
(coerce/to-date))
|
||||
total (->> node-seq
|
||||
(filter (fn [zipper]
|
||||
(= (:tag (zip/node zipper))
|
||||
:NetAmount)))
|
||||
first
|
||||
zip-seq
|
||||
(map zip/node)
|
||||
(filter (fn [node]
|
||||
(= :Money
|
||||
(:tag node))))
|
||||
first
|
||||
:content
|
||||
first
|
||||
Double/parseDouble)
|
||||
invoice {:invoice/vendor :vendor/cintas
|
||||
:invoice/import-status :import-status/imported
|
||||
:invoice/status :invoice-status/unpaid
|
||||
:invoice/location location
|
||||
:invoice/client-identifier location-hint
|
||||
:invoice/client (:db/id matching-client)
|
||||
:invoice/total total
|
||||
:invoice/outstanding-balance total
|
||||
:invoice/invoice-number (->> node-seq
|
||||
(map zip/node)
|
||||
(filter (fn [node]
|
||||
(= (:tag node)
|
||||
:InvoiceDetailRequestHeader)))
|
||||
first
|
||||
(#(-> % :attrs :invoiceID)))
|
||||
:invoice/due due
|
||||
|
||||
:invoice/scheduled-payment (when-not ((into #{} (->> matching-client
|
||||
:client/feature-flags))
|
||||
"manually-pay-cintas")
|
||||
due)
|
||||
|
||||
:invoice/date (coerce/to-date invoice-date)
|
||||
:invoice/expense-accounts [{:invoice-expense-account/account
|
||||
(-> vendor :vendor/default-account :db/id)
|
||||
:invoice-expense-account/location location
|
||||
:invoice-expense-account/amount (Math/abs total)
|
||||
}]}]
|
||||
(log/info ::cintas-invoice-importing
|
||||
:invoice invoice)
|
||||
[invoice])
|
||||
(do
|
||||
(log/info ::missing-client
|
||||
:client-hint location-hint)
|
||||
[]))))
|
||||
|
||||
(defn mark-error [k]
|
||||
(s3/copy-object {:source-bucket-name bucket-name
|
||||
:destination-bucket-name bucket-name
|
||||
:source-key k
|
||||
:destination-key (str "ntg-invoices/error/"
|
||||
(.getName (io/file k)))}))
|
||||
|
||||
(defn copy-readable-version [k]
|
||||
(let [invoice-key (str "invoice-files/" (UUID/randomUUID) "." (last (str/split k #"[\\.]")))]
|
||||
(log/info ::assigned-random-key
|
||||
:key k
|
||||
:invoice-key invoice-key)
|
||||
(s3/copy-object {:source-bucket-name bucket-name
|
||||
:destination-bucket-name bucket-name
|
||||
:source-key k
|
||||
:destination-key invoice-key })
|
||||
invoice-key))
|
||||
|
||||
(defn import-ntg-invoices []
|
||||
(let [clients (d-clients/get-all)
|
||||
keys (->> (s3/list-objects-v2 {:bucket-name bucket-name
|
||||
:prefix "ntg-invoices/pending"})
|
||||
:object-summaries
|
||||
(map :key))]
|
||||
(let [clients (map first (d/q '[:find (pull ?c [:client/code
|
||||
:db/id
|
||||
:client/feature-flags
|
||||
{:client/location-matches [:location-match/matches :location-match/location]}
|
||||
:client/name
|
||||
:client/matches
|
||||
:client/locations])
|
||||
:where [?c :client/code]]
|
||||
(d/db conn)))
|
||||
keys (->> (s3/list-objects-v2 {:bucket-name bucket-name
|
||||
:prefix "ntg-invoices/pending"})
|
||||
:object-summaries
|
||||
(map :key))]
|
||||
|
||||
|
||||
(log/info ::found-invoice-keys
|
||||
:keys keys )
|
||||
|
||||
(let [transaction (->> keys
|
||||
(filter is-csv-file?)
|
||||
(mapcat (fn [k]
|
||||
(try
|
||||
(log/info ::trying-csv :key k)
|
||||
(let [invoice-key (str "invoice-files/" (UUID/randomUUID) ".csv") ;
|
||||
(let [invoice-key (copy-readable-version k)
|
||||
invoice-url (str "http://" bucket-name ".s3-website-us-east-1.amazonaws.com/" invoice-key)]
|
||||
(s3/copy-object {:source-bucket-name bucket-name
|
||||
:destination-bucket-name bucket-name
|
||||
:source-key k
|
||||
:destination-key invoice-key})
|
||||
(->> (extract-invoice-details (read-csv k) clients)
|
||||
(map (fn [i]
|
||||
[:propose-invoice (assoc i :invoice/source-url invoice-url)]))
|
||||
))
|
||||
(with-open [is (-> (s3/get-object {:bucket-name bucket-name
|
||||
:key k})
|
||||
:input-stream)]
|
||||
(doto
|
||||
(->> (extract-invoice-details k
|
||||
is
|
||||
clients)
|
||||
(map (fn [i]
|
||||
(log/info ::importing-invoice
|
||||
:invoice i)
|
||||
i))
|
||||
(mapv (fn [i]
|
||||
[:propose-invoice (assoc i :invoice/source-url invoice-url)])))
|
||||
println)))
|
||||
(catch Exception e
|
||||
(log/error ::cant-load-file
|
||||
:key k
|
||||
:exception e)
|
||||
(log/info
|
||||
(s3/copy-object {:source-bucket-name bucket-name
|
||||
:destination-bucket-name bucket-name
|
||||
:source-key k
|
||||
:destination-key (str "ntg-invoices/error/"
|
||||
(.getName (io/file k)))}))
|
||||
[])))))
|
||||
#_result #_(transact-with-ledger transaction {:user/name "sysco importer" :user/role "admin"})]
|
||||
(clojure.pprint/pprint transaction)
|
||||
#_(log/infof "Imported %d invoices" (/ (count (:tempids result)) 2)))
|
||||
(mark-error k)
|
||||
[]))))
|
||||
(into []))]
|
||||
(transact-with-ledger transaction {:user/name "sysco importer" :user/role "admin"})
|
||||
(log/info ::success
|
||||
:count (count transaction)
|
||||
:sample (take 3 transaction)))
|
||||
(doseq [k keys]
|
||||
(mark-key k))))
|
||||
|
||||
|
||||
Reference in New Issue
Block a user