(ns auto-ap.jobs.sysco (:require [amazonica.aws.s3 :as s3] [auto-ap.datomic :refer [conn]] [auto-ap.datomic :refer [audit-transact pull-attr random-tempid]] [auto-ap.datomic.clients :as d-clients] [auto-ap.datomic.invoices :refer [code-invoice]] [auto-ap.datomic.vendors :as d-vendors] [auto-ap.jobs.core :refer [execute]] [auto-ap.logging :as alog] [auto-ap.parse :as parse] [auto-ap.time :as t] [auto-ap.utils :refer [dollars=]] [clj-time.coerce :as coerce] [clojure.data.csv :as csv] [clojure.java.io :as io] [clojure.string :as str] [config.core :refer [env]] [datomic.api :as dc]) (:import (java.util UUID))) (def sysco-name->line (atom nil)) (defn get-sysco->line [] (when (nil? @sysco-name->line) (reset! sysco-name->line (with-open [data (io/reader (io/resource "sysco_line_item_mapping.csv"))] (let [data (csv/read-csv data)] (->> data (drop 1) (map (fn [[_ _ name _ account-number]] [name (ffirst (dc/q '[:find ?a :in $ ?an :where [?a :account/numeric-code ?an]] (dc/db conn) (Long/parseLong account-number)))])) (into {})))))) @sysco-name->line) (defn get-line-account [item-name] (get (get-sysco->line) item-name (ffirst (dc/q '[:find ?a :in $ ?an :where [?a :account/numeric-code ?an]] (dc/db conn) 50000)))) (def ^:dynamic bucket-name (:data-bucket env)) (def header-keys ["TransCode" "GroupID" "Company" "CustomerNumber" "InvoiceNumber" "RecordType" "Item" "InvoiceDocument" "AccountName" "AccountDunsNo" "InvoiceDate" "AccountDate" "CustomerPONo" "PaymentTerms" "TermsDescription" "StoreNumber" "CustomerName" "AddressLine1" "AddressLine2" "City1" "State1" "Zip1" "Phone1" "Duns1" "Hin1" "Dea1" "TIDCustomer" "ChainNumber" "BidNumber" "ContractNumber" "CompanyNumber" "BriefName" "Address" "Address2" "City2" "State2" "Zip2" "Phone2" "Duns2" "Hin2" "Dea2" "Tid_OPCO" "ObligationIndicator" "Manifest" "Route" "Stop" "TermsDiscountPercent" "TermsDiscountDueDate" "TermsNetDueDate" "TermsDiscountAmount" "TermsDiscountCode" "OrderDate" "DepartmentCode"]) (def item-price-index 15) (def item-name-index 29) (def summary-keys ["TranCode" "GroupID" "Company" "CustomerNumber" "InvoiceNumber" "RecordType" "Item" "InvoiceDocument" "TotalLines" "TotalQtyInvoice" "TotalQty" "TotalQtySplit" "TotalQtyPounds" "TotalExtendedPrice" "TotalTaxAmount" "TotalInvoiceAmount" "AccountDate"]) (defn get-sysco-vendor [] (let [db (dc/db conn)] (-> (dc/q '[:find (pull ?v r) :in $ r :where [?v :vendor/name "Sysco"]] db d-vendors/default-read) first first))) (defn read-sysco-csv [k] (-> (s3/get-object {:bucket-name bucket-name :key k}) :input-stream io/reader csv/read-csv)) (defn check-okay-amount? [i] (dollars= (:invoice/total i) (reduce + 0.0 (map :invoice-expense-account/amount (:invoice/expense-accounts i))))) (defn code-individual-items [invoice csv-rows tax] (let [items (->> csv-rows butlast (reduce (fn [acc row] (update acc (get-line-account (nth row item-name-index)) (fnil + 0.0) (Double/parseDouble (nth row item-price-index)))) {})) items-with-tax (update items (get-line-account "TAX") (fnil + 0.0) tax) updated-invoice (assoc invoice :invoice/expense-accounts (for [[account amount] items-with-tax] #:invoice-expense-account {:db/id (random-tempid) :account account :location (:invoice/location invoice) :amount amount}))] (if (check-okay-amount? updated-invoice) updated-invoice (do (alog/warn ::itemized-expenses-not-adding-up :invoice updated-invoice) invoice)))) (defn code-invoices-list-items [invoice] (with-precision 2 (let [line-items (:line-items invoice) invoice-total (if (string? (:invoice/total invoice)) (Double/parseDouble (:invoice/total invoice)) (:invoice/total invoice)) abs-total (Math/abs invoice-total) expense-accounts (reduce (fn [acc {:keys [description amount]}] (let [account (get-line-account description)] (update acc account (fnil + 0.0) amount))) {} line-items) total-line-amount (reduce + 0.0 (vals expense-accounts)) leftover (- abs-total total-line-amount) food-cost-account (get-line-account nil) adjusted-accounts (if (zero? leftover) expense-accounts (update expense-accounts food-cost-account (fnil + 0.0) leftover)) accounts (vec (for [[account amount] adjusted-accounts] #:invoice-expense-account {:db/id (random-tempid) :account account :location (:invoice/location invoice) :amount (with-precision 2 (double (.setScale (bigdec amount) 2 java.math.RoundingMode/HALF_UP)))}))] (dissoc (assoc invoice :invoice/expense-accounts accounts) :line-items)))) (defn maybe-code-line-items [invoice] (if (and (seq (:line-items invoice)) (= "Sysco" (-> (dc/pull (dc/db conn) [:vendor/name] (:invoice/vendor invoice)) :vendor/name))) (code-invoices-list-items invoice) (dissoc invoice :line-items))) (defn extract-invoice-details [csv-rows sysco-vendor] (let [[header-row & csv-rows] csv-rows header-row (into {} (map vector header-keys header-row)) summary-row (->> csv-rows (filter (fn [[_ _ _ _ _ row-type]] (= row-type "SUM"))) first) summary-row (into {} (map vector summary-keys summary-row)) customer-identifier (header-row "CustomerName") account-number (header-row "CustomerNumber") location-hint (str/join " " [(header-row "CustomerName") (header-row "Address") (header-row "Address2") (header-row "AddressLine1") (header-row "AddressLine2") (header-row "City1") (header-row "City2")]) account-number (some-> account-number Long/parseLong str) matching-client (and account-number (d-clients/exact-match account-number)) _ (when-not matching-client (throw (ex-info "cannot find matching client" {:account-number account-number :name customer-identifier}))) code-items (get (into #{} (pull-attr (dc/db conn) :client/feature-flags (:db/id matching-client))) "code-sysco-items") total (Double/parseDouble (summary-row "TotalExtendedPrice")) tax (Double/parseDouble (summary-row "TotalTaxAmount")) date (t/parse (header-row "InvoiceDate") "yyMMdd")] (alog/info ::importing :account-number account-number :invoice-number (header-row "InvoiceNumber") :customer-name (header-row "CustomerName")) (cond-> #:invoice {:invoice-number (header-row "InvoiceNumber") :db/id (random-tempid) :total (+ total tax) :outstanding-balance (+ total tax) :location (parse/best-location-match (dc/pull (dc/db conn) [{:client/location-matches [:location-match/location :location-match/matches]} :client/default-location :client/locations] (:db/id matching-client)) location-hint location-hint) :date (coerce/to-date date) :vendor (:db/id sysco-vendor) :client (:db/id matching-client) :import-status :import-status/imported :status :invoice-status/unpaid :client-identifier customer-identifier} true (code-invoice) code-items (code-individual-items csv-rows tax)))) (defn mark-key [k] (s3/copy-object {:source-bucket-name bucket-name :destination-bucket-name bucket-name :destination-key (str/replace-first k "pending" "imported") :source-key k}) (s3/delete-object {:bucket-name bucket-name :key k})) #_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]} (defn unmark-key [k] (s3/copy-object {:source-bucket-name bucket-name :destination-bucket-name bucket-name :destination-key (str/replace-first k "imported" "pending") :source-key k}) (s3/delete-object {:bucket-name bucket-name :key k})) (defn get-test-invoice-file ([] (get-test-invoice-file 999)) ([i] (nth (->> (s3/list-objects-v2 {:bucket-name "data.prod.app.integreatconsult.com" :prefix "sysco/imported"}) :object-summaries (map :key)) i))) (comment (with-bindings {#'bucket-name "data.prod.app.integreatconsult.com"} (doall (for [n (range 930 940) :let [result (-> (get-test-invoice-file n) read-sysco-csv (extract-invoice-details (get-sysco-vendor)))] #_#_:when (not (check-okay-amount? result))] result))) (with-bindings {#'bucket-name "data.prod.app.integreatconsult.com"} (let [result (-> "sysco/error/SYSCO050_00175962_20241010122639019.csv" read-sysco-csv (extract-invoice-details (get-sysco-vendor)))] result))) (defn import-sysco [] (let [sysco-vendor (get-sysco-vendor) keys (->> (s3/list-objects-v2 {:bucket-name bucket-name :prefix "sysco/pending"}) :object-summaries (map :key))] (alog/info ::importing-sysco :count (count keys) :keys (pr-str keys)) (let [transaction (->> keys (mapcat (fn [k] (try (let [invoice-key (str "invoice-files/" (UUID/randomUUID) ".csv") ; invoice-url (str "https://" (:data-bucket env) "/" invoice-key)] (s3/copy-object {:source-bucket-name (:data-bucket env) :destination-bucket-name (:data-bucket env) :source-key k :destination-key invoice-key}) [[:propose-invoice (-> k read-sysco-csv (extract-invoice-details sysco-vendor) (assoc :invoice/source-url invoice-url))]]) (catch Exception e (alog/error ::cant-load-file :file k :error e e) (s3/copy-object {:source-bucket-name (:data-bucket env) :destination-bucket-name (:data-bucket env) :source-key k :destination-key (str "sysco/error/" (.getName (io/file k)))}) []))))) result (audit-transact transaction {:user/name "sysco importer" :user/role "admin"})]) (doseq [k keys] (mark-key k)))) (defn -main [& _] (execute "sysco" import-sysco))