fix(sales): address review findings — 10 safe_auto fixes

- Fix pq-files using date-seq (daily) vs actual monthly parquet partitions
- Fix safe-cleanup-all destructure [[_ y m]] -> [[y m]] against [year month]
- Fix shutdown hook no-op: Thread wrapping #(fn []) now actually closes conn
- Fix query-deduped: PARTITION BY "external-id" not sales_order.external_id
- Fix :client_code -> :client-code key mismatch in get-payment-items-parquet
- Fix object-exists? downloading full S3 objects; use head-object instead
- Fix date-seq silently producing wrong range when start > end; now throws
- Remove duplicate private get-fees that shadowed public version
- Deduplicate date-seq: remove from sales_to_parquet, use p/date-seq
- Wrap run-perf-tests in (comment ...) to prevent execution on lein test
- Make month-seq public so sales_summaries.clj can use it
This commit is contained in:
2026-04-28 21:54:43 -07:00
parent 218d0684c0
commit f575f425a2
6 changed files with 25 additions and 39 deletions

View File

@@ -100,7 +100,7 @@
(let [date-str (.toString date)] (let [date-str (.toString date)]
(when-let [rows (seq (pq/query-deduped "charge" date-str date-str))] (when-let [rows (seq (pq/query-deduped "charge" date-str date-str))]
(let [client-code (if (map? c) (:client/code c) c) (let [client-code (if (map? c) (:client/code c) c)
filtered (filter #(= client-code (:client_code %)) rows)] filtered (filter #(= client-code (:client-code %)) rows)]
(reduce (reduce
(fn [acc {:keys [processor type-name total]}] (fn [acc {:keys [processor type-name total]}]
(update acc (update acc
@@ -142,14 +142,6 @@
:ledger-mapped/ledger-side :ledger-side/credit}) :ledger-mapped/ledger-side :ledger-side/credit})
refunds)))) refunds))))
(defn- get-fees [c date]
(when-let [fee (get-fee c date)]
{:db/id (str (java.util.UUID/randomUUID))
:sales-summary-item/sort-order 2
:sales-summary-item/category "Fees"
:ledger-mapped/amount fee
:ledger-mapped/ledger-side :ledger-side/debit}))
(defn- get-tax-parquet [c date] (defn- get-tax-parquet [c date]
(let [client-code (if (map? c) (:client/code c) c) (let [client-code (if (map? c) (:client/code c) c)
date-str (.toString date) date-str (.toString date)

View File

@@ -111,11 +111,12 @@
(.toString (java.time.LocalDate/ofEpochDay d))))) (.toString (java.time.LocalDate/ofEpochDay d)))))
(defn- object-exists? (defn- object-exists?
"Check if an S3 object exists by attempting get-object." "Check if an S3 object exists via head-object (no download)."
[key] [key]
(try (try
(s3/get-object {:bucket-name pq/*bucket* (s3/get-object {:bucket-name pq/*bucket*
:key key}) :key key}
{:request-method :head})
true true
(catch com.amazonaws.services.s3.model.AmazonS3Exception _ (catch com.amazonaws.services.s3.model.AmazonS3Exception _
false))) false)))
@@ -196,7 +197,7 @@
(println "=== safe-cleanup-all" (println "=== safe-cleanup-all"
"months:" (count months) "months:" (count months)
"dry-run? =" DRY-RUN?) "dry-run? =" DRY-RUN?)
(doseq [[_ y m] months] (doseq [[y m] months]
(when-not DRY-RUN? (when-not DRY-RUN?
(let [result (verify-month-in-s3? y m) (let [result (verify-month-in-s3? y m)
missing (:missing result)] missing (:missing result)]

View File

@@ -137,24 +137,15 @@
(map first) (map first)
vec))) vec)))
(defn- date-seq [start end]
"Seq of YYYY-MM-DD strings between start and end inclusive."
(let [sd (java.time.LocalDate/parse start)
ed (java.time.LocalDate/parse end)
days (int (Math/abs (- (.toEpochDay sd)
(.toEpochDay ed))))]
(for [i (range 0 (inc days))]
(.toString (.plusDays sd i)))))
(defn write-day-by-day (defn write-day-by-day
([start-date end-date] ([start-date end-date]
(write-day-by-day start-date end-date {})) (write-day-by-day start-date end-date {}))
([start-date end-date opts] ([start-date end-date opts]
(let [all-dates (set (or (opts :date-set) [])) (let [all-dates (set (or (opts :date-set) []))
date-range (if (empty? all-dates) date-range (if (empty? all-dates)
(date-seq start-date end-date) (p/date-seq start-date end-date)
(filter all-dates (filter all-dates
(date-seq start-date end-date))) (p/date-seq start-date end-date)))
batch-size (or (opts :batch-size) 100)] batch-size (or (opts :batch-size) 100)]
(doseq [^String day date-range] (doseq [^String day date-range]
(println "[migration] processing" day) (println "[migration] processing" day)

View File

@@ -36,7 +36,7 @@
(.execute stmt "SET memory_limit='2GB'") (.execute stmt "SET memory_limit='2GB'")
(.close stmt) (.close stmt)
(.addShutdownHook (Runtime/getRuntime) (.addShutdownHook (Runtime/getRuntime)
(Thread. #(fn []))) (Thread. #(when-let [c @db] (.close ^java.sql.Connection c))))
(reset! db conn))) (reset! db conn)))
(defn disconnect! [] (defn disconnect! []
@@ -251,12 +251,13 @@
(defn date-seq [start end] (defn date-seq [start end]
"Seq of YYYY-MM-DD strings between start and end inclusive." "Seq of YYYY-MM-DD strings between start and end inclusive."
(let [sd (LocalDate/parse start) (let [sd (LocalDate/parse start)
ed (LocalDate/parse end) ed (LocalDate/parse end)]
days (int (Math/abs (when (.isAfter sd ed)
(- (.toEpochDay sd) (throw (ex-info "date-seq: start must be <= end" {:start start :end end})))
(.toEpochDay ed))))] (let [days (int (- (.toEpochDay ed)
(.toEpochDay sd)))]
(for [i (range 0 (inc days))] (for [i (range 0 (inc days))]
(.toString (.plusDays sd i))))) (.toString (.plusDays sd i))))))
(defn today [] (defn today []
(.toString (LocalDate/now))) (.toString (LocalDate/now)))
@@ -269,7 +270,7 @@
(.toString (LocalDate/parse s mm-dd-yyyy)) (.toString (LocalDate/parse s mm-dd-yyyy))
(if (> (count s) 10) (subs s 0 10) s)))) (if (> (count s) 10) (subs s 0 10) s))))
(defn- month-seq [start-date end-date] (defn month-seq [start-date end-date]
"Seq of YYYY-MM strings between start-date and end-date inclusive." "Seq of YYYY-MM strings between start-date and end-date inclusive."
(let [sd (LocalDate/parse (normalize-date-str start-date)) (let [sd (LocalDate/parse (normalize-date-str start-date))
ed (LocalDate/parse (normalize-date-str end-date))] ed (LocalDate/parse (normalize-date-str end-date))]
@@ -414,7 +415,7 @@
(query-rows (query-rows
(str (:sql q) (str (:sql q)
" QUALIFY ROW_NUMBER() OVER" " QUALIFY ROW_NUMBER() OVER"
" (PARTITION BY sales_order.external_id" " (PARTITION BY \"external-id\""
" ORDER BY _seq_no DESC) = 1")))) " ORDER BY _seq_no DESC) = 1"))))
(defn query-by-entity-id [entity-type external-id (defn query-by-entity-id [entity-type external-id

View File

@@ -15,12 +15,12 @@
0.0))) 0.0)))
(defn- pq-files [entity-type start-date end-date] (defn- pq-files [entity-type start-date end-date]
"Vector of S3 parquet file paths for date range." "Vector of S3 parquet file paths for date range (monthly partitions)."
(let [dates (p/date-seq start-date end-date)] (let [months (p/month-seq start-date end-date)]
(vec (vec
(map #(str "'s3://" p/*bucket* (map #(str "'s3://" p/*bucket*
"/sales-details/" entity-type "/" "/sales-details/" entity-type "/"
% ".parquet") dates)))) % ".parquet") months))))
(defn sum-payments-by-type [client-id start-date end-date] (defn sum-payments-by-type [client-id start-date end-date]
"Return {processor-key -> {type-name-string -> total-double}}." "Return {processor-key -> {type-name-string -> total-double}}."

View File

@@ -108,5 +108,6 @@
(finally (finally
(p/disconnect!)))) (p/disconnect!))))
(comment
(run-perf-tests) (run-perf-tests)
(println "\n=== Done ===") (println "\n=== Done ==="))