6 Commits

Author SHA1 Message Date
9153494ed7 feat(sales): wire SSR page to parquet/DuckDB layer with full 7.9M-record support
- Add fetch-page-ssr and summarize-page-ssr to read from parquet via DuckDB
- Add get-sales-orders-summary for cross-page totals (SUM across all rows)
- Optimize parquet-query for large ranges (>60 days) with year-level globs
- Add default-date-range with fallback to data's actual range
- Fix migration: flatten-order-to-pieces! vswap!, pull specs, date handling
- Add denormalized columns: payment-methods, processors, categories, source
- Handle schema-enforce middleware stripping dates via raw query-string parsing
- Add graceful fallback for missing parquet files (catch Exception)
- Fix load-unflushed! with .exists check on WAL files
2026-04-27 20:05:13 -07:00
ea7f46ea8a fix(sales): fix parquet SQL generation and cleanup formatting
- Fix double ORDER BY in sales_orders raw-graphql-ids (was passing full
  ORDER BY clause from build-sort-clause into get-sales-orders which
  prepends its own ORDER BY, producing 'ORDER BY ORDER BY ... DESC DESC')
- Fix WHERE clause column names in parquet build-where-clause:
  external_id.client -> client-code, external_id.vendor -> vendor
- Fix parquet-query format string (%%s -> %s with proper format call)
- Fix ex-info call signature in flush! (was passing :error as third arg
  instead of inside the data map)
- Add S3 credentials to DuckDB connect! so httpfs can read from S3
- Fix parquet buffer indentation and alignment across square/core3,
  ezcater/core, ezcater_xls, payments, sales_summaries, migrations
- Fix broken Datomic query syntax in ezcater/core (upsert-used-subscriptions,
  upsert-recent find/where clauses mangled by paren-repair)
- Uncomment accidentally commented code block in square/core3
- Fix paren/indentation issues in ssr/payments, jobs/sales_summaries
2026-04-27 10:33:22 -07:00
4597611655 fix(sales): resolve namespace collision and add missing clojure.string import
- Remove sales_orders_new.clj (unreferenced, duplicate ns)
- Add [clojure.string :as str] to sales_orders.clj ns
2026-04-27 09:41:25 -07:00
26c9563a03 feat(sales): initial Parquet migration infrastructure
- Add DuckDB/S3 parquet storage layer (auto-ap.storage.parquet)
- Add sales_to_parquet migration script for historical data
- Add cleanup_sales for post-migration Datomic cleanup
- Add sales_orders_new.clj with DuckDB read layer for SSR views
- Add test scaffolding for parquet storage
- Add plan document for move-detailed-sales-to-parquet

feat(sales): redirect production and read flows to Parquet/DuckDB

- U3: Square production (upsert) now buffers to parquet via flatten-order-to-parquet!
- U3: EzCater core import-order now buffers to parquet instead of Datomic transact
- U3: EzCater XLS upload-xls now buffers to parquet instead of audit-transact
- U4: Rewrite sales_orders.clj to read from DuckDB via pq/get-sales-orders
- U5: Rewrite sales_summaries to use parquet aggregation functions
  - get-payment-items-parquet, get-discounts-parquet, get-refund-items-parquet
  - get-tax-parquet, get-tip-parquet, get-sales-parquet
- Add sum-* aggregation functions to storage/sales_summaries.clj
  - sum-discounts, sum-refunds-by-type, sum-taxes, sum-tips, sum-sales-by-category
