(ns auto-ap.jobs.sysco (:require [amazonica.aws.s3 :as s3] [auto-ap.datomic :refer [conn]] [auto-ap.jobs.core :refer [execute]] [auto-ap.datomic :refer [audit-transact]] [auto-ap.datomic.clients :as d-clients] [auto-ap.datomic.invoices :refer [code-invoice]] [auto-ap.parse :as parse] [auto-ap.time :as t] [clj-time.coerce :as coerce] [clojure.data.csv :as csv] [clojure.java.io :as io] [com.brunobonacci.mulog :as mu] [clojure.string :as str] [clojure.tools.logging :as log] [com.unbounce.dogstatsd.core :as statsd] [config.core :refer [env]] [datomic.api :as dc] [auto-ap.datomic.vendors :as d-vendors]) (:import (java.util UUID))) (def bucket-name (:data-bucket env)) (def header-keys ["TransCode" "GroupID" "Company" "CustomerNumber" "InvoiceNumber" "RecordType" "Item" "InvoiceDocument" "AccountName" "AccountDunsNo" "InvoiceDate" "AccountDate" "CustomerPONo" "PaymentTerms" "TermsDescription" "StoreNumber" "CustomerName" "AddressLine1" "AddressLine2" "City1" "State1" "Zip1" "Phone1" "Duns1" "Hin1" "Dea1" "TIDCustomer" "ChainNumber" "BidNumber" "ContractNumber" "CompanyNumber" "BriefName" "Address" "Address2" "City2" "State2" "Zip2" "Phone2" "Duns2" "Hin2" "Dea2" "Tid_OPCO" "ObligationIndicator" "Manifest" "Route" "Stop" "TermsDiscountPercent" "TermsDiscountDueDate" "TermsNetDueDate" "TermsDiscountAmount" "TermsDiscountCode" "OrderDate" "DepartmentCode"]) (def summary-keys ["TranCode" "GroupID" "Company" "CustomerNumber" "InvoiceNumber" "RecordType" "Item" "InvoiceDocument" "TotalLines" "TotalQtyInvoice" "TotalQty" "TotalQtySplit" "TotalQtyPounds" "TotalExtendedPrice" "TotalTaxAmount" "TotalInvoiceAmount" "AccountDate"]) (defn get-sysco-vendor [] (let [db (dc/db conn)] (-> (dc/q '[:find (pull ?v r) :in $ r :where [?v :vendor/name "Sysco"]] db d-vendors/default-read) first first))) (defn read-sysco-csv [k] (-> (s3/get-object {:bucket-name bucket-name :key k}) :input-stream io/reader csv/read-csv)) (defn extract-invoice-details [csv-rows sysco-vendor] (let [[header-row & csv-rows] csv-rows header-row (into {} (map vector header-keys header-row)) summary-row (->> csv-rows (filter (fn [[_ _ _ _ _ row-type]] (= row-type "SUM"))) first) summary-row (into {} (map vector summary-keys summary-row)) customer-identifier (header-row "CustomerName") account-number (header-row "CustomerNumber") location-hint (str/join " " [(header-row "CustomerName") (header-row "Address") (header-row "Address2") (header-row "AddressLine1") (header-row "AddressLine2") (header-row "City1") (header-row "City2")]) account-number (some-> account-number Long/parseLong str) matching-client (and account-number (d-clients/exact-match account-number)) _ (when-not matching-client (throw (ex-info "cannot find matching client" {:account-number account-number :name customer-identifier}))) total (Double/parseDouble (summary-row "TotalExtendedPrice")) tax (Double/parseDouble (summary-row "TotalTaxAmount")) date (t/parse (header-row "InvoiceDate") "yyMMdd")] (log/infof "Importing %s for %s" (header-row "InvoiceNumber") (header-row "CustomerName")) (cond-> #:invoice {:invoice-number (header-row "InvoiceNumber") :total (+ total tax) :outstanding-balance (+ total tax) :location (parse/best-location-match (dc/pull (dc/db conn) [{:client/location-matches [:location-match/location :location-match/matches]} :client/default-location :client/locations] matching-client) location-hint location-hint ) :date (coerce/to-date date) :vendor (:db/id sysco-vendor ) :client (:db/id matching-client) :import-status :import-status/completed :status :invoice-status/unpaid :client-identifier customer-identifier} true (code-invoice)))) (defn mark-key [k] (s3/copy-object {:source-bucket-name bucket-name :destination-bucket-name bucket-name :destination-key (str/replace-first k "pending" "imported") :source-key k}) (s3/delete-object {:bucket-name bucket-name :key k})) #_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]} (defn unmark-key [k] (s3/copy-object {:source-bucket-name bucket-name :destination-bucket-name bucket-name :destination-key (str/replace-first k "imported" "pending") :source-key k}) (s3/delete-object {:bucket-name bucket-name :key k})) (defn import-sysco [] (let [sysco-vendor (get-sysco-vendor) keys (->> (s3/list-objects-v2 {:bucket-name bucket-name :prefix "sysco/pending"}) :object-summaries (map :key))] (log/infof "Found %d sysco invoice to import: %s" (count keys) (pr-str keys)) (let [transaction (->> keys (mapcat (fn [k] (try (let [invoice-key (str "invoice-files/" (UUID/randomUUID) ".csv") ; invoice-url (str "http://" (:data-bucket env) ".s3-website-us-east-1.amazonaws.com/" invoice-key)] (s3/copy-object {:source-bucket-name (:data-bucket env) :destination-bucket-name (:data-bucket env) :source-key k :destination-key invoice-key}) [[:propose-invoice (-> k read-sysco-csv (extract-invoice-details sysco-vendor) (assoc :invoice/source-url invoice-url))]]) (catch Exception e (log/error (str "Cannot load file " k) e) (log/info (s3/copy-object {:source-bucket-name (:data-bucket env) :destination-bucket-name (:data-bucket env) :source-key k :destination-key (str "sysco/error/" (.getName (io/file k)))})) []))))) result (audit-transact transaction {:user/name "sysco importer" :user/role "admin"})]) (doseq [k keys] (mark-key k)))) (defn -main [& _] (execute "sysco" import-sysco))