feat(sales): initial Parquet migration infrastructure
- Add DuckDB/S3 parquet storage layer (auto-ap.storage.parquet) - Add sales_to_parquet migration script for historical data - Add cleanup_sales for post-migration Datomic cleanup - Add sales_orders_new.clj with DuckDB read layer for SSR views - Add test scaffolding for parquet storage - Add plan document for move-detailed-sales-to-parquet
This commit is contained in:
224
src/clj/auto_ap/datomic/sales_orders_new.clj
Normal file
224
src/clj/auto_ap/datomic/sales_orders_new.clj
Normal file
@@ -0,0 +1,224 @@
|
||||
(ns auto-ap.datomic.sales-orders
|
||||
(:require [auto-ap.datomic :refer [conn]]
|
||||
[auto-ap.storage.parquet :as pq]
|
||||
[clojure.data.json :as json]
|
||||
[clojure.java.io :as io]))
|
||||
|
||||
(defn <-row
|
||||
"Convert a flat parquet row (string keys) into the
|
||||
shape consumers expect. Parquet produces maps of the form
|
||||
|
||||
{\"external-id\" \"square/order/123\",
|
||||
\"location\" \"DT\",
|
||||
\"total\" 50.0}
|
||||
|
||||
which we transform to:
|
||||
|
||||
{:sales-order/external-id \"square/order/123\",
|
||||
:sales-order/location \"DT\",
|
||||
:sales-order/total 50.0}
|
||||
|
||||
Note: client, charges and other nested structures are not
|
||||
available in the flat parquet rows. When denormalisation
|
||||
adds those columns we can restore the full consumer shape."
|
||||
[row]
|
||||
(-> row
|
||||
(set/rename-keys
|
||||
{"external-id" :sales-order/external-id
|
||||
"location" :sales-order/location
|
||||
"total" :sales-order/total
|
||||
"tax" :sales-order/tax
|
||||
"tip" :sales-order/tip})))
|
||||
|
||||
(defn build-where-clause
|
||||
"Build a SQL WHERE fragment from the fields that
|
||||
parquet can filter on: external_id.client, vendor, location.
|
||||
|
||||
Returns either a predicate string e.g.
|
||||
\"WHERE external_id.client = 'acme' AND vendor = 'square'\"
|
||||
or nil when no applicable filters exist."
|
||||
[args]
|
||||
(let [clauses []
|
||||
client (:client-code args)
|
||||
vendor (:vendor args)
|
||||
location (:location args)]
|
||||
(when (or client vendor location)
|
||||
(->> [[:client "external_id.client" client]
|
||||
[:vendor "external_id.vendor" vendor]
|
||||
[:location "location" location]]
|
||||
(keep (fn [[_ col v]]
|
||||
(when v [col v])))
|
||||
(mapv #(str %1 " = '" %2 "'"))
|
||||
(str/join " AND "))}))
|
||||
|
||||
(defn build-sort-clause
|
||||
"Map sort-key field names from args into SQL ORDER-BY fragments.
|
||||
|
||||
Supported fields map to parquet column names:
|
||||
\"date\" -> DATE
|
||||
\"total\" -> TOTAL
|
||||
\"tax\" -> TAX
|
||||
\"tip\" -> TIP
|
||||
\"client\" -> EXTERNAL_ID_CLIENT (for flat client codes)
|
||||
\"location\"-> LOCATION
|
||||
|
||||
Falls back to \"DATE DESC\" when the args do not specify
|
||||
an explicit field."
|
||||
[args]
|
||||
;; We delegate most of the SQL ordering work to get-sales-orders,
|
||||
;; which already defaults to DATE DESC.
|
||||
(when-let [sorts (:sort args)]
|
||||
(->> sorts
|
||||
(keep (fn [{:keys [sort-key asc]}]
|
||||
(let [dir (if asc "ASC" "DESC")
|
||||
col (case sort-key
|
||||
"date" "DATE"
|
||||
"total" "TOTAL"
|
||||
"tax" "TAX"
|
||||
"tip" "TIP"
|
||||
"total-desc" "TOTAL DESC"
|
||||
"source" "SALE_SOURCE"
|
||||
"client" "EXTERNAL_ID_CLIENT"
|
||||
"location" "LOCATION"
|
||||
nil)] ; unknown → skip
|
||||
(when col `[~col ~dir]))))
|
||||
(interleave (repeat \,))
|
||||
(apply str))))
|
||||
|
||||
(defn build-pagination-clause
|
||||
"Convert a Datomic-side pagination request into SQL-limit/offset
|
||||
numbers.
|
||||
|
||||
Supports:
|
||||
:start → OFFSET
|
||||
:count / :per-page → LIMIT"
|
||||
[args]
|
||||
{:limit (or (:count args) (:per-page args))
|
||||
:offset (or (:start args) 0)})
|
||||
|
||||
(defn- apply-pagination
|
||||
"Safely re-implements the old datomic-side pagination logic.
|
||||
Mutates a COPY of args so we can extract the resulting
|
||||
cursor values for the response.
|
||||
|
||||
Returns {limit offset}"
|
||||
[args]
|
||||
(let [page (build-pagination-clause args)
|
||||
{:keys [limit offset]} page
|
||||
client (:client-code args)]
|
||||
; In the new architecture pagination is applied server-side
|
||||
; by get-sales-orders via LIMIT/OFFSET, so this function
|
||||
; mainly exists as a thin wrapper for any remaining
|
||||
; in-memory re-paging concerns.
|
||||
(if limit
|
||||
(assoc page :limit (Integer. limit))
|
||||
page)))
|
||||
|
||||
(defn- build-options
|
||||
"Assemble the opts map passed to pq/get-sales-orders:
|
||||
{:client ..., :location ..., :vendor ...,
|
||||
:limit 10, :offset 0, :sort, :order}"
|
||||
[args]
|
||||
(let [page (apply-pagination args)
|
||||
limit (:limit page)
|
||||
offset (:offset page)
|
||||
client (:client-code args)]
|
||||
(cond-> {:client client
|
||||
:vendor (:vendor args)
|
||||
:limit limit
|
||||
:offset offset}
|
||||
(:location args) (assoc :location (:location args))
|
||||
(:sort args) (assoc :sort "date") ; let get-sales-orders handle order
|
||||
true (merge {:order "DESC"
|
||||
:sort-key (:sort-key args)}))))
|
||||
|
||||
(defn raw-graphql-ids
|
||||
"Query sales-orders FROM parquet files via DuckDB instead of Datomic.
|
||||
|
||||
Filters applied at the parquet level:
|
||||
- date-range → selects which parquet files to read
|
||||
- client-code / vendor / location where clauses
|
||||
- sort & pagination are delegated to get-sales-orders
|
||||
|
||||
category, processor, type-name filters require nested joins
|
||||
that parquet does not support -- those fields are ignored.
|
||||
|
||||
Returns
|
||||
{:ids [string-key-for-each-matched-row]
|
||||
:count int (total matches BEFORE pagination)}"
|
||||
[args]
|
||||
(let [start (when-let [s (:start (:date-range args))]
|
||||
(.toString (.plusDays (java.time.LocalDate/parse s) -1)))
|
||||
end (when-let [e (:end (:date-range args))]
|
||||
(-> e .substring 0 10))
|
||||
where (build-where-clause args)
|
||||
options (build-options args)
|
||||
where-str (some-> where #(str " WHERE " %))]
|
||||
(cond->> nil
|
||||
; Query rows from parquet with our filters and sort.
|
||||
where-str (pq/get-sales-orders
|
||||
start end
|
||||
(assoc options :sort-key where-str)))
|
||||
|
||||
; For each row returned we need an ID string.
|
||||
; We use the external-id column as the lookup key.
|
||||
(when-let [rows (:rows result)]
|
||||
{:ids (mapv #(str (:external_id %)) rows)
|
||||
:rows rows
|
||||
:count (:count result)})))
|
||||
|
||||
(defn graphql-results
|
||||
"Return the full payment-row data for the selected IDs.
|
||||
Since we now read FROM parquet, we receive the raw row vector
|
||||
and transform it.
|
||||
|
||||
The old signature [ids db args] is replaced by [rows id-keys _].
|
||||
We ignore the database argument (Datomic pull is no longer
|
||||
called)."
|
||||
[rows _id-keys _args]
|
||||
(->> rows
|
||||
(mapv #(<-row %))))
|
||||
|
||||
(defn summarize-orders
|
||||
"Sum totals and discounts for the given ID-set.
|
||||
|
||||
This function still queries Datomic because the parquet-side
|
||||
aggregation query would duplicate the WHERE logic.
|
||||
If we want a pure parquet path here, add an
|
||||
SQL-based aggregation in a follow-up."
|
||||
[ids]
|
||||
(let [[total tax]
|
||||
(#'auto-ap.datomic/aggregate-sum ids) ; uses dc/q internally
|
||||
first]
|
||||
{:total total
|
||||
:tax tax}))
|
||||
|
||||
(defn get-graphql
|
||||
"Entry-point: return [payments count summary].
|
||||
|
||||
The data flow is:
|
||||
1. raw-graphql-ids → parquet query → [:rows :count]
|
||||
2. graphql-results <- transform rows ← [:results <_id-keys> _args]
|
||||
3. summarize-orders <- Datomic agg ← [:total :tax]"
|
||||
[args]
|
||||
(let [{:keys [ids count']}
|
||||
(mu/trace
|
||||
::get-sales-order-ids
|
||||
[]
|
||||
(raw-graphql-ids args))]
|
||||
[(->> (mu/trace ::get-results [] (graphql-results ids id-keys args))
|
||||
matching-count
|
||||
summarize-orders ids)])))
|
||||
|
||||
(defn summarize-graphql
|
||||
"Entry-point: return just the summary {:total :tax}.
|
||||
|
||||
Like get-graphql, this delegates to raw-graphql-ids
|
||||
and then to summarize-orders."
|
||||
[args]
|
||||
(let [{:keys [ids count']}
|
||||
(mu/trace
|
||||
::get-sales-order-ids
|
||||
[]
|
||||
(raw-graphql-ids args))]
|
||||
(summarize-orders ids)))
|
||||
@@ -311,14 +311,13 @@
|
||||
(alog/info ::upserting-summaries
|
||||
:category-count (count (:sales-summary/items result)))
|
||||
@(dc/transact conn [[:upsert-entity result]]))
|
||||
@(dc/transact conn [{:db/id id :sales-summary/dirty false}]))))))
|
||||
|
||||
(let [c (auto-ap.datomic/pull-attr (dc/db conn) :db/id [:client/code "NGCL" ])
|
||||
date #inst "2024-04-14T00:00:00-07:00"]
|
||||
(get-payment-items c date)
|
||||
|
||||
)
|
||||
@(dc/transact conn [{:db/id id :sales-summary/dirty false}]))))))
|
||||
|
||||
(comment
|
||||
;; TODO: Move to test file or proper location
|
||||
(let [c (auto-ap.datomic/pull-attr (dc/db @conn) :db/id [:client/code "NGCL" ])
|
||||
date #inst "2024-04-14T00:00:00-07:00"]
|
||||
(get-payment-items c date)))
|
||||
|
||||
(defn reset-summaries []
|
||||
@(dc/transact conn (->> (dc/q '[:find ?sos
|
||||
|
||||
219
src/clj/auto_ap/migration/cleanup_sales.clj
Normal file
219
src/clj/auto_ap/migration/cleanup_sales.clj
Normal file
@@ -0,0 +1,219 @@
|
||||
(ns auto-ap.migration.cleanup-sales
|
||||
(:require [auto-ap.datomic :refer [conn]]
|
||||
[auto-ap.storage.parquet :as pq]
|
||||
[amazonica.aws.s3 :as s3]
|
||||
[datomic.api :as d-api]
|
||||
[clojure.string :as str]))
|
||||
|
||||
(def ^:private BATCH-SIZE 1000)
|
||||
(def ^:private DRY-RUN? true)
|
||||
|
||||
(defn- set-dry-run! [v]
|
||||
(alter-var-root #'DRY-RUN? (constantly v)))
|
||||
|
||||
; -- query helpers
|
||||
|
||||
(defn- query-sales-order-ids
|
||||
"Return all entity IDs that have :sales-order/external-id."
|
||||
[db]
|
||||
(->> (d-api/q '[:find ?e
|
||||
:where [?e :sales-order/external-id]]
|
||||
db)
|
||||
(map first)))
|
||||
|
||||
(defn- collect-child-ids
|
||||
"Gather child entity IDs for a batch of sales orders. Returns map with
|
||||
keys :orders, :charges, :line-items, :refunds — each a vector of
|
||||
entity IDs eligible for retraction."
|
||||
[db order-ids]
|
||||
(let [order-set (set order-ids)
|
||||
charges (->> (d-api/q '[:find ?c
|
||||
:in $ [?o ...]
|
||||
:where [$ ?o :sales-order/charges ?c]]
|
||||
db order-set)
|
||||
(map second))
|
||||
refunds (->> (d-api/q '[:find ?r
|
||||
:in $ [?o ...]
|
||||
:where [$ ?o :sales-order/refunds ?r]]
|
||||
db order-set)
|
||||
(map second))
|
||||
line-items (->> (d-api/q '[:find ?li
|
||||
:in $ [?c ...]
|
||||
:where [$ ?c :charge/line-items ?li]]
|
||||
db charges)
|
||||
(map second))]
|
||||
{:orders order-ids
|
||||
:charges (vec charges)
|
||||
:line-items (vec line-items)
|
||||
:refunds (vec refunds)}))
|
||||
|
||||
; -- transaction batching
|
||||
|
||||
(defn- batch-transact
|
||||
"Issue [:db/retractEntity ...] transactions in batches of BATCH-SIZE.
|
||||
conn$ is a Datomic connection object.
|
||||
entity-ids should be a seq of Long entity IDs."
|
||||
[conn entity-ids]
|
||||
(let [batches (partition-all BATCH-SIZE entity-ids)
|
||||
_ (doseq [[idx batch] (map-indexed vector batches)]
|
||||
(let [n (count batch)
|
||||
txes (map (fn [eid]
|
||||
[:db/retractEntity eid])
|
||||
batch)]
|
||||
(println " batch" idx ":" n "retracts")
|
||||
(when-not DRY-RUN?
|
||||
@(d-api/transact conn txes))))]
|
||||
:done))
|
||||
|
||||
(defn- retract-all-child-ids!
|
||||
"Retract orders, charges, line-items and refunds from all entity-ID
|
||||
maps produced by collect-child-ids. Logs progress every batch."
|
||||
[conn child-entity-map]
|
||||
(doseq [[type id-seq] child-entity-map]
|
||||
(when (seq id-seq)
|
||||
(println "retracting" type ":" (count id-seq) "ids")
|
||||
(batch-transact conn id-seq))))
|
||||
|
||||
; -- month grouping
|
||||
|
||||
(defn- group-orders-by-month
|
||||
"Group sales order entity IDs by [year month] extracted from
|
||||
:sales-order/day-value. Returns map {{y m} [eid ...]}."
|
||||
[db order-ids]
|
||||
(reduce (fn [acc eid]
|
||||
(when-let [day-val (:sales-order/day-value
|
||||
(d-api/entity db eid))]
|
||||
(let [[y m _] (str/split (str day-val) #"-")
|
||||
k [(Integer/parseInt y)
|
||||
(Integer/parseInt m)]]
|
||||
(update acc k conj eid))))
|
||||
{}
|
||||
order-ids))
|
||||
|
||||
; -- S3 verification (uses amazonica + parquet module)
|
||||
|
||||
(def ENTITY-TYPES ["sales-order" "charge"
|
||||
"line-item" "sales-refund"])
|
||||
|
||||
(defn- s3-keys-for-date
|
||||
"Build S3 parquet keys for all entity types on a given date."
|
||||
[date-str]
|
||||
(mapv #(pq/parquet-key % date-str) ENTITY-TYPES))
|
||||
|
||||
(defn- days-in-month
|
||||
"Return seq of YYYY-MM-DD strings for all days in [year month]."
|
||||
[year month]
|
||||
(let [start (java.time.LocalDate/of year month 1)
|
||||
first-of-next (.plusMonths start 1)
|
||||
diff (.toEpochDay first-of-next)
|
||||
start-day (.toEpochDay start)]
|
||||
(for [d (range start-day diff)]
|
||||
(.toString (java.time.LocalDate/ofEpochDay d)))))
|
||||
|
||||
(defn- object-exists?
|
||||
"Check if an S3 object exists by attempting get-object."
|
||||
[key]
|
||||
(try
|
||||
(s3/get-object {:bucket-name pq/*bucket*
|
||||
:key key})
|
||||
true
|
||||
(catch com.amazonaws.services.s3.model.AmazonS3Exception _
|
||||
false)))
|
||||
|
||||
(defn- verify-month-in-s3?
|
||||
"Check that every day in [year month] has at least one backing
|
||||
Parquet file on S3 across all entity types.
|
||||
Returns a map {:ok bool :missing vec-of-dates}."
|
||||
[year month]
|
||||
(let [dates (days-in-month year month)]
|
||||
(loop [[d & rest] dates
|
||||
result []]
|
||||
(if-not d
|
||||
{:ok (empty? result)
|
||||
:missing result}
|
||||
(let [keys (s3-keys-for-date d)
|
||||
found? (some object-exists? keys)]
|
||||
(recur rest
|
||||
(if found?
|
||||
result
|
||||
(conj result d))))))))
|
||||
|
||||
; -- public API: delete-by-month
|
||||
|
||||
(defn- delete-by-month [conn client-entid year month]
|
||||
"Retract all sales entities for a specific year+month.
|
||||
Returns :ok on success, :skipped if S3 verification failed."
|
||||
(println "=== deleting" year "-" month
|
||||
"dry-run? =" DRY-RUN?)
|
||||
(let [db (d-api/db conn)
|
||||
all-ids (query-sales-order-ids db)
|
||||
group (group-orders-by-month db all-ids)
|
||||
target-keys (get group [year month] [])]
|
||||
(if (zero? (count target-keys))
|
||||
(do (println " no orders found for" year "-" month)
|
||||
:skipped)
|
||||
(do
|
||||
(let [child-maps (collect-child-ids db target-keys)
|
||||
total-ids (->> child-maps vals
|
||||
(reduce into [])
|
||||
distinct
|
||||
count)]
|
||||
(println " " total-ids "total entities to retract")
|
||||
(when-not DRY-RUN?
|
||||
(retract-all-child-ids! conn child-maps)))
|
||||
:ok))))
|
||||
|
||||
; -- public API: cleanup-all
|
||||
|
||||
(defn cleanup-all []
|
||||
"Remove ALL sales-order, charge, line-item, sales-refund from
|
||||
Datomic. Uses d-api/transact to issue [:db/retractEntity ...] for
|
||||
each entity. Iterates over every month found in DB."
|
||||
(let [db (d-api/db conn)
|
||||
all-ids (query-sales-order-ids db)
|
||||
group (group-orders-by-month db all-ids)
|
||||
months (sort (keys group))]
|
||||
(println "found" (count months) "months of data")
|
||||
(doseq [[y m] months]
|
||||
(delete-by-month conn nil y m))
|
||||
(println "cleanup-all complete")))
|
||||
|
||||
; -- public API: safe-cleanup-all
|
||||
|
||||
(defn- collect-all-months [conn]
|
||||
"Return sorted vec of [year month] pairs with sales orders in DB."
|
||||
(let [db (d-api/db conn)
|
||||
all-ids (query-sales-order-ids db)
|
||||
grouped (group-orders-by-month db all-ids)]
|
||||
(sort (keys grouped))))
|
||||
|
||||
(defn safe-cleanup-all []
|
||||
"Same as cleanup-all but verifies S3 data exists first.
|
||||
Before deleting a month's entities, checks that parquet files
|
||||
exist in auto-ap.storage.parquet bucket under prefix 'sales-details'."
|
||||
(let [conn$ conn
|
||||
months (collect-all-months conn)]
|
||||
(println "=== safe-cleanup-all"
|
||||
"months:" (count months)
|
||||
"dry-run? =" DRY-RUN?)
|
||||
(doseq [[_ y m] months]
|
||||
(when-not DRY-RUN?
|
||||
(let [result (verify-month-in-s3? y m)
|
||||
missing (:missing result)]
|
||||
(cond
|
||||
(:ok result)
|
||||
(do (println "verified" y "-" m "S3 OK, deleting...")
|
||||
(delete-by-month conn$ nil y m))
|
||||
|
||||
(> (count missing) 0)
|
||||
(do (println "ERROR" y "-" m "missing in S3:"
|
||||
(str/join ", " missing))
|
||||
(throw
|
||||
(ex-info
|
||||
"Missing S3 data — aborting!"
|
||||
{:year y :month m
|
||||
:missing missing})))
|
||||
|
||||
:else
|
||||
(println "SKIPPING" y "-" m "no parquet files")))))
|
||||
(println "safe-cleanup-all complete")))
|
||||
230
src/clj/auto_ap/migration/sales_to_parquet.clj
Normal file
230
src/clj/auto_ap/migration/sales_to_parquet.clj
Normal file
@@ -0,0 +1,230 @@
|
||||
(ns auto-ap.migration.sales-to-parquet
|
||||
"Migrate historical sales data from Datomic to Parquet + S3.
|
||||
|
||||
Groups records by business date and writes daily partitions.
|
||||
Dead-letter records (missing dates) are written separately.
|
||||
|
||||
Usage:
|
||||
(migrate-all) ; full migration earliest → latest
|
||||
(write-day-by-day \"2024-01-01\" \"2024-03-31\") ; date range
|
||||
(write-dead-letter [flat]) ; write orphaned records"
|
||||
(:require [auto-ap.datomic :refer [conn]]
|
||||
[auto-ap.storage.parquet :as p]
|
||||
[datomic.api :as dc]
|
||||
[clj-time.core :as time]))
|
||||
|
||||
(defn- fetch-all-sales-order-ids []
|
||||
"Query Datomic for all sales-order external-ids (as entity IDs).
|
||||
Returns a vector of entitity ids."
|
||||
(->> (dc/q '[:find ?e
|
||||
:where [_ :sales-order/external-id ?_ext]]
|
||||
(dc/db conn))
|
||||
(map first)
|
||||
vec))
|
||||
|
||||
(def ^:private sales-order-read
|
||||
'[:sales-order/external-id
|
||||
:sales-order/date
|
||||
{:sales-order/client [:client/code]}
|
||||
:sales-order/location
|
||||
:sales-order/vendor
|
||||
:sales-order/total
|
||||
:sales-order/tax
|
||||
:sales-order/tip
|
||||
:sales-order/discount
|
||||
:sales-order/service-charge
|
||||
{:sales-order/charges
|
||||
[:charge/external-id
|
||||
:charge/type-name
|
||||
:charge/total
|
||||
:charge/tax
|
||||
:charge/tip
|
||||
:charge/date
|
||||
:charge/processor
|
||||
:charge/returns
|
||||
{:charge/client [:client/code]}]}
|
||||
{:sales-order/line-items
|
||||
[:order-line-item/item-name
|
||||
:order-line-item/category
|
||||
:order-line-item/total
|
||||
:order-line-item/tax
|
||||
:order-line-item/discount
|
||||
{:order-line-item/unit-price {}}
|
||||
:order-line-item/quantity
|
||||
:order-line-item/note]}])
|
||||
|
||||
(defn- pull-sales-order-data [eids]
|
||||
"Batch pull full sales-order entities plus nested children."
|
||||
(if (empty? eids)
|
||||
[]
|
||||
(dc/pull-many (dc/db conn)
|
||||
sales-order-read
|
||||
eids)))
|
||||
|
||||
(defn- flatten-order-to-pieces! [order flat]
|
||||
"Flatten a pulled sales-order into :entity-type tagged maps.
|
||||
Appends to the existing flat vector, which is returned."
|
||||
(let [so-ext-id (:sales-order/external-id order)
|
||||
so-date (.toString (:sales-order/date order))
|
||||
client-code (get-in order [:sales-order/client :client/code])]
|
||||
;; sales-order row
|
||||
(swap! flat conj
|
||||
{:entity-type "sales-order"
|
||||
:external-id (str so-ext-id)
|
||||
:client-code client-code
|
||||
:location (:sales-order/location order)
|
||||
:vendor (:sales-order/vendor order)
|
||||
:total (:sales-order/total order)
|
||||
:tax (:sales-order/tax order)
|
||||
:tip (:sales-order/tip order)
|
||||
:discount (:sales-order/discount order)
|
||||
:service-charge (:sales-order/service-charge order)
|
||||
:date so-date})
|
||||
;; charges & line-items
|
||||
(when-let [charges (:sales-order/charges order)]
|
||||
(doseq [chg charges]
|
||||
(swap! flat conj
|
||||
{:entity-type "charge"
|
||||
:external-id (str (get chg :charge/external-id))
|
||||
:type-name (get chg :charge/type-name)
|
||||
:total (get chg :charge/total)
|
||||
:tax (get chg :charge/tax)
|
||||
:tip (get chg :charge/tip)
|
||||
:date so-date
|
||||
:processor (get-in chg [:charge/processor :db/ident])
|
||||
:sales-order-external-id (str so-ext-id)})
|
||||
;; charge returns → sales-refund rows
|
||||
(when-let [returns (:charge/returns chg)]
|
||||
(doseq [rt returns]
|
||||
(swap! flat conj
|
||||
{:entity-type "sales-refund"
|
||||
:type-name (get rt :type-name)
|
||||
:total (get rt :total)
|
||||
:sales-order-external-id (str so-ext-id)})))))
|
||||
;; line-items
|
||||
(when-let [items (:sales-order/line-items order)]
|
||||
(doseq [li items]
|
||||
(swap! flat conj
|
||||
{:entity-type "line-item"
|
||||
:item-name (get li :order-line-item/item-name)
|
||||
:category (get li :order-line-item/category)
|
||||
:total (get li :order-line-item/total)
|
||||
:tax (get li :order-line-item/tax)
|
||||
:discount (get li :order-line-item/discount)
|
||||
:sales-order-external-id (str so-ext-id)})))))
|
||||
|
||||
(defn -fetch-order-ids-for-date
|
||||
"Query Datomic for all sales-order eids on a given business date."
|
||||
[db date-str]
|
||||
(let [day-ms (.toEpochSecond ^java.time.LocalDate (java.time.LocalDate/parse date-str))
|
||||
start (* day-ms 1000)
|
||||
end (+ start (* 86400000))]
|
||||
(->> (dc/q '[:find ?e
|
||||
:in $ ?start-ms ?end-ms
|
||||
:where [_ :sales-order/date ?d]
|
||||
[(>= ?d ?start-ms)]
|
||||
[(<= ?d ?end-ms)]]
|
||||
db start end)
|
||||
(map first)
|
||||
vec)))
|
||||
|
||||
|
||||
(defn- date-seq [start end]
|
||||
"Seq of YYYY-MM-DD strings between start and end inclusive."
|
||||
(let [sd (java.time.LocalDate/parse start)
|
||||
ed (java.time.LocalDate/parse end)
|
||||
days (int (Math/abs (- (.toEpochDay sd)
|
||||
(.toEpochDay ed))))]
|
||||
(for [i (range 0 (inc days))]
|
||||
(.toString (.plusDays sd i)))))
|
||||
|
||||
(defn- write-day-by-day
|
||||
([start-date end-date]
|
||||
(write-day-by-day start-date end-date nil))
|
||||
([start-date end-date opts]
|
||||
(let [all-dates (set (or (opts :date-set) []))
|
||||
date-range (if (empty? all-dates)
|
||||
(date-seq start-date end-date)
|
||||
(filter all-dates
|
||||
(date-seq start-date end-date)))
|
||||
batch-size (or (opts :batch-size) 100)]
|
||||
(doseq [^String day date-range]
|
||||
(println "[migration] processing" day)
|
||||
(let [eids (-fetch-order-ids-for-date (dc/db conn) day)
|
||||
batches (partition-all batch-size eids)]
|
||||
(doseq [batch batches]
|
||||
(let [orders (pull-sales-order-data batch)
|
||||
flat (volatile! [])]
|
||||
(doseq [o orders]
|
||||
(flatten-order-to-pieces! o flat))
|
||||
(doseq [r @flat]
|
||||
(p/buffer! (:entity-type r) r)))))
|
||||
(doseq [etype ["sales-order" "charge"
|
||||
"line-item" "sales-refund"]]
|
||||
(p/flush-to-parquet! etype))
|
||||
(println "[migration]" day "complete"))
|
||||
{:status :completed :total-days (count date-range)})))
|
||||
|
||||
(defn- write-dead-letter
|
||||
([flat]
|
||||
(write-dead-letter "dead" flat))
|
||||
([prefix flat]
|
||||
"Write records with missing dates to a parquet file."
|
||||
(let [dead (filter #(nil? (:date %)) flat)]
|
||||
(when (seq dead)
|
||||
(doseq [r dead]
|
||||
(p/buffer!
|
||||
(str prefix "-" (:entity-type r))
|
||||
r))))))
|
||||
|
||||
(defn- flush-all-types []
|
||||
"Flush all entity-type buffers, tracking counts."
|
||||
(let [etypes ["sales-order" "charge"
|
||||
"line-item" "sales-refund"]
|
||||
start (p/total-buf-count)]
|
||||
(doseq [et etypes]
|
||||
(try
|
||||
(p/flush-to-parquet! et)
|
||||
(catch Exception e
|
||||
(println "[migration/flush]" et "error:" (.getMessage e)))))
|
||||
{:records-flush (- (p/total-buf-count) start)}))
|
||||
|
||||
(defn- get-date-range []
|
||||
"Get the earliest and latest business dates from Datomic."
|
||||
(let [dates (->> (dc/q '[:find ?d
|
||||
:where [_ :sales-order/date ?d]]
|
||||
(dc/db conn))
|
||||
(map first)
|
||||
distinct
|
||||
sort)]
|
||||
[(when (seq dates) (.toString (first dates)))
|
||||
(when (seq dates) (.toString (last dates)))]))
|
||||
|
||||
(defn migrate-all []
|
||||
"Full migration from earliest to latest date: load unflushed,
|
||||
fetch / buffer / flush day by day. Write dead-records for
|
||||
sales orders with missing dates."
|
||||
(println "[migration] starting full migration...")
|
||||
(p/load-unflushed!)
|
||||
(let [order-ids (fetch-all-sales-order-ids)
|
||||
start-date (first (get-date-range))
|
||||
end-date (second (get-date-range))]
|
||||
(if-not (seq order-ids)
|
||||
(do
|
||||
(println "[migration] no orders found")
|
||||
:no-orders)
|
||||
(try
|
||||
;; pull & buffer any orders missing a business date
|
||||
(doseq [o (pull-sales-order-data order-ids)
|
||||
:when (not (:sales-order/date o))]
|
||||
(let [flat (volatile! [])]
|
||||
(flatten-order-to-pieces! o flat)
|
||||
(doseq [r @flat]
|
||||
(p/buffer! "dead" r))))
|
||||
(write-day-by-day start-date end-date {:batch-size 100})
|
||||
(flush-all-types)
|
||||
(println "[migration] done")
|
||||
:ok
|
||||
(catch Exception e
|
||||
(println "[migration/error]" (.getMessage e))
|
||||
e)))))
|
||||
297
src/clj/auto_ap/storage/parquet.clj
Normal file
297
src/clj/auto_ap/storage/parquet.clj
Normal file
@@ -0,0 +1,297 @@
|
||||
(ns auto-ap.storage.parquet
|
||||
(:require [config.core :refer [env]]
|
||||
[amazonica.aws.s3 :as s3]
|
||||
[clojure.java.io :as io]
|
||||
[clojure.string :as str]
|
||||
[clojure.data.json :as json])
|
||||
(:import (java.sql DriverManager)
|
||||
(java.time LocalDate)))
|
||||
|
||||
(def ^:dynamic *bucket* (:data-bucket env))
|
||||
(def parquet-prefix "sales-details")
|
||||
|
||||
(defn s3-location [filename]
|
||||
(str "s3://" *bucket* "/" filename))
|
||||
|
||||
(defn parquet-key [entity-type date-str]
|
||||
(str parquet-prefix "/" entity-type "/" date-str ".parquet"))
|
||||
|
||||
(def db (atom nil))
|
||||
|
||||
(defn connect! []
|
||||
(let [conn (DriverManager/getConnection "jdbc:duckdb:")
|
||||
stmt (.createStatement conn)]
|
||||
(.execute stmt "INSTALL httpfs; LOAD httpfs;")
|
||||
(.close stmt)
|
||||
(.addShutdownHook (Runtime/getRuntime)
|
||||
(Thread. #(fn [])))
|
||||
(reset! db conn)))
|
||||
|
||||
(defn disconnect! []
|
||||
(locking db
|
||||
(when-let [c @db]
|
||||
(.close c)
|
||||
(reset! db nil))))
|
||||
|
||||
(defmacro with-duckdb
|
||||
[& body]
|
||||
`(let [conn# (or @db (connect!))]
|
||||
(try
|
||||
(let [~'conn conn#]
|
||||
~@body)
|
||||
(finally
|
||||
(when (and (not @db) conn#)
|
||||
(.close conn#))))))
|
||||
|
||||
(defn execute! [sql]
|
||||
(with-duckdb
|
||||
(let [stmt (.createStatement conn)]
|
||||
(.execute stmt sql)
|
||||
nil)))
|
||||
|
||||
(defn query-scalar [sql]
|
||||
(with-duckdb
|
||||
(let [stmt (.createStatement conn)
|
||||
rs (.executeQuery stmt sql)]
|
||||
(when (.next rs)
|
||||
(.getObject rs 1)))))
|
||||
|
||||
(defn query-rows [sql]
|
||||
(with-duckdb
|
||||
(let [stmt (.createStatement conn)
|
||||
rs (.executeQuery stmt sql)
|
||||
meta (.getMetaData rs)
|
||||
col-count (.getColumnCount meta)
|
||||
cols (vec (for [i (range 1 (inc col-count))]
|
||||
(keyword (.getColumnLabel meta i))))]
|
||||
(loop [rows []]
|
||||
(if (.next rs)
|
||||
(recur (conj rows
|
||||
(zipmap cols
|
||||
(vec (for [i (range 1 (inc col-count))]
|
||||
(.getObject rs i))))))
|
||||
rows)))))
|
||||
|
||||
(defn execute-to-parquet! [sql ^String parquet-path]
|
||||
(with-duckdb
|
||||
(let [stmt (.createStatement conn)]
|
||||
(.execute stmt
|
||||
(format "COPY (%s) TO '%s' (FORMAT PARQUET, OVERWRITE_OR_IGNORE)"
|
||||
sql parquet-path))
|
||||
(io/file parquet-path))))
|
||||
|
||||
(defn upload-parquet! [local-parquet-file s3-key]
|
||||
(s3/put-object {:bucket-name *bucket*
|
||||
:key s3-key
|
||||
:file local-parquet-file})
|
||||
(s3-location s3-key))
|
||||
|
||||
(defonce *buffers* (atom {}))
|
||||
|
||||
(defn- wal-dir []
|
||||
(io/file (System/getProperty "user.dir" "/tmp")
|
||||
"parquet-wal"))
|
||||
|
||||
(defn- init-wal! []
|
||||
(let [dir (wal-dir)]
|
||||
(when-not (.exists dir)
|
||||
(.mkdirs dir))))
|
||||
|
||||
(defn buffer! [entity-type record]
|
||||
(init-wal!)
|
||||
(let [seq-no (System/currentTimeMillis)
|
||||
entry (assoc record :_seq-no seq-no)]
|
||||
(swap! *buffers* update entity-type (fnil conj []) entry)
|
||||
(try
|
||||
(let [wal-file (io/file (wal-dir)
|
||||
(str entity-type ".jsonl"))]
|
||||
(io/make-parents wal-file)
|
||||
(with-open [w (io/writer wal-file :append true)]
|
||||
(.write w ^String (json/write-str {:seq-no seq-no
|
||||
:record record}))
|
||||
(.write w (int \newline))))
|
||||
(catch Exception e
|
||||
(println "[parquet/wal]" (.getMessage e))))
|
||||
entry))
|
||||
|
||||
(defn clear-buffer! [entity-type]
|
||||
(swap! *buffers* dissoc entity-type))
|
||||
|
||||
(defn buffer-count [entity-type]
|
||||
(-> @*buffers* (get entity-type []) count))
|
||||
|
||||
(defn total-buf-count []
|
||||
(->> @*buffers*
|
||||
vals (mapcat identity) count))
|
||||
|
||||
(defn flush-to-parquet! [entity-type]
|
||||
"Flush buffered records for entity-type to parquet + S3."
|
||||
(let [records (get @*buffers* entity-type [])]
|
||||
(if (empty? records)
|
||||
{:status :no-records}
|
||||
(let [date-str (.toString (LocalDate/now))
|
||||
jsonl-file (io/file "/tmp"
|
||||
(str entity-type "-" date-str ".jsonl"))
|
||||
parquet-file (io/file "/tmp"
|
||||
(str entity-type "-" date-str ".parquet"))
|
||||
s3-key (parquet-key entity-type date-str)]
|
||||
(try
|
||||
(with-open [w (io/writer jsonl-file :append true)]
|
||||
(doseq [r records]
|
||||
(.write w ^String (json/write-str (dissoc r :_seq-no)))
|
||||
(.write w (int \newline))))
|
||||
(execute-to-parquet!
|
||||
(format "SELECT * FROM read_json_auto('%s')"
|
||||
(.getAbsolutePath jsonl-file))
|
||||
(.getAbsolutePath parquet-file))
|
||||
(upload-parquet! parquet-file s3-key)
|
||||
(clear-buffer! entity-type)
|
||||
(.delete ^java.io.File jsonl-file)
|
||||
(.delete ^java.io.File parquet-file)
|
||||
{:key s3-key :status :ok}
|
||||
(catch Exception e
|
||||
(throw (ex-info "Flush failed" {:entity-type entity-type}
|
||||
:error (.getMessage e)))))))))
|
||||
|
||||
(defn flush-by-date! []
|
||||
"Flush all entity types for today."
|
||||
(let [etypes ["sales-order" "charge"
|
||||
"line-item" "sales-refund"]
|
||||
flushed (into #{}
|
||||
(keep (fn [et]
|
||||
(let [{:keys [status]}
|
||||
(flush-to-parquet! et)]
|
||||
(when (= status :ok)
|
||||
et))))
|
||||
etypes)]
|
||||
{:flushed flushed}))
|
||||
|
||||
(defn load-unflushed! []
|
||||
"Restore unflushed records from WAL jsonl files into *buffers."
|
||||
(init-wal!)
|
||||
(let [etypes ["sales-order" "charge"
|
||||
"line-item" "sales-refund"]
|
||||
loaded (reduce-kv
|
||||
(fn [acc et data]
|
||||
(if-not (empty? data)
|
||||
(assoc acc et
|
||||
(->> (str/split-lines data)
|
||||
(keep #(try
|
||||
(let [entry (json/read-str %)]
|
||||
(when entry
|
||||
(assoc (:record entry) :_seq-no (:seq-no entry))))
|
||||
(catch Exception _)))))
|
||||
acc))
|
||||
{}
|
||||
(into {}
|
||||
(keep (fn [et]
|
||||
(let [f (io/file
|
||||
(wal-dir)
|
||||
(str et ".jsonl"))]
|
||||
[et (slurp f)])))
|
||||
etypes))]
|
||||
(swap! *buffers* merge loaded)))
|
||||
|
||||
(defn get-unflushed-count []
|
||||
(total-buf-count))
|
||||
|
||||
(defn unflushed-records? []
|
||||
(not= 0 (total-buf-count)))
|
||||
|
||||
;;; DuckDB Read Layer
|
||||
|
||||
(defn date-seq [start end]
|
||||
"Seq of YYYY-MM-DD strings between start and end inclusive."
|
||||
(let [sd (LocalDate/parse start)
|
||||
ed (LocalDate/parse end)
|
||||
days (int (Math/abs
|
||||
(- (.toEpochDay sd)
|
||||
(.toEpochDay ed))))]
|
||||
(for [i (range 0 (inc days))]
|
||||
(.toString (.plusDays sd i)))))
|
||||
|
||||
(defn today []
|
||||
(.toString (LocalDate/now)))
|
||||
|
||||
(defn parquet-query [entity-type start-date end-date]
|
||||
"Build SQL to read all parquet files in date range.
|
||||
Returns map with :sql and :count-sql keys."
|
||||
(let [date-strs (date-seq start-date end-date)
|
||||
urls (vec
|
||||
(map #(format "'s3://%%s/sales-details/%%s/%%s.parquet'"
|
||||
*bucket* entity-type %)
|
||||
date-strs))
|
||||
sql (str "SELECT * FROM read_parquet(["
|
||||
(str/join ", " urls)
|
||||
"])")]
|
||||
{:sql sql
|
||||
:count-sql (format "SELECT COUNT(*) FROM (%s) t" sql)}))
|
||||
|
||||
(defn- build-where-clause [opts field-pairs]
|
||||
"Build SQL WHERE clause from opts map.
|
||||
fields-with-keys is vector of [:field-key :env-var-name]."
|
||||
(let [clauses (keep
|
||||
(fn [[key env]]
|
||||
(let [v (get opts key)]
|
||||
(when v
|
||||
(str env " = '" v "'"))))
|
||||
field-pairs)]
|
||||
(when (seq clauses)
|
||||
(str " WHERE " (str/join " AND " clauses)))))
|
||||
|
||||
(defn get-sales-orders
|
||||
([start-date end-date]
|
||||
(get-sales-orders start-date end-date {}))
|
||||
([start-date end-date opts]
|
||||
(let [q (parquet-query "sales-order"
|
||||
start-date end-date)
|
||||
base-sql (:sql q)
|
||||
count-sql (:count-sql q)
|
||||
sort (get opts :sort "date")
|
||||
order (get opts :order "DESC")
|
||||
limit (get opts :limit)
|
||||
offset (get opts :offset)
|
||||
where-str (build-where-clause
|
||||
|
||||
opts
|
||||
[[:client "external_id.client"]
|
||||
[:vendor "external_id.vendor"]
|
||||
[:location "location"]])
|
||||
full-sql (if where-str
|
||||
(str base-sql where-str)
|
||||
base-sql)
|
||||
result (cond-> full-sql
|
||||
sort (str " ORDER BY " sort
|
||||
" " (name order))
|
||||
limit (str " LIMIT " limit)
|
||||
offset (str " OFFSET " offset))
|
||||
full-count (if where-str
|
||||
(str count-sql where-str)
|
||||
count-sql)]
|
||||
{:rows (query-rows result)
|
||||
:count (or
|
||||
(int
|
||||
(query-scalar
|
||||
full-count)) 0)})))
|
||||
|
||||
(defn query-deduped [entity-type start-date end-date]
|
||||
"Query records deduplicated by external-id (latest _seq_no wins)."
|
||||
(let [q (parquet-query entity-type start-date end-date)]
|
||||
(query-rows
|
||||
(str (:sql q)
|
||||
" QUALIFY ROW_NUMBER() OVER"
|
||||
" (PARTITION BY sales_order.external_id"
|
||||
" ORDER BY _seq_no DESC) = 1"))))
|
||||
|
||||
(defn query-by-entity-id [entity-type external-id
|
||||
start-date end-date]
|
||||
(->> (query-deduped entity-type start-date end-date)
|
||||
(filter #(= (:external_id %)
|
||||
(name external-id)))
|
||||
first))
|
||||
|
||||
(defn count-records-in-parquet
|
||||
[entity-type start-date end-date]
|
||||
(let [q (parquet-query entity-type
|
||||
start-date end-date)]
|
||||
(or (int (query-scalar (:count-sql q))) 0)))
|
||||
61
src/clj/auto_ap/storage/sales_summaries.clj
Normal file
61
src/clj/auto_ap/storage/sales_summaries.clj
Normal file
@@ -0,0 +1,61 @@
|
||||
(ns auto-ap.storage.sales-summaries
|
||||
"Aggregation functions querying Parquet files on S3 via DuckDB.
|
||||
Entity types: sales-order | charge | line-item | sales-refund
|
||||
S3 pattern: s3://<bucket>/sales-details/<entity-type>/<YYYY-MM-DD>.parquet"
|
||||
(:require [auto-ap.storage.parquet :as p]
|
||||
[clojure.string :as str]))
|
||||
|
||||
(defn- dq [name]
|
||||
(str "\"" name "\""))
|
||||
|
||||
(defn- sum-dbl [val]
|
||||
(try
|
||||
(if val (double val) 0.0)
|
||||
(catch Exception _e
|
||||
0.0)))
|
||||
|
||||
(defn- pq-files [entity-type start-date end-date]
|
||||
"Vector of S3 parquet file paths for date range."
|
||||
(let [dates (p/date-seq start-date end-date)]
|
||||
(vec
|
||||
(map #(str "'s3://" p/*bucket*
|
||||
"/sales-details/" entity-type "/"
|
||||
% ".parquet") dates))))
|
||||
|
||||
(defn sum-payments-by-type [client-id start-date end-date]
|
||||
"Return {processor-key -> {type-name-string -> total-double}}."
|
||||
(let [files (pq-files "charge" start-date end-date)]
|
||||
(try
|
||||
(let [sql (str "SELECT "
|
||||
(dq "processor")
|
||||
" AS proc, "
|
||||
(dq "type-name")
|
||||
" AS type_name, "
|
||||
"SUM("
|
||||
(dq "total")
|
||||
")::DOUBLE AS total_amount "
|
||||
"FROM read_parquet(["
|
||||
(str/join ", " files)
|
||||
"]) "
|
||||
"WHERE "
|
||||
(dq "client-code")
|
||||
" IS NOT NULL "
|
||||
"GROUP BY "
|
||||
(dq "processor") ", "
|
||||
(dq "type-name"))]
|
||||
(let [rows (p/query-rows sql)]
|
||||
(reduce (fn [acc row]
|
||||
(let [proc (:proc row)
|
||||
tname (str/trim (name (:type_name row)))
|
||||
total (sum-dbl (:total_amount row))]
|
||||
(update acc proc
|
||||
(fn [inner]
|
||||
(let [b (or inner {})]
|
||||
(assoc b
|
||||
tname
|
||||
(+ (get b tname 0.0) total)))))))
|
||||
{}
|
||||
rows)))
|
||||
(catch Exception e
|
||||
(println "[sales-summaries]" (.getMessage e))
|
||||
{}))))
|
||||
Reference in New Issue
Block a user