feat(sales): initial Parquet migration infrastructure #3
Reference in New Issue
Block a user
Delete Branch "feat/sales-parquet-migration"
Deleting a branch is permanent. Although the deleted branch may continue to exist for a short time before it actually gets removed, it CANNOT be undone in most cases. Continue?
feat(sales): redirect production and read flows to Parquet/DuckDB
Code Review: feat/sales-parquet-migration
Scope: 15 files, ~2200 lines added (base:
297464c1, HEAD:cc838adf)Reviewers: correctness, testing, maintainability, project-standards, agent-native, learnings
Plan coverage: R1-R5 all addressed across U1-U7. Plan file at
docs/plans/2026-04-24-001-refactor-detailed-sales-to-parquet-plan.md.P0 — Critical (Must fix before merge)
1.
sales_orders_new.cljnamespace collision — may break ALL sales queriessrc/clj/auto_ap/datomic/sales_orders_new.clj:1· correctness, maintainabilityBoth
sales_orders.cljand the newsales_orders_new.cljdeclare(ns auto-ap.datomic.sales-orders)— the same namespace. Depending on classpath load order, one silently shadows the other. The old file is already wired to use parquet (:require [auto-ap.storage.parquet :as pq], see lines 4, 56). The new file is also broken internally:raw-graphql-ids(line 157) threads fromnil→pq/get-sales-ordersbut discards the result, then references unboundresultat line 165:require [clojure.string :as str]despite usingstr/joinat line 52 — won't compileclojure.setandcom.brunobonacci.mulogrequiresget-graphql(line 209) references undefinedid-keysandmatching-countSuggested fix: Delete
sales_orders_new.clj. The existingsales_orders.cljhas been correctly updated to use the parquet read path. If you intended to replace it, rename the new namespace and fix all undefined references before landing.2.
safe-cleanup-alldestructure bug — passes wrong year/month, deletes unverified datasrc/clj/auto_ap/migration/cleanup_sales.clj:199· correctnessThe destructure pattern
[_ y m]against a 2-element[year month]vector discards the year and bindsnilto month. Both S3 verification and delete calls receive garbage arguments.Suggested fix: Change
[_ y m]→[y m].3. SQL injection in WHERE clause construction
src/clj/auto_ap/storage/parquet.clj:236,sales_summaries.clj(all aggregation functions) · correctness, maintainabilityUser-supplied filter values (
:client,:vendor,:location) are concatenated directly into SQL strings:Suggested fix: At minimum, escape single quotes by doubling them:
(str/replace v "'" "''"). Long term, use DuckDB PreparedStatement with parameter binding.4.
query-dedupedgenerates invalid SQLsrc/clj/auto_ap/storage/parquet.clj:279-284· correctnessThe generated SQL has a space between
QUALIFYandROW_NUMBER()breaking the syntax. The partition columnsales_order.external_iddoesn't exist — parquet records useexternal_id. Function always fails at runtime.Suggested fix: Remove leading space, correct column name:
" QUALIFY ROW_NUMBER() OVER (PARTITION BY external_id ORDER BY _seq_no DESC) = 1".P1 — High (Should fix)
5. DuckDB shutdown hook is a no-op
src/clj/auto_ap/storage/parquet.clj:26-27· correctness, maintainability(Thread. #(fn []))creates a thread that returns an unevaluated function and never callsdisconnect!. DuckDB connections persist until JVM crash.Fix:
(Thread. #(.close ^java.sql.Connection @db))6. Query results/statements leak — not closed in
with-opensrc/clj/auto_ap/storage/parquet.clj:53-73· correctness, performanceNeither
query-scalarnorquery-rowsclose theirStatementorResultSetobjects. Under sustained load this exhausts DuckDB internal cursors and file handles.Fix: Wrap in
with-open [stmt (.createStatement conn) rs (.executeQuery stmt sql)] ...7.
flush-to-parquet!clears buffer before S3 upload completessrc/clj/auto_ap/storage/parquet.clj:148· correctness, maintainabilityBuffer is cleared at line 148, then deleted files at 149-150. But
(upload-parquet! ...)runs before the clear. On upload failure the catch throws — but if an unexpected exception occurs between upload success and buffer clear, in-memory records are lost (WAL has them, but they won't re-flush until restart).Fix: Use explicit ordering: verify S3 200 response → confirm file persisted → THEN clear buffer.
8.
get-payment-items-parquetkey mismatch — silently returns empty payment datasrc/clj/auto_ap/jobs/sales_summaries.clj:106· correctnessThe filter looks for
:client_code(underscore), but all write paths (ezcater/core.clj,square/core3.clj,sales_to_parquet.clj) store as:client-code. Payment aggregation silently returns zero across all clients.Fix: Change
:client_code→:client-code.9.
date-seqproduces forward sequence whenstart > endsrc/clj/auto_ap/storage/parquet.clj:203-211· correctness(Math/abs ...)on the epoch diff means reversing arguments still produces a valid-looking sequence going forward from start by |diff| days. Queries against non-existent S3 keys silently return empty results.Fix: Assert
start <= endor reverse iteration direction based on comparison.10. Entity-type array duplicated in 5+ locations
parquet.clj (×2), sales_to_parquet.clj (×2), cleanup_sales.clj (×1) · maintainability, testing
The
["sales-order" "charge" "line-item" "sales-refund"]vector is hardcoded in five places with no single source of truth. Adding or renaming an entity type risks silent data gaps.Fix: Define
ENTITY-TYPESonce inauto-ap.storage.parquetand require it elsewhere.P2 — Moderate (Fix if straightforward)
object-exists?downloads full S3 objects — Uses3/head-objectinstead ofs3/get-object. Downloads entire parquet files just to check existence (~3000+ GET requests per cleanup run).src/clj/auto_ap/migration/cleanup_sales.clj:113-121Perf test runs at namespace load —
(run-perf-tests)at line 111 triggers 100k row generation and S3 upload on everylein test. Wrap indeftestor(comment ...).test/clj/auto_ap/storage/perf_test.clj:111DRY-RUN? mutable var, not thread-safe — Global boolean toggled with
alter-var-root. Concurrent invocations race.src/clj/auto_ap/migration/cleanup_sales.clj:8-12Migration pollutes live app buffers —
sales_to_parquet.cljwrites to the globalp/*buffers*. Running migration alongside live traffic mixes historical with new records.src/clj/auto_ap/migration/sales_to_parquet.clj:160Dead code:
get-feesdefined twice — Private at line 148, public shadowing copy at line 193.src/clj/auto_ap/jobs/sales_summaries.clj:148,193Old Datomic query functions left alongside parquet versions —
get-tax,get-tip,get-sales,get-refunds(Datomic versions at lines 201-273) are dead after migration but still loadable.src/clj/auto_ap/jobs/sales_summaries.clj:201+WAL append not atomic under concurrency — Multiple
buffer!calls to the same entity-type .jsonl file can interleave bytes in multi-threaded server context.src/clj/auto_ap/storage/parquet.clj:105-114P3 — Low (Minor)
18. Test coverage: 5 tests for ~800 lines of new code
test/clj/auto_ap/storage/parquet_test.clj· testingThe existing 5 tests cover only
query-scalar,buffer!,clear-buffer!, anddate-seq. Missing critical paths:load-unflushed!, verify data recovered)sales_summaries.cljfunctions return correct results after imports go through parquet path)sales_orders_new.clj19. Naming inconsistency
buffer-count,total-buf-count,get-unflushed-count— three names for conceptually similar metrics. Unify tobuffer-count(per-type) andtotal-buffer-count(all).src/clj/auto_ap/storage/parquet.clj:120,123,195Summary
Top blocking items: The
sales_orders_new.cljnamespace collision (finding #1) and thesafe-cleanup-alldestructure bug (#2) are the most consequential — they can cause data loss in production before anyone notices. SQL injection (#3) is exploitable if filter values reach GraphQL endpoints without sanitization.Positive notes:
cc838adfacto9153494ed7View command line instructions
Checkout
From your project repository, check out a new branch and test the changes.