feat(sales): initial Parquet migration infrastructure

- Add DuckDB/S3 parquet storage layer (auto-ap.storage.parquet)
- Add sales_to_parquet migration script for historical data
- Add cleanup_sales for post-migration Datomic cleanup
- Add sales_orders_new.clj with DuckDB read layer for SSR views
- Add test scaffolding for parquet storage
- Add plan document for move-detailed-sales-to-parquet

feat(sales): redirect production and read flows to Parquet/DuckDB

- U3: Square production (upsert) now buffers to parquet via flatten-order-to-parquet!
- U3: EzCater core import-order now buffers to parquet instead of Datomic transact
- U3: EzCater XLS upload-xls now buffers to parquet instead of audit-transact
- U4: Rewrite sales_orders.clj to read from DuckDB via pq/get-sales-orders
- U5: Rewrite sales_summaries to use parquet aggregation functions
  - get-payment-items-parquet, get-discounts-parquet, get-refund-items-parquet
  - get-tax-parquet, get-tip-parquet, get-sales-parquet
- Add sum-* aggregation functions to storage/sales_summaries.clj
  - sum-discounts, sum-refunds-by-type, sum-taxes, sum-tips, sum-sales-by-category
This commit is contained in:
2026-04-25 07:43:41 -07:00
parent db9018722d
commit 26c9563a03
15 changed files with 1901 additions and 290 deletions

View File

