1 Commits

Author SHA1 Message Date
cc838adfac feat(sales): initial Parquet migration infrastructure
- Add DuckDB/S3 parquet storage layer (auto-ap.storage.parquet)
- Add sales_to_parquet migration script for historical data
- Add cleanup_sales for post-migration Datomic cleanup
- Add sales_orders_new.clj with DuckDB read layer for SSR views
- Add test scaffolding for parquet storage
- Add plan document for move-detailed-sales-to-parquet

feat(sales): redirect production and read flows to Parquet/DuckDB

- U3: Square production (upsert) now buffers to parquet via flatten-order-to-parquet!
- U3: EzCater core import-order now buffers to parquet instead of Datomic transact
- U3: EzCater XLS upload-xls now buffers to parquet instead of audit-transact
- U4: Rewrite sales_orders.clj to read from DuckDB via pq/get-sales-orders
- U5: Rewrite sales_summaries to use parquet aggregation functions
  - get-payment-items-parquet, get-discounts-parquet, get-refund-items-parquet
  - get-tax-parquet, get-tip-parquet, get-sales-parquet
- Add sum-* aggregation functions to storage/sales_summaries.clj
  - sum-discounts, sum-refunds-by-type, sum-taxes, sum-tips, sum-sales-by-category
2026-04-25 08:37:08 -07:00
17 changed files with 989 additions and 985 deletions

File diff suppressed because one or more lines are too long

View File

