(ns auto-ap.jobs.ntg (:require [amazonica.aws.s3 :as s3] [auto-ap.datomic :refer [audit-transact conn]] [auto-ap.jobs.core :refer [execute]] [auto-ap.logging :as log] [auto-ap.parse :as parse] [auto-ap.time :as atime] [clj-time.coerce :as coerce] [clj-time.core :as time] [clojure.data.csv :as csv] [clojure.java.io :as io] [clojure.string :as str] [clojure.xml :as xml] [clojure.zip :as zip] [datomic.api :as d] [iol-ion.tx :refer [random-tempid]]) (:import (java.util UUID))) (def bucket-name "data.prod.app.integreatconsult.com" #_(:data-bucket env)) (defn read-csv [stream] (-> stream io/reader csv/read-csv)) (defn read-xml [stream] (-> (slurp stream) (.getBytes) (java.io.ByteArrayInputStream. ) xml/parse zip/xml-zip)) (defn mark-key [k] (s3/copy-object {:source-bucket-name bucket-name :destination-bucket-name bucket-name :destination-key (str/replace-first k "pending" "imported") :source-key k}) #_(s3/delete-object {:bucket-name bucket-name :key k})) (defn is-csv-file? [x] (= "dat" (last (str/split x #"[\\.]")))) (defn decipher-source [k] (cond (and (str/includes? k "Cintas") (str/includes? k "zcic")) :cintas (and (str/includes? k "GeneralProduce") (str/includes? k "FRANCHISEE") (is-csv-file? k)) :general-produce :else :unknown)) (defmulti extract-invoice-details (fn [key input-stream clients] (decipher-source key))) (defmethod extract-invoice-details :general-produce [k input-stream clients] (log/info ::parsing-general-produce :key k) (let [missing-client-hints (atom #{})] (try (->> (read-csv input-stream) (drop 1) #_(filter (fn [[_ _ _ _ _ _ _ _ _ _ _ break-flag]] (= "Y" break-flag))) (map (fn [[_ location-hint invoice-number ship-date invoice-total ]] (let [matching-client (and location-hint (parse/exact-match clients location-hint)) location (parse/best-location-match matching-client location-hint location-hint ) vendor (d/pull (d/db conn) '[:vendor/default-account] :vendor/general-produce)] (when-not (and matching-client (not (@missing-client-hints location-hint)) (not (str/blank? location-hint))) (log/warn ::missing-client :client-hint location-hint :invoice-number invoice-number :invoice-total invoice-total) (swap! missing-client-hints conj location-hint)) {:db/id (random-tempid) :invoice/location location :invoice/date (coerce/to-date (atime/parse ship-date atime/normal-date)) :invoice/invoice-number invoice-number :invoice/total (Double/parseDouble invoice-total) :invoice/vendor :vendor/general-produce :invoice/outstanding-balance (Double/parseDouble invoice-total) :invoice/client (:db/id matching-client) :invoice/import-status :import-status/imported :invoice/status :invoice-status/unpaid :invoice/client-identifier location-hint :invoice/expense-accounts [{:invoice-expense-account/account (-> vendor :vendor/default-account :db/id) :invoice-expense-account/location location :invoice-expense-account/amount (Math/abs (Double/parseDouble invoice-total)) :db/id (random-tempid) }]}))) (filter :invoice/client) (reduce (fn [[seen-so-far list] i] (let [k [(:invoice/invoice-number i) (:invoice/client i)]] (if (seen-so-far k) [seen-so-far list] [(conj seen-so-far k) (conj list i)]))) [#{} []]) (second) ) (catch Exception e (log/error ::cant-import-general-produce :error e) [])))) (defmethod extract-invoice-details :unknown [k input-stream clients] (log/warn ::unknown-invoice-format :key k) []) (defn zip-seq [zipper] (->> (zip/xml-zip (zip/node zipper)) (iterate zip/next ) (take-while (complement zip/end?)))) (defmethod extract-invoice-details :cintas [k input-stream clients] (log/info ::parsing-cintas :key k) (let [vendor (d/pull (d/db conn) '[:vendor/default-account] :vendor/cintas) top (read-xml input-stream) node-seq (->> top (iterate zip/next) (take-while (complement zip/end?))) location-hint (->> node-seq (filter (fn [z] (= (:tag (zip/node z)) :InvoiceDetailShipping))) first zip-seq (map zip/node) (filter (fn [node] (= :Street (:tag node)))) first :content first) matching-client (and location-hint (parse/exact-match clients location-hint))] (if matching-client (let [invoice-date (->> node-seq (map zip/node) (filter (fn [node] (= (:tag node) :InvoiceDetailRequestHeader))) first (#(-> % :attrs :invoiceDate coerce/to-date-time atime/localize (atime/unparse atime/iso-date) (atime/parse atime/iso-date)))) location (parse/best-location-match matching-client location-hint location-hint ) due (-> invoice-date (time/plus (time/days 30)) (coerce/to-date)) total (->> node-seq (filter (fn [zipper] (= (:tag (zip/node zipper)) :NetAmount))) first zip-seq (map zip/node) (filter (fn [node] (= :Money (:tag node)))) first :content first Double/parseDouble) invoice {:db/id (random-tempid ) :invoice/vendor :vendor/cintas :invoice/import-status :import-status/imported :invoice/status :invoice-status/unpaid :invoice/location location :invoice/client-identifier location-hint :invoice/client (:db/id matching-client) :invoice/total total :invoice/outstanding-balance total :invoice/invoice-number (->> node-seq (map zip/node) (filter (fn [node] (= (:tag node) :InvoiceDetailRequestHeader))) first (#(-> % :attrs :invoiceID))) :invoice/due due :invoice/scheduled-payment (when-not ((into #{} (->> matching-client :client/feature-flags)) "manually-pay-cintas") due) :invoice/date (coerce/to-date invoice-date) :invoice/expense-accounts [{:invoice-expense-account/account (-> vendor :vendor/default-account :db/id) :invoice-expense-account/location location :invoice-expense-account/amount (Math/abs total) :db/id (random-tempid) }]}] (log/info ::cintas-invoice-importing :invoice invoice) [invoice]) (do ;; disabling logging for cintas #_(log/warn ::missing-client :client-hint location-hint) [])))) (defn mark-error [k] (s3/copy-object {:source-bucket-name bucket-name :destination-bucket-name bucket-name :source-key k :destination-key (str "ntg-invoices/error/" (.getName (io/file k)))})) (defn copy-readable-version [k] (let [invoice-key (str "invoice-files/" (UUID/randomUUID) "." (last (str/split k #"[\\.]")))] (log/info ::assigned-random-key :key k :invoice-key invoice-key) (s3/copy-object {:source-bucket-name bucket-name :destination-bucket-name bucket-name :source-key k :destination-key invoice-key }) invoice-key)) (defn get-all-keys ([] (let [first-page-result (s3/list-objects-v2 {:bucket-name bucket-name :prefix "ntg-invoices/pending"})] (lazy-seq (concat (:object-summaries first-page-result) (get-all-keys (:next-continuation-token first-page-result)))))) ([next-token ] (when next-token (let [page-result (s3/list-objects-v2 {:bucket-name bucket-name :prefix "ntg-invoices/pending" :continuation-token next-token})] (println "getting next page " next-token) (when (seq (:object-summaries page-result)) (lazy-seq (concat (:object-summaries page-result) (get-all-keys (:next-continuation-token page-result))))))))) (defn recent? [k] (time/after? (:last-modified k) (time/plus (time/now) (time/days -15))) ) (defn import-ntg-invoices ([] (import-ntg-invoices (->> (get-all-keys) (filter recent?) (map :key)))) ([keys] (let [clients (map first (d/q '[:find (pull ?c [:client/code :db/id :client/feature-flags {:client/location-matches [:location-match/matches :location-match/location]} :client/name :client/matches :client/locations]) :where [?c :client/code]] (d/db conn)))] (log/info ::found-invoice-keys :keys keys ) (let [transaction (->> keys (mapcat (fn [k] (try (let [invoice-key (copy-readable-version k) invoice-url (str "https://" bucket-name "/" invoice-key)] (with-open [is (-> (s3/get-object {:bucket-name bucket-name :key k}) :input-stream)] (->> (extract-invoice-details k is clients) (set) (map (fn [i] (log/info ::importing-invoice :invoice i) i)) (mapv (fn [i] (if (= :vendor/cintas (:invoice/vendor i)) [:propose-invoice (assoc i :invoice/source-url invoice-url)] [:propose-invoice i])))))) (catch Exception e (log/error ::cant-load-file :key k :exception e) (mark-error k) [])))) (into []))] (doseq [t transaction] (audit-transact [t] {:user/name "sysco importer" :user/role "admin"})) (log/info ::success :count (count transaction) :sample (take 3 transaction))) (doseq [k keys] (mark-key k))))) (defn -main [& _] (execute "ntg" import-ntg-invoices))