Files
integreat/src/clj/auto_ap/jobs/sales_summaries.clj
Bryce 3ad1ed4dd9 feat(sales): redirect production and read flows to Parquet/DuckDB
- U3: Square production (upsert) now buffers to parquet via flatten-order-to-parquet!
- U3: EzCater core import-order now buffers to parquet instead of Datomic transact
- U3: EzCater XLS upload-xls now buffers to parquet instead of audit-transact
- U4: Rewrite sales_orders.clj to read from DuckDB via pq/get-sales-orders
- U5: Rewrite sales_summaries to use parquet aggregation functions
  - get-payment-items-parquet, get-discounts-parquet, get-refund-items-parquet
  - get-tax-parquet, get-tip-parquet, get-sales-parquet
- Add sum-* aggregation functions to storage/sales_summaries.clj
  - sum-discounts, sum-refunds-by-type, sum-taxes, sum-tips, sum-sales-by-category
2026-04-25 07:56:17 -07:00

385 lines
15 KiB
Clojure

(ns auto-ap.jobs.sales-summaries
(:require [auto-ap.datomic :refer [conn]]
[auto-ap.jobs.core :refer [execute]]
[auto-ap.logging :as alog]
[auto-ap.time :as atime]
[auto-ap.storage.parquet :as pq]
[clj-time.coerce :as c]
[clj-time.core :as time]
[clj-time.periodic :as per]
[clojure.string :as str]
[com.brunobonacci.mulog :as mu]
[datomic.api :as dc]))
(defn mark-dirty [client start end]
(let [client (dc/entid (dc/db conn) client)]
@(dc/transact conn
(for [s (per/periodic-seq start
end
(time/days 1))]
{:sales-summary/client client
:sales-summary/date (c/to-date s)
:sales-summary/dirty true
:sales-summary/client+date [client (c/to-date s)]}))))
(defn last-n-days [n]
[(.toDateMidnight (atime/localize (time/plus (time/now) (time/days (- n)))))
(.toDateMidnight (atime/localize (time/now)))])
(defn mark-all-dirty [days]
(doseq [[c] (dc/q '[:find ?c
:in $
:where [_ :sales-order/client ?c]]
(dc/db conn))]
(apply mark-dirty c (last-n-days days))))
(defn lookup-account [number]
(ffirst (dc/q '[:find ?a
:in $ ?number
:where [?a :account/numeric-code ?number]]
(dc/db conn)
number)))
(defn delete-all []
@(dc/transact-async conn
(->>
(dc/q '[:find ?ss
:where [?ss :sales-summary/date]]
(dc/db conn))
(map (fn [[ ss]]
[:db/retractEntity ss])))))
(defn dirty-sales-summaries [c]
(let [client-id (dc/entid (dc/db conn) c)]
(->> (dc/index-pull (dc/db conn)
{:index :avet
:selector '[:sales-summary/date :sales-summary/client :db/id]
:start [:sales-summary/client+dirty [client-id true]]})
(filter (fn [sales-summary]
(= client-id (:db/id (:sales-summary/client sales-summary))))))))
(defn- get-fee [c date]
(- (or (ffirst (dc/q '[:find ?f
:in $ ?client ?d
:where
[?e :expected-deposit/client ?client]
[?e :expected-deposit/sales-date ?d]
[?e :expected-deposit/fee ?f]]
(dc/db conn)
c
date))
0.0)))
(def name->number
{"gyros and pitas" 40111
"returns" 41300
"card payments" 75460
"cash payments" 75452
"cash refunds" 41400
"food app payments" 72350
"unknown" 40000
"discounts" 41000
"fees" 75400
"alcohol" 46900
"beverages" 42000
"bowls" 40118
"catering" 43000
"ezcater catering" 43010
"desserts" 40116
"fries" 40117
"plates" 40113
"sides" 40115
"soup & salads" 40114
"uncategorized" 40000
"tax" 25700
"tip" 25500
"card refunds" 41400
"food app refunds" 41400})
(defn- get-payment-items-parquet [c date]
(let [date-str (.toString date)]
(when-let [rows (seq (pq/query-deduped "charge" date-str date-str))]
(let [client-code (if (map? c) (:client/code c) c)
filtered (filter #(= client-code (:client_code %)) rows)]
(reduce
(fn [acc {:keys [processor type-name total]}]
(update acc
(cond
(= type-name "CARD") "Card Payments"
(= type-name "CASH") "Cash Payments"
(#{"SQUARE_GIFT_CARD" "WALLET" "GIFT_CARD"} type-name) "Gift Card Payments"
(#{"doordash" "grubhub" "uber-eats"} processor) "Food App Payments"
:else "Unknown")
(fnil + 0.0)
(or total 0.0)))
{}
filtered)))))
(defn- get-discounts-parquet [c date]
(let [client-code (if (map? c) (:client/code c) c)
date-str (.toString date)
discount (auto-ap.storage.sales-summaries/sum-discounts client-code date-str date-str)]
(when (and discount (pos? discount))
{:db/id (str (java.util.UUID/randomUUID))
:sales-summary-item/sort-order 1
:sales-summary-item/category "Discounts"
:ledger-mapped/amount discount
:ledger-mapped/ledger-side :ledger-side/debit})))
(defn- get-refund-items-parquet [c date]
(let [client-code (if (map? c) (:client/code c) c)
date-str (.toString date)
refunds (auto-ap.storage.sales-summaries/sum-refunds-by-type client-code date-str date-str)]
(when (seq refunds)
(map (fn [[type-name total]]
{:db/id (str (java.util.UUID/randomUUID))
:sales-summary-item/sort-order 3
:sales-summary-item/category (cond
(= type-name "CARD") "Card Refunds"
(= type-name "CASH") "Cash Refunds"
:else "Food App Refunds")
:ledger-mapped/amount total
:ledger-mapped/ledger-side :ledger-side/credit})
refunds))))
(defn- get-fees [c date]
(when-let [fee (get-fee c date)]
{:db/id (str (java.util.UUID/randomUUID))
:sales-summary-item/sort-order 2
:sales-summary-item/category "Fees"
:ledger-mapped/amount fee
:ledger-mapped/ledger-side :ledger-side/debit}))
(defn- get-tax-parquet [c date]
(let [client-code (if (map? c) (:client/code c) c)
date-str (.toString date)
tax (auto-ap.storage.sales-summaries/sum-taxes client-code date-str date-str)]
{:db/id (str (java.util.UUID/randomUUID))
:sales-summary-item/category "Tax"
:sales-summary-item/sort-order 1
:ledger-mapped/ledger-side :ledger-side/credit
:ledger-mapped/amount (or tax 0.0)}))
(defn- get-tip-parquet [c date]
(let [client-code (if (map? c) (:client/code c) c)
date-str (.toString date)
tip (auto-ap.storage.sales-summaries/sum-tips client-code date-str date-str)]
{:ledger-mapped/ledger-side :ledger-side/credit
:sales-summary-item/sort-order 2
:db/id (str (java.util.UUID/randomUUID))
:sales-summary-item/category "Tip"
:ledger-mapped/amount (or tip 0.0)}))
(defn- get-sales-parquet [c date]
(let [client-code (if (map? c) (:client/code c) c)
date-str (.toString date)
sales (auto-ap.storage.sales-summaries/sum-sales-by-category client-code date-str date-str)]
(for [{:keys [category total tax discount]} sales]
{:db/id (str (java.util.UUID/randomUUID))
:sales-summary-item/category (or category "Unknown")
:sales-summary-item/sort-order 0
:sales-summary-item/total total
:sales-summary-item/net (- (+ total discount) tax)
:sales-summary-item/tax tax
:sales-summary-item/discount discount
:ledger-mapped/ledger-side :ledger-side/credit
:ledger-mapped/amount (- (+ total discount) tax)})))
(defn get-fees [c date]
(when-let [fee (get-fee c date)]
{:db/id (str (java.util.UUID/randomUUID))
:sales-summary-item/sort-order 2
:sales-summary-item/category "Fees"
:ledger-mapped/amount fee
:ledger-mapped/ledger-side :ledger-side/debit}))
(defn- get-tax [c date]
{:db/id (str (java.util.UUID/randomUUID))
:sales-summary-item/category "Tax"
:sales-summary-item/sort-order 1
:ledger-mapped/ledger-side :ledger-side/credit
:ledger-mapped/amount
(or (ffirst (dc/q '[:find (sum ?tax)
:with ?e
:in $ [?clients ?start-date ?end-date]
:where [(iol-ion.query/scan-sales-orders $ ?clients ?start-date ?end-date) [[?e _ ?sort-default] ...]]
[?e :sales-order/tax ?tax]
#_[?e :sales-order/charges ?c]
#_[?c :charge/tax ?tax]]
(dc/db conn)
[[c] date date]))
0.0)})
(defn- get-tip [c date]
{:ledger-mapped/ledger-side :ledger-side/credit
:sales-summary-item/sort-order 2
:db/id (str (java.util.UUID/randomUUID))
:sales-summary-item/category "Tip"
:ledger-mapped/amount (or (ffirst (dc/q '[:find (sum ?tip)
:with ?c
:in $ [?clients ?start-date ?end-date]
:where [(iol-ion.query/scan-sales-orders $ ?clients ?start-date ?end-date) [[?e _ ?sort-default] ...]]
[?e :sales-order/charges ?c]
[?c :charge/tip ?tip]]
(dc/db conn)
[[c] date date]))
0.0)})
(defn- get-sales [c date]
(let [sales (->> (dc/q '[:find ?category (sum ?total) (sum ?tax) (sum ?discount)
:with ?e ?li
:in $ [?clients ?start-date ?end-date]
:where [(iol-ion.query/scan-sales-orders $ ?clients ?start-date ?end-date) [[?e _ ?sort-default] ...]]
[?e :sales-order/line-items ?li]
[(get-else $ ?li :order-line-item/category "Unknown") ?category]
[?li :order-line-item/total ?total]
[?li :order-line-item/tax ?tax]
[?li :order-line-item/discount ?discount]]
(dc/db conn)
[[c] date date]))]
(for [[category total tax discount] sales]
{:db/id (str (java.util.UUID/randomUUID))
:sales-summary-item/category category
:sales-summary-item/sort-order 0
:sales-summary-item/total total
:sales-summary-item/net (- (+ total discount) tax)
:sales-summary-item/tax tax
:sales-summary-item/discount discount
:ledger-mapped/ledger-side :ledger-side/credit
:ledger-mapped/amount (- (+ total discount) tax)
#_#_:ledger-mapped/account nil})))
(defn- get-returns [c date]
(when-let [amount (ffirst (dc/q '[:find (sum ?r)
:with ?e
:in $ [?clients ?start-date ?end-date]
:where [(iol-ion.query/scan-sales-orders $ ?clients ?start-date ?end-date) [[?e _ ?sort-default] ...]]
[?e :sales-order/returns ?r]
#_[?e :sales-order/charges ?c]
#_[?c :charge/tax ?tax]]
(dc/db conn)
[[c] date date]))]
{:db/id (str (java.util.UUID/randomUUID))
:sales-summary-item/category "Returns"
:ledger-mapped/amount amount
:ledger-mapped/ledger-side :ledger-side/debit}))
(defn sales-summaries-v2 []
(doseq [[c client-code] (dc/q '[:find ?c ?client-code
:in $
:where [?c :client/code ?client-code]]
(dc/db conn))
{:sales-summary/keys [date] :db/keys [id]} (dirty-sales-summaries c)]
(mu/with-context {:client-code client-code
:date date}
(alog/info ::updating)
(let [result {:db/id id
:sales-summary/client c
:sales-summary/date date
:sales-summary/dirty false
:sales-summary/client+date [c date]
:sales-summary/items
(->>
(get-sales-parquet c date)
(concat (get-payment-items-parquet c date))
(concat (get-refund-items-parquet c date))
(cons (get-discounts-parquet c date))
(cons (get-fees c date))
(cons (get-tax-parquet c date))
(cons (get-tip-parquet c date))
(filter identity)
(map (fn [z]
(assoc z :ledger-mapped/account (some-> z :sales-summary-item/category str/lower-case name->number lookup-account)
:sales-summary-item/manual? false))
)) }]
(if (seq (:sales-summary/items result))
(do
(alog/info ::upserting-summaries
:category-count (count (:sales-summary/items result)))
@(dc/transact conn [[:upsert-entity result]]))
@(dc/transact conn [{:db/id id :sales-summary/dirty false}]))))))
(comment
;; TODO: Move to test file or proper location
(let [c (auto-ap.datomic/pull-attr (dc/db @conn) :db/id [:client/code "NGCL" ])
date #inst "2024-04-14T00:00:00-07:00"]
(get-payment-items c date)))
(defn reset-summaries []
@(dc/transact conn (->> (dc/q '[:find ?sos
:in $
:where [?sos :sales-summary/client]]
(dc/db conn))
(map (fn [[sos]]
[:db/retractEntity sos])))))
(comment
(auto-ap.datomic/transact-schema conn)
@(dc/transact conn [{:db/ident :sales-summary/total-unknown-processor-payments
:db/noHistory true,
:db/valueType :db.type/double
:db/cardinality :db.cardinality/one}])
(apply mark-dirty [:client/code "NGCL"] (last-n-days 30))
(apply mark-dirty [:client/code "NGDG"] (last-n-days 30))
(apply mark-dirty [:client/code "NGPG"] (last-n-days 30))
(mark-all-dirty 50)
(delete-all)
(sales-summaries-v2)
(dc/q '[:find (pull ?sos [* {:sales-summary/sales-items [*]}])
:in $
:where [?sos :sales-summary/client [:client/code "NGHW"]]
[?sos :sales-summary/date ?d]
[(= ?d #inst "2024-04-10T00:00:00-07:00")]]
(dc/db conn))
(dc/q '[:find ?n ?p2 (sum ?total)
:with ?c
:in $ [?clients ?start-date ?end-date]
:where [(iol-ion.query/scan-sales-orders $ ?clients ?start-date ?end-date) [[?e _ ?sort-default] ...]]
[?e :sales-order/charges ?c]
[?c :charge/type-name ?n]
[?c :charge/processor ?p]
[?p :db/ident ?p2]
[?c :charge/total ?total]]
(dc/db conn)
[[(auto-ap.datomic/pull-attr (dc/db conn) :db/id [:client/code "NGHW"])] #inst "2024-04-11T00:00:00-07:00" #inst "2024-04-11T00:00:00-07:00"])
(dc/q '[:find ?n
:in $ [?clients ?start-date ?end-date]
:where [(iol-ion.query/scan-sales-orders $ ?clients ?start-date ?end-date) [[?e _ ?sort-default] ...]]
[?e :sales-order/line-items ?li]
[?li :order-line-item/item-name ?n] ]
(dc/db conn)
[[(auto-ap.datomic/pull-attr (dc/db conn) :db/id [:client/code "NGCL"])] #inst "2024-04-11T00:00:00-07:00" #inst "2024-04-24T00:00:00-07:00"])
@(dc/transact conn [{:db/id :sales-summary/total-tax :db/ident :sales-summary/total-tax-legacy}
{:db/id :sales-summary/total-tip :db/ident :sales-summary/total-tip-legacy}])
(auto-ap.datomic/transact-schema conn)
)
(defn -main [& _]
(execute "sales-summaries" sales-summaries-v2))