@@ -1,127 +0,0 @@
---
name: gitea-tea
description: Use tea CLI to create, manage, and checkout Gitea pull requests. Use this when opening a PR, managing PRs, or checking out PRs on the gitea remote (gitea.story-basking.ts.net).
---
# Gitea Tea CLI Skill
This skill covers using `tea` (Gitea's official CLI) for pull request workflows in this project.
## When to Use This Skill
Use this skill when you need to:
- Create a PR from a working branch to master on the gitea remote
- Open, list, or view PRs
- Checkout a PR locally for review or iteration
- Manage PR state (close, reopen, merge)
## Project Setup
The gitea remote is `gitea.story-basking.ts.net` with repo slug `notid/integreat`. The default push remote is **gitea**, NOT origin and NOT deploy.
In this project's environment:
- Gitea login is pre-configured for `gitea.story-basking.ts.net`
- Repo slug: `notid/integreat`
- Target branch for PRs: `master`
- The git remote named `gitea` points to this instance
## Creating a PR
Use `tea pulls create` to open a PR from the current branch to master. Always specify `-r notid/integreat -b master`:
```bash
tea pulls create -r notid/integreat -b master --title "Title" --description "Body"
```
Common flags:
- `-t, --title` - PR title
- `-d, --description` - PR body/description (use heredoc or file for long descriptions)
- `-a, --assignees` - Comma-separated usernames to assign
- `-L, --labels` - Comma-separated labels to apply
- `-m, --milestone` - Milestone to assign
**Writing a multiline description:**
```bash
tea pulls create -r notid/integreat -b master \
-t "feat: add feature" \
-d "$(cat <<'EOF'
## Summary
- Bullet point one
- Bullet point two
EOF
)"
```
Or write the body to a temp file first and reference it.
## Listing PRs
```bash
tea pulls list -r notid/integreat # List open PRs
tea pulls list -r notid/integreat --state all # All PRs
tea pulls list -r notid/integreat --limit 10 -o simple # Limit output, simple format
```
## Opening a PR in Browser
```bash
tea open pr <number> -r notid/integreat
tea open pr create -r notid/integreat # Open web UI to create a PR
```
## Checking Out a PR Locally
```bash
tea pulls checkout <number> -r notid/integreat
```
This fetches and checks out the PR branch locally.
## Managing PR State
**Close a PR:**
```bash
tea pulls close <number> -r notid/integreat --confirm
```
**Reopen a closed PR:**
```bash
tea pulls reopen <number> -r notid/integreat --confirm
```
**Merge a PR:**
```bash
tea pulls merge <number> -r notid/integreat --confirm
```
**Edit a PR (title, description, etc.):**
```bash
tea pulls edit <number> -r notid/integreat --title "New title" --description "New body"
```
## Full PR Creation Workflow
1. Ensure the branch is pushed to gitea:
```bash
git push gitea <branch-name>
```
2. Create the PR with tea:
```bash
tea pulls create -r notid/integreat -b master \
--title "feat: description of change" \
--description "Detailed PR body here"
```
3. Open the PR in browser to verify:
```bash
tea open pr <number> -r notid/integreat
```
## Tips
- Always use `-r notid/integreat` to specify the repo explicitly
- Use `-b master` to set the target branch (default may differ)
- The `--confirm` flag is required for destructive actions (close, merge)
- Use `-o simple`, `-o json`, `-o table`, etc. to control output format

View File

@@ -1,9 +1,54 @@
## Pull Requests on Gitea # Agent Instructions
This project uses **gitea story-basking.ts net** as the primary remote for PRs. Use `tea` (the Gitea CLI) to create and manage pull requests. The gitea remote is the one you push to, NOT origin and NOT deploy. This project uses **bd** (beads) for issue tracking. Run `bd onboard` to get started.
**When opening a PR**, load and follow the **gitea-tea** skill. In short: ## Issue Tracking
- Target branch is always `master`
- Use `tea pulls create -r notid/integreat -b master --title "..." --description "..."` This project uses **bd (beads)** for issue tracking.
Run `bd prime` for workflow context, or install hooks (`bd hooks install`) for auto-injection.
**Quick reference:**
- `bd ready` - Find unblocked work
- `bd create "Title" --type task --priority 2` - Create issue
- `bd close <id>` - Complete work
- `bd sync` - Sync with git (run at session end)
For full workflow details: `bd prime`
## Quick Reference
```bash
bd ready # Find available work
bd show <id> # View issue details
bd update <id> --status in_progress # Claim work
bd close <id> # Complete work
bd sync # Sync with git
```
## Landing the Plane (Session Completion)
**When ending a work session**, you MUST complete ALL steps below. Work is NOT complete until `git push` succeeds.
**MANDATORY WORKFLOW:**
1. **File issues for remaining work** - Create issues for anything that needs follow-up
2. **Run quality gates** (if code changed) - Tests, linters, builds
3. **Update issue status** - Close finished work, update in-progress items
4. **PUSH TO REMOTE** - This is MANDATORY:
```bash
git pull --rebase
bd sync
git push
git status # MUST show "up to date with origin"
```
5. **Clean up** - Clear stashes, prune remote branches
6. **Verify** - All changes committed AND pushed
7. **Hand off** - Provide context for next session
**CRITICAL RULES:**
- Work is NOT complete until `git push` succeeds
- NEVER stop before pushing - that leaves work stranded locally
- NEVER say "ready to push when you are" - YOU must push
- If push fails, resolve and retry until it succeeds
Use 'bd' for task tracking Use 'bd' for task tracking

View File

@@ -1,51 +1,39 @@
(ns auto-ap.datomic.sales-orders (ns auto-ap.datomic.sales-orders
(:require (:require
[auto-ap.datomic :refer [conn]]
[auto-ap.storage.parquet :as pq] [auto-ap.storage.parquet :as pq]
[auto-ap.time :as atime] [clj-time.coerce :as c]
[clj-time.coerce :as coerce]
[clojure.set :as set] [clojure.set :as set]
[clojure.string :as str] [com.brunobonacci.mulog :as mu]))
[com.brunobonacci.mulog :as mu]
[ring.util.codec :as ring-codec]))
(defn- payment-methods->charges [pm-str]
(when (not-empty pm-str)
(mapv (fn [pm] {:charge/type-name pm})
(str/split pm-str #","))))
(defn <-row (defn <-row
"Convert a flat parquet row into the shape consumers expect." "Convert a flat parquet row into the shape consumers expect.
Parquet produces maps of the form:
{\"external-id\" \"square/order/123\", ...}
which we transform to:
{:sales-order/external-id \"square/order/123\", ...}"
[row] [row]
(let [pm (:payment-methods row)]
(-> row (-> row
(set/rename-keys (set/rename-keys
{:external-id :sales-order/external-id {"external-id" :sales-order/external-id
:location :sales-order/location "location" :sales-order/location
:total :sales-order/total "total" :sales-order/total
:tax :sales-order/tax "tax" :sales-order/tax
:tip :sales-order/tip "tip" :sales-order/tip
:discount :sales-order/discount "discount" :sales-order/discount
:service-charge :sales-order/service-charge "service-charge" :sales-order/service-charge
:vendor :sales-order/vendor "vendor" :sales-order/vendor
:client-code :sales-order/client-code "client-code" :sales-order/client-code
:date :sales-order/date "date" :sales-order/date})
:source :sales-order/source (update :sales-order/date #(some-> % str))))
:reference-link :sales-order/reference-link
:payment-methods :sales-order/payment-methods
:processors :sales-order/processors
:categories :sales-order/categories})
(update :sales-order/date #(some-> % str))
(dissoc :entity-type :_seq-no)
(assoc :sales-order/charges (payment-methods->charges pm)))))
(defn build-where-clause [args] (defn build-where-clause [args]
(let [clauses (keep identity (let [clauses [(when-let [c (:client-code args)]
[(when-let [c (:client-code args)] ["external_id.client = '" c "'"])
(str "external_id.client = '" c "'"))
(when-let [v (:vendor args)] (when-let [v (:vendor args)]
(str "external_id.vendor = '" (name v) "'")) ["external_id.vendor = '" (name v) "'"])
(when-let [l (:location args)] (when-let [l (:location args)]
(str "location = '" l "'"))])] ["location = '" l "'"])]
(when (seq clauses) (when (seq clauses)
(str "WHERE " (str/join " AND " clauses))))) (str "WHERE " (str/join " AND " clauses)))))
@@ -58,114 +46,32 @@
(defn raw-graphql-ids [args] (defn raw-graphql-ids [args]
(let [start (some-> (:start (:date-range args)) .toString) (let [start (some-> (:start (:date-range args)) .toString)
end (some-> (:end (:date-range args)) (.substring 0 10)) end (some-> (:end (:date-range args)) .substring 0 10)
where (build-where-clause args)
sort (build-sort-clause args)
limit (or (:limit args) page-size) limit (or (:limit args) page-size)
offset (or (:offset args) 0)] offset (or (:offset args) 0)
where-str (when where (str " " where))]
(when start (when start
(let [result (pq/get-sales-orders start end (let [result (pq/get-sales-orders start end
{:client (:client-code args) {:client (:client-code args)
:vendor (:vendor args) :vendor (:vendor args)
:location (:location args) :location (:location args)
:sort (or (:sort args) "date") :sort sort
:order "DESC" :order "DESC"
:limit limit :limit limit
:offset offset})] :offset offset})]
{:ids (mapv #(str (:external-id %)) (:rows result)) {:ids (mapv #(str (:external_id %)) (:rows result))
:rows (:rows result) :rows (:rows result)
:count (:count result)})))) :count (:count result)}))))
(defn graphql-results [rows _ids _args] (defn graphql-results [rows _ids _args]
(mapv <-row rows)) (mapv <-row rows))
(defn- extract-date-str [v]
(when v
(cond
(instance? org.joda.time.DateTime v) (atime/unparse-local v atime/normal-date)
(instance? org.joda.time.LocalDate v) (atime/unparse-local v atime/normal-date)
(instance? java.util.Date v) (atime/unparse-local (coerce/to-date-time v) atime/normal-date)
(instance? java.time.LocalDate v) (.toString v)
(string? v) (if (re-find #"^\d{2}/\d{2}/\d{4}" v)
(-> (java.time.LocalDate/parse v (java.time.format.DateTimeFormatter/ofPattern "MM/dd/yyyy"))
.toString)
(if (> (count v) 10) (.substring v 0 10) v))
:else (str v))))
(defn- get-date [qp k]
(or (extract-date-str (get qp k))
(extract-date-str (get qp (name k)))))
(defn- kw->str [v]
(when (some? v)
(if (keyword? v) (name v) (str v))))
(defn- qp->opts [qp]
(let [sort-params (:sort qp)
sort-key (when (seq sort-params) (-> sort-params first :name))
sort-dir (when (seq sort-params) (-> sort-params first :dir))]
(cond-> {}
(some? (:client-code qp)) (assoc :client (kw->str (:client-code qp)))
(some? (:location qp)) (assoc :location (kw->str (:location qp)))
(not-empty (:payment-method qp)) (assoc :payment-method (:payment-method qp))
(some? (:processor qp)) (assoc :processor (kw->str (:processor qp)))
(not-empty (:category qp)) (assoc :category (:category qp))
(:total-gte qp) (assoc :total-gte (:total-gte qp))
(:total-lte qp) (assoc :total-lte (:total-lte qp))
sort-key (assoc :sort sort-key)
sort-dir (assoc :order (or sort-dir "DESC"))
true (assoc :limit (or (:per-page qp) 25)
:offset (or (:start qp) 0)))))
(defn- default-date-range []
(let [today (.toString (java.time.LocalDate/now))
week-ago (.toString (.minusDays (java.time.LocalDate/now) 7))]
[week-ago today]))
(defn- qp->date-range [qp]
(let [[default-start default-end] (default-date-range)]
[(or (get-date qp :start-date)
(extract-date-str (get-in qp [:date-range :start]))
default-start)
(or (get-date qp :end-date)
(extract-date-str (get-in qp [:date-range :end]))
default-end)]))
(defn- request->client-codes [request]
(let [clients (:clients request)
codes (keep :client/code clients)]
(when (seq codes) codes)))
(defn fetch-page-ssr
"Fetch sales orders from parquet for the SSR page."
[request]
(let [qp (:query-params request)
raw-qp (some-> (:query-string request)
ring-codec/form-decode
(->> (into {} (remove (fn [[_ v]] (str/blank? v))))))
[start end] (qp->date-range (merge raw-qp qp))
opts (qp->opts qp)
client-codes (request->client-codes request)
opts (if client-codes (assoc opts :client-codes client-codes) opts)
result (pq/get-sales-orders start end opts)
rows (mapv <-row (:rows result))]
{:rows rows :count (:count result)}))
(defn summarize-page-ssr
"Summarize all matching sales orders via parquet."
[request]
(let [qp (:query-params request)
raw-qp (some-> (:query-string request)
ring-codec/form-decode
(->> (into {} (remove (fn [[_ v]] (str/blank? v))))))
[start end] (qp->date-range (merge raw-qp qp))
opts (dissoc (qp->opts qp) :limit :offset :sort :order)
client-codes (request->client-codes request)
opts (if client-codes (assoc opts :client-codes client-codes) opts)]
(pq/get-sales-orders-summary start end opts)))
(defn summarize-orders [rows] (defn summarize-orders [rows]
(when (seq rows) (when (seq rows)
(let [total (reduce + 0.0 (map #(or (:sales-order/total %) 0.0) rows)) (let [total (reduce + 0.0 (map #(or (:total %) 0.0) rows))
tax (reduce + 0.0 (map #(or (:sales-order/tax %) 0.0) rows))] tax (reduce + 0.0 (map #(or (:tax %) 0.0) rows))]
{:total total {:total total
:tax tax}))) :tax tax})))

View File

@@ -0,0 +1,224 @@
(ns auto-ap.datomic.sales-orders
(:require [auto-ap.datomic :refer [conn]]
[auto-ap.storage.parquet :as pq]
[clojure.data.json :as json]
[clojure.java.io :as io]))
(defn <-row
"Convert a flat parquet row (string keys) into the
shape consumers expect. Parquet produces maps of the form
{\"external-id\" \"square/order/123\",
\"location\" \"DT\",
\"total\" 50.0}
which we transform to:
{:sales-order/external-id \"square/order/123\",
:sales-order/location \"DT\",
:sales-order/total 50.0}
Note: client, charges and other nested structures are not
available in the flat parquet rows. When denormalisation
adds those columns we can restore the full consumer shape."
[row]
(-> row
(set/rename-keys
{"external-id" :sales-order/external-id
"location" :sales-order/location
"total" :sales-order/total
"tax" :sales-order/tax
"tip" :sales-order/tip})))
(defn build-where-clause
"Build a SQL WHERE fragment from the fields that
parquet can filter on: external_id.client, vendor, location.
Returns either a predicate string e.g.
\"WHERE external_id.client = 'acme' AND vendor = 'square'\"
or nil when no applicable filters exist."
[args]
(let [clauses []
client (:client-code args)
vendor (:vendor args)
location (:location args)]
(when (or client vendor location)
(->> [[:client "external_id.client" client]
[:vendor "external_id.vendor" vendor]
[:location "location" location]]
(keep (fn [[_ col v]]
(when v [col v])))
(mapv #(str %1 " = '" %2 "'"))
(str/join " AND "))}))
(defn build-sort-clause
"Map sort-key field names from args into SQL ORDER-BY fragments.
Supported fields map to parquet column names:
\"date\" -> DATE
\"total\" -> TOTAL
\"tax\" -> TAX
\"tip\" -> TIP
\"client\" -> EXTERNAL_ID_CLIENT (for flat client codes)
\"location\"-> LOCATION
Falls back to \"DATE DESC\" when the args do not specify
an explicit field."
[args]
;; We delegate most of the SQL ordering work to get-sales-orders,
;; which already defaults to DATE DESC.
(when-let [sorts (:sort args)]
(->> sorts
(keep (fn [{:keys [sort-key asc]}]
(let [dir (if asc "ASC" "DESC")
col (case sort-key
"date" "DATE"
"total" "TOTAL"
"tax" "TAX"
"tip" "TIP"
"total-desc" "TOTAL DESC"
"source" "SALE_SOURCE"
"client" "EXTERNAL_ID_CLIENT"
"location" "LOCATION"
nil)] ; unknown → skip
(when col `[~col ~dir]))))
(interleave (repeat \,))
(apply str))))
(defn build-pagination-clause
"Convert a Datomic-side pagination request into SQL-limit/offset
numbers.
Supports:
:start → OFFSET
:count / :per-page → LIMIT"
[args]
{:limit (or (:count args) (:per-page args))
:offset (or (:start args) 0)})
(defn- apply-pagination
"Safely re-implements the old datomic-side pagination logic.
Mutates a COPY of args so we can extract the resulting
cursor values for the response.
Returns {limit offset}"
[args]
(let [page (build-pagination-clause args)
{:keys [limit offset]} page
client (:client-code args)]
; In the new architecture pagination is applied server-side
; by get-sales-orders via LIMIT/OFFSET, so this function
; mainly exists as a thin wrapper for any remaining
; in-memory re-paging concerns.
(if limit
(assoc page :limit (Integer. limit))
page)))
(defn- build-options
"Assemble the opts map passed to pq/get-sales-orders:
{:client ..., :location ..., :vendor ...,
:limit 10, :offset 0, :sort, :order}"
[args]
(let [page (apply-pagination args)
limit (:limit page)
offset (:offset page)
client (:client-code args)]
(cond-> {:client client
:vendor (:vendor args)
:limit limit
:offset offset}
(:location args) (assoc :location (:location args))
(:sort args) (assoc :sort "date") ; let get-sales-orders handle order
true (merge {:order "DESC"
:sort-key (:sort-key args)}))))
(defn raw-graphql-ids
"Query sales-orders FROM parquet files via DuckDB instead of Datomic.
Filters applied at the parquet level:
- date-range → selects which parquet files to read
- client-code / vendor / location where clauses
- sort & pagination are delegated to get-sales-orders
category, processor, type-name filters require nested joins
that parquet does not support -- those fields are ignored.
Returns
{:ids [string-key-for-each-matched-row]
:count int (total matches BEFORE pagination)}"
[args]
(let [start (when-let [s (:start (:date-range args))]
(.toString (.plusDays (java.time.LocalDate/parse s) -1)))
end (when-let [e (:end (:date-range args))]
(-> e .substring 0 10))
where (build-where-clause args)
options (build-options args)
where-str (some-> where #(str " WHERE " %))]
(cond->> nil
; Query rows from parquet with our filters and sort.
where-str (pq/get-sales-orders
start end
(assoc options :sort-key where-str)))
; For each row returned we need an ID string.
; We use the external-id column as the lookup key.
(when-let [rows (:rows result)]
{:ids (mapv #(str (:external_id %)) rows)
:rows rows
:count (:count result)})))
(defn graphql-results
"Return the full payment-row data for the selected IDs.
Since we now read FROM parquet, we receive the raw row vector
and transform it.
The old signature [ids db args] is replaced by [rows id-keys _].
We ignore the database argument (Datomic pull is no longer
called)."
[rows _id-keys _args]
(->> rows
(mapv #(<-row %))))
(defn summarize-orders
"Sum totals and discounts for the given ID-set.
This function still queries Datomic because the parquet-side
aggregation query would duplicate the WHERE logic.
If we want a pure parquet path here, add an
SQL-based aggregation in a follow-up."
[ids]
(let [[total tax]
(#'auto-ap.datomic/aggregate-sum ids) ; uses dc/q internally
first]
{:total total
:tax tax}))
(defn get-graphql
"Entry-point: return [payments count summary].
The data flow is:
1. raw-graphql-ids → parquet query → [:rows :count]
2. graphql-results <- transform rows ← [:results <_id-keys> _args]
3. summarize-orders <- Datomic agg ← [:total :tax]"
[args]
(let [{:keys [ids count']}
(mu/trace
::get-sales-order-ids
[]
(raw-graphql-ids args))]
[(->> (mu/trace ::get-results [] (graphql-results ids id-keys args))
matching-count
summarize-orders ids)])))
(defn summarize-graphql
"Entry-point: return just the summary {:total :tax}.
Like get-graphql, this delegates to raw-graphql-ids
and then to summarize-orders."
[args]
(let [{:keys [ids count']}
(mu/trace
::get-sales-order-ids
[]
(raw-graphql-ids args))]
(summarize-orders ids)))

View File

@@ -21,15 +21,16 @@
:body (json/write-str {"query" (v/graphql-query q)}) :body (json/write-str {"query" (v/graphql-query q)})
:as :json}) :as :json})
:body :body
:data)) :data
))
(defn get-caterers [integration] (defn get-caterers [integration]
(:caterers (query integration {:venia/queries [{:query/data (:caterers (query integration {:venia/queries [{:query/data
[:caterers [:name :uuid [:address [:name :street]]]]}]}))) [:caterers [:name :uuid [:address [:name :street]]]]}]} )))
(defn get-subscriptions [integration] (defn get-subscriptions [integration]
(->> (query integration {:venia/queries [{:query/data (->> (query integration {:venia/queries [{:query/data
[:subscribers [:id [:subscriptions [:parentId :parentEntity :eventEntity :eventKey]]]]}]}) [:subscribers [:id [:subscriptions [:parentId :parentEntity :eventEntity :eventKey]] ]]}]} )
:subscribers :subscribers
first first
:subscriptions)) :subscriptions))
@@ -38,7 +39,7 @@
(map first (dc/q '[:find (pull ?i [:ezcater-integration/api-key (map first (dc/q '[:find (pull ?i [:ezcater-integration/api-key
:ezcater-integration/subscriber-uuid :ezcater-integration/subscriber-uuid
:db/id :db/id
{:ezcater-integration/integration-status [:db/id]}]) :ezcater-integration/integration-status [:db/id]])
:in $ :in $
:where [?i :ezcater-integration/api-key]] :where [?i :ezcater-integration/api-key]]
(dc/db conn)))) (dc/db conn))))
@@ -94,6 +95,7 @@
:eventKey 'cancelled}} :eventKey 'cancelled}}
[[:subscription [:parentId :parentEntity :eventEntity :eventKey]]]]]}))))) [[:subscription [:parentId :parentEntity :eventEntity :eventKey]]]]]})))))
#_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]} #_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]}
(defn upsert-ezcater (defn upsert-ezcater
([] (upsert-ezcater (get-integrations))) ([] (upsert-ezcater (get-integrations)))
@@ -120,6 +122,7 @@
{:client/_ezcater-locations [:client/code]}]}] {:client/_ezcater-locations [:client/code]}]}]
[:ezcater-caterer/uuid caterer-uuid])) [:ezcater-caterer/uuid caterer-uuid]))
(defn round-carry-cents [f] (defn round-carry-cents [f]
(with-precision 2 (double (.setScale (bigdec f) 2 java.math.RoundingMode/HALF_UP)))) (with-precision 2 (double (.setScale (bigdec f) 2 java.math.RoundingMode/HALF_UP))))
@@ -137,17 +140,17 @@
(* commision% (* commision%
0.01M 0.01M
(+ (+
(-> order :totals :subTotal :subunits) (-> order :totals :subTotal :subunits )
(reduce + (reduce +
0 0
(map (comp :subunits :cost) (:feesAndDiscounts (:catererCart order))))))))) (map (comp :subunits :cost) (:feesAndDiscounts (:catererCart order))))))))
(defn ccp-fee [order] (defn ccp-fee [order]
(round-carry-cents (round-carry-cents
(* 0.000299M (* 0.000299M
(+ (+
(-> order :totals :subTotal :subunits) (-> order :totals :subTotal :subunits )
(-> order :totals :salesTax :subunits) (-> order :totals :salesTax :subunits )
(reduce + (reduce +
0 0
(map (comp :subunits :cost) (:feesAndDiscounts (:catererCart order)))))))) (map (comp :subunits :cost) (:feesAndDiscounts (:catererCart order))))))))
@@ -155,7 +158,7 @@
(defn order->sales-order [{{:keys [timestamp]} :event {:keys [orderItems]} :catererCart :keys [client-code client-location uuid] :as order}] (defn order->sales-order [{{:keys [timestamp]} :event {:keys [orderItems]} :catererCart :keys [client-code client-location uuid] :as order}]
(let [adjustment (round-carry-cents (- (+ (-> order :totals :subTotal :subunits (* 0.01)) (let [adjustment (round-carry-cents (- (+ (-> order :totals :subTotal :subunits (* 0.01))
(-> order :totals :salesTax :subunits (* 0.01))) (-> order :totals :salesTax :subunits (* 0.01)))
(-> order :catererCart :totals :catererTotalDue) (-> order :catererCart :totals :catererTotalDue )
(commision order) (commision order)
(ccp-fee order))) (ccp-fee order)))
service-charge (+ (commision order) (ccp-fee order)) service-charge (+ (commision order) (ccp-fee order))
@@ -166,7 +169,7 @@
:external-id (str "ezcater/order/" client-code "-" client-location "-" uuid) :external-id (str "ezcater/order/" client-code "-" client-location "-" uuid)
:client [:client/code client-code] :client [:client/code client-code]
:location client-location :location client-location
:reference-link (str (url/url "https://ezmanage.ezcater.com/orders/" uuid)) :reference-link (str (url/url "https://ezmanage.ezcater.com/orders/" uuid ))
:line-items [#:order-line-item :line-items [#:order-line-item
{:external-id (str "ezcater/order/" client-code "-" client-location "-" uuid "-" 0) {:external-id (str "ezcater/order/" client-code "-" client-location "-" uuid "-" 0)
:item-name "EZCater Catering" :item-name "EZCater Catering"
@@ -196,7 +199,7 @@
:tax tax :tax tax
:tip tip :tip tip
:returns 0.0 :returns 0.0
:vendor :vendor/ccp-ezcater})) :vendor :vendor/ccp-ezcater}}))
(defn- flatten-order-to-parquet! [order] (defn- flatten-order-to-parquet! [order]
"Flatten a sales-order into entity-type tagged maps and buffer to parquet." "Flatten a sales-order into entity-type tagged maps and buffer to parquet."
@@ -239,6 +242,7 @@
:discount (:order-line-item/discount li) :discount (:order-line-item/discount li)
:sales-order-external-id so-ext-id}))))) :sales-order-external-id so-ext-id})))))
(defn get-by-id [integration id] (defn get-by-id [integration id]
(query (query
integration integration
@@ -269,20 +273,27 @@
[:currency [:currency
:subunits]]]]]] :subunits]]]]]]
[:totals [[:customerTotalDue [:totals [[:customerTotalDue
[:currency [
:subunits]] :currency
:subunits
]]
[:pointOfSaleIntegrationFee [:pointOfSaleIntegrationFee
[:currency [
:subunits]] :currency
:subunits
]]
[:tip [:tip
[:currency [:currency
:subunits]] :subunits]]
[:salesTax [:salesTax
[:currency [
:subunits]] :currency
:subunits
]]
[:salesTaxRemittance [:salesTaxRemittance
[:currency [:currency
:subunits]] :subunits
]]
[:subTotal [:subTotal
[:currency [:currency
:subunits]]]]]]]})) :subunits]]]]]]]}))
@@ -348,7 +359,7 @@
"key" "accepted", "key" "accepted",
"occurred_at" "2022-07-21T19:21:07.549Z"} "occurred_at" "2022-07-21T19:21:07.549Z"}
ezcater-order (lookup-order lookup-map) ezcater-order (lookup-order lookup-map)
extant-order (dc/pull (dc/db conn '[:sales-order/total] extant-order (dc/pull (dc/db conn) '[:sales-order/total
:sales-order/tax :sales-order/tax
:sales-order/tip :sales-order/tip
:sales-order/discount :sales-order/discount
@@ -360,7 +371,7 @@
:sales-order/line-items [:order-line-item/external-id :sales-order/line-items [:order-line-item/external-id
:order-line-item/total :order-line-item/total
:order-line-item/tax :order-line-item/tax
:order-line-item/discount]}) :order-line-item/discount]}]
[:sales-order/external-id order]) [:sales-order/external-id order])
updated-order (-> (order->sales-order ezcater-order) updated-order (-> (order->sales-order ezcater-order)

View File

@@ -40,15 +40,18 @@
(dc/db conn) (dc/db conn)
number))) number)))
(defn delete-all [] (defn delete-all []
@(dc/transact-async conn @(dc/transact-async conn
(->> (->>
(dc/q '[:find ?ss (dc/q '[:find ?ss
:where [?ss :sales-summary/date]] :where [?ss :sales-summary/date]]
(dc/db conn)) (dc/db conn))
(map (fn [[ss]] (map (fn [[ ss]]
[:db/retractEntity ss]))))) [:db/retractEntity ss])))))
(defn dirty-sales-summaries [c] (defn dirty-sales-summaries [c]
(let [client-id (dc/entid (dc/db conn) c)] (let [client-id (dc/entid (dc/db conn) c)]
(->> (dc/index-pull (dc/db conn) (->> (dc/index-pull (dc/db conn)
@@ -100,7 +103,7 @@
(let [date-str (.toString date)] (let [date-str (.toString date)]
(when-let [rows (seq (pq/query-deduped "charge" date-str date-str))] (when-let [rows (seq (pq/query-deduped "charge" date-str date-str))]
(let [client-code (if (map? c) (:client/code c) c) (let [client-code (if (map? c) (:client/code c) c)
filtered (filter #(= client-code (:client-code %)) rows)] filtered (filter #(= client-code (:client_code %)) rows)]
(reduce (reduce
(fn [acc {:keys [processor type-name total]}] (fn [acc {:keys [processor type-name total]}]
(update acc (update acc
@@ -142,6 +145,14 @@
:ledger-mapped/ledger-side :ledger-side/credit}) :ledger-mapped/ledger-side :ledger-side/credit})
refunds)))) refunds))))
(defn- get-fees [c date]
(when-let [fee (get-fee c date)]
{:db/id (str (java.util.UUID/randomUUID))
:sales-summary-item/sort-order 2
:sales-summary-item/category "Fees"
:ledger-mapped/amount fee
:ledger-mapped/ledger-side :ledger-side/debit}))
(defn- get-tax-parquet [c date] (defn- get-tax-parquet [c date]
(let [client-code (if (map? c) (:client/code c) c) (let [client-code (if (map? c) (:client/code c) c)
date-str (.toString date) date-str (.toString date)
@@ -177,6 +188,8 @@
:ledger-mapped/ledger-side :ledger-side/credit :ledger-mapped/ledger-side :ledger-side/credit
:ledger-mapped/amount (- (+ total discount) tax)}))) :ledger-mapped/amount (- (+ total discount) tax)})))
(defn get-fees [c date] (defn get-fees [c date]
(when-let [fee (get-fee c date)] (when-let [fee (get-fee c date)]
{:db/id (str (java.util.UUID/randomUUID)) {:db/id (str (java.util.UUID/randomUUID))
@@ -286,7 +299,8 @@
(filter identity) (filter identity)
(map (fn [z] (map (fn [z]
(assoc z :ledger-mapped/account (some-> z :sales-summary-item/category str/lower-case name->number lookup-account) (assoc z :ledger-mapped/account (some-> z :sales-summary-item/category str/lower-case name->number lookup-account)
:sales-summary-item/manual? false))))}] :sales-summary-item/manual? false))
)) }]
(if (seq (:sales-summary/items result)) (if (seq (:sales-summary/items result))
(do (do
(alog/info ::upserting-summaries (alog/info ::upserting-summaries
@@ -296,7 +310,7 @@
(comment (comment
;; TODO: Move to test file or proper location ;; TODO: Move to test file or proper location
(let [c (auto-ap.datomic/pull-attr (dc/db @conn) :db/id [:client/code "NGCL"]) (let [c (auto-ap.datomic/pull-attr (dc/db @conn) :db/id [:client/code "NGCL" ])
date #inst "2024-04-14T00:00:00-07:00"] date #inst "2024-04-14T00:00:00-07:00"]
(get-payment-items c date))) (get-payment-items c date)))
@@ -308,6 +322,9 @@
(map (fn [[sos]] (map (fn [[sos]]
[:db/retractEntity sos]))))) [:db/retractEntity sos])))))
(comment (comment
(auto-ap.datomic/transact-schema conn) (auto-ap.datomic/transact-schema conn)
@@ -350,14 +367,19 @@
:in $ [?clients ?start-date ?end-date] :in $ [?clients ?start-date ?end-date]
:where [(iol-ion.query/scan-sales-orders $ ?clients ?start-date ?end-date) [[?e _ ?sort-default] ...]] :where [(iol-ion.query/scan-sales-orders $ ?clients ?start-date ?end-date) [[?e _ ?sort-default] ...]]
[?e :sales-order/line-items ?li] [?e :sales-order/line-items ?li]
[?li :order-line-item/item-name ?n]] [?li :order-line-item/item-name ?n] ]
(dc/db conn) (dc/db conn)
[[(auto-ap.datomic/pull-attr (dc/db conn) :db/id [:client/code "NGCL"])] #inst "2024-04-11T00:00:00-07:00" #inst "2024-04-24T00:00:00-07:00"]) [[(auto-ap.datomic/pull-attr (dc/db conn) :db/id [:client/code "NGCL"])] #inst "2024-04-11T00:00:00-07:00" #inst "2024-04-24T00:00:00-07:00"])
@(dc/transact conn [{:db/id :sales-summary/total-tax :db/ident :sales-summary/total-tax-legacy} @(dc/transact conn [{:db/id :sales-summary/total-tax :db/ident :sales-summary/total-tax-legacy}
{:db/id :sales-summary/total-tip :db/ident :sales-summary/total-tip-legacy}]) {:db/id :sales-summary/total-tip :db/ident :sales-summary/total-tip-legacy}])
(auto-ap.datomic/transact-schema conn)) (auto-ap.datomic/transact-schema conn)
)
(defn -main [& _] (defn -main [& _]
(execute "sales-summaries" sales-summaries-v2)) (execute "sales-summaries" sales-summaries-v2))

View File

@@ -111,12 +111,11 @@
(.toString (java.time.LocalDate/ofEpochDay d))))) (.toString (java.time.LocalDate/ofEpochDay d)))))
(defn- object-exists? (defn- object-exists?
"Check if an S3 object exists via head-object (no download)." "Check if an S3 object exists by attempting get-object."
[key] [key]
(try (try
(s3/get-object {:bucket-name pq/*bucket* (s3/get-object {:bucket-name pq/*bucket*
:key key} :key key})
{:request-method :head})
true true
(catch com.amazonaws.services.s3.model.AmazonS3Exception _ (catch com.amazonaws.services.s3.model.AmazonS3Exception _
false))) false)))
@@ -197,7 +196,7 @@
(println "=== safe-cleanup-all" (println "=== safe-cleanup-all"
"months:" (count months) "months:" (count months)
"dry-run? =" DRY-RUN?) "dry-run? =" DRY-RUN?)
(doseq [[y m] months] (doseq [[_ y m] months]
(when-not DRY-RUN? (when-not DRY-RUN?
(let [result (verify-month-in-s3? y m) (let [result (verify-month-in-s3? y m)
missing (:missing result)] missing (:missing result)]

View File

@@ -10,14 +10,14 @@
(write-dead-letter [flat]) ; write orphaned records" (write-dead-letter [flat]) ; write orphaned records"
(:require [auto-ap.datomic :refer [conn]] (:require [auto-ap.datomic :refer [conn]]
[auto-ap.storage.parquet :as p] [auto-ap.storage.parquet :as p]
[clojure.string :as str] [datomic.api :as dc]
[datomic.api :as dc])) [clj-time.core :as time]))
(defn- fetch-all-sales-order-ids [] (defn- fetch-all-sales-order-ids []
"Query Datomic for all sales-order external-ids (as entity IDs). "Query Datomic for all sales-order external-ids (as entity IDs).
Returns a vector of entitity ids." Returns a vector of entitity ids."
(->> (dc/q '[:find ?e (->> (dc/q '[:find ?e
:where [?e :sales-order/external-id _]] :where [_ :sales-order/external-id ?_ext]]
(dc/db conn)) (dc/db conn))
(map first) (map first)
vec)) vec))
@@ -25,16 +25,14 @@
(def ^:private sales-order-read (def ^:private sales-order-read
'[:sales-order/external-id '[:sales-order/external-id
:sales-order/date :sales-order/date
{:sales-order/client [:client/code :client/name]} {:sales-order/client [:client/code]}
:sales-order/location :sales-order/location
{:sales-order/vendor [:vendor/name]} :sales-order/vendor
:sales-order/total :sales-order/total
:sales-order/tax :sales-order/tax
:sales-order/tip :sales-order/tip
:sales-order/discount :sales-order/discount
:sales-order/service-charge :sales-order/service-charge
:sales-order/source
:sales-order/reference-link
{:sales-order/charges {:sales-order/charges
[:charge/external-id [:charge/external-id
:charge/type-name :charge/type-name
@@ -42,7 +40,7 @@
:charge/tax :charge/tax
:charge/tip :charge/tip
:charge/date :charge/date
{:charge/processor [:db/ident]} :charge/processor
:charge/returns :charge/returns
{:charge/client [:client/code]}]} {:charge/client [:client/code]}]}
{:sales-order/line-items {:sales-order/line-items
@@ -51,7 +49,7 @@
:order-line-item/total :order-line-item/total
:order-line-item/tax :order-line-item/tax
:order-line-item/discount :order-line-item/discount
:order-line-item/unit-price {:order-line-item/unit-price {}}
:order-line-item/quantity :order-line-item/quantity
:order-line-item/note]}]) :order-line-item/note]}])
@@ -63,38 +61,29 @@
sales-order-read sales-order-read
eids))) eids)))
(defn- flatten-order-to-pieces! [order date-str flat] (defn- flatten-order-to-pieces! [order flat]
"Flatten a pulled sales-order into :entity-type tagged maps. "Flatten a pulled sales-order into :entity-type tagged maps.
Appends to the existing flat vector, which is returned." Appends to the existing flat vector, which is returned."
(let [so-ext-id (:sales-order/external-id order) (let [so-ext-id (:sales-order/external-id order)
so-date date-str so-date (.toString (:sales-order/date order))
client-code (get-in order [:sales-order/client :client/code]) client-code (get-in order [:sales-order/client :client/code])]
vendor-name (get-in order [:sales-order/vendor :vendor/name]) ;; sales-order row
charges (:sales-order/charges order) (swap! flat conj
items (:sales-order/line-items order)
payment-methods (->> charges (map :charge/type-name) distinct (str/join ","))
processors (->> charges (map #(get-in % [:charge/processor :db/ident])) (remove nil?) distinct (map name) (str/join ","))
categories (->> items (map :order-line-item/category) (remove nil?) distinct (str/join ","))]
(vswap! flat conj
{:entity-type "sales-order" {:entity-type "sales-order"
:external-id (str so-ext-id) :external-id (str so-ext-id)
:client-code client-code :client-code client-code
:location (:sales-order/location order) :location (:sales-order/location order)
:vendor vendor-name :vendor (:sales-order/vendor order)
:total (:sales-order/total order) :total (:sales-order/total order)
:tax (:sales-order/tax order) :tax (:sales-order/tax order)
:tip (:sales-order/tip order) :tip (:sales-order/tip order)
:discount (:sales-order/discount order) :discount (:sales-order/discount order)
:service-charge (:sales-order/service-charge order) :service-charge (:sales-order/service-charge order)
:date so-date :date so-date})
:source (:sales-order/source order) ;; charges & line-items
:reference-link (:sales-order/reference-link order)
:payment-methods payment-methods
:processors processors
:categories categories})
(when-let [charges (:sales-order/charges order)] (when-let [charges (:sales-order/charges order)]
(doseq [chg charges] (doseq [chg charges]
(vswap! flat conj (swap! flat conj
{:entity-type "charge" {:entity-type "charge"
:external-id (str (get chg :charge/external-id)) :external-id (str (get chg :charge/external-id))
:type-name (get chg :charge/type-name) :type-name (get chg :charge/type-name)
@@ -104,16 +93,18 @@
:date so-date :date so-date
:processor (get-in chg [:charge/processor :db/ident]) :processor (get-in chg [:charge/processor :db/ident])
:sales-order-external-id (str so-ext-id)}) :sales-order-external-id (str so-ext-id)})
;; charge returns → sales-refund rows
(when-let [returns (:charge/returns chg)] (when-let [returns (:charge/returns chg)]
(doseq [rt returns] (doseq [rt returns]
(vswap! flat conj (swap! flat conj
{:entity-type "sales-refund" {:entity-type "sales-refund"
:type-name (get rt :type-name) :type-name (get rt :type-name)
:total (get rt :total) :total (get rt :total)
:sales-order-external-id (str so-ext-id)}))))) :sales-order-external-id (str so-ext-id)})))))
;; line-items
(when-let [items (:sales-order/line-items order)] (when-let [items (:sales-order/line-items order)]
(doseq [li items] (doseq [li items]
(vswap! flat conj (swap! flat conj
{:entity-type "line-item" {:entity-type "line-item"
:item-name (get li :order-line-item/item-name) :item-name (get li :order-line-item/item-name)
:category (get li :order-line-item/category) :category (get li :order-line-item/category)
@@ -125,27 +116,37 @@
(defn -fetch-order-ids-for-date (defn -fetch-order-ids-for-date
"Query Datomic for all sales-order eids on a given business date." "Query Datomic for all sales-order eids on a given business date."
[db date-str] [db date-str]
(let [ld (java.time.LocalDate/parse date-str) (let [day-ms (.toEpochSecond ^java.time.LocalDate (java.time.LocalDate/parse date-str))
start (-> ld (.atStartOfDay (java.time.ZoneId/of "America/Los_Angeles")) .toInstant java.util.Date/from) start (* day-ms 1000)
end (-> ld (.plusDays 1) (.atStartOfDay (java.time.ZoneId/of "America/Los_Angeles")) .toInstant java.util.Date/from)] end (+ start (* 86400000))]
(->> (dc/q '[:find ?e (->> (dc/q '[:find ?e
:in $ ?start ?end :in $ ?start-ms ?end-ms
:where [?e :sales-order/date ?d] :where [_ :sales-order/date ?d]
[(>= ?d ?start)] [(>= ?d ?start-ms)]
[(< ?d ?end)]] [(<= ?d ?end-ms)]]
db start end) db start end)
(map first) (map first)
vec))) vec)))
(defn write-day-by-day
(defn- date-seq [start end]
"Seq of YYYY-MM-DD strings between start and end inclusive."
(let [sd (java.time.LocalDate/parse start)
ed (java.time.LocalDate/parse end)
days (int (Math/abs (- (.toEpochDay sd)
(.toEpochDay ed))))]
(for [i (range 0 (inc days))]
(.toString (.plusDays sd i)))))
(defn- write-day-by-day
([start-date end-date] ([start-date end-date]
(write-day-by-day start-date end-date {})) (write-day-by-day start-date end-date nil))
([start-date end-date opts] ([start-date end-date opts]
(let [all-dates (set (or (opts :date-set) [])) (let [all-dates (set (or (opts :date-set) []))
date-range (if (empty? all-dates) date-range (if (empty? all-dates)
(p/date-seq start-date end-date) (date-seq start-date end-date)
(filter all-dates (filter all-dates
(p/date-seq start-date end-date))) (date-seq start-date end-date)))
batch-size (or (opts :batch-size) 100)] batch-size (or (opts :batch-size) 100)]
(doseq [^String day date-range] (doseq [^String day date-range]
(println "[migration] processing" day) (println "[migration] processing" day)
@@ -155,12 +156,12 @@
(let [orders (pull-sales-order-data batch) (let [orders (pull-sales-order-data batch)
flat (volatile! [])] flat (volatile! [])]
(doseq [o orders] (doseq [o orders]
(flatten-order-to-pieces! o day flat)) (flatten-order-to-pieces! o flat))
(doseq [r @flat] (doseq [r @flat]
(p/buffer! (:entity-type r) r))))) (p/buffer! (:entity-type r) r)))))
(doseq [etype ["sales-order" "charge" (doseq [etype ["sales-order" "charge"
"line-item" "sales-refund"]] "line-item" "sales-refund"]]
(p/flush-to-parquet! etype day)) (p/flush-to-parquet! etype))
(println "[migration]" day "complete")) (println "[migration]" day "complete"))
{:status :completed :total-days (count date-range)}))) {:status :completed :total-days (count date-range)})))
@@ -180,11 +181,10 @@
"Flush all entity-type buffers, tracking counts." "Flush all entity-type buffers, tracking counts."
(let [etypes ["sales-order" "charge" (let [etypes ["sales-order" "charge"
"line-item" "sales-refund"] "line-item" "sales-refund"]
today (.toString (java.time.LocalDate/now))
start (p/total-buf-count)] start (p/total-buf-count)]
(doseq [et etypes] (doseq [et etypes]
(try (try
(p/flush-to-parquet! et today) (p/flush-to-parquet! et)
(catch Exception e (catch Exception e
(println "[migration/flush]" et "error:" (.getMessage e))))) (println "[migration/flush]" et "error:" (.getMessage e)))))
{:records-flush (- (p/total-buf-count) start)})) {:records-flush (- (p/total-buf-count) start)}))
@@ -218,7 +218,7 @@
(doseq [o (pull-sales-order-data order-ids) (doseq [o (pull-sales-order-data order-ids)
:when (not (:sales-order/date o))] :when (not (:sales-order/date o))]
(let [flat (volatile! [])] (let [flat (volatile! [])]
(flatten-order-to-pieces! o "unknown" flat) (flatten-order-to-pieces! o flat)
(doseq [r @flat] (doseq [r @flat]
(p/buffer! "dead" r)))) (p/buffer! "dead" r))))
(write-day-by-day start-date end-date {:batch-size 100}) (write-day-by-day start-date end-date {:batch-size 100})

View File

@@ -55,8 +55,8 @@
event-date (some-> (excel/xls-date->date event-date) event-date (some-> (excel/xls-date->date event-date)
coerce/to-date-time coerce/to-date-time
atime/as-local-time atime/as-local-time
coerce/to-date)] coerce/to-date )]
(cond (and event-date client-id location) (cond (and event-date client-id location )
[:order #:sales-order [:order #:sales-order
{:date event-date {:date event-date
:external-id (str "ezcater/order/" client-id "-" location "-" order-number) :external-id (str "ezcater/order/" client-id "-" location "-" order-number)

View File

@@ -28,9 +28,11 @@
"Authorization" (str "Bearer " (:client/square-auth-token client)) "Authorization" (str "Bearer " (:client/square-auth-token client))
"Content-Type" "application/json"})) "Content-Type" "application/json"}))
(defn ->square-date [d] (defn ->square-date [d]
(f/unparse (f/formatter "YYYY-MM-dd'T'HH:mm:ssZZ") d)) (f/unparse (f/formatter "YYYY-MM-dd'T'HH:mm:ssZZ") d))
(def manifold-api-stream (def manifold-api-stream
(let [stream (s/stream 100)] (let [stream (s/stream 100)]
(->> stream (->> stream
@@ -103,6 +105,7 @@
:exception error)) :exception error))
[])))) []))))
(def item-cache (atom {})) (def item-cache (atom {}))
(defn fetch-catalog [client i v] (defn fetch-catalog [client i v]
@@ -122,11 +125,13 @@
#(do (swap! item-cache assoc i %) #(do (swap! item-cache assoc i %)
%)))) %))))
(defn fetch-catalog-cache [client i version] (defn fetch-catalog-cache [client i version]
(if (get @item-cache i) (if (get @item-cache i)
(de/success-deferred (get @item-cache i)) (de/success-deferred (get @item-cache i))
(fetch-catalog client i version))) (fetch-catalog client i version)))
(defn item->category-name-impl [client item version] (defn item->category-name-impl [client item version]
(capture-context->lc (capture-context->lc
(cond (:item_id (:item_variation_data item)) (cond (:item_id (:item_variation_data item))
@@ -157,6 +162,7 @@
:item item) :item item)
"Uncategorized")))) "Uncategorized"))))
(defn item-id->category-name [client i version] (defn item-id->category-name [client i version]
(capture-context->lc (capture-context->lc
(-> [client i] (-> [client i]
@@ -221,6 +227,7 @@
(concat (:orders result) continued-results)))) (concat (:orders result) continued-results))))
(:orders result))))))) (:orders result)))))))
(defn search (defn search
([client location start end] ([client location start end]
(capture-context->lc (capture-context->lc
@@ -244,9 +251,11 @@
(concat (:orders result) continued-results)))) (concat (:orders result) continued-results))))
(:orders result)))))))) (:orders result))))))))
(defn amount->money [amt] (defn amount->money [amt]
(* 0.01 (or (:amount amt) 0.0))) (* 0.01 (or (:amount amt) 0.0)))
;; to get totals: ;; to get totals:
(comment (comment
(reduce (reduce
@@ -407,6 +416,7 @@
:client client :client client
:location location))))))) :location location)))))))
(defn get-payment [client p] (defn get-payment [client p]
(de/chain (manifold-api-call (de/chain (manifold-api-call
{:url (str "https://connect.squareup.com/v2/payments/" p) {:url (str "https://connect.squareup.com/v2/payments/" p)
@@ -415,6 +425,7 @@
:body :body
:payment)) :payment))
(defn continue-payout-entry-list [c l poi cursor] (defn continue-payout-entry-list [c l poi cursor]
(capture-context->lc lc (capture-context->lc lc
(de/chain (de/chain
@@ -663,7 +674,8 @@
(catch Exception e (catch Exception e
(log/error ::buffer-failed (log/error ::buffer-failed
:exception e :exception e
:order (:sales-order/external-id order)))))))))))) :order (:sales-order/external-id order)))))))))))
(defn upsert-payouts (defn upsert-payouts
([client] ([client]
@@ -713,6 +725,7 @@
(log/info ::done-loading-refunds))))))) (log/info ::done-loading-refunds)))))))
(defn get-cash-shift [client id] (defn get-cash-shift [client id]
(de/chain (manifold-api-call {:url (str (url/url "https://connect.squareup.com/v2/cash-drawers/shifts" id)) (de/chain (manifold-api-call {:url (str (url/url "https://connect.squareup.com/v2/cash-drawers/shifts" id))
:method :get :method :get
@@ -871,6 +884,8 @@
d1 d1
d2)) d2))
(defn remove-voided-orders (defn remove-voided-orders
([client] ([client]
(apply de/zip (apply de/zip
@@ -905,20 +920,22 @@
(doseq [x (partition-all 100 results)] (doseq [x (partition-all 100 results)]
(log/info ::removing-orders (log/info ::removing-orders
:count (count x)) :count (count x))
@(dc/transact-async conn x) @(dc/transact-async conn x)))))
(de/catch (fn [e] (de/catch (fn [e]
(log/warn ::couldnt-remove :error e) (log/warn ::couldnt-remove :error e)
nil))))))))))) nil) ))))))
#_(comment #_(comment
(require 'auto-ap.time-reader) (require 'auto-ap.time-reader)
@(let [[c [l]] (get-square-client-and-location "DBFS")] @(let [[c [l]] (get-square-client-and-location "DBFS") ]
(log/peek :x [c l]) (log/peek :x [ c l])
(search c l #clj-time/date-time "2026-03-28" #clj-time/date-time "2026-03-29")) (search c l #clj-time/date-time "2026-03-28" #clj-time/date-time "2026-03-29")
@(let [[c [l]] (get-square-client-and-location "NGAK")] )
(log/peek :x [c l])
@(let [[c [l]] (get-square-client-and-location "NGAK") ]
(log/peek :x [ c l])
(remove-voided-orders c l #clj-time/date-time "2024-04-11" #clj-time/date-time "2024-04-15")) (remove-voided-orders c l #clj-time/date-time "2024-04-11" #clj-time/date-time "2024-04-15"))
(doseq [c (get-square-clients)] (doseq [c (get-square-clients)]
@@ -926,6 +943,8 @@
@(remove-voided-orders c) @(remove-voided-orders c)
(catch Exception e (catch Exception e
nil))) nil)))
) )
(defn upsert-all [& clients] (defn upsert-all [& clients]
@@ -995,6 +1014,8 @@
[:clients clients] [:clients clients]
@(apply upsert-all clients))) @(apply upsert-all clients)))
(comment (comment
(defn refunds-raw-cont (defn refunds-raw-cont
([client l cursor so-far] ([client l cursor so-far]
@@ -1024,6 +1045,7 @@
(->> (->>
@(let [[c [l]] (get-square-client-and-location "NGGG")] @(let [[c [l]] (get-square-client-and-location "NGGG")]
(search c l (time/now) (time/plus (time/now) (time/days -1)))) (search c l (time/now) (time/plus (time/now) (time/days -1))))
(filter (fn [r] (filter (fn [r]
@@ -1033,6 +1055,7 @@
(->> (->>
@(let [[c [l]] (get-square-client-and-location "NGGG")] @(let [[c [l]] (get-square-client-and-location "NGGG")]
(refunds-raw-cont c l nil [])) (refunds-raw-cont c l nil []))
(filter (fn [r] (filter (fn [r]
(str/starts-with? (:created_at r) "2024-03-14"))))) (str/starts-with? (:created_at r) "2024-03-14")))))
@@ -1067,7 +1090,12 @@
[(:client/code c) (atime/unparse-local (clj-time.coerce/to-date-time (:sales-order/date bad-row)) atime/normal-date) (:sales-order/total bad-row) (:sales-order/tax bad-row) (:sales-order/tip bad-row) (:db/id bad-row)]) [(:client/code c) (atime/unparse-local (clj-time.coerce/to-date-time (:sales-order/date bad-row)) atime/normal-date) (:sales-order/total bad-row) (:sales-order/tax bad-row) (:sales-order/tip bad-row) (:db/id bad-row)])
:separator \tab) :separator \tab)
;; =>
;; =>
(require 'auto-ap.time-reader) (require 'auto-ap.time-reader)
@@ -1076,15 +1104,26 @@
(clojure.pprint/pprint (let [[c [l]] (get-square-client-and-location "NGVT")] (clojure.pprint/pprint (let [[c [l]] (get-square-client-and-location "NGVT")]
l l
(def z @(search c l #clj-time/date-time "2025-02-23T00:00:00-08:00" (def z @(search c l #clj-time/date-time "2025-02-23T00:00:00-08:00"
#clj-time/date-time "2025-02-28T00:00:00-08:00")) #clj-time/date-time "2025-02-28T00:00:00-08:00"))
(take 10 (map #(first (deref (order->sales-order c l %))) z)))) (take 10 (map #(first (deref (order->sales-order c l %))) z)))
)
(->> z (->> z
(filter (fn [o] (filter (fn [o]
(seq (filter (comp #{"OTHER"} :type) (:tenders o))))) (seq (filter (comp #{"OTHER"} :type) (:tenders o)))))
(filter #(not (:name (:source %)))) (filter #(not (:name (:source %))))
(count)) (count)
)
(doseq [[code] (seq (dc/q '[:find ?code (doseq [[code] (seq (dc/q '[:find ?code
:in $ :in $
@@ -1094,22 +1133,32 @@
[?o :sales-order/client ?c] [?o :sales-order/client ?c]
[?c :client/code ?code]] [?c :client/code ?code]]
(dc/db conn))) (dc/db conn)))
:let [[c [l]] (get-square-client-and-location code)] :let [[c [l]] (get-square-client-and-location code)
]
order @(search c l #clj-time/date-time "2026-01-01T00:00:00-08:00" (time/now)) order @(search c l #clj-time/date-time "2026-01-01T00:00:00-08:00" (time/now))
:when (= "Invoices" (:name (:source order))) :when (= "Invoices" (:name (:source order) ))
:let [[sales-order] @(order->sales-order c l order)]] :let [[sales-order] @(order->sales-order c l order)]]
(when (should-import-order? order) (when (should-import-order? order)
(println "DATE IS" (:sales-order/date sales-order)) (println "DATE IS" (:sales-order/date sales-order))
(when (some-> (:sales-order/date sales-order) coerce/to-date-time (time/after? #clj-time/date-time "2026-2-16T00:00:00-08:00")) (when (some-> (:sales-order/date sales-order) coerce/to-date-time (time/after? #clj-time/date-time "2026-2-16T00:00:00-08:00"))
(println "WOULD UPDATE" sales-order) (println "WOULD UPDATE" sales-order)
@(dc/transact auto-ap.datomic/conn [sales-order])) @(dc/transact auto-ap.datomic/conn [sales-order])
#_@(dc/transact) )
(println "DONE"))) #_@(dc/transact )
(println "DONE"))
)
#_(filter (comp #{"OTHER"} :type) (mapcat :tenders z)) #_(filter (comp #{"OTHER"} :type) (mapcat :tenders z))
@(let [[c [l]] (get-square-client-and-location "NGRY")] @(let [[c [l]] (get-square-client-and-location "NGRY")]
#_(search c l (clj-time.coerce/from-date #inst "2025-02-28") (clj-time.coerce/from-date #inst "2025-03-01")) #_(search c l (clj-time.coerce/from-date #inst "2025-02-28") (clj-time.coerce/from-date #inst "2025-03-01"))
(order->sales-order c l (:order (get-order c l "KdvwntmfMNTKBu8NOocbxatOs18YY"))))) (order->sales-order c l (:order (get-order c l "KdvwntmfMNTKBu8NOocbxatOs18YY" )))
)
)

View File

@@ -116,6 +116,7 @@
:content "Debit"}]})) :content "Debit"}]}))
(exact-match-id* request)]]) (exact-match-id* request)]])
(def default-read '[* (def default-read '[*
[:payment/date :xform clj-time.coerce/from-date] [:payment/date :xform clj-time.coerce/from-date]
{:invoice-payment/_payment [* {:invoice-payment/invoice [*]}]} {:invoice-payment/_payment [* {:invoice-payment/invoice [*]}]}
@@ -211,6 +212,7 @@
'[(iol-ion.query/dollars= ?transaction-amount ?amount)]]} '[(iol-ion.query/dollars= ?transaction-amount ?amount)]]}
:args [(:amount query-params)]}) :args [(:amount query-params)]})
(:status route-params) (:status route-params)
(merge-query {:query {:in ['?status] (merge-query {:query {:in ['?status]
:where ['[?e :payment/status ?status]]} :where ['[?e :payment/status ?status]]}
@@ -275,6 +277,7 @@
{ids-to-retrieve :ids matching-count :count {ids-to-retrieve :ids matching-count :count
all-ids :all-ids} (fetch-ids db request)] all-ids :all-ids} (fetch-ids db request)]
[(->> (hydrate-results ids-to-retrieve db request)) [(->> (hydrate-results ids-to-retrieve db request))
matching-count matching-count
(sum-visible-pending all-ids) (sum-visible-pending all-ids)
@@ -282,7 +285,8 @@
(:client request) (:client request)
(:client-id (:query-params request)) (:client-id (:query-params request))
(when (:client-code (:query-params request)) (when (:client-code (:query-params request))
[:client/code (:client-code (:query-params request))])))])) [:client/code (:client-code (:query-params request))])))
]))
(def query-schema (mc/schema (def query-schema (mc/schema
[:maybe [:map {:date-range [:date-range :start-date :end-date]} [:maybe [:map {:date-range [:date-range :start-date :end-date]}
@@ -323,7 +327,7 @@
(assoc-in (exact-match-id* request) [1 :hx-swap-oob] true)]) (assoc-in (exact-match-id* request) [1 :hx-swap-oob] true)])
:query-schema query-schema :query-schema query-schema
:action-buttons (fn [request] :action-buttons (fn [request]
(let [[_ _ visible-in-float total-in-float] (:page-results request)] (let [[_ _ visible-in-float total-in-float ] (:page-results request)]
[(com/pill {:color :primary} " Visible in float " [(com/pill {:color :primary} " Visible in float "
(format "$%,.2f" visible-in-float)) (format "$%,.2f" visible-in-float))
(com/pill {:color :secondary} " Total in float " (com/pill {:color :secondary} " Total in float "
@@ -430,6 +434,8 @@
(def row* (partial helper/row* grid-page)) (def row* (partial helper/row* grid-page))
(comment (comment
(mc/decode query-schema {"exact-match-id" "123"} (mt/transformer main-transformer mt/strip-extra-keys-transformer)) (mc/decode query-schema {"exact-match-id" "123"} (mt/transformer main-transformer mt/strip-extra-keys-transformer))
(mc/decode query-schema {} (mt/transformer main-transformer mt/strip-extra-keys-transformer)) (mc/decode query-schema {} (mt/transformer main-transformer mt/strip-extra-keys-transformer))
@@ -439,6 +445,7 @@
(mc/decode query-schema {"payment-type" "food"} (mt/transformer main-transformer mt/strip-extra-keys-transformer)) (mc/decode query-schema {"payment-type" "food"} (mt/transformer main-transformer mt/strip-extra-keys-transformer))
(mc/decode query-schema {"vendor" "87"} (mt/transformer main-transformer mt/strip-extra-keys-transformer)) (mc/decode query-schema {"vendor" "87"} (mt/transformer main-transformer mt/strip-extra-keys-transformer))
(mc/decode query-schema {"start-date" #inst "2023-12-21T08:00:00.000-00:00"} (mt/transformer main-transformer mt/strip-extra-keys-transformer))) (mc/decode query-schema {"start-date" #inst "2023-12-21T08:00:00.000-00:00"} (mt/transformer main-transformer mt/strip-extra-keys-transformer)))
(defn delete [{check :entity :as request identity :identity}] (defn delete [{check :entity :as request identity :identity}]
@@ -452,7 +459,7 @@
#(assert-can-see-client identity (:db/id (:payment/client check)))) #(assert-can-see-client identity (:db/id (:payment/client check))))
(notify-if-locked (:db/id (:payment/client check)) (notify-if-locked (:db/id (:payment/client check))
(:payment/date check)) (:payment/date check))
(let [removing-payments (mapcat (fn [x] (let [ removing-payments (mapcat (fn [x]
(let [invoice (:invoice-payment/invoice x) (let [invoice (:invoice-payment/invoice x)
new-balance (+ (:invoice/outstanding-balance invoice) new-balance (+ (:invoice/outstanding-balance invoice)
(:invoice-payment/amount x))] (:invoice-payment/amount x))]
@@ -571,6 +578,7 @@
(assoc-in [:query-params :start] 0) (assoc-in [:query-params :start] 0)
(assoc-in [:query-params :per-page] 250)))) (assoc-in [:query-params :per-page] 250))))
:else :else
selected) selected)
updated-count (void-payments-internal ids (:identity request))] updated-count (void-payments-internal ids (:identity request))]
@@ -583,7 +591,7 @@
(defn wrap-status-from-source [handler] (defn wrap-status-from-source [handler]
(fn [{:keys [matched-current-page-route] :as request}] (fn [{:keys [matched-current-page-route] :as request}]
(let [request (cond-> request (let [ request (cond-> request
(= ::route/cleared-page matched-current-page-route) (assoc-in [:route-params :status] :payment-status/cleared) (= ::route/cleared-page matched-current-page-route) (assoc-in [:route-params :status] :payment-status/cleared)
(= ::route/pending-page matched-current-page-route) (assoc-in [:route-params :status] :payment-status/pending) (= ::route/pending-page matched-current-page-route) (assoc-in [:route-params :status] :payment-status/pending)
(= ::route/voided-page matched-current-page-route) (assoc-in [:route-params :status] :payment-status/voided) (= ::route/voided-page matched-current-page-route) (assoc-in [:route-params :status] :payment-status/voided)
@@ -610,6 +618,7 @@
::route/bulk-delete (-> bulk-delete-dialog ::route/bulk-delete (-> bulk-delete-dialog
(wrap-admin)) (wrap-admin))
::route/table (helper/table-route grid-page)} ::route/table (helper/table-route grid-page)}
(fn [h] (fn [h]
(-> h (-> h

View File

@@ -1,7 +1,7 @@
(ns auto-ap.ssr.pos.sales-orders (ns auto-ap.ssr.pos.sales-orders
(:require (:require
[auto-ap.datomic [auto-ap.datomic
:refer [add-sorter-fields apply-pagination apply-sort-3 merge-query :refer [add-sorter-fields apply-pagination apply-sort-3 conn merge-query
pull-many query2]] pull-many query2]]
[auto-ap.datomic.sales-orders :as d-sales] [auto-ap.datomic.sales-orders :as d-sales]
[auto-ap.query-params :as query-params :refer [wrap-copy-qp-pqp]] [auto-ap.query-params :as query-params :refer [wrap-copy-qp-pqp]]
@@ -17,6 +17,7 @@
[auto-ap.time :as atime] [auto-ap.time :as atime]
[bidi.bidi :as bidi] [bidi.bidi :as bidi]
[clj-time.coerce :as c] [clj-time.coerce :as c]
[datomic.api :as dc]
[malli.core :as mc])) [malli.core :as mc]))
(def query-schema (mc/schema (def query-schema (mc/schema
@@ -171,8 +172,11 @@
charges)) charges))
(defn fetch-page [request] (defn fetch-page [request]
(let [{:keys [rows count]} (d-sales/fetch-page-ssr request)] (let [db (dc/db conn)
[rows count])) {ids-to-retrieve :ids matching-count :count} (fetch-ids db request)]
[(->> (hydrate-results ids-to-retrieve db request))
matching-count]))
(def grid-page (def grid-page
@@ -197,7 +201,7 @@
:entity-name "Sales orders" :entity-name "Sales orders"
:route :pos-sales-table :route :pos-sales-table
:action-buttons (fn [request] :action-buttons (fn [request]
(let [{:keys [total tax]} (d-sales/summarize-page-ssr request)] (let [{:keys [total tax]} (d-sales/summarize-orders (:ids (fetch-ids (dc/db conn) request)))]
(when (and total tax) (when (and total tax)
[(com/pill {:color :primary} [(com/pill {:color :primary}
(format "Total $%.2f" total)) (format "Total $%.2f" total))

View File

@@ -3,9 +3,7 @@
[amazonica.aws.s3 :as s3] [amazonica.aws.s3 :as s3]
[clojure.java.io :as io] [clojure.java.io :as io]
[clojure.string :as str] [clojure.string :as str]
[clojure.data.json :as json] [clojure.data.json :as json])
[clojure.core.cache :as cache]
[com.brunobonacci.mulog :as mu])
(:import (java.sql DriverManager) (:import (java.sql DriverManager)
(java.time LocalDate))) (java.time LocalDate)))
@@ -16,10 +14,7 @@
(str "s3://" *bucket* "/" filename)) (str "s3://" *bucket* "/" filename))
(defn parquet-key [entity-type date-str] (defn parquet-key [entity-type date-str]
(let [month-str (if (= (count date-str) 10) (str parquet-prefix "/" entity-type "/" date-str ".parquet"))
(subs date-str 0 7)
date-str)]
(str parquet-prefix "/" entity-type "/" month-str ".parquet")))
(def db (atom nil)) (def db (atom nil))
@@ -27,16 +22,9 @@
(let [conn (DriverManager/getConnection "jdbc:duckdb:") (let [conn (DriverManager/getConnection "jdbc:duckdb:")
stmt (.createStatement conn)] stmt (.createStatement conn)]
(.execute stmt "INSTALL httpfs; LOAD httpfs;") (.execute stmt "INSTALL httpfs; LOAD httpfs;")
(when-let [key (:aws-access-key-id env)]
(.execute stmt (str "SET s3_access_key_id='" key "'"))
(.execute stmt (str "SET s3_secret_access_key='" (:aws-secret-access-key env) "'"))
(.execute stmt (str "SET s3_region='" (or (:aws-region env) "us-east-1") "'")))
(.execute stmt "PRAGMA enable_object_cache")
(.execute stmt "SET temp_directory='/tmp/duckdb-temp'")
(.execute stmt "SET memory_limit='2GB'")
(.close stmt) (.close stmt)
(.addShutdownHook (Runtime/getRuntime) (.addShutdownHook (Runtime/getRuntime)
(Thread. #(when-let [c @db] (.close ^java.sql.Connection c)))) (Thread. #(fn [])))
(reset! db conn))) (reset! db conn)))
(defn disconnect! [] (defn disconnect! []
@@ -61,32 +49,14 @@
(.execute stmt sql) (.execute stmt sql)
nil))) nil)))
(defn- sql-snippet [sql] (subs sql 0 (min (count sql) 500)))
(defn query-scalar [sql] (defn query-scalar [sql]
(mu/trace ::query-scalar
[:sql (sql-snippet sql)]
(with-duckdb (with-duckdb
(let [stmt (.createStatement conn) (let [stmt (.createStatement conn)
rs (.executeQuery stmt sql)] rs (.executeQuery stmt sql)]
(when (.next rs) (when (.next rs)
(.getObject rs 1)))))) (.getObject rs 1)))))
(def ^:private count-cache
(atom (-> (cache/ttl-cache-factory {} :ttl 1800000)
(cache/lru-cache-factory :threshold 256))))
(defn- cached-count [sql]
(if-let [v (find @count-cache sql)]
(do (mu/log ::count-cache :hit true :sql (sql-snippet sql)) (val v))
(do (mu/log ::count-cache :hit false :sql (sql-snippet sql))
(let [result (query-scalar sql)]
(swap! count-cache assoc sql result)
result))))
(defn query-rows [sql] (defn query-rows [sql]
(mu/trace ::query-rows
[:sql (sql-snippet sql)]
(with-duckdb (with-duckdb
(let [stmt (.createStatement conn) (let [stmt (.createStatement conn)
rs (.executeQuery stmt sql) rs (.executeQuery stmt sql)
@@ -100,13 +70,13 @@
(zipmap cols (zipmap cols
(vec (for [i (range 1 (inc col-count))] (vec (for [i (range 1 (inc col-count))]
(.getObject rs i)))))) (.getObject rs i))))))
rows)))))) rows)))))
(defn execute-to-parquet! [sql ^String parquet-path] (defn execute-to-parquet! [sql ^String parquet-path]
(with-duckdb (with-duckdb
(let [stmt (.createStatement conn)] (let [stmt (.createStatement conn)]
(.execute stmt (.execute stmt
(format "COPY (%s) TO '%s' (FORMAT PARQUET, OVERWRITE_OR_IGNORE, ROW_GROUP_SIZE 10000, COMPRESSION 'zstd')" (format "COPY (%s) TO '%s' (FORMAT PARQUET, OVERWRITE_OR_IGNORE)"
sql parquet-path)) sql parquet-path))
(io/file parquet-path)))) (io/file parquet-path))))
@@ -154,60 +124,43 @@
(->> @*buffers* (->> @*buffers*
vals (mapcat identity) count)) vals (mapcat identity) count))
(defn flush-to-parquet! [entity-type date-str] (defn flush-to-parquet! [entity-type]
"Flush buffered records for entity-type to monthly parquet + S3. "Flush buffered records for entity-type to parquet + S3."
Reads existing monthly file (if any), merges with new records, and uploads.
Uses temp table to ensure ROW_GROUP_SIZE is respected (DuckDB ignores it
when reading directly from S3 via COPY)."
(let [records (get @*buffers* entity-type [])] (let [records (get @*buffers* entity-type [])]
(if (empty? records) (if (empty? records)
{:status :no-records} {:status :no-records}
(let [date-str (or date-str (.toString (LocalDate/now))) (let [date-str (.toString (LocalDate/now))
s3-key (parquet-key entity-type date-str)
s3-url (s3-location s3-key)
jsonl-file (io/file "/tmp" jsonl-file (io/file "/tmp"
(str entity-type "-" date-str ".jsonl")) (str entity-type "-" date-str ".jsonl"))
parquet-file (io/file "/tmp" parquet-file (io/file "/tmp"
(str entity-type "-" date-str ".parquet")) (str entity-type "-" date-str ".parquet"))
tbl (format "\"_flush_%s_%s\"" s3-key (parquet-key entity-type date-str)]
(clojure.string/replace entity-type "-" "_")
(subs date-str 0 7))]
(try (try
(with-open [w (io/writer jsonl-file :append true)] (with-open [w (io/writer jsonl-file :append true)]
(doseq [r records] (doseq [r records]
(.write w ^String (json/write-str (dissoc r :_seq-no))) (.write w ^String (json/write-str (dissoc r :_seq-no)))
(.write w (int \newline)))) (.write w (int \newline))))
(let [existing-sql (format (execute-to-parquet!
"SELECT * FROM read_parquet('%s', union_by_name=true)" (format "SELECT * FROM read_json_auto('%s')"
s3-url) (.getAbsolutePath jsonl-file))
new-sql (format (.getAbsolutePath parquet-file))
"SELECT * FROM read_json_auto('%s')"
(.getAbsolutePath jsonl-file))]
(execute! (format "CREATE OR REPLACE TABLE %s AS SELECT * FROM (%s UNION ALL %s) ORDER BY \"client-code\", date"
tbl existing-sql new-sql))
(execute! (format "COPY (SELECT * FROM %s) TO '%s' (FORMAT PARQUET, OVERWRITE_OR_IGNORE, ROW_GROUP_SIZE 10000, COMPRESSION 'zstd')"
tbl (.getAbsolutePath parquet-file)))
(execute! (format "DROP TABLE IF EXISTS %s" tbl))
(upload-parquet! parquet-file s3-key) (upload-parquet! parquet-file s3-key)
(clear-buffer! entity-type) (clear-buffer! entity-type)
(.delete ^java.io.File jsonl-file) (.delete ^java.io.File jsonl-file)
(.delete ^java.io.File parquet-file) (.delete ^java.io.File parquet-file)
{:key s3-key :status :ok}) {:key s3-key :status :ok}
(catch Exception e (catch Exception e
(execute! (format "DROP TABLE IF EXISTS %s" tbl)) (throw (ex-info "Flush failed" {:entity-type entity-type}
(throw (ex-info "Flush failed" :error (.getMessage e)))))))))
{:entity-type entity-type
:error (.getMessage e)}))))))))
(defn flush-by-date! [] (defn flush-by-date! []
"Flush all entity types for today." "Flush all entity types for today."
(let [etypes ["sales-order" "charge" (let [etypes ["sales-order" "charge"
"line-item" "sales-refund"] "line-item" "sales-refund"]
today (.toString (LocalDate/now))
flushed (into #{} flushed (into #{}
(keep (fn [et] (keep (fn [et]
(let [{:keys [status]} (let [{:keys [status]}
(flush-to-parquet! et today)] (flush-to-parquet! et)]
(when (= status :ok) (when (= status :ok)
et)))) et))))
etypes)] etypes)]
@@ -235,8 +188,7 @@
(let [f (io/file (let [f (io/file
(wal-dir) (wal-dir)
(str et ".jsonl"))] (str et ".jsonl"))]
(when (.exists f) [et (slurp f)])))
[et (slurp f)]))))
etypes))] etypes))]
(swap! *buffers* merge loaded))) (swap! *buffers* merge loaded)))
@@ -251,163 +203,76 @@
(defn date-seq [start end] (defn date-seq [start end]
"Seq of YYYY-MM-DD strings between start and end inclusive." "Seq of YYYY-MM-DD strings between start and end inclusive."
(let [sd (LocalDate/parse start) (let [sd (LocalDate/parse start)
ed (LocalDate/parse end)] ed (LocalDate/parse end)
(when (.isAfter sd ed) days (int (Math/abs
(throw (ex-info "date-seq: start must be <= end" {:start start :end end}))) (- (.toEpochDay sd)
(let [days (int (- (.toEpochDay ed) (.toEpochDay ed))))]
(.toEpochDay sd)))]
(for [i (range 0 (inc days))] (for [i (range 0 (inc days))]
(.toString (.plusDays sd i)))))) (.toString (.plusDays sd i)))))
(defn today [] (defn today []
(.toString (LocalDate/now))) (.toString (LocalDate/now)))
(def ^:private mm-dd-yyyy (java.time.format.DateTimeFormatter/ofPattern "MM/dd/yyyy"))
(defn- normalize-date-str [s]
(when s
(if (re-find #"^\d{2}/\d{2}/\d{4}" s)
(.toString (LocalDate/parse s mm-dd-yyyy))
(if (> (count s) 10) (subs s 0 10) s))))
(defn month-seq [start-date end-date]
"Seq of YYYY-MM strings between start-date and end-date inclusive."
(let [sd (LocalDate/parse (normalize-date-str start-date))
ed (LocalDate/parse (normalize-date-str end-date))]
(loop [months [] cur sd]
(if (.isAfter cur ed)
months
(recur (conj months (.toString (.withDayOfMonth cur 1)))
(.plusMonths cur 1))))))
(defn- parquet-glob [entity-type start-date end-date]
"Build explicit file list for the date range using monthly partitions.
Monthly files mean only 1-3 files for typical queries, 12 for a full year."
(let [prefix (format "s3://%s/sales-details/%s/" *bucket* entity-type)]
(vec
(map (fn [m]
(format "'%s%s.parquet'" prefix m))
(month-seq start-date end-date)))))
(defn parquet-query [entity-type start-date end-date] (defn parquet-query [entity-type start-date end-date]
"Build SQL to read monthly parquet files in date range. "Build SQL to read all parquet files in date range.
Uses explicit file list (monthly = few files) + WHERE date filter. Returns map with :sql and :count-sql keys."
Normalizes date formats (handles MM/dd/yyyy from UI)." (let [date-strs (date-seq start-date end-date)
(let [sd (normalize-date-str start-date) urls (vec
ed (normalize-date-str end-date) (map #(format "'s3://%%s/sales-details/%%s/%%s.parquet'"
files (parquet-glob entity-type sd ed) *bucket* entity-type %)
base (format "SELECT * FROM read_parquet([%s], union_by_name=true)" date-strs))
(str/join ", " files)) sql (str "SELECT * FROM read_parquet(["
sql (format "%s WHERE date >= '%s' AND date <= '%s'" (str/join ", " urls)
base sd ed)] "])")]
{:sql sql {:sql sql
:count-sql (format "SELECT COUNT(*) FROM (%s) t" sql)})) :count-sql (format "SELECT COUNT(*) FROM (%s) t" sql)}))
(defn- like-clause [col v] (defn- build-where-clause [opts field-pairs]
(str "\"" col "\" LIKE '%" v "%'")) "Build SQL WHERE clause from opts map.
fields-with-keys is vector of [:field-key :env-var-name]."
(defn- build-sales-orders-where [opts] (let [clauses (keep
(let [eq-clauses (keep (fn [[key env]]
(fn [[key col]]
(let [v (get opts key)] (let [v (get opts key)]
(when v (when v
(str "\"" col "\" = '" v "'")))) (str env " = '" v "'"))))
[[:client "client-code"] field-pairs)]
[:vendor "vendor"] (when (seq clauses)
[:location "location"]]) (str " WHERE " (str/join " AND " clauses)))))
in-clauses (keep
(fn [[key col]]
(let [vs (get opts key)]
(when (seq vs)
(str "\"" col "\" IN ("
(str/join ", " (map #(str "'" % "'") vs))
")"))))
[[:client-codes "client-code"]])
like-clauses (keep
(fn [[key col]]
(let [v (get opts key)]
(when v
(like-clause col v))))
[[:payment-method "payment-methods"]
[:processor "processors"]
[:category "categories"]])
range-clauses (keep
(fn [[key col op]]
(let [v (get opts key)]
(when v
(str "\"" col "\" " op " " v))))
[[:total-gte "total" ">="]
[:total-lte "total" "<="]])
all-clauses (concat eq-clauses in-clauses like-clauses range-clauses)]
(when (seq all-clauses)
(str/join " AND " all-clauses))))
(defn get-sales-orders (defn get-sales-orders
([start-date end-date] ([start-date end-date]
(get-sales-orders start-date end-date {})) (get-sales-orders start-date end-date {}))
([start-date end-date opts] ([start-date end-date opts]
(mu/trace ::get-sales-orders (let [q (parquet-query "sales-order"
[:start-date start-date :end-date end-date :opts opts] start-date end-date)
(try
(let [q (parquet-query "sales-order" start-date end-date)
base-sql (:sql q) base-sql (:sql q)
has-where? (str/includes? base-sql " WHERE ") count-sql (:count-sql q)
sort (get opts :sort "date") sort (get opts :sort "date")
order (get opts :order "DESC") order (get opts :order "DESC")
limit (get opts :limit) limit (get opts :limit)
offset (get opts :offset) offset (get opts :offset)
extra-clauses (build-sales-orders-where opts) where-str (build-where-clause
full-sql (if extra-clauses
(str base-sql (if has-where? " AND " " WHERE ") extra-clauses) opts
[[:client "external_id.client"]
[:vendor "external_id.vendor"]
[:location "location"]])
full-sql (if where-str
(str base-sql where-str)
base-sql) base-sql)
data-sql (cond-> full-sql result (cond-> full-sql
sort (str " ORDER BY " sort " " (name order)) sort (str " ORDER BY " sort
" " (name order))
limit (str " LIMIT " limit) limit (str " LIMIT " limit)
offset (str " OFFSET " offset)) offset (str " OFFSET " offset))
count-sql (format "SELECT COUNT(*) FROM (%s) t" full-sql)] full-count (if where-str
(mu/log ::get-sales-orders :data-sql data-sql :count-sql count-sql) (str count-sql where-str)
(let [cnt (cached-count count-sql) count-sql)]
rows (query-rows data-sql)] {:rows (query-rows result)
{:rows rows :count (or
:count (or (int cnt) 0)})) (int
(catch Exception e (query-scalar
(mu/log ::get-sales-orders :error e :start-date start-date :end-date end-date :opts opts) full-count)) 0)})))
(throw e))))))
(def ^:private summary-cache
(atom (-> (cache/ttl-cache-factory {} :ttl 1800000)
(cache/lru-cache-factory :threshold 256))))
(defn- cached-summary [sql]
(if-let [v (find @summary-cache sql)]
(do (mu/log ::summary-cache :hit true :sql (sql-snippet sql)) v)
(do (mu/log ::summary-cache :hit false :sql (sql-snippet sql))
(let [result (let [row (first (query-rows sql))]
{:total (or (:total row) 0.0)
:tax (or (:tax row) 0.0)})]
(swap! summary-cache assoc sql result)
result))))
(defn get-sales-orders-summary
([start-date end-date]
(get-sales-orders-summary start-date end-date {}))
([start-date end-date opts]
(mu/trace ::get-sales-orders-summary
[:start-date start-date :end-date end-date :opts opts]
(try
(let [q (parquet-query "sales-order" start-date end-date)
base-sql (:sql q)
has-where? (str/includes? base-sql " WHERE ")
extra-clauses (build-sales-orders-where opts)
full-sql (if extra-clauses
(str base-sql (if has-where? " AND " " WHERE ") extra-clauses)
base-sql)
sum-sql (format "SELECT COALESCE(SUM(total), 0) as total, COALESCE(SUM(tax), 0) as tax FROM (%s) t" full-sql)]
(mu/log ::get-sales-orders-summary :sum-sql sum-sql)
(cached-summary sum-sql))
(catch Exception e
(mu/log ::get-sales-orders-summary :error e :start-date start-date :end-date end-date :opts opts)
(throw e))))))
(defn query-deduped [entity-type start-date end-date] (defn query-deduped [entity-type start-date end-date]
"Query records deduplicated by external-id (latest _seq_no wins)." "Query records deduplicated by external-id (latest _seq_no wins)."
@@ -415,7 +280,7 @@
(query-rows (query-rows
(str (:sql q) (str (:sql q)
" QUALIFY ROW_NUMBER() OVER" " QUALIFY ROW_NUMBER() OVER"
" (PARTITION BY \"external-id\"" " (PARTITION BY sales_order.external_id"
" ORDER BY _seq_no DESC) = 1")))) " ORDER BY _seq_no DESC) = 1"))))
(defn query-by-entity-id [entity-type external-id (defn query-by-entity-id [entity-type external-id

View File

@@ -15,12 +15,12 @@
0.0))) 0.0)))
(defn- pq-files [entity-type start-date end-date] (defn- pq-files [entity-type start-date end-date]
"Vector of S3 parquet file paths for date range (monthly partitions)." "Vector of S3 parquet file paths for date range."
(let [months (p/month-seq start-date end-date)] (let [dates (p/date-seq start-date end-date)]
(vec (vec
(map #(str "'s3://" p/*bucket* (map #(str "'s3://" p/*bucket*
"/sales-details/" entity-type "/" "/sales-details/" entity-type "/"
% ".parquet") months)))) % ".parquet") dates))))
(defn sum-payments-by-type [client-id start-date end-date] (defn sum-payments-by-type [client-id start-date end-date]
"Return {processor-key -> {type-name-string -> total-double}}." "Return {processor-key -> {type-name-string -> total-double}}."

View File

@@ -108,6 +108,5 @@
(finally (finally
(p/disconnect!)))) (p/disconnect!))))
(comment (run-perf-tests)
(run-perf-tests) (println "\n=== Done ===")
(println "\n=== Done ==="))