2026-04-27 09:41:25 -07:00
db9018722d Merge pull request 'feat: add gitea-tea skill and update AGENTS.md for PR workflow' (#4) from alluring-houseboat into master
Reviewed-on: #4
2026-04-25 20:05:38 -07:00
0e57550b3c stuff 2026-04-25 19:41:40 -07:00
16 changed files with 870 additions and 920 deletions

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View File

@@ -0,0 +1,127 @@
---
name: gitea-tea
description: Use tea CLI to create, manage, and checkout Gitea pull requests. Use this when opening a PR, managing PRs, or checking out PRs on the gitea remote (gitea.story-basking.ts.net).
---
# Gitea Tea CLI Skill
This skill covers using `tea` (Gitea's official CLI) for pull request workflows in this project.
## When to Use This Skill
Use this skill when you need to:
- Create a PR from a working branch to master on the gitea remote
- Open, list, or view PRs
- Checkout a PR locally for review or iteration
- Manage PR state (close, reopen, merge)
## Project Setup
The gitea remote is `gitea.story-basking.ts.net` with repo slug `notid/integreat`. The default push remote is **gitea**, NOT origin and NOT deploy.
In this project's environment:
- Gitea login is pre-configured for `gitea.story-basking.ts.net`
- Repo slug: `notid/integreat`
- Target branch for PRs: `master`
- The git remote named `gitea` points to this instance
## Creating a PR
Use `tea pulls create` to open a PR from the current branch to master. Always specify `-r notid/integreat -b master`:
```bash
tea pulls create -r notid/integreat -b master --title "Title" --description "Body"
```
Common flags:
- `-t, --title` - PR title
- `-d, --description` - PR body/description (use heredoc or file for long descriptions)
- `-a, --assignees` - Comma-separated usernames to assign
- `-L, --labels` - Comma-separated labels to apply
- `-m, --milestone` - Milestone to assign
**Writing a multiline description:**
```bash
tea pulls create -r notid/integreat -b master \
-t "feat: add feature" \
-d "$(cat <<'EOF'
## Summary
- Bullet point one
- Bullet point two
EOF
)"
```
Or write the body to a temp file first and reference it.
## Listing PRs
```bash
tea pulls list -r notid/integreat # List open PRs
tea pulls list -r notid/integreat --state all # All PRs
tea pulls list -r notid/integreat --limit 10 -o simple # Limit output, simple format
```
## Opening a PR in Browser
```bash
tea open pr <number> -r notid/integreat
tea open pr create -r notid/integreat # Open web UI to create a PR
```
## Checking Out a PR Locally
```bash
tea pulls checkout <number> -r notid/integreat
```
This fetches and checks out the PR branch locally.
## Managing PR State
**Close a PR:**
```bash
tea pulls close <number> -r notid/integreat --confirm
```
**Reopen a closed PR:**
```bash
tea pulls reopen <number> -r notid/integreat --confirm
```
**Merge a PR:**
```bash
tea pulls merge <number> -r notid/integreat --confirm
```
**Edit a PR (title, description, etc.):**
```bash
tea pulls edit <number> -r notid/integreat --title "New title" --description "New body"
```
## Full PR Creation Workflow
1. Ensure the branch is pushed to gitea:
```bash
git push gitea <branch-name>
```
2. Create the PR with tea:
```bash
tea pulls create -r notid/integreat -b master \
--title "feat: description of change" \
--description "Detailed PR body here"
```
3. Open the PR in browser to verify:
```bash
tea open pr <number> -r notid/integreat
```
## Tips
- Always use `-r notid/integreat` to specify the repo explicitly
- Use `-b master` to set the target branch (default may differ)
- The `--confirm` flag is required for destructive actions (close, merge)
- Use `-o simple`, `-o json`, `-o table`, etc. to control output format

View File

@@ -1,54 +1,9 @@
# Agent Instructions ## Pull Requests on Gitea
This project uses **bd** (beads) for issue tracking. Run `bd onboard` to get started. This project uses **gitea story-basking.ts net** as the primary remote for PRs. Use `tea` (the Gitea CLI) to create and manage pull requests. The gitea remote is the one you push to, NOT origin and NOT deploy.
## Issue Tracking **When opening a PR**, load and follow the **gitea-tea** skill. In short:
- Target branch is always `master`
This project uses **bd (beads)** for issue tracking. - Use `tea pulls create -r notid/integreat -b master --title "..." --description "..."`
Run `bd prime` for workflow context, or install hooks (`bd hooks install`) for auto-injection.
**Quick reference:**
- `bd ready` - Find unblocked work
- `bd create "Title" --type task --priority 2` - Create issue
- `bd close <id>` - Complete work
- `bd sync` - Sync with git (run at session end)
For full workflow details: `bd prime`
## Quick Reference
```bash
bd ready # Find available work
bd show <id> # View issue details
bd update <id> --status in_progress # Claim work
bd close <id> # Complete work
bd sync # Sync with git
```
## Landing the Plane (Session Completion)
**When ending a work session**, you MUST complete ALL steps below. Work is NOT complete until `git push` succeeds.
**MANDATORY WORKFLOW:**
1. **File issues for remaining work** - Create issues for anything that needs follow-up
2. **Run quality gates** (if code changed) - Tests, linters, builds
3. **Update issue status** - Close finished work, update in-progress items
4. **PUSH TO REMOTE** - This is MANDATORY:
```bash
git pull --rebase
bd sync
git push
git status # MUST show "up to date with origin"
```
5. **Clean up** - Clear stashes, prune remote branches
6. **Verify** - All changes committed AND pushed
7. **Hand off** - Provide context for next session
**CRITICAL RULES:**
- Work is NOT complete until `git push` succeeds
- NEVER stop before pushing - that leaves work stranded locally
- NEVER say "ready to push when you are" - YOU must push
- If push fails, resolve and retry until it succeeds
Use 'bd' for task tracking Use 'bd' for task tracking

View File

@@ -1,39 +1,51 @@
(ns auto-ap.datomic.sales-orders (ns auto-ap.datomic.sales-orders
(:require (:require
[auto-ap.datomic :refer [conn]]
[auto-ap.storage.parquet :as pq] [auto-ap.storage.parquet :as pq]
[clj-time.coerce :as c] [auto-ap.time :as atime]
[clj-time.coerce :as coerce]
[clojure.set :as set] [clojure.set :as set]
[com.brunobonacci.mulog :as mu])) [clojure.string :as str]
[com.brunobonacci.mulog :as mu]
[ring.util.codec :as ring-codec]))
(defn- payment-methods->charges [pm-str]
(when (not-empty pm-str)
(mapv (fn [pm] {:charge/type-name pm})
(str/split pm-str #","))))
(defn <-row (defn <-row
"Convert a flat parquet row into the shape consumers expect. "Convert a flat parquet row into the shape consumers expect."
Parquet produces maps of the form:
{\"external-id\" \"square/order/123\", ...}
which we transform to:
{:sales-order/external-id \"square/order/123\", ...}"
[row] [row]
(-> row (let [pm (:payment-methods row)]
(set/rename-keys (-> row
{"external-id" :sales-order/external-id (set/rename-keys
"location" :sales-order/location {:external-id :sales-order/external-id
"total" :sales-order/total :location :sales-order/location
"tax" :sales-order/tax :total :sales-order/total
"tip" :sales-order/tip :tax :sales-order/tax
"discount" :sales-order/discount :tip :sales-order/tip
"service-charge" :sales-order/service-charge :discount :sales-order/discount
"vendor" :sales-order/vendor :service-charge :sales-order/service-charge
"client-code" :sales-order/client-code :vendor :sales-order/vendor
"date" :sales-order/date}) :client-code :sales-order/client-code
(update :sales-order/date #(some-> % str)))) :date :sales-order/date
:source :sales-order/source
:reference-link :sales-order/reference-link
:payment-methods :sales-order/payment-methods
:processors :sales-order/processors
:categories :sales-order/categories})
(update :sales-order/date #(some-> % str))
(dissoc :entity-type :_seq-no)
(assoc :sales-order/charges (payment-methods->charges pm)))))
(defn build-where-clause [args] (defn build-where-clause [args]
(let [clauses [(when-let [c (:client-code args)] (let [clauses (keep identity
["external_id.client = '" c "'"]) [(when-let [c (:client-code args)]
(when-let [v (:vendor args)] (str "external_id.client = '" c "'"))
["external_id.vendor = '" (name v) "'"]) (when-let [v (:vendor args)]
(when-let [l (:location args)] (str "external_id.vendor = '" (name v) "'"))
["location = '" l "'"])] (when-let [l (:location args)]
(str "location = '" l "'"))])]
(when (seq clauses) (when (seq clauses)
(str "WHERE " (str/join " AND " clauses))))) (str "WHERE " (str/join " AND " clauses)))))
@@ -46,32 +58,112 @@
(defn raw-graphql-ids [args] (defn raw-graphql-ids [args]
(let [start (some-> (:start (:date-range args)) .toString) (let [start (some-> (:start (:date-range args)) .toString)
end (some-> (:end (:date-range args)) .substring 0 10) end (some-> (:end (:date-range args)) (.substring 0 10))
where (build-where-clause args)
sort (build-sort-clause args)
limit (or (:limit args) page-size) limit (or (:limit args) page-size)
offset (or (:offset args) 0) offset (or (:offset args) 0)]
where-str (when where (str " " where))]
(when start (when start
(let [result (pq/get-sales-orders start end (let [result (pq/get-sales-orders start end
{:client (:client-code args) {:client (:client-code args)
:vendor (:vendor args) :vendor (:vendor args)
:location (:location args) :location (:location args)
:sort sort :sort (or (:sort args) "date")
:order "DESC" :order "DESC"
:limit limit :limit limit
:offset offset})] :offset offset})]
{:ids (mapv #(str (:external_id %)) (:rows result)) {:ids (mapv #(str (:external-id %)) (:rows result))
:rows (:rows result) :rows (:rows result)
:count (:count result)})))) :count (:count result)}))))
(defn graphql-results [rows _ids _args] (defn graphql-results [rows _ids _args]
(mapv <-row rows)) (mapv <-row rows))
(defn- extract-date-str [v]
(when v
(cond
(string? v) (if (> (count v) 10) (.substring v 0 10) v)
(instance? org.joda.time.DateTime v) (atime/unparse-local v atime/normal-date)
(instance? org.joda.time.LocalDate v) (atime/unparse-local v atime/normal-date)
(instance? java.util.Date v) (atime/unparse-local (coerce/to-date-time v) atime/normal-date)
(instance? java.time.LocalDate v) (.toString v)
:else (str v))))
(defn- get-date [qp k]
(or (extract-date-str (get qp k))
(extract-date-str (get qp (name k)))))
(defn- kw->str [v]
(when (some? v)
(if (keyword? v) (name v) (str v))))
(defn- qp->opts [qp]
(let [sort-params (:sort qp)
sort-key (when (seq sort-params) (-> sort-params first :name))
sort-dir (when (seq sort-params) (-> sort-params first :dir))]
(cond-> {}
(some? (:client-code qp)) (assoc :client (kw->str (:client-code qp)))
(some? (:location qp)) (assoc :location (kw->str (:location qp)))
(not-empty (:payment-method qp)) (assoc :payment-method (:payment-method qp))
(some? (:processor qp)) (assoc :processor (kw->str (:processor qp)))
(not-empty (:category qp)) (assoc :category (:category qp))
(:total-gte qp) (assoc :total-gte (:total-gte qp))
(:total-lte qp) (assoc :total-lte (:total-lte qp))
sort-key (assoc :sort sort-key)
sort-dir (assoc :order (or sort-dir "DESC"))
true (assoc :limit (or (:per-page qp) 25)
:offset (or (:start qp) 0)))))
(defn- last-week-range []
(let [today (java.time.LocalDate/now)
end (.toString (.minusDays today 1))
start (.toString (.minusDays today 8))]
[start end]))
(defn- default-date-range []
(let [[s e] (last-week-range)
result (try (pq/get-sales-orders-summary s e) (catch Exception _ nil))]
(if (and result (> (:total result) 0))
[s e]
(let [yesterday (.toString (.minusDays (java.time.LocalDate/of 2024 4 24) 1))
week-before (.toString (.minusDays (java.time.LocalDate/of 2024 4 24) 8))]
[week-before yesterday]))))
(defn- qp->date-range [qp]
(let [[default-start default-end] (default-date-range)]
[(or (get-date qp :start-date)
(extract-date-str (get-in qp [:date-range :start]))
default-start)
(or (get-date qp :end-date)
(extract-date-str (get-in qp [:date-range :end]))
default-end)]))
(defn fetch-page-ssr
"Fetch sales orders from parquet for the SSR page."
[request]
(let [qp (:query-params request)
raw-qp (some-> (:query-string request)
ring-codec/form-decode
(->> (into {} (remove (fn [[_ v]] (str/blank? v))))))
[start end] (qp->date-range (merge raw-qp qp))
opts (qp->opts qp)
result (pq/get-sales-orders start end opts)
rows (mapv <-row (:rows result))]
{:rows rows :count (:count result)}))
(defn summarize-page-ssr
"Summarize all matching sales orders via parquet."
[request]
(let [qp (:query-params request)
raw-qp (some-> (:query-string request)
ring-codec/form-decode
(->> (into {} (remove (fn [[_ v]] (str/blank? v))))))
[start end] (qp->date-range (merge raw-qp qp))
opts (dissoc (qp->opts qp) :limit :offset :sort :order)]
(pq/get-sales-orders-summary start end opts)))
(defn summarize-orders [rows] (defn summarize-orders [rows]
(when (seq rows) (when (seq rows)
(let [total (reduce + 0.0 (map #(or (:total %) 0.0) rows)) (let [total (reduce + 0.0 (map #(or (:sales-order/total %) 0.0) rows))
tax (reduce + 0.0 (map #(or (:tax %) 0.0) rows))] tax (reduce + 0.0 (map #(or (:sales-order/tax %) 0.0) rows))]
{:total total {:total total
:tax tax}))) :tax tax})))

View File

@@ -1,224 +0,0 @@
(ns auto-ap.datomic.sales-orders
(:require [auto-ap.datomic :refer [conn]]
[auto-ap.storage.parquet :as pq]
[clojure.data.json :as json]
[clojure.java.io :as io]))
(defn <-row
"Convert a flat parquet row (string keys) into the
shape consumers expect. Parquet produces maps of the form
{\"external-id\" \"square/order/123\",
\"location\" \"DT\",
\"total\" 50.0}
which we transform to:
{:sales-order/external-id \"square/order/123\",
:sales-order/location \"DT\",
:sales-order/total 50.0}
Note: client, charges and other nested structures are not
available in the flat parquet rows. When denormalisation
adds those columns we can restore the full consumer shape."
[row]
(-> row
(set/rename-keys
{"external-id" :sales-order/external-id
"location" :sales-order/location
"total" :sales-order/total
"tax" :sales-order/tax
"tip" :sales-order/tip})))
(defn build-where-clause
"Build a SQL WHERE fragment from the fields that
parquet can filter on: external_id.client, vendor, location.
Returns either a predicate string e.g.
\"WHERE external_id.client = 'acme' AND vendor = 'square'\"
or nil when no applicable filters exist."
[args]
(let [clauses []
client (:client-code args)
vendor (:vendor args)
location (:location args)]
(when (or client vendor location)
(->> [[:client "external_id.client" client]
[:vendor "external_id.vendor" vendor]
[:location "location" location]]
(keep (fn [[_ col v]]
(when v [col v])))
(mapv #(str %1 " = '" %2 "'"))
(str/join " AND "))}))
(defn build-sort-clause
"Map sort-key field names from args into SQL ORDER-BY fragments.
Supported fields map to parquet column names:
\"date\" -> DATE
\"total\" -> TOTAL
\"tax\" -> TAX
\"tip\" -> TIP
\"client\" -> EXTERNAL_ID_CLIENT (for flat client codes)
\"location\"-> LOCATION
Falls back to \"DATE DESC\" when the args do not specify
an explicit field."
[args]
;; We delegate most of the SQL ordering work to get-sales-orders,
;; which already defaults to DATE DESC.
(when-let [sorts (:sort args)]
(->> sorts
(keep (fn [{:keys [sort-key asc]}]
(let [dir (if asc "ASC" "DESC")
col (case sort-key
"date" "DATE"
"total" "TOTAL"
"tax" "TAX"
"tip" "TIP"
"total-desc" "TOTAL DESC"
"source" "SALE_SOURCE"
"client" "EXTERNAL_ID_CLIENT"
"location" "LOCATION"
nil)] ; unknown → skip
(when col `[~col ~dir]))))
(interleave (repeat \,))
(apply str))))
(defn build-pagination-clause
"Convert a Datomic-side pagination request into SQL-limit/offset
numbers.
Supports:
:start → OFFSET
:count / :per-page → LIMIT"
[args]
{:limit (or (:count args) (:per-page args))
:offset (or (:start args) 0)})
(defn- apply-pagination
"Safely re-implements the old datomic-side pagination logic.
Mutates a COPY of args so we can extract the resulting
cursor values for the response.
Returns {limit offset}"
[args]
(let [page (build-pagination-clause args)
{:keys [limit offset]} page
client (:client-code args)]
; In the new architecture pagination is applied server-side
; by get-sales-orders via LIMIT/OFFSET, so this function
; mainly exists as a thin wrapper for any remaining
; in-memory re-paging concerns.
(if limit
(assoc page :limit (Integer. limit))
page)))
(defn- build-options
"Assemble the opts map passed to pq/get-sales-orders:
{:client ..., :location ..., :vendor ...,
:limit 10, :offset 0, :sort, :order}"
[args]
(let [page (apply-pagination args)
limit (:limit page)
offset (:offset page)
client (:client-code args)]
(cond-> {:client client
:vendor (:vendor args)
:limit limit
:offset offset}
(:location args) (assoc :location (:location args))
(:sort args) (assoc :sort "date") ; let get-sales-orders handle order
true (merge {:order "DESC"
:sort-key (:sort-key args)}))))
(defn raw-graphql-ids
"Query sales-orders FROM parquet files via DuckDB instead of Datomic.
Filters applied at the parquet level:
- date-range → selects which parquet files to read
- client-code / vendor / location where clauses
- sort & pagination are delegated to get-sales-orders
category, processor, type-name filters require nested joins
that parquet does not support -- those fields are ignored.
Returns
{:ids [string-key-for-each-matched-row]
:count int (total matches BEFORE pagination)}"
[args]
(let [start (when-let [s (:start (:date-range args))]
(.toString (.plusDays (java.time.LocalDate/parse s) -1)))
end (when-let [e (:end (:date-range args))]
(-> e .substring 0 10))
where (build-where-clause args)
options (build-options args)
where-str (some-> where #(str " WHERE " %))]
(cond->> nil
; Query rows from parquet with our filters and sort.
where-str (pq/get-sales-orders
start end
(assoc options :sort-key where-str)))
; For each row returned we need an ID string.
; We use the external-id column as the lookup key.
(when-let [rows (:rows result)]
{:ids (mapv #(str (:external_id %)) rows)
:rows rows
:count (:count result)})))
(defn graphql-results
"Return the full payment-row data for the selected IDs.
Since we now read FROM parquet, we receive the raw row vector
and transform it.
The old signature [ids db args] is replaced by [rows id-keys _].
We ignore the database argument (Datomic pull is no longer
called)."
[rows _id-keys _args]
(->> rows
(mapv #(<-row %))))
(defn summarize-orders
"Sum totals and discounts for the given ID-set.
This function still queries Datomic because the parquet-side
aggregation query would duplicate the WHERE logic.
If we want a pure parquet path here, add an
SQL-based aggregation in a follow-up."
[ids]
(let [[total tax]
(#'auto-ap.datomic/aggregate-sum ids) ; uses dc/q internally
first]
{:total total
:tax tax}))
(defn get-graphql
"Entry-point: return [payments count summary].
The data flow is:
1. raw-graphql-ids → parquet query → [:rows :count]
2. graphql-results <- transform rows ← [:results <_id-keys> _args]
3. summarize-orders <- Datomic agg ← [:total :tax]"
[args]
(let [{:keys [ids count']}
(mu/trace
::get-sales-order-ids
[]
(raw-graphql-ids args))]
[(->> (mu/trace ::get-results [] (graphql-results ids id-keys args))
matching-count
summarize-orders ids)])))
(defn summarize-graphql
"Entry-point: return just the summary {:total :tax}.
Like get-graphql, this delegates to raw-graphql-ids
and then to summarize-orders."
[args]
(let [{:keys [ids count']}
(mu/trace
::get-sales-order-ids
[]
(raw-graphql-ids args))]
(summarize-orders ids)))

View File

@@ -21,35 +21,34 @@
:body (json/write-str {"query" (v/graphql-query q)}) :body (json/write-str {"query" (v/graphql-query q)})
:as :json}) :as :json})
:body :body
:data :data))
))
(defn get-caterers [integration] (defn get-caterers [integration]
(:caterers (query integration {:venia/queries [{:query/data (:caterers (query integration {:venia/queries [{:query/data
[:caterers [:name :uuid [:address [:name :street]]]]}]} ))) [:caterers [:name :uuid [:address [:name :street]]]]}]})))
(defn get-subscriptions [integration] (defn get-subscriptions [integration]
(->> (query integration {:venia/queries [{:query/data (->> (query integration {:venia/queries [{:query/data
[:subscribers [:id [:subscriptions [:parentId :parentEntity :eventEntity :eventKey]] ]]}]} ) [:subscribers [:id [:subscriptions [:parentId :parentEntity :eventEntity :eventKey]]]]}]})
:subscribers :subscribers
first first
:subscriptions)) :subscriptions))
(defn get-integrations [] (defn get-integrations []
(map first (dc/q '[:find (pull ?i [:ezcater-integration/api-key (map first (dc/q '[:find (pull ?i [:ezcater-integration/api-key
:ezcater-integration/subscriber-uuid :ezcater-integration/subscriber-uuid
:db/id :db/id
:ezcater-integration/integration-status [:db/id]]) {:ezcater-integration/integration-status [:db/id]}])
:in $ :in $
:where [?i :ezcater-integration/api-key]] :where [?i :ezcater-integration/api-key]]
(dc/db conn)))) (dc/db conn))))
(defn mark-integration-status [integration integration-status] (defn mark-integration-status [integration integration-status]
@(dc/transact conn @(dc/transact conn
[{:db/id (:db/id integration) [{:db/id (:db/id integration)
:ezcater-integration/integration-status (assoc integration-status :ezcater-integration/integration-status (assoc integration-status
:db/id (or (-> integration :ezcater-integration/integration-status :db/id) :db/id (or (-> integration :ezcater-integration/integration-status :db/id)
(random-tempid)))}])) (random-tempid)))}]))
(defn upsert-caterers (defn upsert-caterers
([integration] ([integration]
@@ -65,14 +64,14 @@
([integration] ([integration]
(let [extant (get-subscriptions integration) (let [extant (get-subscriptions integration)
to-ensure (set (map first (dc/q '[:find ?cu to-ensure (set (map first (dc/q '[:find ?cu
:in $ :in $
:where [_ :client/ezcater-locations ?el] :where [_ :client/ezcater-locations ?el]
[?el :ezcater-location/caterer ?c] [?el :ezcater-location/caterer ?c]
[?c :ezcater-caterer/uuid ?cu]] [?c :ezcater-caterer/uuid ?cu]]
(dc/db conn)))) (dc/db conn))))
to-create (set/difference to-create (set/difference
to-ensure to-ensure
(set (map :parentId extant)))] (set (map :parentId extant)))]
(doseq [parentId to-create] (doseq [parentId to-create]
(query integration (query integration
{:venia/operation {:operation/type :mutation {:venia/operation {:operation/type :mutation
@@ -95,7 +94,6 @@
:eventKey 'cancelled}} :eventKey 'cancelled}}
[[:subscription [:parentId :parentEntity :eventEntity :eventKey]]]]]}))))) [[:subscription [:parentId :parentEntity :eventEntity :eventKey]]]]]})))))
#_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]} #_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]}
(defn upsert-ezcater (defn upsert-ezcater
([] (upsert-ezcater (get-integrations))) ([] (upsert-ezcater (get-integrations)))
@@ -116,12 +114,11 @@
(defn get-caterer [caterer-uuid] (defn get-caterer [caterer-uuid]
(dc/pull (dc/db conn) (dc/pull (dc/db conn)
'[:ezcater-caterer/name '[:ezcater-caterer/name
{:ezcater-integration/_caterers [:ezcater-integration/api-key]} {:ezcater-integration/_caterers [:ezcater-integration/api-key]}
{:ezcater-location/_caterer [:ezcater-location/location {:ezcater-location/_caterer [:ezcater-location/location
{:client/_ezcater-locations [:client/code]}]}] {:client/_ezcater-locations [:client/code]}]}]
[:ezcater-caterer/uuid caterer-uuid])) [:ezcater-caterer/uuid caterer-uuid]))
(defn round-carry-cents [f] (defn round-carry-cents [f]
(with-precision 2 (double (.setScale (bigdec f) 2 java.math.RoundingMode/HALF_UP)))) (with-precision 2 (double (.setScale (bigdec f) 2 java.math.RoundingMode/HALF_UP))))
@@ -137,69 +134,69 @@
:else :else
0.07M)] 0.07M)]
(round-carry-cents (round-carry-cents
(* commision% (* commision%
0.01M 0.01M
(+ (+
(-> order :totals :subTotal :subunits ) (-> order :totals :subTotal :subunits)
(reduce + (reduce +
0 0
(map (comp :subunits :cost) (:feesAndDiscounts (:catererCart order)))))))) (map (comp :subunits :cost) (:feesAndDiscounts (:catererCart order)))))))))
(defn ccp-fee [order] (defn ccp-fee [order]
(round-carry-cents (round-carry-cents
(* 0.000299M (* 0.000299M
(+ (+
(-> order :totals :subTotal :subunits ) (-> order :totals :subTotal :subunits)
(-> order :totals :salesTax :subunits ) (-> order :totals :salesTax :subunits)
(reduce + (reduce +
0 0
(map (comp :subunits :cost) (:feesAndDiscounts (:catererCart order)))))))) (map (comp :subunits :cost) (:feesAndDiscounts (:catererCart order))))))))
(defn order->sales-order [{{:keys [timestamp]} :event {:keys [orderItems]} :catererCart :keys [client-code client-location uuid] :as order}] (defn order->sales-order [{{:keys [timestamp]} :event {:keys [orderItems]} :catererCart :keys [client-code client-location uuid] :as order}]
(let [adjustment (round-carry-cents (- (+ (-> order :totals :subTotal :subunits (* 0.01)) (let [adjustment (round-carry-cents (- (+ (-> order :totals :subTotal :subunits (* 0.01))
(-> order :totals :salesTax :subunits (* 0.01))) (-> order :totals :salesTax :subunits (* 0.01)))
(-> order :catererCart :totals :catererTotalDue ) (-> order :catererCart :totals :catererTotalDue)
(commision order) (commision order)
(ccp-fee order))) (ccp-fee order)))
service-charge (+ (commision order) (ccp-fee order)) service-charge (+ (commision order) (ccp-fee order))
tax (-> order :totals :salesTax :subunits (* 0.01)) tax (-> order :totals :salesTax :subunits (* 0.01))
tip (-> order :totals :tip :subunits (* 0.01))] tip (-> order :totals :tip :subunits (* 0.01))]
#:sales-order #:sales-order
{:date (atime/localize (coerce/to-date-time timestamp)) {:date (atime/localize (coerce/to-date-time timestamp))
:external-id (str "ezcater/order/" client-code "-" client-location "-" uuid) :external-id (str "ezcater/order/" client-code "-" client-location "-" uuid)
:client [:client/code client-code] :client [:client/code client-code]
:location client-location :location client-location
:reference-link (str (url/url "https://ezmanage.ezcater.com/orders/" uuid )) :reference-link (str (url/url "https://ezmanage.ezcater.com/orders/" uuid))
:line-items [#:order-line-item :line-items [#:order-line-item
{:external-id (str "ezcater/order/" client-code "-" client-location "-" uuid "-" 0) {:external-id (str "ezcater/order/" client-code "-" client-location "-" uuid "-" 0)
:item-name "EZCater Catering" :item-name "EZCater Catering"
:category "EZCater Catering" :category "EZCater Catering"
:discount adjustment :discount adjustment
:tax tax :tax tax
:total (+ (-> order :totals :subTotal :subunits (* 0.01)) :total (+ (-> order :totals :subTotal :subunits (* 0.01))
tax tax
tip)}] tip)}]
:charges [#:charge :charges [#:charge
{:type-name "CARD" {:type-name "CARD"
:date (atime/localize (coerce/to-date-time timestamp)) :date (atime/localize (coerce/to-date-time timestamp))
:client [:client/code client-code] :client [:client/code client-code]
:location client-location :location client-location
:external-id (str "ezcater/charge/" uuid) :external-id (str "ezcater/charge/" uuid)
:processor :ccp-processor/ezcater :processor :ccp-processor/ezcater
:total (+ (-> order :totals :subTotal :subunits (* 0.01)) :total (+ (-> order :totals :subTotal :subunits (* 0.01))
tax tax
tip) tip)
:tip tip}] :tip tip}]
:total (+ (-> order :totals :subTotal :subunits (* 0.01)) :total (+ (-> order :totals :subTotal :subunits (* 0.01))
tax tax
tip) tip)
:discount adjustment :discount adjustment
:service-charge service-charge :service-charge service-charge
:tax tax :tax tax
:tip tip :tip tip
:returns 0.0 :returns 0.0
:vendor :vendor/ccp-ezcater}})) :vendor :vendor/ccp-ezcater}))
(defn- flatten-order-to-parquet! [order] (defn- flatten-order-to-parquet! [order]
"Flatten a sales-order into entity-type tagged maps and buffer to parquet." "Flatten a sales-order into entity-type tagged maps and buffer to parquet."
@@ -222,81 +219,73 @@
(when-let [charges (:sales-order/charges order)] (when-let [charges (:sales-order/charges order)]
(doseq [chg charges] (doseq [chg charges]
(parquet/buffer! "charge" (parquet/buffer! "charge"
{:entity-type "charge" {:entity-type "charge"
:external-id (:charge/external-id chg) :external-id (:charge/external-id chg)
:type-name (:charge/type-name chg) :type-name (:charge/type-name chg)
:total (:charge/total chg) :total (:charge/total chg)
:tax (:charge/tax chg) :tax (:charge/tax chg)
:tip (:charge/tip chg) :tip (:charge/tip chg)
:date so-date :date so-date
:processor (some-> (:charge/processor chg) name) :processor (some-> (:charge/processor chg) name)
:sales-order-external-id so-ext-id}))) :sales-order-external-id so-ext-id})))
(when-let [items (:sales-order/line-items order)] (when-let [items (:sales-order/line-items order)]
(doseq [li items] (doseq [li items]
(parquet/buffer! "line-item" (parquet/buffer! "line-item"
{:entity-type "line-item" {:entity-type "line-item"
:item-name (:order-line-item/item-name li) :item-name (:order-line-item/item-name li)
:category (:order-line-item/category li) :category (:order-line-item/category li)
:total (:order-line-item/total li) :total (:order-line-item/total li)
:tax (:order-line-item/tax li) :tax (:order-line-item/tax li)
:discount (:order-line-item/discount li) :discount (:order-line-item/discount li)
:sales-order-external-id so-ext-id}))))) :sales-order-external-id so-ext-id})))))
(defn get-by-id [integration id] (defn get-by-id [integration id]
(query (query
integration integration
{:venia/queries [[:order {:id id} {:venia/queries [[:order {:id id}
[:uuid [:uuid
:orderNumber :orderNumber
:orderSourceType :orderSourceType
[:caterer [:caterer
[:name [:name
:uuid :uuid
[:address [:street]]]] [:address [:street]]]]
[:event [:event
[:timestamp [:timestamp
:catererHandoffFoodTime :catererHandoffFoodTime
:orderType]] :orderType]]
[:catererCart [[:orderItems [:catererCart [[:orderItems
[:name [:name
:quantity :quantity
:posItemId :posItemId
[:totalInSubunits [:totalInSubunits
[:currency [:currency
:subunits]]]] :subunits]]]]
[:totals [:totals
[:catererTotalDue]] [:catererTotalDue]]
[:feesAndDiscounts [:feesAndDiscounts
{:type 'DELIVERY_FEE} {:type 'DELIVERY_FEE}
[[:cost [[:cost
[:currency [:currency
:subunits]]]]]] :subunits]]]]]]
[:totals [[:customerTotalDue [:totals [[:customerTotalDue
[ [:currency
:currency :subunits]]
:subunits [:pointOfSaleIntegrationFee
]] [:currency
[:pointOfSaleIntegrationFee :subunits]]
[ [:tip
:currency [:currency
:subunits :subunits]]
]] [:salesTax
[:tip [:currency
[:currency :subunits]]
:subunits]] [:salesTaxRemittance
[:salesTax [:currency
[ :subunits]]
:currency [:subTotal
:subunits [:currency
]] :subunits]]]]]]]}))
[:salesTaxRemittance
[:currency
:subunits
]]
[:subTotal
[:currency
:subunits]]]]]]]}))
(defn lookup-order [json] (defn lookup-order [json]
(let [caterer (get-caterer (get json "parent_id")) (let [caterer (get-caterer (get json "parent_id"))
@@ -305,17 +294,17 @@
location (-> caterer :ezcater-location/_caterer first :ezcater-location/location)] location (-> caterer :ezcater-location/_caterer first :ezcater-location/location)]
(if (and client location) (if (and client location)
(doto (doto
(-> (get-by-id integration (get json "entity_id")) (-> (get-by-id integration (get json "entity_id"))
(:order) (:order)
(assoc :client-code client (assoc :client-code client
:client-location location)) :client-location location))
(#(alog/info ::order-details :detail %))) (#(alog/info ::order-details :detail %)))
(alog/warn ::caterer-no-longer-has-location :json json)))) (alog/warn ::caterer-no-longer-has-location :json json))))
(defn import-order [json] (defn import-order [json]
(alog/info (alog/info
::try-import-order ::try-import-order
:json json) :json json)
(when-let [order (some-> json (when-let [order (some-> json
(lookup-order) (lookup-order)
(order->sales-order) (order->sales-order)
@@ -324,7 +313,7 @@
(try (try
(flatten-order-to-parquet! order) (flatten-order-to-parquet! order)
(alog/info ::order-buffered (alog/info ::order-buffered
:external-id (:sales-order/external-id order)) :external-id (:sales-order/external-id order))
(catch Exception e (catch Exception e
(alog/error ::buffer-failed (alog/error ::buffer-failed
:exception e :exception e
@@ -359,30 +348,30 @@
"key" "accepted", "key" "accepted",
"occurred_at" "2022-07-21T19:21:07.549Z"} "occurred_at" "2022-07-21T19:21:07.549Z"}
ezcater-order (lookup-order lookup-map) ezcater-order (lookup-order lookup-map)
extant-order (dc/pull (dc/db conn) '[:sales-order/total extant-order (dc/pull (dc/db conn '[:sales-order/total]
:sales-order/tax :sales-order/tax
:sales-order/tip :sales-order/tip
:sales-order/discount :sales-order/discount
:sales-order/external-id :sales-order/external-id
{:sales-order/charges [:charge/tax {:sales-order/charges [:charge/tax
:charge/tip :charge/tip
:charge/total :charge/total
:charge/external-id] :charge/external-id]
:sales-order/line-items [:order-line-item/external-id :sales-order/line-items [:order-line-item/external-id
:order-line-item/total :order-line-item/total
:order-line-item/tax :order-line-item/tax
:order-line-item/discount]}] :order-line-item/discount]})
[:sales-order/external-id order]) [:sales-order/external-id order])
updated-order (-> (order->sales-order ezcater-order) updated-order (-> (order->sales-order ezcater-order)
(select-keys (select-keys
#{:sales-order/total #{:sales-order/total
:sales-order/tax :sales-order/tax
:sales-order/tip :sales-order/tip
:sales-order/discount :sales-order/discount
:sales-order/charges :sales-order/charges
:sales-order/external-id :sales-order/external-id
:sales-order/line-items}) :sales-order/line-items})
(update :sales-order/line-items (update :sales-order/line-items
(fn [c] (fn [c]
(map #(select-keys % #{:order-line-item/external-id (map #(select-keys % #{:order-line-item/external-id

View File

@@ -40,17 +40,14 @@
(dc/db conn) (dc/db conn)
number))) number)))
(defn delete-all [] (defn delete-all []
@(dc/transact-async conn @(dc/transact-async conn
(->> (->>
(dc/q '[:find ?ss (dc/q '[:find ?ss
:where [?ss :sales-summary/date]] :where [?ss :sales-summary/date]]
(dc/db conn)) (dc/db conn))
(map (fn [[ ss]] (map (fn [[ss]]
[:db/retractEntity ss]))))) [:db/retractEntity ss])))))
(defn dirty-sales-summaries [c] (defn dirty-sales-summaries [c]
(let [client-id (dc/entid (dc/db conn) c)] (let [client-id (dc/entid (dc/db conn) c)]
@@ -138,9 +135,9 @@
{:db/id (str (java.util.UUID/randomUUID)) {:db/id (str (java.util.UUID/randomUUID))
:sales-summary-item/sort-order 3 :sales-summary-item/sort-order 3
:sales-summary-item/category (cond :sales-summary-item/category (cond
(= type-name "CARD") "Card Refunds" (= type-name "CARD") "Card Refunds"
(= type-name "CASH") "Cash Refunds" (= type-name "CASH") "Cash Refunds"
:else "Food App Refunds") :else "Food App Refunds")
:ledger-mapped/amount total :ledger-mapped/amount total
:ledger-mapped/ledger-side :ledger-side/credit}) :ledger-mapped/ledger-side :ledger-side/credit})
refunds)))) refunds))))
@@ -188,8 +185,6 @@
:ledger-mapped/ledger-side :ledger-side/credit :ledger-mapped/ledger-side :ledger-side/credit
:ledger-mapped/amount (- (+ total discount) tax)}))) :ledger-mapped/amount (- (+ total discount) tax)})))
(defn get-fees [c date] (defn get-fees [c date]
(when-let [fee (get-fee c date)] (when-let [fee (get-fee c date)]
{:db/id (str (java.util.UUID/randomUUID)) {:db/id (str (java.util.UUID/randomUUID))
@@ -299,18 +294,17 @@
(filter identity) (filter identity)
(map (fn [z] (map (fn [z]
(assoc z :ledger-mapped/account (some-> z :sales-summary-item/category str/lower-case name->number lookup-account) (assoc z :ledger-mapped/account (some-> z :sales-summary-item/category str/lower-case name->number lookup-account)
:sales-summary-item/manual? false)) :sales-summary-item/manual? false))))}]
)) }]
(if (seq (:sales-summary/items result)) (if (seq (:sales-summary/items result))
(do (do
(alog/info ::upserting-summaries (alog/info ::upserting-summaries
:category-count (count (:sales-summary/items result))) :category-count (count (:sales-summary/items result)))
@(dc/transact conn [[:upsert-entity result]])) @(dc/transact conn [[:upsert-entity result]]))
@(dc/transact conn [{:db/id id :sales-summary/dirty false}])))))) @(dc/transact conn [{:db/id id :sales-summary/dirty false}]))))))
(comment (comment
;; TODO: Move to test file or proper location ;; TODO: Move to test file or proper location
(let [c (auto-ap.datomic/pull-attr (dc/db @conn) :db/id [:client/code "NGCL" ]) (let [c (auto-ap.datomic/pull-attr (dc/db @conn) :db/id [:client/code "NGCL"])
date #inst "2024-04-14T00:00:00-07:00"] date #inst "2024-04-14T00:00:00-07:00"]
(get-payment-items c date))) (get-payment-items c date)))
@@ -322,16 +316,13 @@
(map (fn [[sos]] (map (fn [[sos]]
[:db/retractEntity sos]))))) [:db/retractEntity sos])))))
(comment (comment
(auto-ap.datomic/transact-schema conn) (auto-ap.datomic/transact-schema conn)
@(dc/transact conn [{:db/ident :sales-summary/total-unknown-processor-payments @(dc/transact conn [{:db/ident :sales-summary/total-unknown-processor-payments
:db/noHistory true, :db/noHistory true,
:db/valueType :db.type/double :db/valueType :db.type/double
:db/cardinality :db.cardinality/one}]) :db/cardinality :db.cardinality/one}])
(apply mark-dirty [:client/code "NGCL"] (last-n-days 30)) (apply mark-dirty [:client/code "NGCL"] (last-n-days 30))
@@ -363,23 +354,18 @@
(dc/db conn) (dc/db conn)
[[(auto-ap.datomic/pull-attr (dc/db conn) :db/id [:client/code "NGHW"])] #inst "2024-04-11T00:00:00-07:00" #inst "2024-04-11T00:00:00-07:00"]) [[(auto-ap.datomic/pull-attr (dc/db conn) :db/id [:client/code "NGHW"])] #inst "2024-04-11T00:00:00-07:00" #inst "2024-04-11T00:00:00-07:00"])
(dc/q '[:find ?n (dc/q '[:find ?n
:in $ [?clients ?start-date ?end-date] :in $ [?clients ?start-date ?end-date]
:where [(iol-ion.query/scan-sales-orders $ ?clients ?start-date ?end-date) [[?e _ ?sort-default] ...]] :where [(iol-ion.query/scan-sales-orders $ ?clients ?start-date ?end-date) [[?e _ ?sort-default] ...]]
[?e :sales-order/line-items ?li] [?e :sales-order/line-items ?li]
[?li :order-line-item/item-name ?n] ] [?li :order-line-item/item-name ?n]]
(dc/db conn) (dc/db conn)
[[(auto-ap.datomic/pull-attr (dc/db conn) :db/id [:client/code "NGCL"])] #inst "2024-04-11T00:00:00-07:00" #inst "2024-04-24T00:00:00-07:00"]) [[(auto-ap.datomic/pull-attr (dc/db conn) :db/id [:client/code "NGCL"])] #inst "2024-04-11T00:00:00-07:00" #inst "2024-04-24T00:00:00-07:00"])
@(dc/transact conn [{:db/id :sales-summary/total-tax :db/ident :sales-summary/total-tax-legacy} @(dc/transact conn [{:db/id :sales-summary/total-tax :db/ident :sales-summary/total-tax-legacy}
{:db/id :sales-summary/total-tip :db/ident :sales-summary/total-tip-legacy}]) {:db/id :sales-summary/total-tip :db/ident :sales-summary/total-tip-legacy}])
(auto-ap.datomic/transact-schema conn)
)
(auto-ap.datomic/transact-schema conn))
(defn -main [& _] (defn -main [& _]
(execute "sales-summaries" sales-summaries-v2)) (execute "sales-summaries" sales-summaries-v2))

View File

@@ -17,8 +17,8 @@
"Return all entity IDs that have :sales-order/external-id." "Return all entity IDs that have :sales-order/external-id."
[db] [db]
(->> (d-api/q '[:find ?e (->> (d-api/q '[:find ?e
:where [?e :sales-order/external-id]] :where [?e :sales-order/external-id]]
db) db)
(map first))) (map first)))
(defn- collect-child-ids (defn- collect-child-ids
@@ -28,19 +28,19 @@
[db order-ids] [db order-ids]
(let [order-set (set order-ids) (let [order-set (set order-ids)
charges (->> (d-api/q '[:find ?c charges (->> (d-api/q '[:find ?c
:in $ [?o ...] :in $ [?o ...]
:where [$ ?o :sales-order/charges ?c]] :where [$ ?o :sales-order/charges ?c]]
db order-set) db order-set)
(map second)) (map second))
refunds (->> (d-api/q '[:find ?r refunds (->> (d-api/q '[:find ?r
:in $ [?o ...] :in $ [?o ...]
:where [$ ?o :sales-order/refunds ?r]] :where [$ ?o :sales-order/refunds ?r]]
db order-set) db order-set)
(map second)) (map second))
line-items (->> (d-api/q '[:find ?li line-items (->> (d-api/q '[:find ?li
:in $ [?c ...] :in $ [?c ...]
:where [$ ?c :charge/line-items ?li]] :where [$ ?c :charge/line-items ?li]]
db charges) db charges)
(map second))] (map second))]
{:orders order-ids {:orders order-ids
:charges (vec charges) :charges (vec charges)
@@ -149,7 +149,7 @@
all-ids (query-sales-order-ids db) all-ids (query-sales-order-ids db)
group (group-orders-by-month db all-ids) group (group-orders-by-month db all-ids)
target-keys (get group [year month] [])] target-keys (get group [year month] [])]
(if (zero? (count target-keys)) (if (zero? (count target-keys))
(do (println " no orders found for" year "-" month) (do (println " no orders found for" year "-" month)
:skipped) :skipped)
(do (do
@@ -202,18 +202,18 @@
missing (:missing result)] missing (:missing result)]
(cond (cond
(:ok result) (:ok result)
(do (println "verified" y "-" m "S3 OK, deleting...") (do (println "verified" y "-" m "S3 OK, deleting...")
(delete-by-month conn$ nil y m)) (delete-by-month conn$ nil y m))
(> (count missing) 0) (> (count missing) 0)
(do (println "ERROR" y "-" m "missing in S3:" (do (println "ERROR" y "-" m "missing in S3:"
(str/join ", " missing)) (str/join ", " missing))
(throw (throw
(ex-info (ex-info
"Missing S3 data — aborting!" "Missing S3 data — aborting!"
{:year y :month m {:year y :month m
:missing missing}))) :missing missing})))
:else :else
(println "SKIPPING" y "-" m "no parquet files"))))) (println "SKIPPING" y "-" m "no parquet files")))))
(println "safe-cleanup-all complete"))) (println "safe-cleanup-all complete")))

View File

@@ -10,14 +10,14 @@
(write-dead-letter [flat]) ; write orphaned records" (write-dead-letter [flat]) ; write orphaned records"
(:require [auto-ap.datomic :refer [conn]] (:require [auto-ap.datomic :refer [conn]]
[auto-ap.storage.parquet :as p] [auto-ap.storage.parquet :as p]
[datomic.api :as dc] [clojure.string :as str]
[clj-time.core :as time])) [datomic.api :as dc]))
(defn- fetch-all-sales-order-ids [] (defn- fetch-all-sales-order-ids []
"Query Datomic for all sales-order external-ids (as entity IDs). "Query Datomic for all sales-order external-ids (as entity IDs).
Returns a vector of entitity ids." Returns a vector of entitity ids."
(->> (dc/q '[:find ?e (->> (dc/q '[:find ?e
:where [_ :sales-order/external-id ?_ext]] :where [?e :sales-order/external-id _]]
(dc/db conn)) (dc/db conn))
(map first) (map first)
vec)) vec))
@@ -25,14 +25,16 @@
(def ^:private sales-order-read (def ^:private sales-order-read
'[:sales-order/external-id '[:sales-order/external-id
:sales-order/date :sales-order/date
{:sales-order/client [:client/code]} {:sales-order/client [:client/code :client/name]}
:sales-order/location :sales-order/location
:sales-order/vendor {:sales-order/vendor [:vendor/name]}
:sales-order/total :sales-order/total
:sales-order/tax :sales-order/tax
:sales-order/tip :sales-order/tip
:sales-order/discount :sales-order/discount
:sales-order/service-charge :sales-order/service-charge
:sales-order/source
:sales-order/reference-link
{:sales-order/charges {:sales-order/charges
[:charge/external-id [:charge/external-id
:charge/type-name :charge/type-name
@@ -40,16 +42,16 @@
:charge/tax :charge/tax
:charge/tip :charge/tip
:charge/date :charge/date
:charge/processor {:charge/processor [:db/ident]}
:charge/returns :charge/returns
{:charge/client [:client/code]}]} {:charge/client [:client/code]}]}
{:sales-order/line-items {:sales-order/line-items
[:order-line-item/item-name [:order-line-item/item-name
:order-line-item/category :order-line-item/category
:order-line-item/total :order-line-item/total
:order-line-item/tax :order-line-item/tax
:order-line-item/discount :order-line-item/discount
{:order-line-item/unit-price {}} :order-line-item/unit-price
:order-line-item/quantity :order-line-item/quantity
:order-line-item/note]}]) :order-line-item/note]}])
@@ -61,74 +63,80 @@
sales-order-read sales-order-read
eids))) eids)))
(defn- flatten-order-to-pieces! [order flat] (defn- flatten-order-to-pieces! [order date-str flat]
"Flatten a pulled sales-order into :entity-type tagged maps. "Flatten a pulled sales-order into :entity-type tagged maps.
Appends to the existing flat vector, which is returned." Appends to the existing flat vector, which is returned."
(let [so-ext-id (:sales-order/external-id order) (let [so-ext-id (:sales-order/external-id order)
so-date (.toString (:sales-order/date order)) so-date date-str
client-code (get-in order [:sales-order/client :client/code])] client-code (get-in order [:sales-order/client :client/code])
;; sales-order row vendor-name (get-in order [:sales-order/vendor :vendor/name])
(swap! flat conj charges (:sales-order/charges order)
{:entity-type "sales-order" items (:sales-order/line-items order)
:external-id (str so-ext-id) payment-methods (->> charges (map :charge/type-name) distinct (str/join ","))
:client-code client-code processors (->> charges (map #(get-in % [:charge/processor :db/ident])) (remove nil?) distinct (map name) (str/join ","))
:location (:sales-order/location order) categories (->> items (map :order-line-item/category) (remove nil?) distinct (str/join ","))]
:vendor (:sales-order/vendor order) (vswap! flat conj
:total (:sales-order/total order) {:entity-type "sales-order"
:tax (:sales-order/tax order) :external-id (str so-ext-id)
:tip (:sales-order/tip order) :client-code client-code
:discount (:sales-order/discount order) :location (:sales-order/location order)
:service-charge (:sales-order/service-charge order) :vendor vendor-name
:date so-date}) :total (:sales-order/total order)
;; charges & line-items :tax (:sales-order/tax order)
:tip (:sales-order/tip order)
:discount (:sales-order/discount order)
:service-charge (:sales-order/service-charge order)
:date so-date
:source (:sales-order/source order)
:reference-link (:sales-order/reference-link order)
:payment-methods payment-methods
:processors processors
:categories categories})
(when-let [charges (:sales-order/charges order)] (when-let [charges (:sales-order/charges order)]
(doseq [chg charges] (doseq [chg charges]
(swap! flat conj (vswap! flat conj
{:entity-type "charge" {:entity-type "charge"
:external-id (str (get chg :charge/external-id)) :external-id (str (get chg :charge/external-id))
:type-name (get chg :charge/type-name) :type-name (get chg :charge/type-name)
:total (get chg :charge/total) :total (get chg :charge/total)
:tax (get chg :charge/tax) :tax (get chg :charge/tax)
:tip (get chg :charge/tip) :tip (get chg :charge/tip)
:date so-date :date so-date
:processor (get-in chg [:charge/processor :db/ident]) :processor (get-in chg [:charge/processor :db/ident])
:sales-order-external-id (str so-ext-id)}) :sales-order-external-id (str so-ext-id)})
;; charge returns → sales-refund rows
(when-let [returns (:charge/returns chg)] (when-let [returns (:charge/returns chg)]
(doseq [rt returns] (doseq [rt returns]
(swap! flat conj (vswap! flat conj
{:entity-type "sales-refund" {:entity-type "sales-refund"
:type-name (get rt :type-name) :type-name (get rt :type-name)
:total (get rt :total) :total (get rt :total)
:sales-order-external-id (str so-ext-id)}))))) :sales-order-external-id (str so-ext-id)})))))
;; line-items
(when-let [items (:sales-order/line-items order)] (when-let [items (:sales-order/line-items order)]
(doseq [li items] (doseq [li items]
(swap! flat conj (vswap! flat conj
{:entity-type "line-item" {:entity-type "line-item"
:item-name (get li :order-line-item/item-name) :item-name (get li :order-line-item/item-name)
:category (get li :order-line-item/category) :category (get li :order-line-item/category)
:total (get li :order-line-item/total) :total (get li :order-line-item/total)
:tax (get li :order-line-item/tax) :tax (get li :order-line-item/tax)
:discount (get li :order-line-item/discount) :discount (get li :order-line-item/discount)
:sales-order-external-id (str so-ext-id)}))))) :sales-order-external-id (str so-ext-id)})))))
(defn -fetch-order-ids-for-date (defn -fetch-order-ids-for-date
"Query Datomic for all sales-order eids on a given business date." "Query Datomic for all sales-order eids on a given business date."
[db date-str] [db date-str]
(let [day-ms (.toEpochSecond ^java.time.LocalDate (java.time.LocalDate/parse date-str)) (let [ld (java.time.LocalDate/parse date-str)
start (* day-ms 1000) start (-> ld (.atStartOfDay (java.time.ZoneId/of "America/Los_Angeles")) .toInstant java.util.Date/from)
end (+ start (* 86400000))] end (-> ld (.plusDays 1) (.atStartOfDay (java.time.ZoneId/of "America/Los_Angeles")) .toInstant java.util.Date/from)]
(->> (dc/q '[:find ?e (->> (dc/q '[:find ?e
:in $ ?start-ms ?end-ms :in $ ?start ?end
:where [_ :sales-order/date ?d] :where [?e :sales-order/date ?d]
[(>= ?d ?start-ms)] [(>= ?d ?start)]
[(<= ?d ?end-ms)]] [(< ?d ?end)]]
db start end) db start end)
(map first) (map first)
vec))) vec)))
(defn- date-seq [start end] (defn- date-seq [start end]
"Seq of YYYY-MM-DD strings between start and end inclusive." "Seq of YYYY-MM-DD strings between start and end inclusive."
(let [sd (java.time.LocalDate/parse start) (let [sd (java.time.LocalDate/parse start)
@@ -138,9 +146,9 @@
(for [i (range 0 (inc days))] (for [i (range 0 (inc days))]
(.toString (.plusDays sd i))))) (.toString (.plusDays sd i)))))
(defn- write-day-by-day (defn write-day-by-day
([start-date end-date] ([start-date end-date]
(write-day-by-day start-date end-date nil)) (write-day-by-day start-date end-date {}))
([start-date end-date opts] ([start-date end-date opts]
(let [all-dates (set (or (opts :date-set) [])) (let [all-dates (set (or (opts :date-set) []))
date-range (if (empty? all-dates) date-range (if (empty? all-dates)
@@ -156,14 +164,14 @@
(let [orders (pull-sales-order-data batch) (let [orders (pull-sales-order-data batch)
flat (volatile! [])] flat (volatile! [])]
(doseq [o orders] (doseq [o orders]
(flatten-order-to-pieces! o flat)) (flatten-order-to-pieces! o day flat))
(doseq [r @flat] (doseq [r @flat]
(p/buffer! (:entity-type r) r))))) (p/buffer! (:entity-type r) r)))))
(doseq [etype ["sales-order" "charge" (doseq [etype ["sales-order" "charge"
"line-item" "sales-refund"]] "line-item" "sales-refund"]]
(p/flush-to-parquet! etype)) (p/flush-to-parquet! etype day))
(println "[migration]" day "complete")) (println "[migration]" day "complete"))
{:status :completed :total-days (count date-range)}))) {:status :completed :total-days (count date-range)})))
(defn- write-dead-letter (defn- write-dead-letter
([flat] ([flat]
@@ -181,10 +189,11 @@
"Flush all entity-type buffers, tracking counts." "Flush all entity-type buffers, tracking counts."
(let [etypes ["sales-order" "charge" (let [etypes ["sales-order" "charge"
"line-item" "sales-refund"] "line-item" "sales-refund"]
today (.toString (java.time.LocalDate/now))
start (p/total-buf-count)] start (p/total-buf-count)]
(doseq [et etypes] (doseq [et etypes]
(try (try
(p/flush-to-parquet! et) (p/flush-to-parquet! et today)
(catch Exception e (catch Exception e
(println "[migration/flush]" et "error:" (.getMessage e))))) (println "[migration/flush]" et "error:" (.getMessage e)))))
{:records-flush (- (p/total-buf-count) start)})) {:records-flush (- (p/total-buf-count) start)}))
@@ -192,11 +201,11 @@
(defn- get-date-range [] (defn- get-date-range []
"Get the earliest and latest business dates from Datomic." "Get the earliest and latest business dates from Datomic."
(let [dates (->> (dc/q '[:find ?d (let [dates (->> (dc/q '[:find ?d
:where [_ :sales-order/date ?d]] :where [_ :sales-order/date ?d]]
(dc/db conn)) (dc/db conn))
(map first) (map first)
distinct distinct
sort)] sort)]
[(when (seq dates) (.toString (first dates))) [(when (seq dates) (.toString (first dates)))
(when (seq dates) (.toString (last dates)))])) (when (seq dates) (.toString (last dates)))]))
@@ -218,9 +227,9 @@
(doseq [o (pull-sales-order-data order-ids) (doseq [o (pull-sales-order-data order-ids)
:when (not (:sales-order/date o))] :when (not (:sales-order/date o))]
(let [flat (volatile! [])] (let [flat (volatile! [])]
(flatten-order-to-pieces! o flat) (flatten-order-to-pieces! o "unknown" flat)
(doseq [r @flat] (doseq [r @flat]
(p/buffer! "dead" r)))) (p/buffer! "dead" r))))
(write-day-by-day start-date end-date {:batch-size 100}) (write-day-by-day start-date end-date {:batch-size 100})
(flush-all-types) (flush-all-types)
(println "[migration] done") (println "[migration] done")

View File

@@ -55,54 +55,54 @@
event-date (some-> (excel/xls-date->date event-date) event-date (some-> (excel/xls-date->date event-date)
coerce/to-date-time coerce/to-date-time
atime/as-local-time atime/as-local-time
coerce/to-date )] coerce/to-date)]
(cond (and event-date client-id location ) (cond (and event-date client-id location)
[:order #:sales-order [:order #:sales-order
{:date event-date {:date event-date
:external-id (str "ezcater/order/" client-id "-" location "-" order-number) :external-id (str "ezcater/order/" client-id "-" location "-" order-number)
:client client-id :client client-id
:location location :location location
:reference-link (str order-number) :reference-link (str order-number)
:line-items [#:order-line-item :line-items [#:order-line-item
{:external-id (str "ezcater/order/" client-id "-" location "-" order-number "-" 0) {:external-id (str "ezcater/order/" client-id "-" location "-" order-number "-" 0)
:item-name "EZCater Catering" :item-name "EZCater Catering"
:category "EZCater Catering" :category "EZCater Catering"
:discount (fmt-amount (or adjustments 0.0)) :discount (fmt-amount (or adjustments 0.0))
:tax (fmt-amount tax) :tax (fmt-amount tax)
:total (fmt-amount (+ food-total :total (fmt-amount (+ food-total
tax))}] tax))}]
:charges [#:charge :charges [#:charge
{:type-name "CARD" {:type-name "CARD"
:date event-date :date event-date
:client client-id :client client-id
:location location :location location
:external-id (str "ezcater/charge/" client-id "-" location "-" order-number "-" 0) :external-id (str "ezcater/charge/" client-id "-" location "-" order-number "-" 0)
:processor :ccp-processor/ezcater :processor :ccp-processor/ezcater
:total (fmt-amount (+ food-total :total (fmt-amount (+ food-total
tax tax
tip)) tip))
:tip (fmt-amount tip)}] :tip (fmt-amount tip)}]
:total (fmt-amount (+ food-total :total (fmt-amount (+ food-total
tax tax
(or adjustments 0.0))) (or adjustments 0.0)))
:discount (fmt-amount (or adjustments 0.0)) :discount (fmt-amount (or adjustments 0.0))
:service-charge (fmt-amount (+ fee commission)) :service-charge (fmt-amount (+ fee commission))
:tax (fmt-amount tax) :tax (fmt-amount tax)
:tip (fmt-amount tip) :tip (fmt-amount tip)
:returns 0.0 :returns 0.0
:vendor :vendor/ccp-ezcater}] :vendor :vendor/ccp-ezcater}]
caterer-name caterer-name
(do (do
(alog/warn ::missing-client (alog/warn ::missing-client
:order order-number :order order-number
:store-name store-name :store-name store-name
:caterer-name caterer-name) :caterer-name caterer-name)
[:missing caterer-name]) [:missing caterer-name])
:else :else
nil))) nil)))
(defn- flatten-order-to-parquet! [order] (defn- flatten-order-to-parquet! [order]
"Flatten a sales-order into entity-type tagged maps and buffer to parquet." "Flatten a sales-order into entity-type tagged maps and buffer to parquet."
@@ -125,25 +125,25 @@
(when-let [charges (:sales-order/charges order)] (when-let [charges (:sales-order/charges order)]
(doseq [chg charges] (doseq [chg charges]
(parquet/buffer! "charge" (parquet/buffer! "charge"
{:entity-type "charge" {:entity-type "charge"
:external-id (:charge/external-id chg) :external-id (:charge/external-id chg)
:type-name (:charge/type-name chg) :type-name (:charge/type-name chg)
:total (:charge/total chg) :total (:charge/total chg)
:tax (:charge/tax chg) :tax (:charge/tax chg)
:tip (:charge/tip chg) :tip (:charge/tip chg)
:date so-date :date so-date
:processor (some-> (:charge/processor chg) name) :processor (some-> (:charge/processor chg) name)
:sales-order-external-id so-ext-id}))) :sales-order-external-id so-ext-id})))
(when-let [items (:sales-order/line-items order)] (when-let [items (:sales-order/line-items order)]
(doseq [li items] (doseq [li items]
(parquet/buffer! "line-item" (parquet/buffer! "line-item"
{:entity-type "line-item" {:entity-type "line-item"
:item-name (:order-line-item/item-name li) :item-name (:order-line-item/item-name li)
:category (:order-line-item/category li) :category (:order-line-item/category li)
:total (:order-line-item/total li) :total (:order-line-item/total li)
:tax (:order-line-item/tax li) :tax (:order-line-item/tax li)
:discount (:order-line-item/discount li) :discount (:order-line-item/discount li)
:sales-order-external-id so-ext-id}))))) :sales-order-external-id so-ext-id})))))
(defn stream->sales-orders [s] (defn stream->sales-orders [s]
(let [clients (map first (dc/q '[:find (pull ?c [:client/code (let [clients (map first (dc/q '[:find (pull ?c [:client/code
@@ -235,7 +235,7 @@
[:li ml])]])])) [:li ml])]])]))
(catch Exception e (catch Exception e
(alog/error ::import-error (alog/error ::import-error
:error e) :error e)
(html-response [:div (.getMessage e)])))))) (html-response [:div (.getMessage e)]))))))
(defn page [{:keys [matched-route request-method] :as request}] (defn page [{:keys [matched-route request-method] :as request}]

View File

@@ -28,11 +28,9 @@
"Authorization" (str "Bearer " (:client/square-auth-token client)) "Authorization" (str "Bearer " (:client/square-auth-token client))
"Content-Type" "application/json"})) "Content-Type" "application/json"}))
(defn ->square-date [d] (defn ->square-date [d]
(f/unparse (f/formatter "YYYY-MM-dd'T'HH:mm:ssZZ") d)) (f/unparse (f/formatter "YYYY-MM-dd'T'HH:mm:ssZZ") d))
(def manifold-api-stream (def manifold-api-stream
(let [stream (s/stream 100)] (let [stream (s/stream 100)]
(->> stream (->> stream
@@ -43,10 +41,10 @@
(de/loop [attempt 0] (de/loop [attempt 0]
(-> (de/chain (de/future-with (ex/execute-pool) (-> (de/chain (de/future-with (ex/execute-pool)
#_(log/info ::request-started #_(log/info ::request-started
:url (:url request) :url (:url request)
:attempt attempt :attempt attempt
:source "Square 3" :source "Square 3"
:background-job "Square 3") :background-job "Square 3")
(try (try
(client/request (assoc request (client/request (assoc request
:socket-timeout 10000 :socket-timeout 10000
@@ -105,7 +103,6 @@
:exception error)) :exception error))
[])))) []))))
(def item-cache (atom {})) (def item-cache (atom {}))
(defn fetch-catalog [client i v] (defn fetch-catalog [client i v]
@@ -125,13 +122,11 @@
#(do (swap! item-cache assoc i %) #(do (swap! item-cache assoc i %)
%)))) %))))
(defn fetch-catalog-cache [client i version] (defn fetch-catalog-cache [client i version]
(if (get @item-cache i) (if (get @item-cache i)
(de/success-deferred (get @item-cache i)) (de/success-deferred (get @item-cache i))
(fetch-catalog client i version))) (fetch-catalog client i version)))
(defn item->category-name-impl [client item version] (defn item->category-name-impl [client item version]
(capture-context->lc (capture-context->lc
(cond (:item_id (:item_variation_data item)) (cond (:item_id (:item_variation_data item))
@@ -162,7 +157,6 @@
:item item) :item item)
"Uncategorized")))) "Uncategorized"))))
(defn item-id->category-name [client i version] (defn item-id->category-name [client i version]
(capture-context->lc (capture-context->lc
(-> [client i] (-> [client i]
@@ -227,7 +221,6 @@
(concat (:orders result) continued-results)))) (concat (:orders result) continued-results))))
(:orders result))))))) (:orders result)))))))
(defn search (defn search
([client location start end] ([client location start end]
(capture-context->lc (capture-context->lc
@@ -251,11 +244,9 @@
(concat (:orders result) continued-results)))) (concat (:orders result) continued-results))))
(:orders result)))))))) (:orders result))))))))
(defn amount->money [amt] (defn amount->money [amt]
(* 0.01 (or (:amount amt) 0.0))) (* 0.01 (or (:amount amt) 0.0)))
;; to get totals: ;; to get totals:
(comment (comment
(reduce (reduce
@@ -416,7 +407,6 @@
:client client :client client
:location location))))))) :location location)))))))
(defn get-payment [client p] (defn get-payment [client p]
(de/chain (manifold-api-call (de/chain (manifold-api-call
{:url (str "https://connect.squareup.com/v2/payments/" p) {:url (str "https://connect.squareup.com/v2/payments/" p)
@@ -425,7 +415,6 @@
:body :body
:payment)) :payment))
(defn continue-payout-entry-list [c l poi cursor] (defn continue-payout-entry-list [c l poi cursor]
(capture-context->lc lc (capture-context->lc lc
(de/chain (de/chain
@@ -610,8 +599,8 @@
so-date (some-> (:sales-order/date order) .toString) so-date (some-> (:sales-order/date order) .toString)
client (:sales-order/client order) client (:sales-order/client order)
client-code (when client (if (map? client) client-code (when client (if (map? client)
(:client/code client) (:client/code client)
client))] client))]
(parquet/buffer! "sales-order" (parquet/buffer! "sales-order"
{:entity-type "sales-order" {:entity-type "sales-order"
:external-id so-ext-id :external-id so-ext-id
@@ -627,32 +616,32 @@
(when-let [charges (:sales-order/charges order)] (when-let [charges (:sales-order/charges order)]
(doseq [chg charges] (doseq [chg charges]
(parquet/buffer! "charge" (parquet/buffer! "charge"
{:entity-type "charge" {:entity-type "charge"
:external-id (:charge/external-id chg) :external-id (:charge/external-id chg)
:type-name (:charge/type-name chg) :type-name (:charge/type-name chg)
:total (:charge/total chg) :total (:charge/total chg)
:tax (:charge/tax chg) :tax (:charge/tax chg)
:tip (:charge/tip chg) :tip (:charge/tip chg)
:date so-date :date so-date
:processor (some-> (:charge/processor chg) name) :processor (some-> (:charge/processor chg) name)
:sales-order-external-id so-ext-id}) :sales-order-external-id so-ext-id})
(when-let [returns (:charge/returns chg)] (when-let [returns (:charge/returns chg)]
(doseq [rt returns] (doseq [rt returns]
(parquet/buffer! "sales-refund" (parquet/buffer! "sales-refund"
{:entity-type "sales-refund" {:entity-type "sales-refund"
:type-name (:type-name rt) :type-name (:type-name rt)
:total (:total rt) :total (:total rt)
:sales-order-external-id so-ext-id}))))) :sales-order-external-id so-ext-id})))))
(when-let [items (:sales-order/line-items order)] (when-let [items (:sales-order/line-items order)]
(doseq [li items] (doseq [li items]
(parquet/buffer! "line-item" (parquet/buffer! "line-item"
{:entity-type "line-item" {:entity-type "line-item"
:item-name (:order-line-item/item-name li) :item-name (:order-line-item/item-name li)
:category (:order-line-item/category li) :category (:order-line-item/category li)
:total (:order-line-item/total li) :total (:order-line-item/total li)
:tax (:order-line-item/tax li) :tax (:order-line-item/tax li)
:discount (:order-line-item/discount li) :discount (:order-line-item/discount li)
:sales-order-external-id so-ext-id}))))) :sales-order-external-id so-ext-id})))))
(defn upsert (defn upsert
([client] ([client]
@@ -674,8 +663,7 @@
(catch Exception e (catch Exception e
(log/error ::buffer-failed (log/error ::buffer-failed
:exception e :exception e
:order (:sales-order/external-id order))))))))))) :order (:sales-order/external-id order))))))))))))
(defn upsert-payouts (defn upsert-payouts
([client] ([client]
@@ -725,7 +713,6 @@
(log/info ::done-loading-refunds))))))) (log/info ::done-loading-refunds)))))))
(defn get-cash-shift [client id] (defn get-cash-shift [client id]
(de/chain (manifold-api-call {:url (str (url/url "https://connect.squareup.com/v2/cash-drawers/shifts" id)) (de/chain (manifold-api-call {:url (str (url/url "https://connect.squareup.com/v2/cash-drawers/shifts" id))
:method :get :method :get
@@ -884,8 +871,6 @@
d1 d1
d2)) d2))
(defn remove-voided-orders (defn remove-voided-orders
([client] ([client]
(apply de/zip (apply de/zip
@@ -920,32 +905,28 @@
(doseq [x (partition-all 100 results)] (doseq [x (partition-all 100 results)]
(log/info ::removing-orders (log/info ::removing-orders
:count (count x)) :count (count x))
@(dc/transact-async conn x))))) @(dc/transact-async conn x)
(de/catch (fn [e] (de/catch (fn [e]
(log/warn ::couldnt-remove :error e) (log/warn ::couldnt-remove :error e)
nil) )))))) nil)))))))))))
#_(comment #_(comment
(require 'auto-ap.time-reader) (require 'auto-ap.time-reader)
@(let [[c [l]] (get-square-client-and-location "DBFS") ] @(let [[c [l]] (get-square-client-and-location "DBFS")]
(log/peek :x [ c l]) (log/peek :x [c l])
(search c l #clj-time/date-time "2026-03-28" #clj-time/date-time "2026-03-29") (search c l #clj-time/date-time "2026-03-28" #clj-time/date-time "2026-03-29"))
) @(let [[c [l]] (get-square-client-and-location "NGAK")]
(log/peek :x [c l])
@(let [[c [l]] (get-square-client-and-location "NGAK") ] (remove-voided-orders c l #clj-time/date-time "2024-04-11" #clj-time/date-time "2024-04-15"))
(log/peek :x [ c l]) (doseq [c (get-square-clients)]
(try
(remove-voided-orders c l #clj-time/date-time "2024-04-11" #clj-time/date-time "2024-04-15")) @(remove-voided-orders c)
(doseq [c (get-square-clients)] (catch Exception e
(try nil)))
@(remove-voided-orders c) )
(catch Exception e
nil)))
)
(defn upsert-all [& clients] (defn upsert-all [& clients]
(capture-context->lc (capture-context->lc
@@ -1014,8 +995,6 @@
[:clients clients] [:clients clients]
@(apply upsert-all clients))) @(apply upsert-all clients)))
(comment (comment
(defn refunds-raw-cont (defn refunds-raw-cont
([client l cursor so-far] ([client l cursor so-far]
@@ -1045,7 +1024,6 @@
(->> (->>
@(let [[c [l]] (get-square-client-and-location "NGGG")] @(let [[c [l]] (get-square-client-and-location "NGGG")]
(search c l (time/now) (time/plus (time/now) (time/days -1)))) (search c l (time/now) (time/plus (time/now) (time/days -1))))
(filter (fn [r] (filter (fn [r]
@@ -1055,7 +1033,6 @@
(->> (->>
@(let [[c [l]] (get-square-client-and-location "NGGG")] @(let [[c [l]] (get-square-client-and-location "NGGG")]
(refunds-raw-cont c l nil [])) (refunds-raw-cont c l nil []))
(filter (fn [r] (filter (fn [r]
(str/starts-with? (:created_at r) "2024-03-14"))))) (str/starts-with? (:created_at r) "2024-03-14")))))
@@ -1090,12 +1067,7 @@
[(:client/code c) (atime/unparse-local (clj-time.coerce/to-date-time (:sales-order/date bad-row)) atime/normal-date) (:sales-order/total bad-row) (:sales-order/tax bad-row) (:sales-order/tip bad-row) (:db/id bad-row)]) [(:client/code c) (atime/unparse-local (clj-time.coerce/to-date-time (:sales-order/date bad-row)) atime/normal-date) (:sales-order/total bad-row) (:sales-order/tax bad-row) (:sales-order/tip bad-row) (:db/id bad-row)])
:separator \tab) :separator \tab)
;; =>
;; =>
(require 'auto-ap.time-reader) (require 'auto-ap.time-reader)
@@ -1104,26 +1076,15 @@
(clojure.pprint/pprint (let [[c [l]] (get-square-client-and-location "NGVT")] (clojure.pprint/pprint (let [[c [l]] (get-square-client-and-location "NGVT")]
l l
(def z @(search c l #clj-time/date-time "2025-02-23T00:00:00-08:00" (def z @(search c l #clj-time/date-time "2025-02-23T00:00:00-08:00"
#clj-time/date-time "2025-02-28T00:00:00-08:00")) #clj-time/date-time "2025-02-28T00:00:00-08:00"))
(take 10 (map #(first (deref (order->sales-order c l %))) z))) (take 10 (map #(first (deref (order->sales-order c l %))) z))))
)
(->> z (->> z
(filter (fn [o] (filter (fn [o]
(seq (filter (comp #{"OTHER"} :type) (:tenders o))))) (seq (filter (comp #{"OTHER"} :type) (:tenders o)))))
(filter #(not (:name (:source %)))) (filter #(not (:name (:source %))))
(count) (count))
)
(doseq [[code] (seq (dc/q '[:find ?code (doseq [[code] (seq (dc/q '[:find ?code
:in $ :in $
@@ -1133,32 +1094,22 @@
[?o :sales-order/client ?c] [?o :sales-order/client ?c]
[?c :client/code ?code]] [?c :client/code ?code]]
(dc/db conn))) (dc/db conn)))
:let [[c [l]] (get-square-client-and-location code) :let [[c [l]] (get-square-client-and-location code)]
]
order @(search c l #clj-time/date-time "2026-01-01T00:00:00-08:00" (time/now)) order @(search c l #clj-time/date-time "2026-01-01T00:00:00-08:00" (time/now))
:when (= "Invoices" (:name (:source order) )) :when (= "Invoices" (:name (:source order)))
:let [[sales-order] @(order->sales-order c l order)]] :let [[sales-order] @(order->sales-order c l order)]]
(when (should-import-order? order) (when (should-import-order? order)
(println "DATE IS" (:sales-order/date sales-order)) (println "DATE IS" (:sales-order/date sales-order))
(when (some-> (:sales-order/date sales-order) coerce/to-date-time (time/after? #clj-time/date-time "2026-2-16T00:00:00-08:00")) (when (some-> (:sales-order/date sales-order) coerce/to-date-time (time/after? #clj-time/date-time "2026-2-16T00:00:00-08:00"))
(println "WOULD UPDATE" sales-order) (println "WOULD UPDATE" sales-order)
@(dc/transact auto-ap.datomic/conn [sales-order]) @(dc/transact auto-ap.datomic/conn [sales-order]))
) #_@(dc/transact)
#_@(dc/transact ) (println "DONE")))
(println "DONE"))
)
#_(filter (comp #{"OTHER"} :type) (mapcat :tenders z)) #_(filter (comp #{"OTHER"} :type) (mapcat :tenders z))
@(let [[c [l]] (get-square-client-and-location "NGRY")] @(let [[c [l]] (get-square-client-and-location "NGRY")]
#_(search c l (clj-time.coerce/from-date #inst "2025-02-28") (clj-time.coerce/from-date #inst "2025-03-01")) #_(search c l (clj-time.coerce/from-date #inst "2025-02-28") (clj-time.coerce/from-date #inst "2025-03-01"))
(order->sales-order c l (:order (get-order c l "KdvwntmfMNTKBu8NOocbxatOs18YY" ))) (order->sales-order c l (:order (get-order c l "KdvwntmfMNTKBu8NOocbxatOs18YY")))))
)
)

View File

@@ -104,19 +104,18 @@
:size :small})]) :size :small})])
(com/field {:label "Payment Type"} (com/field {:label "Payment Type"}
(com/radio-card {:size :small (com/radio-card {:size :small
:name "payment-type" :name "payment-type"
:value (:payment-type (:query-params request)) :value (:payment-type (:query-params request))
:options [{:value "" :options [{:value ""
:content "All"} :content "All"}
{:value "cash" {:value "cash"
:content "Cash"} :content "Cash"}
{:value "check" {:value "check"
:content "Check"} :content "Check"}
{:value "debit" {:value "debit"
:content "Debit"}]})) :content "Debit"}]}))
(exact-match-id* request)]]) (exact-match-id* request)]])
(def default-read '[* (def default-read '[*
[:payment/date :xform clj-time.coerce/from-date] [:payment/date :xform clj-time.coerce/from-date]
{:invoice-payment/_payment [* {:invoice-payment/invoice [*]}]} {:invoice-payment/_payment [* {:invoice-payment/invoice [*]}]}
@@ -212,7 +211,6 @@
'[(iol-ion.query/dollars= ?transaction-amount ?amount)]]} '[(iol-ion.query/dollars= ?transaction-amount ?amount)]]}
:args [(:amount query-params)]}) :args [(:amount query-params)]})
(:status route-params) (:status route-params)
(merge-query {:query {:in ['?status] (merge-query {:query {:in ['?status]
:where ['[?e :payment/status ?status]]} :where ['[?e :payment/status ?status]]}
@@ -244,12 +242,12 @@
(defn sum-visible-pending [ids] (defn sum-visible-pending [ids]
(->> (->>
(dc/q {:find ['?id '?o] (dc/q {:find ['?id '?o]
:in ['$ '[?id ...]] :in ['$ '[?id ...]]
:where ['[?id :payment/amount ?o] :where ['[?id :payment/amount ?o]
'[?id :payment/status :payment-status/pending]]} '[?id :payment/status :payment-status/pending]]}
(dc/db conn) (dc/db conn)
ids) ids)
(map last) (map last)
(reduce (reduce
+ +
@@ -257,15 +255,15 @@
(defn sum-client-pending [clients] (defn sum-client-pending [clients]
(->> (->>
(dc/q {:find '[?e ?a] (dc/q {:find '[?e ?a]
:in '[$ [?clients ?start ?end]] :in '[$ [?clients ?start ?end]]
:where '[[(iol-ion.query/scan-payments $ ?clients ?start ?end) [[?e _ ?sort-default] ...]] :where '[[(iol-ion.query/scan-payments $ ?clients ?start ?end) [[?e _ ?sort-default] ...]]
[?e :payment/status :payment-status/pending] [?e :payment/status :payment-status/pending]
[?e :payment/amount ?a]]} [?e :payment/amount ?a]]}
(dc/db conn) (dc/db conn)
[clients [clients
nil nil
nil]) nil])
(map last) (map last)
(reduce (reduce
@@ -277,16 +275,14 @@
{ids-to-retrieve :ids matching-count :count {ids-to-retrieve :ids matching-count :count
all-ids :all-ids} (fetch-ids db request)] all-ids :all-ids} (fetch-ids db request)]
[(->> (hydrate-results ids-to-retrieve db request)) [(->> (hydrate-results ids-to-retrieve db request))
matching-count matching-count
(sum-visible-pending all-ids) (sum-visible-pending all-ids)
(sum-client-pending (extract-client-ids (:clients request) (sum-client-pending (extract-client-ids (:clients request)
(:client request) (:client request)
(:client-id (:query-params request)) (:client-id (:query-params request))
(when (:client-code (:query-params request)) (when (:client-code (:query-params request))
[:client/code (:client-code (:query-params request))]))) [:client/code (:client-code (:query-params request))])))]))
]))
(def query-schema (mc/schema (def query-schema (mc/schema
[:maybe [:map {:date-range [:date-range :start-date :end-date]} [:maybe [:map {:date-range [:date-range :start-date :end-date]}
@@ -327,7 +323,7 @@
(assoc-in (exact-match-id* request) [1 :hx-swap-oob] true)]) (assoc-in (exact-match-id* request) [1 :hx-swap-oob] true)])
:query-schema query-schema :query-schema query-schema
:action-buttons (fn [request] :action-buttons (fn [request]
(let [[_ _ visible-in-float total-in-float ] (:page-results request)] (let [[_ _ visible-in-float total-in-float] (:page-results request)]
[(com/pill {:color :primary} " Visible in float " [(com/pill {:color :primary} " Visible in float "
(format "$%,.2f" visible-in-float)) (format "$%,.2f" visible-in-float))
(com/pill {:color :secondary} " Total in float " (com/pill {:color :secondary} " Total in float "
@@ -409,7 +405,7 @@
:render (fn [{:payment/keys [date]}] :render (fn [{:payment/keys [date]}]
(some-> date (atime/unparse-local atime/normal-date)))} (some-> date (atime/unparse-local atime/normal-date)))}
{:key "amount" {:key "amount"
:sort-key "amount" :sort-key "amount"
:name "Amount" :name "Amount"
:render (fn [{:payment/keys [amount]}] :render (fn [{:payment/keys [amount]}]
(some->> amount (format "$%.2f")))} (some->> amount (format "$%.2f")))}
@@ -421,10 +417,10 @@
(map :invoice-payment/invoice) (map :invoice-payment/invoice)
(filter identity) (filter identity)
(map (fn [invoice] (map (fn [invoice]
{:link (hu/url (bidi/path-for ssr-routes/only-routes {:link (hu/url (bidi/path-for ssr-routes/only-routes
::invoice-route/all-page) ::invoice-route/all-page)
{:exact-match-id (:db/id invoice)}) {:exact-match-id (:db/id invoice)})
:content (str "Inv. " (:invoice/invoice-number invoice))}))) :content (str "Inv. " (:invoice/invoice-number invoice))})))
(some-> p :transaction/_payment ((fn [t] (some-> p :transaction/_payment ((fn [t]
[{:link (hu/url (bidi/path-for client-routes/routes [{:link (hu/url (bidi/path-for client-routes/routes
:transactions) :transactions)
@@ -434,8 +430,6 @@
(def row* (partial helper/row* grid-page)) (def row* (partial helper/row* grid-page))
(comment (comment
(mc/decode query-schema {"exact-match-id" "123"} (mt/transformer main-transformer mt/strip-extra-keys-transformer)) (mc/decode query-schema {"exact-match-id" "123"} (mt/transformer main-transformer mt/strip-extra-keys-transformer))
(mc/decode query-schema {} (mt/transformer main-transformer mt/strip-extra-keys-transformer)) (mc/decode query-schema {} (mt/transformer main-transformer mt/strip-extra-keys-transformer))
@@ -445,7 +439,6 @@
(mc/decode query-schema {"payment-type" "food"} (mt/transformer main-transformer mt/strip-extra-keys-transformer)) (mc/decode query-schema {"payment-type" "food"} (mt/transformer main-transformer mt/strip-extra-keys-transformer))
(mc/decode query-schema {"vendor" "87"} (mt/transformer main-transformer mt/strip-extra-keys-transformer)) (mc/decode query-schema {"vendor" "87"} (mt/transformer main-transformer mt/strip-extra-keys-transformer))
(mc/decode query-schema {"start-date" #inst "2023-12-21T08:00:00.000-00:00"} (mt/transformer main-transformer mt/strip-extra-keys-transformer))) (mc/decode query-schema {"start-date" #inst "2023-12-21T08:00:00.000-00:00"} (mt/transformer main-transformer mt/strip-extra-keys-transformer)))
(defn delete [{check :entity :as request identity :identity}] (defn delete [{check :entity :as request identity :identity}]
@@ -459,7 +452,7 @@
#(assert-can-see-client identity (:db/id (:payment/client check)))) #(assert-can-see-client identity (:db/id (:payment/client check))))
(notify-if-locked (:db/id (:payment/client check)) (notify-if-locked (:db/id (:payment/client check))
(:payment/date check)) (:payment/date check))
(let [ removing-payments (mapcat (fn [x] (let [removing-payments (mapcat (fn [x]
(let [invoice (:invoice-payment/invoice x) (let [invoice (:invoice-payment/invoice x)
new-balance (+ (:invoice/outstanding-balance invoice) new-balance (+ (:invoice/outstanding-balance invoice)
(:invoice-payment/amount x))] (:invoice-payment/amount x))]
@@ -578,7 +571,6 @@
(assoc-in [:query-params :start] 0) (assoc-in [:query-params :start] 0)
(assoc-in [:query-params :per-page] 250)))) (assoc-in [:query-params :per-page] 250))))
:else :else
selected) selected)
updated-count (void-payments-internal ids (:identity request))] updated-count (void-payments-internal ids (:identity request))]
@@ -591,7 +583,7 @@
(defn wrap-status-from-source [handler] (defn wrap-status-from-source [handler]
(fn [{:keys [matched-current-page-route] :as request}] (fn [{:keys [matched-current-page-route] :as request}]
(let [ request (cond-> request (let [request (cond-> request
(= ::route/cleared-page matched-current-page-route) (assoc-in [:route-params :status] :payment-status/cleared) (= ::route/cleared-page matched-current-page-route) (assoc-in [:route-params :status] :payment-status/cleared)
(= ::route/pending-page matched-current-page-route) (assoc-in [:route-params :status] :payment-status/pending) (= ::route/pending-page matched-current-page-route) (assoc-in [:route-params :status] :payment-status/pending)
(= ::route/voided-page matched-current-page-route) (assoc-in [:route-params :status] :payment-status/voided) (= ::route/voided-page matched-current-page-route) (assoc-in [:route-params :status] :payment-status/voided)
@@ -605,7 +597,7 @@
::route/pending-page (-> (helper/page-route grid-page) ::route/pending-page (-> (helper/page-route grid-page)
(wrap-implied-route-param :status :payment-status/pending)) (wrap-implied-route-param :status :payment-status/pending))
::route/voided-page (-> (helper/page-route grid-page) ::route/voided-page (-> (helper/page-route grid-page)
(wrap-implied-route-param :status :payment-status/voided)) (wrap-implied-route-param :status :payment-status/voided))
::route/all-page (-> (helper/page-route grid-page) ::route/all-page (-> (helper/page-route grid-page)
(wrap-implied-route-param :status nil)) (wrap-implied-route-param :status nil))
@@ -618,7 +610,6 @@
::route/bulk-delete (-> bulk-delete-dialog ::route/bulk-delete (-> bulk-delete-dialog
(wrap-admin)) (wrap-admin))
::route/table (helper/table-route grid-page)} ::route/table (helper/table-route grid-page)}
(fn [h] (fn [h]
(-> h (-> h

View File

@@ -1,7 +1,7 @@
(ns auto-ap.ssr.pos.sales-orders (ns auto-ap.ssr.pos.sales-orders
(:require (:require
[auto-ap.datomic [auto-ap.datomic
:refer [add-sorter-fields apply-pagination apply-sort-3 conn merge-query :refer [add-sorter-fields apply-pagination apply-sort-3 merge-query
pull-many query2]] pull-many query2]]
[auto-ap.datomic.sales-orders :as d-sales] [auto-ap.datomic.sales-orders :as d-sales]
[auto-ap.query-params :as query-params :refer [wrap-copy-qp-pqp]] [auto-ap.query-params :as query-params :refer [wrap-copy-qp-pqp]]
@@ -17,7 +17,6 @@
[auto-ap.time :as atime] [auto-ap.time :as atime]
[bidi.bidi :as bidi] [bidi.bidi :as bidi]
[clj-time.coerce :as c] [clj-time.coerce :as c]
[datomic.api :as dc]
[malli.core :as mc])) [malli.core :as mc]))
(def query-schema (mc/schema (def query-schema (mc/schema
@@ -172,11 +171,8 @@
charges)) charges))
(defn fetch-page [request] (defn fetch-page [request]
(let [db (dc/db conn) (let [{:keys [rows count]} (d-sales/fetch-page-ssr request)]
{ids-to-retrieve :ids matching-count :count} (fetch-ids db request)] [rows count]))
[(->> (hydrate-results ids-to-retrieve db request))
matching-count]))
(def grid-page (def grid-page
@@ -200,13 +196,13 @@
:title "Sales orders" :title "Sales orders"
:entity-name "Sales orders" :entity-name "Sales orders"
:route :pos-sales-table :route :pos-sales-table
:action-buttons (fn [request] :action-buttons (fn [request]
(let [{:keys [total tax]} (d-sales/summarize-orders (:ids (fetch-ids (dc/db conn) request)))] (let [{:keys [total tax]} (d-sales/summarize-page-ssr request)]
(when (and total tax) (when (and total tax)
[(com/pill {:color :primary} [(com/pill {:color :primary}
(format "Total $%.2f" total)) (format "Total $%.2f" total))
(com/pill {:color :secondary} (com/pill {:color :secondary}
(format "Tax $%.2f" tax))]))) (format "Tax $%.2f" tax))])))
:row-buttons (fn [_ e] :row-buttons (fn [_ e]
(when (:sales-order/reference-link e) (when (:sales-order/reference-link e)
[(com/a-icon-button {:href (:sales-order/reference-link e)} [(com/a-icon-button {:href (:sales-order/reference-link e)}

View File

@@ -22,6 +22,10 @@
(let [conn (DriverManager/getConnection "jdbc:duckdb:") (let [conn (DriverManager/getConnection "jdbc:duckdb:")
stmt (.createStatement conn)] stmt (.createStatement conn)]
(.execute stmt "INSTALL httpfs; LOAD httpfs;") (.execute stmt "INSTALL httpfs; LOAD httpfs;")
(when-let [key (:aws-access-key-id env)]
(.execute stmt (str "SET s3_access_key_id='" key "'"))
(.execute stmt (str "SET s3_secret_access_key='" (:aws-secret-access-key env) "'"))
(.execute stmt (str "SET s3_region='" (or (:aws-region env) "us-east-1") "'")))
(.close stmt) (.close stmt)
(.addShutdownHook (Runtime/getRuntime) (.addShutdownHook (Runtime/getRuntime)
(Thread. #(fn []))) (Thread. #(fn [])))
@@ -107,9 +111,9 @@
(str entity-type ".jsonl"))] (str entity-type ".jsonl"))]
(io/make-parents wal-file) (io/make-parents wal-file)
(with-open [w (io/writer wal-file :append true)] (with-open [w (io/writer wal-file :append true)]
(.write w ^String (json/write-str {:seq-no seq-no (.write w ^String (json/write-str {:seq-no seq-no
:record record})) :record record}))
(.write w (int \newline)))) (.write w (int \newline))))
(catch Exception e (catch Exception e
(println "[parquet/wal]" (.getMessage e)))) (println "[parquet/wal]" (.getMessage e))))
entry)) entry))
@@ -124,12 +128,12 @@
(->> @*buffers* (->> @*buffers*
vals (mapcat identity) count)) vals (mapcat identity) count))
(defn flush-to-parquet! [entity-type] (defn flush-to-parquet! [entity-type date-str]
"Flush buffered records for entity-type to parquet + S3." "Flush buffered records for entity-type to parquet + S3."
(let [records (get @*buffers* entity-type [])] (let [records (get @*buffers* entity-type [])]
(if (empty? records) (if (empty? records)
{:status :no-records} {:status :no-records}
(let [date-str (.toString (LocalDate/now)) (let [date-str (or date-str (.toString (LocalDate/now)))
jsonl-file (io/file "/tmp" jsonl-file (io/file "/tmp"
(str entity-type "-" date-str ".jsonl")) (str entity-type "-" date-str ".jsonl"))
parquet-file (io/file "/tmp" parquet-file (io/file "/tmp"
@@ -138,8 +142,8 @@
(try (try
(with-open [w (io/writer jsonl-file :append true)] (with-open [w (io/writer jsonl-file :append true)]
(doseq [r records] (doseq [r records]
(.write w ^String (json/write-str (dissoc r :_seq-no))) (.write w ^String (json/write-str (dissoc r :_seq-no)))
(.write w (int \newline)))) (.write w (int \newline))))
(execute-to-parquet! (execute-to-parquet!
(format "SELECT * FROM read_json_auto('%s')" (format "SELECT * FROM read_json_auto('%s')"
(.getAbsolutePath jsonl-file)) (.getAbsolutePath jsonl-file))
@@ -150,17 +154,19 @@
(.delete ^java.io.File parquet-file) (.delete ^java.io.File parquet-file)
{:key s3-key :status :ok} {:key s3-key :status :ok}
(catch Exception e (catch Exception e
(throw (ex-info "Flush failed" {:entity-type entity-type} (throw (ex-info "Flush failed"
:error (.getMessage e))))))))) {:entity-type entity-type
:error (.getMessage e)}))))))))
(defn flush-by-date! [] (defn flush-by-date! []
"Flush all entity types for today." "Flush all entity types for today."
(let [etypes ["sales-order" "charge" (let [etypes ["sales-order" "charge"
"line-item" "sales-refund"] "line-item" "sales-refund"]
today (.toString (LocalDate/now))
flushed (into #{} flushed (into #{}
(keep (fn [et] (keep (fn [et]
(let [{:keys [status]} (let [{:keys [status]}
(flush-to-parquet! et)] (flush-to-parquet! et today)]
(when (= status :ok) (when (= status :ok)
et)))) et))))
etypes)] etypes)]
@@ -185,11 +191,12 @@
{} {}
(into {} (into {}
(keep (fn [et] (keep (fn [et]
(let [f (io/file (let [f (io/file
(wal-dir) (wal-dir)
(str et ".jsonl"))] (str et ".jsonl"))]
[et (slurp f)]))) (when (.exists f)
etypes))] [et (slurp f)])))
etypes)))]
(swap! *buffers* merge loaded))) (swap! *buffers* merge loaded)))
(defn get-unflushed-count [] (defn get-unflushed-count []
@@ -213,66 +220,135 @@
(defn today [] (defn today []
(.toString (LocalDate/now))) (.toString (LocalDate/now)))
(defn- parquet-glob [entity-type start-date end-date]
"Build a glob pattern or explicit file list for the date range.
Uses glob patterns for ranges > 60 days; explicit list otherwise."
(let [days (-> (LocalDate/parse end-date)
(.toEpochDay)
(- (.toEpochDay (LocalDate/parse start-date)))
inc)]
(if (> days 60)
(let [prefix (format "s3://%s/sales-details/%s/" *bucket* entity-type)
sy (-> (LocalDate/parse start-date) .getYear)
ey (-> (LocalDate/parse end-date) .getYear)]
(if (= sy ey)
[(format "%s%d-*.parquet" prefix sy)]
(vec
(for [y (range sy (inc ey))]
(format "%s%d-*.parquet" prefix y)))))
(vec
(map (fn [d]
(format "'s3://%s/sales-details/%s/%s.parquet'"
*bucket* entity-type d))
(date-seq start-date end-date))))))
(defn parquet-query [entity-type start-date end-date] (defn parquet-query [entity-type start-date end-date]
"Build SQL to read all parquet files in date range. "Build SQL to read all parquet files in date range.
Returns map with :sql and :count-sql keys." Returns map with :sql and :count-sql keys."
(let [date-strs (date-seq start-date end-date) (let [globs (parquet-glob entity-type start-date end-date)
urls (vec use-glob? (some #(.endsWith ^String % "*.parquet") globs)
(map #(format "'s3://%%s/sales-details/%%s/%%s.parquet'" base (if use-glob?
*bucket* entity-type %) (format "SELECT * FROM read_parquet(%s, union_by_name=true)"
date-strs)) (if (= (count globs) 1)
sql (str "SELECT * FROM read_parquet([" (format "'%s'" (first globs))
(str/join ", " urls) (format "[%s]"
"])")] (str/join ", " (map #(format "'%s'" %) globs)))))
(format "SELECT * FROM read_parquet([%s])"
(str/join ", " globs)))
add-date-filter (fn [sql]
(if (> (-> (LocalDate/parse end-date)
(.toEpochDay)
(- (.toEpochDay (LocalDate/parse start-date)))
inc)
60)
(format "%s WHERE date >= '%s' AND date <= '%s'"
sql start-date end-date)
sql))
sql (add-date-filter base)]
{:sql sql {:sql sql
:count-sql (format "SELECT COUNT(*) FROM (%s) t" sql)})) :count-sql (format "SELECT COUNT(*) FROM (%s) t" sql)}))
(defn- build-where-clause [opts field-pairs] (defn- like-clause [col v]
"Build SQL WHERE clause from opts map. (str "\"" col "\" LIKE '%" v "%'"))
fields-with-keys is vector of [:field-key :env-var-name]."
(let [clauses (keep (defn- build-sales-orders-where [opts]
(fn [[key env]] (let [eq-clauses (keep
(let [v (get opts key)] (fn [[key col]]
(when v (let [v (get opts key)]
(str env " = '" v "'")))) (when v
field-pairs)] (str "\"" col "\" = '" v "'"))))
(when (seq clauses) [[:client "client-code"]
(str " WHERE " (str/join " AND " clauses))))) [:vendor "vendor"]
[:location "location"]])
like-clauses (keep
(fn [[key col]]
(let [v (get opts key)]
(when v
(like-clause col v))))
[[:payment-method "payment-methods"]
[:processor "processors"]
[:category "categories"]])
range-clauses (keep
(fn [[key col op]]
(let [v (get opts key)]
(when v
(str "\"" col "\" " op " " v))))
[[:total-gte "total" ">="]
[:total-lte "total" "<="]])
all-clauses (concat eq-clauses like-clauses range-clauses)]
(when (seq all-clauses)
(str " WHERE " (str/join " AND " all-clauses)))))
(defn get-sales-orders (defn get-sales-orders
([start-date end-date] ([start-date end-date]
(get-sales-orders start-date end-date {})) (get-sales-orders start-date end-date {}))
([start-date end-date opts] ([start-date end-date opts]
(let [q (parquet-query "sales-order" (try
start-date end-date) (let [q (parquet-query "sales-order"
base-sql (:sql q) start-date end-date)
count-sql (:count-sql q) base-sql (:sql q)
sort (get opts :sort "date") count-sql (:count-sql q)
order (get opts :order "DESC") sort (get opts :sort "date")
limit (get opts :limit) order (get opts :order "DESC")
offset (get opts :offset) limit (get opts :limit)
where-str (build-where-clause offset (get opts :offset)
where-str (build-sales-orders-where opts)
full-sql (if where-str
(str base-sql where-str)
base-sql)
result (cond-> full-sql
sort (str " ORDER BY " sort
" " (name order))
limit (str " LIMIT " limit)
offset (str " OFFSET " offset))
full-count (if where-str
(str count-sql where-str)
count-sql)]
{:rows (query-rows result)
:count (or
(int
(query-scalar
full-count)) 0)})
(catch Exception _
{:rows [] :count 0}))))
opts (defn get-sales-orders-summary
[[:client "external_id.client"] ([start-date end-date]
[:vendor "external_id.vendor"] (get-sales-orders-summary start-date end-date {}))
[:location "location"]]) ([start-date end-date opts]
full-sql (if where-str (try
(str base-sql where-str) (let [q (parquet-query "sales-order" start-date end-date)
base-sql) base-sql (:sql q)
result (cond-> full-sql where-str (build-sales-orders-where opts)
sort (str " ORDER BY " sort full-sql (if where-str
" " (name order)) (str base-sql where-str)
limit (str " LIMIT " limit) base-sql)
offset (str " OFFSET " offset)) sum-sql (format "SELECT COALESCE(SUM(total), 0) as total, COALESCE(SUM(tax), 0) as tax FROM (%s) t" full-sql)
full-count (if where-str row (first (query-rows sum-sql))]
(str count-sql where-str) {:total (or (:total row) 0.0)
count-sql)] :tax (or (:tax row) 0.0)})
{:rows (query-rows result) (catch Exception _
:count (or {:total 0.0 :tax 0.0}))))
(int
(query-scalar
full-count)) 0)})))
(defn query-deduped [entity-type start-date end-date] (defn query-deduped [entity-type start-date end-date]
"Query records deduplicated by external-id (latest _seq_no wins)." "Query records deduplicated by external-id (latest _seq_no wins)."

View File

@@ -55,7 +55,7 @@
tname tname
(+ (get b tname 0.0) total))))))) (+ (get b tname 0.0) total)))))))
{} {}
rows))) rows)))
(catch Exception e (catch Exception e
(println "[sales-summaries]" (.getMessage e)) (println "[sales-summaries]" (.getMessage e))
{})))) {}))))