275 lines
14 KiB
Clojure
275 lines
14 KiB
Clojure
(ns auto-ap.jobs.ntg
|
|
(:require
|
|
[amazonica.aws.s3 :as s3]
|
|
[auto-ap.datomic :refer [audit-transact conn]]
|
|
[auto-ap.jobs.core :refer [execute]]
|
|
[auto-ap.logging :as log]
|
|
[auto-ap.parse :as parse]
|
|
[auto-ap.time :as atime]
|
|
[clj-time.coerce :as coerce]
|
|
[clj-time.core :as time]
|
|
[clojure.data.csv :as csv]
|
|
[clojure.java.io :as io]
|
|
[clojure.string :as str]
|
|
[clojure.xml :as xml]
|
|
[clojure.zip :as zip]
|
|
[datomic.api :as d]
|
|
[iol-ion.tx :refer [random-tempid]])
|
|
(:import
|
|
(java.util UUID)))
|
|
|
|
(def bucket-name "data.prod.app.integreatconsult.com" #_(:data-bucket env))
|
|
|
|
(defn read-csv [stream]
|
|
(-> stream
|
|
io/reader
|
|
csv/read-csv))
|
|
|
|
(defn read-xml [stream]
|
|
(-> (slurp stream)
|
|
(.getBytes)
|
|
(java.io.ByteArrayInputStream. )
|
|
xml/parse
|
|
zip/xml-zip))
|
|
|
|
|
|
|
|
(defn mark-key [k]
|
|
(s3/copy-object {:source-bucket-name bucket-name
|
|
:destination-bucket-name bucket-name
|
|
:destination-key (str/replace-first k "pending" "imported")
|
|
:source-key k})
|
|
#_(s3/delete-object {:bucket-name bucket-name
|
|
:key k}))
|
|
|
|
(defn is-csv-file? [x]
|
|
(= "dat" (last (str/split x #"[\\.]"))))
|
|
|
|
(defn decipher-source [k]
|
|
(cond
|
|
(and (str/includes? k "Cintas")
|
|
(str/includes? k "zcic"))
|
|
:cintas
|
|
|
|
(and (str/includes? k "GeneralProduce")
|
|
(str/includes? k "FRANCHISEE")
|
|
(is-csv-file? k))
|
|
:general-produce
|
|
|
|
:else
|
|
:unknown))
|
|
|
|
(defmulti extract-invoice-details (fn [key input-stream clients]
|
|
(decipher-source key)))
|
|
|
|
(defmethod extract-invoice-details :general-produce
|
|
[k input-stream clients]
|
|
(log/info ::parsing-general-produce :key k)
|
|
(try
|
|
(->> (read-csv input-stream)
|
|
(drop 1)
|
|
(filter (fn [[_ _ _ _ _ _ _ _ _ _ _ break-flag]]
|
|
(= "Y" break-flag)))
|
|
(map (fn [[_ location-hint invoice-number ship-date invoice-total ]]
|
|
(let [matching-client (and location-hint
|
|
(parse/exact-match clients location-hint))
|
|
location (parse/best-location-match matching-client location-hint location-hint )
|
|
vendor (d/pull (d/db conn) '[:vendor/default-account] :vendor/general-produce)]
|
|
(when-not matching-client
|
|
(log/warn ::missing-client
|
|
:client-hint location-hint))
|
|
{:invoice/location location
|
|
:invoice/date (coerce/to-date (atime/parse ship-date atime/normal-date))
|
|
:invoice/invoice-number invoice-number
|
|
:invoice/total (Double/parseDouble invoice-total)
|
|
:invoice/vendor :vendor/general-produce
|
|
:invoice/outstanding-balance (Double/parseDouble invoice-total)
|
|
:invoice/client matching-client
|
|
:invoice/import-status :import-status/imported
|
|
:invoice/status :invoice-status/unpaid
|
|
:invoice/client-identifier location-hint
|
|
:invoice/expense-accounts [{:invoice-expense-account/account
|
|
(-> vendor :vendor/default-account :db/id)
|
|
:invoice-expense-account/location location
|
|
:invoice-expense-account/amount (Math/abs (Double/parseDouble invoice-total))
|
|
}]})))
|
|
(filter :invoice/client)
|
|
(into []))
|
|
(catch Exception e
|
|
(log/error ::cant-import-general-produce
|
|
:error e)
|
|
[])))
|
|
|
|
(defmethod extract-invoice-details :unknown
|
|
[k input-stream clients]
|
|
(log/warn ::unknown-invoice-format
|
|
:key k)
|
|
[])
|
|
|
|
(defn zip-seq [zipper]
|
|
(->> (zip/xml-zip (zip/node zipper))
|
|
(iterate zip/next )
|
|
(take-while (complement zip/end?))))
|
|
|
|
(defmethod extract-invoice-details :cintas
|
|
[k input-stream clients]
|
|
(log/info ::parsing-cintas :key k)
|
|
(let [vendor (d/pull (d/db conn) '[:vendor/default-account] :vendor/cintas)
|
|
top (read-xml input-stream)
|
|
node-seq (->> top
|
|
(iterate zip/next)
|
|
(take-while (complement zip/end?)))
|
|
location-hint (->> node-seq
|
|
(filter (fn [z]
|
|
(= (:tag (zip/node z))
|
|
:InvoiceDetailShipping)))
|
|
first
|
|
zip-seq
|
|
(map zip/node)
|
|
(filter (fn [node]
|
|
(= :Street
|
|
(:tag node))))
|
|
first
|
|
:content
|
|
first)
|
|
matching-client (and location-hint (parse/exact-match clients location-hint))]
|
|
(if matching-client
|
|
(let [invoice-date (->> node-seq
|
|
(map zip/node)
|
|
(filter (fn [node]
|
|
(= (:tag node)
|
|
:InvoiceDetailRequestHeader)))
|
|
first
|
|
(#(-> % :attrs :invoiceDate
|
|
coerce/to-date-time
|
|
atime/localize
|
|
(atime/unparse atime/iso-date)
|
|
(atime/parse atime/iso-date))))
|
|
location (parse/best-location-match matching-client location-hint location-hint )
|
|
due (-> invoice-date
|
|
(time/plus (time/days 30))
|
|
(coerce/to-date))
|
|
total (->> node-seq
|
|
(filter (fn [zipper]
|
|
(= (:tag (zip/node zipper))
|
|
:NetAmount)))
|
|
first
|
|
zip-seq
|
|
(map zip/node)
|
|
(filter (fn [node]
|
|
(= :Money
|
|
(:tag node))))
|
|
first
|
|
:content
|
|
first
|
|
Double/parseDouble)
|
|
invoice {:db/id (random-tempid)
|
|
:invoice/vendor :vendor/cintas
|
|
:invoice/import-status :import-status/imported
|
|
:invoice/status :invoice-status/unpaid
|
|
:invoice/location location
|
|
:invoice/client-identifier location-hint
|
|
:invoice/client (:db/id matching-client)
|
|
:invoice/total total
|
|
:invoice/outstanding-balance total
|
|
:invoice/invoice-number (->> node-seq
|
|
(map zip/node)
|
|
(filter (fn [node]
|
|
(= (:tag node)
|
|
:InvoiceDetailRequestHeader)))
|
|
first
|
|
(#(-> % :attrs :invoiceID)))
|
|
:invoice/due due
|
|
|
|
:invoice/scheduled-payment (when-not ((into #{} (->> matching-client
|
|
:client/feature-flags))
|
|
"manually-pay-cintas")
|
|
due)
|
|
|
|
:invoice/date (coerce/to-date invoice-date)
|
|
:invoice/expense-accounts [{:invoice-expense-account/account
|
|
(-> vendor :vendor/default-account :db/id)
|
|
:invoice-expense-account/location location
|
|
:invoice-expense-account/amount (Math/abs total)
|
|
:db/id (random-tempid)
|
|
}]}]
|
|
(log/info ::cintas-invoice-importing
|
|
:invoice invoice)
|
|
[invoice])
|
|
(do
|
|
(log/warn ::missing-client
|
|
:client-hint location-hint)
|
|
[]))))
|
|
|
|
(defn mark-error [k]
|
|
(s3/copy-object {:source-bucket-name bucket-name
|
|
:destination-bucket-name bucket-name
|
|
:source-key k
|
|
:destination-key (str "ntg-invoices/error/"
|
|
(.getName (io/file k)))}))
|
|
|
|
(defn copy-readable-version [k]
|
|
(let [invoice-key (str "invoice-files/" (UUID/randomUUID) "." (last (str/split k #"[\\.]")))]
|
|
(log/info ::assigned-random-key
|
|
:key k
|
|
:invoice-key invoice-key)
|
|
(s3/copy-object {:source-bucket-name bucket-name
|
|
:destination-bucket-name bucket-name
|
|
:source-key k
|
|
:destination-key invoice-key })
|
|
invoice-key))
|
|
|
|
(defn import-ntg-invoices
|
|
([] (import-ntg-invoices (->> (s3/list-objects-v2 {:bucket-name bucket-name
|
|
:prefix "ntg-invoices/pending"})
|
|
:object-summaries
|
|
(map :key))))
|
|
([keys]
|
|
(let [clients (map first (d/q '[:find (pull ?c [:client/code
|
|
:db/id
|
|
:client/feature-flags
|
|
{:client/location-matches [:location-match/matches :location-match/location]}
|
|
:client/name
|
|
:client/matches
|
|
:client/locations])
|
|
:where [?c :client/code]]
|
|
(d/db conn)))]
|
|
(log/info ::found-invoice-keys
|
|
:keys keys )
|
|
(let [transaction (->> keys
|
|
(mapcat (fn [k]
|
|
(try
|
|
(let [invoice-key (copy-readable-version k)
|
|
invoice-url (str "http://" bucket-name ".s3-website-us-east-1.amazonaws.com/" invoice-key)]
|
|
(with-open [is (-> (s3/get-object {:bucket-name bucket-name
|
|
:key k})
|
|
:input-stream)]
|
|
(->> (extract-invoice-details k
|
|
is
|
|
clients)
|
|
(map (fn [i]
|
|
(log/info ::importing-invoice
|
|
:invoice i)
|
|
i))
|
|
(mapv (fn [i]
|
|
(if (= :vendor/cintas (:invoice/vendor i))
|
|
[:propose-invoice (assoc i :invoice/source-url invoice-url)]
|
|
[:propose-invoice i]))))))
|
|
(catch Exception e
|
|
(log/error ::cant-load-file
|
|
:key k
|
|
:exception e)
|
|
(mark-error k)
|
|
[]))))
|
|
(into []))]
|
|
(audit-transact transaction {:user/name "sysco importer" :user/role "admin"})
|
|
(log/info ::success
|
|
:count (count transaction)
|
|
:sample (take 3 transaction)))
|
|
(doseq [k keys]
|
|
(mark-key k)))))
|
|
|
|
|
|
(defn -main [& _]
|
|
(execute "ntg" import-ntg-invoices))
|