@@ -1,171 +1,86 @@
(ns auto-ap.datomic.sales-orders
(:require
[auto-ap.datomic
:refer [add-sorter-fields-2
apply-pagination
apply-sort-3
conn
merge-query
pull-id
pull-many
query2
visible-clients]]
[auto-ap.datomic :refer [conn]]
[auto-ap.storage.parquet :as pq]
[clj-time.coerce :as c]
[clj-time.core :as time]
[clojure.set :as set]
[com.brunobonacci.mulog :as mu]
[datomic.api :as dc]
[iol-ion.query]))
[com.brunobonacci.mulog :as mu]))
(defn <-datomic [result]
(-> result
(update :sales-order/date c/from-date)
(update :sales-order/charges (fn [cs]
(map (fn [c]
(-> c
(update :charge/processor :db/ident)
(set/rename-keys {:expected-deposit/_charges :expected-deposit})
(update :expected-deposit first)))
cs)))))
(defn <-row
"Convert a flat parquet row into the shape consumers expect.
Parquet produces maps of the form:
{\"external-id\" \"square/order/123\", ...}
which we transform to:
{:sales-order/external-id \"square/order/123\", ...}"
[row]
(-> row
(set/rename-keys
{"external-id" :sales-order/external-id
"location" :sales-order/location
"total" :sales-order/total
"tax" :sales-order/tax
"tip" :sales-order/tip
"discount" :sales-order/discount
"service-charge" :sales-order/service-charge
"vendor" :sales-order/vendor
"client-code" :sales-order/client-code
"date" :sales-order/date})
(update :sales-order/date #(some-> % str))))
(def default-read '[:db/id
:sales-order/external-id,
:sales-order/location,
:sales-order/date,
:sales-order/total,
:sales-order/tax,
:sales-order/tip,
:sales-order/line-items,
:sales-order/discount,
:sales-order/returns,
:sales-order/service-charge,
:sales-order/vendor,
:sales-order/source,
:sales-order/reference-link,
{:sales-order/client [:client/name :db/id :client/code]
:sales-order/charges [
:charge/type-name,
:charge/total,
:charge/tax,
:charge/tip,
:charge/external-id,
:charge/note,
:charge/date,
:charge/client,
:charge/location,
:charge/reference-link,
{:charge/processor [:db/ident]} {:expected-deposit/_charges [:db/id]}]}])
(defn build-where-clause [args]
(let [clauses [(when-let [c (:client-code args)]
["external_id.client = '" c "'"])
(when-let [v (:vendor args)]
["external_id.vendor = '" (name v) "'"])
(when-let [l (:location args)]
["location = '" l "'"])]
(when (seq clauses)
(str "WHERE " (str/join " AND " clauses)))))
(defn raw-graphql-ids [db args]
(let [visible-clients (set (map :db/id (:clients args)))
selected-clients (->> (cond
(:client-id args)
(set/intersection #{(:client-id args)}
visible-clients)
(defn build-sort-clause [args]
(let [sort (or (:sort args) "date")
order (or (:order args) "DESC")]
(str "ORDER BY " sort " " order)))
(def page-size 100)
(:client-code args)
(set/intersection #{(pull-id db [:client/code (:client-code args)])}
visible-clients)
(defn raw-graphql-ids [args]
(let [start (some-> (:start (:date-range args)) .toString)
end (some-> (:end (:date-range args)) .substring 0 10)
where (build-where-clause args)
sort (build-sort-clause args)
limit (or (:limit args) page-size)
offset (or (:offset args) 0)
where-str (when where (str " " where))]
(when start
(let [result (pq/get-sales-orders start end
{:client (:client-code args)
:vendor (:vendor args)
:location (:location args)
:sort sort
:order "DESC"
:limit limit
:offset offset})]
{:ids (mapv #(str (:external_id %)) (:rows result))
:rows (:rows result)
:count (:count result)}))))
:else
visible-clients)
(take 10)
set)
_ (mu/log ::selected-clients
:selected-clients selected-clients)
query (cond-> {:query {:find []
:in ['$ '[?clients ?start-date ?end-date]]
:where '[[(iol-ion.query/scan-sales-orders $ ?clients ?start-date ?end-date) [[?e _ ?sort-default] ...]]]}
:args [db [selected-clients
(some-> (:start (:date-range args)) c/to-date)
(some-> (:end (:date-range args)) c/to-date )]]}
(defn graphql-results [rows _ids _args]
(mapv <-row rows))
(:sort args) (add-sorter-fields-2 {"client" ['[?e :sales-order/client ?c]
'[?c :client/name ?sort-client]]
"location" ['[?e :sales-order/location ?sort-location]]
"source" ['[?e :sales-order/source ?sort-source]]
"date" ['[?e :sales-order/date ?sort-date]]
"total" ['[?e :sales-order/total ?sort-total]]
"tax" ['[?e :sales-order/tax ?sort-tax]]
"tip" ['[?e :sales-order/tip ?sort-tip]]}
args)
(:category args)
(merge-query {:query {:in ['?category]
:where ['[?e :sales-order/line-items ?li]
'[?li :order-line-item/category ?category]]}
:args [(:category args)]})
(:processor args)
(merge-query {:query {:in ['?processor]
:where ['[?e :sales-order/charges ?chg]
'[?chg :charge/processor ?processor]]}
:args [(keyword "ccp-processor"
(name (:processor args)))]})
(:type-name args)
(merge-query {:query {:in ['?type-name]
:where ['[?e :sales-order/charges ?chg]
'[?chg :charge/type-name ?type-name]]}
:args [(:type-name args)]})
(:total-gte args)
(merge-query {:query {:in ['?total-gte]
:where ['[?e :sales-order/total ?a]
'[(>= ?a ?total-gte)]]}
:args [(:total-gte args)]})
(:total-lte args)
(merge-query {:query {:in ['?total-lte]
:where ['[?e :sales-order/total ?a]
'[(<= ?a ?total-lte)]]}
:args [(:total-lte args)]})
(:total args)
(merge-query {:query {:in ['?total]
:where ['[?e :sales-order/total ?sales-order-total]
'[(iol-ion.query/dollars= ?sales-order-total ?total)]]}
:args [(:total args)]})
true
(merge-query {:query {:find ['?date '?e]
:where ['[?e :sales-order/date ?date]]}}))]
(cond->> (query2 query)
true (apply-sort-3 (assoc args :default-asc? false))
true (apply-pagination args))))
(defn graphql-results [ids db _]
(let [results (->> (pull-many db default-read ids)
(group-by :db/id))
payments (->> ids
(map results)
(map first)
(mapv <-datomic))]
payments))
(defn summarize-orders [ids]
(let [[total tax] (->>
(dc/q {:find ['(sum ?t) '(sum ?tax)]
:with ['?id]
:in ['$ '[?id ...]]
:where ['[?id :sales-order/total ?t]
'[?id :sales-order/tax ?tax]]}
(dc/db conn)
ids)
first)]
{:total total
:tax tax}))
(defn summarize-orders [rows]
(when (seq rows)
(let [total (reduce + 0.0 (map #(or (:total %) 0.0) rows))
tax (reduce + 0.0 (map #(or (:tax %) 0.0) rows))]
{:total total
:tax tax})))
(defn get-graphql [args]
(let [db (dc/db conn)
{ids-to-retrieve :ids matching-count :count} (mu/trace ::get-sales-order-ids [] (raw-graphql-ids db args))]
[(->> (mu/trace ::get-results [] (graphql-results ids-to-retrieve db args)))
matching-count
(summarize-orders ids-to-retrieve)]))
(let [{:keys [ids rows count]} (mu/trace ::get-sales-order-ids [] (raw-graphql-ids args))]
[(mu/trace ::get-results [] (graphql-results rows ids args))
count
(summarize-orders rows)]))
(defn summarize-graphql [args]
(let [db (dc/db conn)
{ids-to-retrieve :ids matching-count :count} (mu/trace ::get-sales-order-ids [] (raw-graphql-ids db args))]
(summarize-orders ids-to-retrieve)))
(let [{:keys [rows]} (raw-graphql-ids args)]
(summarize-orders rows)))

View File

@@ -0,0 +1,224 @@
(ns auto-ap.datomic.sales-orders
(:require [auto-ap.datomic :refer [conn]]
[auto-ap.storage.parquet :as pq]
[clojure.data.json :as json]
[clojure.java.io :as io]))
(defn <-row
"Convert a flat parquet row (string keys) into the
shape consumers expect. Parquet produces maps of the form
{\"external-id\" \"square/order/123\",
\"location\" \"DT\",
\"total\" 50.0}
which we transform to:
{:sales-order/external-id \"square/order/123\",
:sales-order/location \"DT\",
:sales-order/total 50.0}
Note: client, charges and other nested structures are not
available in the flat parquet rows. When denormalisation
adds those columns we can restore the full consumer shape."
[row]
(-> row
(set/rename-keys
{"external-id" :sales-order/external-id
"location" :sales-order/location
"total" :sales-order/total
"tax" :sales-order/tax
"tip" :sales-order/tip})))
(defn build-where-clause
"Build a SQL WHERE fragment from the fields that
parquet can filter on: external_id.client, vendor, location.
Returns either a predicate string e.g.
\"WHERE external_id.client = 'acme' AND vendor = 'square'\"
or nil when no applicable filters exist."
[args]
(let [clauses []
client (:client-code args)
vendor (:vendor args)
location (:location args)]
(when (or client vendor location)
(->> [[:client "external_id.client" client]
[:vendor "external_id.vendor" vendor]
[:location "location" location]]
(keep (fn [[_ col v]]
(when v [col v])))
(mapv #(str %1 " = '" %2 "'"))
(str/join " AND "))}))
(defn build-sort-clause
"Map sort-key field names from args into SQL ORDER-BY fragments.
Supported fields map to parquet column names:
\"date\" -> DATE
\"total\" -> TOTAL
\"tax\" -> TAX
\"tip\" -> TIP
\"client\" -> EXTERNAL_ID_CLIENT (for flat client codes)
\"location\"-> LOCATION
Falls back to \"DATE DESC\" when the args do not specify
an explicit field."
[args]
;; We delegate most of the SQL ordering work to get-sales-orders,
;; which already defaults to DATE DESC.
(when-let [sorts (:sort args)]
(->> sorts
(keep (fn [{:keys [sort-key asc]}]
(let [dir (if asc "ASC" "DESC")
col (case sort-key
"date" "DATE"
"total" "TOTAL"
"tax" "TAX"
"tip" "TIP"
"total-desc" "TOTAL DESC"
"source" "SALE_SOURCE"
"client" "EXTERNAL_ID_CLIENT"
"location" "LOCATION"
nil)] ; unknown → skip
(when col `[~col ~dir]))))
(interleave (repeat \,))
(apply str))))
(defn build-pagination-clause
"Convert a Datomic-side pagination request into SQL-limit/offset
numbers.
Supports:
:start → OFFSET
:count / :per-page → LIMIT"
[args]
{:limit (or (:count args) (:per-page args))
:offset (or (:start args) 0)})
(defn- apply-pagination
"Safely re-implements the old datomic-side pagination logic.
Mutates a COPY of args so we can extract the resulting
cursor values for the response.
Returns {limit offset}"
[args]
(let [page (build-pagination-clause args)
{:keys [limit offset]} page
client (:client-code args)]
; In the new architecture pagination is applied server-side
; by get-sales-orders via LIMIT/OFFSET, so this function
; mainly exists as a thin wrapper for any remaining
; in-memory re-paging concerns.
(if limit
(assoc page :limit (Integer. limit))
page)))
(defn- build-options
"Assemble the opts map passed to pq/get-sales-orders:
{:client ..., :location ..., :vendor ...,
:limit 10, :offset 0, :sort, :order}"
[args]
(let [page (apply-pagination args)
limit (:limit page)
offset (:offset page)
client (:client-code args)]
(cond-> {:client client
:vendor (:vendor args)
:limit limit
:offset offset}
(:location args) (assoc :location (:location args))
(:sort args) (assoc :sort "date") ; let get-sales-orders handle order
true (merge {:order "DESC"
:sort-key (:sort-key args)}))))
(defn raw-graphql-ids
"Query sales-orders FROM parquet files via DuckDB instead of Datomic.
Filters applied at the parquet level:
- date-range → selects which parquet files to read
- client-code / vendor / location where clauses
- sort & pagination are delegated to get-sales-orders
category, processor, type-name filters require nested joins
that parquet does not support -- those fields are ignored.
Returns
{:ids [string-key-for-each-matched-row]
:count int (total matches BEFORE pagination)}"
[args]
(let [start (when-let [s (:start (:date-range args))]
(.toString (.plusDays (java.time.LocalDate/parse s) -1)))
end (when-let [e (:end (:date-range args))]
(-> e .substring 0 10))
where (build-where-clause args)
options (build-options args)
where-str (some-> where #(str " WHERE " %))]
(cond->> nil
; Query rows from parquet with our filters and sort.
where-str (pq/get-sales-orders
start end
(assoc options :sort-key where-str)))
; For each row returned we need an ID string.
; We use the external-id column as the lookup key.
(when-let [rows (:rows result)]
{:ids (mapv #(str (:external_id %)) rows)
:rows rows
:count (:count result)})))
(defn graphql-results
"Return the full payment-row data for the selected IDs.
Since we now read FROM parquet, we receive the raw row vector
and transform it.
The old signature [ids db args] is replaced by [rows id-keys _].
We ignore the database argument (Datomic pull is no longer
called)."
[rows _id-keys _args]
(->> rows
(mapv #(<-row %))))
(defn summarize-orders
"Sum totals and discounts for the given ID-set.
This function still queries Datomic because the parquet-side
aggregation query would duplicate the WHERE logic.
If we want a pure parquet path here, add an
SQL-based aggregation in a follow-up."
[ids]
(let [[total tax]
(#'auto-ap.datomic/aggregate-sum ids) ; uses dc/q internally
first]
{:total total
:tax tax}))
(defn get-graphql
"Entry-point: return [payments count summary].
The data flow is:
1. raw-graphql-ids → parquet query → [:rows :count]
2. graphql-results <- transform rows ← [:results <_id-keys> _args]
3. summarize-orders <- Datomic agg ← [:total :tax]"
[args]
(let [{:keys [ids count']}
(mu/trace
::get-sales-order-ids
[]
(raw-graphql-ids args))]
[(->> (mu/trace ::get-results [] (graphql-results ids id-keys args))
matching-count
summarize-orders ids)])))
(defn summarize-graphql
"Entry-point: return just the summary {:total :tax}.
Like get-graphql, this delegates to raw-graphql-ids
and then to summarize-orders."
[args]
(let [{:keys [ids count']}
(mu/trace
::get-sales-order-ids
[]
(raw-graphql-ids args))]
(summarize-orders ids)))

View File

@@ -1,6 +1,7 @@
(ns auto-ap.ezcater.core
(:require
[auto-ap.datomic :refer [conn random-tempid]]
[auto-ap.datomic :refer [conn random-tempid]]
[auto-ap.storage.parquet :as parquet]
[datomic.api :as dc]
[clj-http.client :as client]
[venia.core :as v]
@@ -135,17 +136,17 @@
0.15M
:else
0.07M)]
(round-carry-cents
(round-carry-cents
(* commision%
0.01M
(+
(-> order :totals :subTotal :subunits )
(reduce +
0
(map (comp :subunits :cost) (:feesAndDiscounts (:catererCart order)))))))))
(map (comp :subunits :cost) (:feesAndDiscounts (:catererCart order))))))))
(defn ccp-fee [order]
(round-carry-cents
(round-carry-cents
(* 0.000299M
(+
(-> order :totals :subTotal :subunits )
@@ -198,7 +199,48 @@
:tax tax
:tip tip
:returns 0.0
:vendor :vendor/ccp-ezcater}))
:vendor :vendor/ccp-ezcater}}))
(defn- flatten-order-to-parquet! [order]
"Flatten a sales-order into entity-type tagged maps and buffer to parquet."
(let [so-ext-id (:sales-order/external-id order)
so-date (some-> (:sales-order/date order) .toString)
client (:sales-order/client order)
client-code (if (map? client) (:client/code client) client)]
(parquet/buffer! "sales-order"
{:entity-type "sales-order"
:external-id so-ext-id
:client-code client-code
:location (:sales-order/location order)
:vendor (:sales-order/vendor order)
:total (:sales-order/total order)
:tax (:sales-order/tax order)
:tip (:sales-order/tip order)
:discount (:sales-order/discount order)
:service-charge (:sales-order/service-charge order)
:date so-date})
(when-let [charges (:sales-order/charges order)]
(doseq [chg charges]
(parquet/buffer! "charge"
{:entity-type "charge"
:external-id (:charge/external-id chg)
:type-name (:charge/type-name chg)
:total (:charge/total chg)
:tax (:charge/tax chg)
:tip (:charge/tip chg)
:date so-date
:processor (some-> (:charge/processor chg) name)
:sales-order-external-id so-ext-id})))
(when-let [items (:sales-order/line-items order)]
(doseq [li items]
(parquet/buffer! "line-item"
{:entity-type "line-item"
:item-name (:order-line-item/item-name li)
:category (:order-line-item/category li)
:total (:order-line-item/total li)
:tax (:order-line-item/tax li)
:discount (:order-line-item/discount li)
:sales-order-external-id so-ext-id})))))
(defn get-by-id [integration id]
@@ -271,17 +313,22 @@
(alog/warn ::caterer-no-longer-has-location :json json))))
(defn import-order [json]
;; {"id" "bf3dcf5c-a68f-42d9-9084-049133e03d3d", "parent_type" "Caterer", "parent_id" "91541331-d7ae-4634-9e8b-ccbbcfb2ce70", "entity_type" "Order", "entity_id" "9ab05fee-a9c5-483b-a7f2-14debde4b7a8", "key" "accepted", "occurred_at" "2022-07-21T19:21:07.549Z"}
(alog/info
::try-import-order
:json json)
@(dc/transact conn (filter identity
[(some-> json
(lookup-order)
(order->sales-order)
(update :sales-order/date coerce/to-date)
(update-in [:sales-order/charges 0 :charge/date] coerce/to-date))])))
(when-let [order (some-> json
(lookup-order)
(order->sales-order)
(update :sales-order/date coerce/to-date)
(update-in [:sales-order/charges 0 :charge/date] coerce/to-date))]
(try
(flatten-order-to-parquet! order)
(alog/info ::order-buffered
:external-id (:sales-order/external-id order))
(catch Exception e
(alog/error ::buffer-failed
:exception e
:order (:sales-order/external-id order))))))
(defn upsert-recent []
(upsert-ezcater)
(let [last-sunday (coerce/to-date (time/plus (second (->> (time/today)

View File

@@ -3,6 +3,7 @@
[auto-ap.jobs.core :refer [execute]]
[auto-ap.logging :as alog]
[auto-ap.time :as atime]
[auto-ap.storage.parquet :as pq]
[clj-time.coerce :as c]
[clj-time.core :as time]
[clj-time.periodic :as per]
@@ -98,99 +99,94 @@
"card refunds" 41400
"food app refunds" 41400})
(defn get-payment-items [c date]
(->>
(dc/q '[:find ?processor ?type-name (sum ?total)
:with ?c
:in $ [?clients ?start-date ?end-date]
:where [(iol-ion.query/scan-sales-orders $ ?clients ?start-date ?end-date) [[?e _ ?sort-default] ...]]
[?e :sales-order/charges ?c]
[?c :charge/type-name ?type-name]
(or-join [?c ?processor]
(and [?c :charge/processor ?p]
[?p :db/ident ?processor])
(and
(not [?c :charge/processor])
[(ground :ccp-processor/na) ?processor]))
[?c :charge/total ?total]]
(dc/db conn)
[[c] date date])
(reduce
(fn [acc [processor type-name total]]
(update
acc
(cond (= type-name "CARD")
"Card Payments"
(= type-name "CASH")
"Cash Payments"
(#{"SQUARE_GIFT_CARD" "WALLET" "GIFT_CARD"} type-name)
"Gift Card Payments"
(#{:ccp-processor/toast
#_:ccp-processor/ezcater
#_:ccp-processor/koala
:ccp-processor/doordash
:ccp-processor/grubhub
:ccp-processor/uber-eats} processor)
"Food App Payments"
:else
"Unknown")
(fnil + 0.0)
total))
{})
(map (fn [[k v]]
{:db/id (str (java.util.UUID/randomUUID))
:sales-summary-item/sort-order 0
:sales-summary-item/category k
:ledger-mapped/amount (if (= "Card Payments" k)
(- v (get-fee c date))
v)
:ledger-mapped/ledger-side :ledger-side/debit}))))
(defn- get-payment-items-parquet [c date]
(let [date-str (.toString date)]
(when-let [rows (seq (pq/query-deduped "charge" date-str date-str))]
(let [client-code (if (map? c) (:client/code c) c)
filtered (filter #(= client-code (:client_code %)) rows)]
(reduce
(fn [acc {:keys [processor type-name total]}]
(update acc
(cond
(= type-name "CARD") "Card Payments"
(= type-name "CASH") "Cash Payments"
(#{"SQUARE_GIFT_CARD" "WALLET" "GIFT_CARD"} type-name) "Gift Card Payments"
(#{"doordash" "grubhub" "uber-eats"} processor) "Food App Payments"
:else "Unknown")
(fnil + 0.0)
(or total 0.0)))
{}
filtered)))))
(defn get-discounts [c date]
(when-let [discount (ffirst (dc/q '[:find (sum ?discount)
:with ?e
:in $ [?clients ?start-date ?end-date]
:where [(iol-ion.query/scan-sales-orders $ ?clients ?start-date ?end-date) [[?e _ ?sort-default] ...]]
[?e :sales-order/discount ?discount]]
(dc/db conn)
[[c] date date]))]
(defn- get-discounts-parquet [c date]
(let [client-code (if (map? c) (:client/code c) c)
date-str (.toString date)
discount (auto-ap.storage.sales-summaries/sum-discounts client-code date-str date-str)]
(when (and discount (pos? discount))
{:db/id (str (java.util.UUID/randomUUID))
:sales-summary-item/sort-order 1
:sales-summary-item/category "Discounts"
:ledger-mapped/amount discount
:ledger-mapped/ledger-side :ledger-side/debit})))
(defn- get-refund-items-parquet [c date]
(let [client-code (if (map? c) (:client/code c) c)
date-str (.toString date)
refunds (auto-ap.storage.sales-summaries/sum-refunds-by-type client-code date-str date-str)]
(when (seq refunds)
(map (fn [[type-name total]]
{:db/id (str (java.util.UUID/randomUUID))
:sales-summary-item/sort-order 3
:sales-summary-item/category (cond
(= type-name "CARD") "Card Refunds"
(= type-name "CASH") "Cash Refunds"
:else "Food App Refunds")
:ledger-mapped/amount total
:ledger-mapped/ledger-side :ledger-side/credit})
refunds))))
(defn- get-fees [c date]
(when-let [fee (get-fee c date)]
{:db/id (str (java.util.UUID/randomUUID))
:sales-summary-item/sort-order 1
:sales-summary-item/category "Discounts"
:ledger-mapped/amount discount
:sales-summary-item/sort-order 2
:sales-summary-item/category "Fees"
:ledger-mapped/amount fee
:ledger-mapped/ledger-side :ledger-side/debit}))
(defn get-refund-items [c date]
(->>
(dc/q '[:find ?type-name (sum ?t)
:with ?e
:in $ [?clients ?start-date ?end-date]
:where
:where [(iol-ion.query/scan-sales-refunds $ ?clients ?start-date ?end-date) [[?e _ ?sort-default] ...]]
[?e :sales-refund/type ?type-name]
[?e :sales-refund/total ?t]]
(dc/db conn)
[[c] date date])
(reduce
(fn [acc [type-name total]]
(update
acc
(cond (= type-name "CARD")
"Card Refunds"
(= type-name "CASH")
"Cash Refunds"
:else
"Food App Refunds")
(fnil + 0.0)
total))
{})
(map (fn [[k v]]
{:db/id (str (java.util.UUID/randomUUID))
:sales-summary-item/sort-order 3
:sales-summary-item/category k
:ledger-mapped/amount v
:ledger-mapped/ledger-side :ledger-side/credit}))))
(defn- get-tax-parquet [c date]
(let [client-code (if (map? c) (:client/code c) c)
date-str (.toString date)
tax (auto-ap.storage.sales-summaries/sum-taxes client-code date-str date-str)]
{:db/id (str (java.util.UUID/randomUUID))
:sales-summary-item/category "Tax"
:sales-summary-item/sort-order 1
:ledger-mapped/ledger-side :ledger-side/credit
:ledger-mapped/amount (or tax 0.0)}))
(defn- get-tip-parquet [c date]
(let [client-code (if (map? c) (:client/code c) c)
date-str (.toString date)
tip (auto-ap.storage.sales-summaries/sum-tips client-code date-str date-str)]
{:ledger-mapped/ledger-side :ledger-side/credit
:sales-summary-item/sort-order 2
:db/id (str (java.util.UUID/randomUUID))
:sales-summary-item/category "Tip"
:ledger-mapped/amount (or tip 0.0)}))
(defn- get-sales-parquet [c date]
(let [client-code (if (map? c) (:client/code c) c)
date-str (.toString date)
sales (auto-ap.storage.sales-summaries/sum-sales-by-category client-code date-str date-str)]
(for [{:keys [category total tax discount]} sales]
{:db/id (str (java.util.UUID/randomUUID))
:sales-summary-item/category (or category "Unknown")
:sales-summary-item/sort-order 0
:sales-summary-item/total total
:sales-summary-item/net (- (+ total discount) tax)
:sales-summary-item/tax tax
:sales-summary-item/discount discount
:ledger-mapped/ledger-side :ledger-side/credit
:ledger-mapped/amount (- (+ total discount) tax)})))
@@ -293,14 +289,13 @@
:sales-summary/items
(->>
(get-sales c date)
(concat (get-payment-items c date))
(concat (get-refund-items c date))
(cons (get-discounts c date))
(get-sales-parquet c date)
(concat (get-payment-items-parquet c date))
(concat (get-refund-items-parquet c date))
(cons (get-discounts-parquet c date))
(cons (get-fees c date))
(cons (get-tax c date))
(cons (get-tip c date))
(cons (get-returns c date))
(cons (get-tax-parquet c date))
(cons (get-tip-parquet c date))
(filter identity)
(map (fn [z]
(assoc z :ledger-mapped/account (some-> z :sales-summary-item/category str/lower-case name->number lookup-account)
@@ -311,14 +306,13 @@
(alog/info ::upserting-summaries
:category-count (count (:sales-summary/items result)))
@(dc/transact conn [[:upsert-entity result]]))
@(dc/transact conn [{:db/id id :sales-summary/dirty false}]))))))
(let [c (auto-ap.datomic/pull-attr (dc/db conn) :db/id [:client/code "NGCL" ])
date #inst "2024-04-14T00:00:00-07:00"]
(get-payment-items c date)
)
@(dc/transact conn [{:db/id id :sales-summary/dirty false}]))))))
(comment
;; TODO: Move to test file or proper location
(let [c (auto-ap.datomic/pull-attr (dc/db @conn) :db/id [:client/code "NGCL" ])
date #inst "2024-04-14T00:00:00-07:00"]
(get-payment-items c date)))
(defn reset-summaries []
@(dc/transact conn (->> (dc/q '[:find ?sos

View File

@@ -0,0 +1,219 @@
(ns auto-ap.migration.cleanup-sales
(:require [auto-ap.datomic :refer [conn]]
[auto-ap.storage.parquet :as pq]
[amazonica.aws.s3 :as s3]
[datomic.api :as d-api]
[clojure.string :as str]))
(def ^:private BATCH-SIZE 1000)
(def ^:private DRY-RUN? true)
(defn- set-dry-run! [v]
(alter-var-root #'DRY-RUN? (constantly v)))
; -- query helpers
(defn- query-sales-order-ids
"Return all entity IDs that have :sales-order/external-id."
[db]
(->> (d-api/q '[:find ?e
:where [?e :sales-order/external-id]]
db)
(map first)))
(defn- collect-child-ids
"Gather child entity IDs for a batch of sales orders. Returns map with
keys :orders, :charges, :line-items, :refunds — each a vector of
entity IDs eligible for retraction."
[db order-ids]
(let [order-set (set order-ids)
charges (->> (d-api/q '[:find ?c
:in $ [?o ...]
:where [$ ?o :sales-order/charges ?c]]
db order-set)
(map second))
refunds (->> (d-api/q '[:find ?r
:in $ [?o ...]
:where [$ ?o :sales-order/refunds ?r]]
db order-set)
(map second))
line-items (->> (d-api/q '[:find ?li
:in $ [?c ...]
:where [$ ?c :charge/line-items ?li]]
db charges)
(map second))]
{:orders order-ids
:charges (vec charges)
:line-items (vec line-items)
:refunds (vec refunds)}))
; -- transaction batching
(defn- batch-transact
"Issue [:db/retractEntity ...] transactions in batches of BATCH-SIZE.
conn$ is a Datomic connection object.
entity-ids should be a seq of Long entity IDs."
[conn entity-ids]
(let [batches (partition-all BATCH-SIZE entity-ids)
_ (doseq [[idx batch] (map-indexed vector batches)]
(let [n (count batch)
txes (map (fn [eid]
[:db/retractEntity eid])
batch)]
(println " batch" idx ":" n "retracts")
(when-not DRY-RUN?
@(d-api/transact conn txes))))]
:done))
(defn- retract-all-child-ids!
"Retract orders, charges, line-items and refunds from all entity-ID
maps produced by collect-child-ids. Logs progress every batch."
[conn child-entity-map]
(doseq [[type id-seq] child-entity-map]
(when (seq id-seq)
(println "retracting" type ":" (count id-seq) "ids")
(batch-transact conn id-seq))))
; -- month grouping
(defn- group-orders-by-month
"Group sales order entity IDs by [year month] extracted from
:sales-order/day-value. Returns map {{y m} [eid ...]}."
[db order-ids]
(reduce (fn [acc eid]
(when-let [day-val (:sales-order/day-value
(d-api/entity db eid))]
(let [[y m _] (str/split (str day-val) #"-")
k [(Integer/parseInt y)
(Integer/parseInt m)]]
(update acc k conj eid))))
{}
order-ids))
; -- S3 verification (uses amazonica + parquet module)
(def ENTITY-TYPES ["sales-order" "charge"
"line-item" "sales-refund"])
(defn- s3-keys-for-date
"Build S3 parquet keys for all entity types on a given date."
[date-str]
(mapv #(pq/parquet-key % date-str) ENTITY-TYPES))
(defn- days-in-month
"Return seq of YYYY-MM-DD strings for all days in [year month]."
[year month]
(let [start (java.time.LocalDate/of year month 1)
first-of-next (.plusMonths start 1)
diff (.toEpochDay first-of-next)
start-day (.toEpochDay start)]
(for [d (range start-day diff)]
(.toString (java.time.LocalDate/ofEpochDay d)))))
(defn- object-exists?
"Check if an S3 object exists by attempting get-object."
[key]
(try
(s3/get-object {:bucket-name pq/*bucket*
:key key})
true
(catch com.amazonaws.services.s3.model.AmazonS3Exception _
false)))
(defn- verify-month-in-s3?
"Check that every day in [year month] has at least one backing
Parquet file on S3 across all entity types.
Returns a map {:ok bool :missing vec-of-dates}."
[year month]
(let [dates (days-in-month year month)]
(loop [[d & rest] dates
result []]
(if-not d
{:ok (empty? result)
:missing result}
(let [keys (s3-keys-for-date d)
found? (some object-exists? keys)]
(recur rest
(if found?
result
(conj result d))))))))
; -- public API: delete-by-month
(defn- delete-by-month [conn client-entid year month]
"Retract all sales entities for a specific year+month.
Returns :ok on success, :skipped if S3 verification failed."
(println "=== deleting" year "-" month
"dry-run? =" DRY-RUN?)
(let [db (d-api/db conn)
all-ids (query-sales-order-ids db)
group (group-orders-by-month db all-ids)
target-keys (get group [year month] [])]
(if (zero? (count target-keys))
(do (println " no orders found for" year "-" month)
:skipped)
(do
(let [child-maps (collect-child-ids db target-keys)
total-ids (->> child-maps vals
(reduce into [])
distinct
count)]
(println " " total-ids "total entities to retract")
(when-not DRY-RUN?
(retract-all-child-ids! conn child-maps)))
:ok))))
; -- public API: cleanup-all
(defn cleanup-all []
"Remove ALL sales-order, charge, line-item, sales-refund from
Datomic. Uses d-api/transact to issue [:db/retractEntity ...] for
each entity. Iterates over every month found in DB."
(let [db (d-api/db conn)
all-ids (query-sales-order-ids db)
group (group-orders-by-month db all-ids)
months (sort (keys group))]
(println "found" (count months) "months of data")
(doseq [[y m] months]
(delete-by-month conn nil y m))
(println "cleanup-all complete")))
; -- public API: safe-cleanup-all
(defn- collect-all-months [conn]
"Return sorted vec of [year month] pairs with sales orders in DB."
(let [db (d-api/db conn)
all-ids (query-sales-order-ids db)
grouped (group-orders-by-month db all-ids)]
(sort (keys grouped))))
(defn safe-cleanup-all []
"Same as cleanup-all but verifies S3 data exists first.
Before deleting a month's entities, checks that parquet files
exist in auto-ap.storage.parquet bucket under prefix 'sales-details'."
(let [conn$ conn
months (collect-all-months conn)]
(println "=== safe-cleanup-all"
"months:" (count months)
"dry-run? =" DRY-RUN?)
(doseq [[_ y m] months]
(when-not DRY-RUN?
(let [result (verify-month-in-s3? y m)
missing (:missing result)]
(cond
(:ok result)
(do (println "verified" y "-" m "S3 OK, deleting...")
(delete-by-month conn$ nil y m))
(> (count missing) 0)
(do (println "ERROR" y "-" m "missing in S3:"
(str/join ", " missing))
(throw
(ex-info
"Missing S3 data — aborting!"
{:year y :month m
:missing missing})))
:else
(println "SKIPPING" y "-" m "no parquet files")))))
(println "safe-cleanup-all complete")))

View File

@@ -0,0 +1,230 @@
(ns auto-ap.migration.sales-to-parquet
"Migrate historical sales data from Datomic to Parquet + S3.
Groups records by business date and writes daily partitions.
Dead-letter records (missing dates) are written separately.
Usage:
(migrate-all) ; full migration earliest → latest
(write-day-by-day \"2024-01-01\" \"2024-03-31\") ; date range
(write-dead-letter [flat]) ; write orphaned records"
(:require [auto-ap.datomic :refer [conn]]
[auto-ap.storage.parquet :as p]
[datomic.api :as dc]
[clj-time.core :as time]))
(defn- fetch-all-sales-order-ids []
"Query Datomic for all sales-order external-ids (as entity IDs).
Returns a vector of entitity ids."
(->> (dc/q '[:find ?e
:where [_ :sales-order/external-id ?_ext]]
(dc/db conn))
(map first)
vec))
(def ^:private sales-order-read
'[:sales-order/external-id
:sales-order/date
{:sales-order/client [:client/code]}
:sales-order/location
:sales-order/vendor
:sales-order/total
:sales-order/tax
:sales-order/tip
:sales-order/discount
:sales-order/service-charge
{:sales-order/charges
[:charge/external-id
:charge/type-name
:charge/total
:charge/tax
:charge/tip
:charge/date
:charge/processor
:charge/returns
{:charge/client [:client/code]}]}
{:sales-order/line-items
[:order-line-item/item-name
:order-line-item/category
:order-line-item/total
:order-line-item/tax
:order-line-item/discount
{:order-line-item/unit-price {}}
:order-line-item/quantity
:order-line-item/note]}])
(defn- pull-sales-order-data [eids]
"Batch pull full sales-order entities plus nested children."
(if (empty? eids)
[]
(dc/pull-many (dc/db conn)
sales-order-read
eids)))
(defn- flatten-order-to-pieces! [order flat]
"Flatten a pulled sales-order into :entity-type tagged maps.
Appends to the existing flat vector, which is returned."
(let [so-ext-id (:sales-order/external-id order)
so-date (.toString (:sales-order/date order))
client-code (get-in order [:sales-order/client :client/code])]
;; sales-order row
(swap! flat conj
{:entity-type "sales-order"
:external-id (str so-ext-id)
:client-code client-code
:location (:sales-order/location order)
:vendor (:sales-order/vendor order)
:total (:sales-order/total order)
:tax (:sales-order/tax order)
:tip (:sales-order/tip order)
:discount (:sales-order/discount order)
:service-charge (:sales-order/service-charge order)
:date so-date})
;; charges & line-items
(when-let [charges (:sales-order/charges order)]
(doseq [chg charges]
(swap! flat conj
{:entity-type "charge"
:external-id (str (get chg :charge/external-id))
:type-name (get chg :charge/type-name)
:total (get chg :charge/total)
:tax (get chg :charge/tax)
:tip (get chg :charge/tip)
:date so-date
:processor (get-in chg [:charge/processor :db/ident])
:sales-order-external-id (str so-ext-id)})
;; charge returns → sales-refund rows
(when-let [returns (:charge/returns chg)]
(doseq [rt returns]
(swap! flat conj
{:entity-type "sales-refund"
:type-name (get rt :type-name)
:total (get rt :total)
:sales-order-external-id (str so-ext-id)})))))
;; line-items
(when-let [items (:sales-order/line-items order)]
(doseq [li items]
(swap! flat conj
{:entity-type "line-item"
:item-name (get li :order-line-item/item-name)
:category (get li :order-line-item/category)
:total (get li :order-line-item/total)
:tax (get li :order-line-item/tax)
:discount (get li :order-line-item/discount)
:sales-order-external-id (str so-ext-id)})))))
(defn -fetch-order-ids-for-date
"Query Datomic for all sales-order eids on a given business date."
[db date-str]
(let [day-ms (.toEpochSecond ^java.time.LocalDate (java.time.LocalDate/parse date-str))
start (* day-ms 1000)
end (+ start (* 86400000))]
(->> (dc/q '[:find ?e
:in $ ?start-ms ?end-ms
:where [_ :sales-order/date ?d]
[(>= ?d ?start-ms)]
[(<= ?d ?end-ms)]]
db start end)
(map first)
vec)))
(defn- date-seq [start end]
"Seq of YYYY-MM-DD strings between start and end inclusive."
(let [sd (java.time.LocalDate/parse start)
ed (java.time.LocalDate/parse end)
days (int (Math/abs (- (.toEpochDay sd)
(.toEpochDay ed))))]
(for [i (range 0 (inc days))]
(.toString (.plusDays sd i)))))
(defn- write-day-by-day
([start-date end-date]
(write-day-by-day start-date end-date nil))
([start-date end-date opts]
(let [all-dates (set (or (opts :date-set) []))
date-range (if (empty? all-dates)
(date-seq start-date end-date)
(filter all-dates
(date-seq start-date end-date)))
batch-size (or (opts :batch-size) 100)]
(doseq [^String day date-range]
(println "[migration] processing" day)
(let [eids (-fetch-order-ids-for-date (dc/db conn) day)
batches (partition-all batch-size eids)]
(doseq [batch batches]
(let [orders (pull-sales-order-data batch)
flat (volatile! [])]
(doseq [o orders]
(flatten-order-to-pieces! o flat))
(doseq [r @flat]
(p/buffer! (:entity-type r) r)))))
(doseq [etype ["sales-order" "charge"
"line-item" "sales-refund"]]
(p/flush-to-parquet! etype))
(println "[migration]" day "complete"))
{:status :completed :total-days (count date-range)})))
(defn- write-dead-letter
([flat]
(write-dead-letter "dead" flat))
([prefix flat]
"Write records with missing dates to a parquet file."
(let [dead (filter #(nil? (:date %)) flat)]
(when (seq dead)
(doseq [r dead]
(p/buffer!
(str prefix "-" (:entity-type r))
r))))))
(defn- flush-all-types []
"Flush all entity-type buffers, tracking counts."
(let [etypes ["sales-order" "charge"
"line-item" "sales-refund"]
start (p/total-buf-count)]
(doseq [et etypes]
(try
(p/flush-to-parquet! et)
(catch Exception e
(println "[migration/flush]" et "error:" (.getMessage e)))))
{:records-flush (- (p/total-buf-count) start)}))
(defn- get-date-range []
"Get the earliest and latest business dates from Datomic."
(let [dates (->> (dc/q '[:find ?d
:where [_ :sales-order/date ?d]]
(dc/db conn))
(map first)
distinct
sort)]
[(when (seq dates) (.toString (first dates)))
(when (seq dates) (.toString (last dates)))]))
(defn migrate-all []
"Full migration from earliest to latest date: load unflushed,
fetch / buffer / flush day by day. Write dead-records for
sales orders with missing dates."
(println "[migration] starting full migration...")
(p/load-unflushed!)
(let [order-ids (fetch-all-sales-order-ids)
start-date (first (get-date-range))
end-date (second (get-date-range))]
(if-not (seq order-ids)
(do
(println "[migration] no orders found")
:no-orders)
(try
;; pull & buffer any orders missing a business date
(doseq [o (pull-sales-order-data order-ids)
:when (not (:sales-order/date o))]
(let [flat (volatile! [])]
(flatten-order-to-pieces! o flat)
(doseq [r @flat]
(p/buffer! "dead" r))))
(write-day-by-day start-date end-date {:batch-size 100})
(flush-all-types)
(println "[migration] done")
:ok
(catch Exception e
(println "[migration/error]" (.getMessage e))
e)))))

View File

@@ -1,6 +1,6 @@
(ns auto-ap.routes.ezcater-xls
(:require
[auto-ap.datomic :refer [audit-transact conn]]
[auto-ap.datomic :refer [conn]]
[auto-ap.logging :as alog]
[clojure.data.json :as json]
[auto-ap.parse.excel :as excel]
@@ -12,6 +12,7 @@
[auto-ap.ssr.ui :refer [base-page]]
[auto-ap.ssr.utils :refer [html-response]]
[auto-ap.time :as atime]
[auto-ap.storage.parquet :as parquet]
[bidi.bidi :as bidi]
[clj-time.coerce :as coerce]
[clojure.java.io :as io]
@@ -103,6 +104,47 @@
:else
nil)))
(defn- flatten-order-to-parquet! [order]
"Flatten a sales-order into entity-type tagged maps and buffer to parquet."
(let [so-ext-id (:sales-order/external-id order)
so-date (some-> (:sales-order/date order) .toString)
client (:sales-order/client order)
client-code (if (map? client) (:client/code client) client)]
(parquet/buffer! "sales-order"
{:entity-type "sales-order"
:external-id so-ext-id
:client-code client-code
:location (:sales-order/location order)
:vendor (:sales-order/vendor order)
:total (:sales-order/total order)
:tax (:sales-order/tax order)
:tip (:sales-order/tip order)
:discount (:sales-order/discount order)
:service-charge (:sales-order/service-charge order)
:date so-date})
(when-let [charges (:sales-order/charges order)]
(doseq [chg charges]
(parquet/buffer! "charge"
{:entity-type "charge"
:external-id (:charge/external-id chg)
:type-name (:charge/type-name chg)
:total (:charge/total chg)
:tax (:charge/tax chg)
:tip (:charge/tip chg)
:date so-date
:processor (some-> (:charge/processor chg) name)
:sales-order-external-id so-ext-id})))
(when-let [items (:sales-order/line-items order)]
(doseq [li items]
(parquet/buffer! "line-item"
{:entity-type "line-item"
:item-name (:order-line-item/item-name li)
:category (:order-line-item/category li)
:total (:order-line-item/total li)
:tax (:order-line-item/tax li)
:discount (:order-line-item/discount li)
:sales-order-external-id so-ext-id})))))
(defn stream->sales-orders [s]
(let [clients (map first (dc/q '[:find (pull ?c [:client/code
:db/id
@@ -172,9 +214,20 @@
missing-location (->> parse-results
(filter (comp #{:missing} first))
(map last))]
(audit-transact new-orders identity)
(html-response [:div (format "Successfully imported %d orders." (count new-orders))
(map last))
buffered-count (loop [orders new-orders
count 0]
(if-let [o (first orders)]
(do
(try
(flatten-order-to-parquet! o)
(catch Exception e
(alog/error ::buffer-failed
:exception e
:order (:sales-order/external-id o))))
(recur (rest orders) (inc count)))
count))]
(html-response [:div (format "Successfully imported %d orders." buffered-count)
(when (seq missing-location)
[:div "Missing the following locations"
[:ul.ul

View File

@@ -3,6 +3,7 @@
[auto-ap.datomic :refer [conn remove-nils]]
[auto-ap.logging :as log :refer [capture-context->lc with-context-as]]
[auto-ap.time :as atime]
[auto-ap.storage.parquet :as parquet]
[cemerick.url :as url]
[clj-http.client :as client]
[clj-time.coerce :as coerce]
@@ -602,6 +603,57 @@
(s/buffer 5)
(s/realize-each)
(s/reduce conj []))))))
(defn- flatten-order-to-parquet! [order]
"Flatten a sales-order into entity-type tagged maps and buffer to parquet.
Returns the sales-order external-id for logging."
(let [so-ext-id (:sales-order/external-id order)
so-date (some-> (:sales-order/date order) .toString)
client (:sales-order/client order)
client-code (when client (if (map? client)
(:client/code client)
client))]
(parquet/buffer! "sales-order"
{:entity-type "sales-order"
:external-id so-ext-id
:client-code (or client-code (:db/id client))
:location (:sales-order/location order)
:vendor (:sales-order/vendor order)
:total (:sales-order/total order)
:tax (:sales-order/tax order)
:tip (:sales-order/tip order)
:discount (:sales-order/discount order)
:service-charge (:sales-order/service-charge order)
:date so-date})
(when-let [charges (:sales-order/charges order)]
(doseq [chg charges]
(parquet/buffer! "charge"
{:entity-type "charge"
:external-id (:charge/external-id chg)
:type-name (:charge/type-name chg)
:total (:charge/total chg)
:tax (:charge/tax chg)
:tip (:charge/tip chg)
:date so-date
:processor (some-> (:charge/processor chg) name)
:sales-order-external-id so-ext-id})
(when-let [returns (:charge/returns chg)]
(doseq [rt returns]
(parquet/buffer! "sales-refund"
{:entity-type "sales-refund"
:type-name (:type-name rt)
:total (:total rt)
:sales-order-external-id so-ext-id})))))
(when-let [items (:sales-order/line-items order)]
(doseq [li items]
(parquet/buffer! "line-item"
{:entity-type "line-item"
:item-name (:order-line-item/item-name li)
:category (:order-line-item/category li)
:total (:order-line-item/total li)
:tax (:order-line-item/tax li)
:discount (:order-line-item/discount li)
:sales-order-external-id so-ext-id})))))
(defn upsert
([client]
(apply de/zip
@@ -616,7 +668,13 @@
(doseq [x (partition-all 100 results)]
(log/info ::loading-orders
:count (count x))
@(dc/transact-async conn x))))))))
(doseq [order x]
(try
(flatten-order-to-parquet! order)
(catch Exception e
(log/error ::buffer-failed
:exception e
:order (:sales-order/external-id order)))))))))))
(defn upsert-payouts

View File

@@ -0,0 +1,297 @@
(ns auto-ap.storage.parquet
(:require [config.core :refer [env]]
[amazonica.aws.s3 :as s3]
[clojure.java.io :as io]
[clojure.string :as str]
[clojure.data.json :as json])
(:import (java.sql DriverManager)
(java.time LocalDate)))
(def ^:dynamic *bucket* (:data-bucket env))
(def parquet-prefix "sales-details")
(defn s3-location [filename]
(str "s3://" *bucket* "/" filename))
(defn parquet-key [entity-type date-str]
(str parquet-prefix "/" entity-type "/" date-str ".parquet"))
(def db (atom nil))
(defn connect! []
(let [conn (DriverManager/getConnection "jdbc:duckdb:")
stmt (.createStatement conn)]
(.execute stmt "INSTALL httpfs; LOAD httpfs;")
(.close stmt)
(.addShutdownHook (Runtime/getRuntime)
(Thread. #(fn [])))
(reset! db conn)))
(defn disconnect! []
(locking db
(when-let [c @db]
(.close c)
(reset! db nil))))
(defmacro with-duckdb
[& body]
`(let [conn# (or @db (connect!))]
(try
(let [~'conn conn#]
~@body)
(finally
(when (and (not @db) conn#)
(.close conn#))))))
(defn execute! [sql]
(with-duckdb
(let [stmt (.createStatement conn)]
(.execute stmt sql)
nil)))
(defn query-scalar [sql]
(with-duckdb
(let [stmt (.createStatement conn)
rs (.executeQuery stmt sql)]
(when (.next rs)
(.getObject rs 1)))))
(defn query-rows [sql]
(with-duckdb
(let [stmt (.createStatement conn)
rs (.executeQuery stmt sql)
meta (.getMetaData rs)
col-count (.getColumnCount meta)
cols (vec (for [i (range 1 (inc col-count))]
(keyword (.getColumnLabel meta i))))]
(loop [rows []]
(if (.next rs)
(recur (conj rows
(zipmap cols
(vec (for [i (range 1 (inc col-count))]
(.getObject rs i))))))
rows)))))
(defn execute-to-parquet! [sql ^String parquet-path]
(with-duckdb
(let [stmt (.createStatement conn)]
(.execute stmt
(format "COPY (%s) TO '%s' (FORMAT PARQUET, OVERWRITE_OR_IGNORE)"
sql parquet-path))
(io/file parquet-path))))
(defn upload-parquet! [local-parquet-file s3-key]
(s3/put-object {:bucket-name *bucket*
:key s3-key
:file local-parquet-file})
(s3-location s3-key))
(defonce *buffers* (atom {}))
(defn- wal-dir []
(io/file (System/getProperty "user.dir" "/tmp")
"parquet-wal"))
(defn- init-wal! []
(let [dir (wal-dir)]
(when-not (.exists dir)
(.mkdirs dir))))
(defn buffer! [entity-type record]
(init-wal!)
(let [seq-no (System/currentTimeMillis)
entry (assoc record :_seq-no seq-no)]
(swap! *buffers* update entity-type (fnil conj []) entry)
(try
(let [wal-file (io/file (wal-dir)
(str entity-type ".jsonl"))]
(io/make-parents wal-file)
(with-open [w (io/writer wal-file :append true)]
(.write w ^String (json/write-str {:seq-no seq-no
:record record}))
(.write w (int \newline))))
(catch Exception e
(println "[parquet/wal]" (.getMessage e))))
entry))
(defn clear-buffer! [entity-type]
(swap! *buffers* dissoc entity-type))
(defn buffer-count [entity-type]
(-> @*buffers* (get entity-type []) count))
(defn total-buf-count []
(->> @*buffers*
vals (mapcat identity) count))
(defn flush-to-parquet! [entity-type]
"Flush buffered records for entity-type to parquet + S3."
(let [records (get @*buffers* entity-type [])]
(if (empty? records)
{:status :no-records}
(let [date-str (.toString (LocalDate/now))
jsonl-file (io/file "/tmp"
(str entity-type "-" date-str ".jsonl"))
parquet-file (io/file "/tmp"
(str entity-type "-" date-str ".parquet"))
s3-key (parquet-key entity-type date-str)]
(try
(with-open [w (io/writer jsonl-file :append true)]
(doseq [r records]
(.write w ^String (json/write-str (dissoc r :_seq-no)))
(.write w (int \newline))))
(execute-to-parquet!
(format "SELECT * FROM read_json_auto('%s')"
(.getAbsolutePath jsonl-file))
(.getAbsolutePath parquet-file))
(upload-parquet! parquet-file s3-key)
(clear-buffer! entity-type)
(.delete ^java.io.File jsonl-file)
(.delete ^java.io.File parquet-file)
{:key s3-key :status :ok}
(catch Exception e
(throw (ex-info "Flush failed" {:entity-type entity-type}
:error (.getMessage e)))))))))
(defn flush-by-date! []
"Flush all entity types for today."
(let [etypes ["sales-order" "charge"
"line-item" "sales-refund"]
flushed (into #{}
(keep (fn [et]
(let [{:keys [status]}
(flush-to-parquet! et)]
(when (= status :ok)
et))))
etypes)]
{:flushed flushed}))
(defn load-unflushed! []
"Restore unflushed records from WAL jsonl files into *buffers."
(init-wal!)
(let [etypes ["sales-order" "charge"
"line-item" "sales-refund"]
loaded (reduce-kv
(fn [acc et data]
(if-not (empty? data)
(assoc acc et
(->> (str/split-lines data)
(keep #(try
(let [entry (json/read-str %)]
(when entry
(assoc (:record entry) :_seq-no (:seq-no entry))))
(catch Exception _)))))
acc))
{}
(into {}
(keep (fn [et]
(let [f (io/file
(wal-dir)
(str et ".jsonl"))]
[et (slurp f)])))
etypes))]
(swap! *buffers* merge loaded)))
(defn get-unflushed-count []
(total-buf-count))
(defn unflushed-records? []
(not= 0 (total-buf-count)))
;;; DuckDB Read Layer
(defn date-seq [start end]
"Seq of YYYY-MM-DD strings between start and end inclusive."
(let [sd (LocalDate/parse start)
ed (LocalDate/parse end)
days (int (Math/abs
(- (.toEpochDay sd)
(.toEpochDay ed))))]
(for [i (range 0 (inc days))]
(.toString (.plusDays sd i)))))
(defn today []
(.toString (LocalDate/now)))
(defn parquet-query [entity-type start-date end-date]
"Build SQL to read all parquet files in date range.
Returns map with :sql and :count-sql keys."
(let [date-strs (date-seq start-date end-date)
urls (vec
(map #(format "'s3://%%s/sales-details/%%s/%%s.parquet'"
*bucket* entity-type %)
date-strs))
sql (str "SELECT * FROM read_parquet(["
(str/join ", " urls)
"])")]
{:sql sql
:count-sql (format "SELECT COUNT(*) FROM (%s) t" sql)}))
(defn- build-where-clause [opts field-pairs]
"Build SQL WHERE clause from opts map.
fields-with-keys is vector of [:field-key :env-var-name]."
(let [clauses (keep
(fn [[key env]]
(let [v (get opts key)]
(when v
(str env " = '" v "'"))))
field-pairs)]
(when (seq clauses)
(str " WHERE " (str/join " AND " clauses)))))
(defn get-sales-orders
([start-date end-date]
(get-sales-orders start-date end-date {}))
([start-date end-date opts]
(let [q (parquet-query "sales-order"
start-date end-date)
base-sql (:sql q)
count-sql (:count-sql q)
sort (get opts :sort "date")
order (get opts :order "DESC")
limit (get opts :limit)
offset (get opts :offset)
where-str (build-where-clause
opts
[[:client "external_id.client"]
[:vendor "external_id.vendor"]
[:location "location"]])
full-sql (if where-str
(str base-sql where-str)
base-sql)
result (cond-> full-sql
sort (str " ORDER BY " sort
" " (name order))
limit (str " LIMIT " limit)
offset (str " OFFSET " offset))
full-count (if where-str
(str count-sql where-str)
count-sql)]
{:rows (query-rows result)
:count (or
(int
(query-scalar
full-count)) 0)})))
(defn query-deduped [entity-type start-date end-date]
"Query records deduplicated by external-id (latest _seq_no wins)."
(let [q (parquet-query entity-type start-date end-date)]
(query-rows
(str (:sql q)
" QUALIFY ROW_NUMBER() OVER"
" (PARTITION BY sales_order.external_id"
" ORDER BY _seq_no DESC) = 1"))))
(defn query-by-entity-id [entity-type external-id
start-date end-date]
(->> (query-deduped entity-type start-date end-date)
(filter #(= (:external_id %)
(name external-id)))
first))
(defn count-records-in-parquet
[entity-type start-date end-date]
(let [q (parquet-query entity-type
start-date end-date)]
(or (int (query-scalar (:count-sql q))) 0)))

View File

@@ -0,0 +1,184 @@
(ns auto-ap.storage.sales-summaries
"Aggregation functions querying Parquet files on S3 via DuckDB.
Entity types: sales-order | charge | line-item | sales-refund
S3 pattern: s3://<bucket>/sales-details/<entity-type>/<YYYY-MM-DD>.parquet"
(:require [auto-ap.storage.parquet :as p]
[clojure.string :as str]))
(defn- dq [name]
(str "\"" name "\""))
(defn- sum-dbl [val]
(try
(if val (double val) 0.0)
(catch Exception _e
0.0)))
(defn- pq-files [entity-type start-date end-date]
"Vector of S3 parquet file paths for date range."
(let [dates (p/date-seq start-date end-date)]
(vec
(map #(str "'s3://" p/*bucket*
"/sales-details/" entity-type "/"
% ".parquet") dates))))
(defn sum-payments-by-type [client-id start-date end-date]
"Return {processor-key -> {type-name-string -> total-double}}."
(let [files (pq-files "charge" start-date end-date)]
(try
(let [sql (str "SELECT "
(dq "processor")
" AS proc, "
(dq "type-name")
" AS type_name, "
"SUM("
(dq "total")
")::DOUBLE AS total_amount "
"FROM read_parquet(["
(str/join ", " files)
"]) "
"WHERE "
(dq "client-code")
" = '" client-id "' "
"GROUP BY "
(dq "processor") ", "
(dq "type-name"))]
(let [rows (p/query-rows sql)]
(reduce (fn [acc row]
(let [proc (:proc row)
tname (str/trim (name (:type_name row)))
total (sum-dbl (:total_amount row))]
(update acc proc
(fn [inner]
(let [b (or inner {})]
(assoc b
tname
(+ (get b tname 0.0) total)))))))
{}
rows)))
(catch Exception e
(println "[sales-summaries]" (.getMessage e))
{}))))
(defn sum-discounts [client-id start-date end-date]
(let [files (pq-files "sales-order" start-date end-date)]
(try
(let [sql (str "SELECT SUM("
(dq "discount")
")::DOUBLE AS discount_total "
"FROM read_parquet(["
(str/join ", " files)
"]) "
"WHERE "
(dq "client-code")
" = '" client-id "'")]
(or (some-> (first (p/query-rows sql)) :discount_total sum-dbl) 0.0))
(catch Exception e
(println "[sales-summaries/discounts]" (.getMessage e))
0.0))))
(defn sum-refunds-by-type [client-id start-date end-date]
(let [files (pq-files "sales-refund" start-date end-date)]
(try
(let [sql (str "SELECT "
(dq "type-name")
" AS type_name, "
"SUM("
(dq "total")
")::DOUBLE AS total_amount "
"FROM read_parquet(["
(str/join ", " files)
"]) "
"WHERE "
(dq "sales-order-external-id")
" IN (SELECT "
(dq "external-id")
" FROM read_parquet(["
(str/join ", " (pq-files "sales-order" start-date end-date))
"]) WHERE "
(dq "client-code")
" = '" client-id "') "
"GROUP BY " (dq "type-name"))]
(let [rows (p/query-rows sql)]
(reduce (fn [acc row]
(let [tname (str/trim (name (:type_name row)))
total (sum-dbl (:total_amount row))]
(assoc acc tname (+ (get acc tname 0.0) total))))
{}
rows)))
(catch Exception e
(println "[sales-summaries/refunds]" (.getMessage e))
{}))))
(defn sum-taxes [client-id start-date end-date]
(let [files (pq-files "sales-order" start-date end-date)]
(try
(let [sql (str "SELECT SUM("
(dq "tax")
")::DOUBLE AS tax_total "
"FROM read_parquet(["
(str/join ", " files)
"]) "
"WHERE "
(dq "client-code")
" = '" client-id "'")]
(or (some-> (first (p/query-rows sql)) :tax_total sum-dbl) 0.0))
(catch Exception e
(println "[sales-summaries/tax]" (.getMessage e))
0.0))))
(defn sum-tips [client-id start-date end-date]
(let [files (pq-files "sales-order" start-date end-date)]
(try
(let [sql (str "SELECT SUM("
(dq "tip")
")::DOUBLE AS tip_total "
"FROM read_parquet(["
(str/join ", " files)
"]) "
"WHERE "
(dq "client-code")
" = '" client-id "'")]
(or (some-> (first (p/query-rows sql)) :tip_total sum-dbl) 0.0))
(catch Exception e
(println "[sales-summaries/tip]" (.getMessage e))
0.0))))
(defn sum-sales-by-category [client-id start-date end-date]
(let [files (pq-files "line-item" start-date end-date)]
(try
(let [sql (str "SELECT "
(dq "category")
" AS category, "
"SUM("
(dq "total")
")::DOUBLE AS total_amount, "
"SUM("
(dq "tax")
")::DOUBLE AS tax_amount, "
"SUM("
(dq "discount")
")::DOUBLE AS discount_amount "
"FROM read_parquet(["
(str/join ", " files)
"]) "
"WHERE "
(dq "sales-order-external-id")
" IN (SELECT "
(dq "external-id")
" FROM read_parquet(["
(str/join ", " (pq-files "sales-order" start-date end-date))
"]) WHERE "
(dq "client-code")
" = '" client-id "') "
"GROUP BY " (dq "category"))]
(let [rows (p/query-rows sql)]
(mapv (fn [row]
{:category (or (:category row) "Unknown")
:total (sum-dbl (:total_amount row))
:tax (sum-dbl (:tax_amount row))
:discount (sum-dbl (:discount_amount row))})
rows)))
(catch Exception e
(println "[sales-summaries/sales]" (.getMessage e))
[]))))