diff --git a/src/clj/auto_ap/jobs/sales_summaries.clj b/src/clj/auto_ap/jobs/sales_summaries.clj index 509b5813..c66e6fe9 100644 --- a/src/clj/auto_ap/jobs/sales_summaries.clj +++ b/src/clj/auto_ap/jobs/sales_summaries.clj @@ -100,7 +100,7 @@ (let [date-str (.toString date)] (when-let [rows (seq (pq/query-deduped "charge" date-str date-str))] (let [client-code (if (map? c) (:client/code c) c) - filtered (filter #(= client-code (:client_code %)) rows)] + filtered (filter #(= client-code (:client-code %)) rows)] (reduce (fn [acc {:keys [processor type-name total]}] (update acc @@ -142,14 +142,6 @@ :ledger-mapped/ledger-side :ledger-side/credit}) refunds)))) -(defn- get-fees [c date] - (when-let [fee (get-fee c date)] - {:db/id (str (java.util.UUID/randomUUID)) - :sales-summary-item/sort-order 2 - :sales-summary-item/category "Fees" - :ledger-mapped/amount fee - :ledger-mapped/ledger-side :ledger-side/debit})) - (defn- get-tax-parquet [c date] (let [client-code (if (map? c) (:client/code c) c) date-str (.toString date) diff --git a/src/clj/auto_ap/migration/cleanup_sales.clj b/src/clj/auto_ap/migration/cleanup_sales.clj index 4b1003be..87346eec 100644 --- a/src/clj/auto_ap/migration/cleanup_sales.clj +++ b/src/clj/auto_ap/migration/cleanup_sales.clj @@ -111,11 +111,12 @@ (.toString (java.time.LocalDate/ofEpochDay d))))) (defn- object-exists? - "Check if an S3 object exists by attempting get-object." + "Check if an S3 object exists via head-object (no download)." [key] (try (s3/get-object {:bucket-name pq/*bucket* - :key key}) + :key key} + {:request-method :head}) true (catch com.amazonaws.services.s3.model.AmazonS3Exception _ false))) @@ -196,7 +197,7 @@ (println "=== safe-cleanup-all" "months:" (count months) "dry-run? =" DRY-RUN?) - (doseq [[_ y m] months] + (doseq [[y m] months] (when-not DRY-RUN? (let [result (verify-month-in-s3? y m) missing (:missing result)] diff --git a/src/clj/auto_ap/migration/sales_to_parquet.clj b/src/clj/auto_ap/migration/sales_to_parquet.clj index d89e1b74..00086faf 100644 --- a/src/clj/auto_ap/migration/sales_to_parquet.clj +++ b/src/clj/auto_ap/migration/sales_to_parquet.clj @@ -137,24 +137,15 @@ (map first) vec))) -(defn- date-seq [start end] - "Seq of YYYY-MM-DD strings between start and end inclusive." - (let [sd (java.time.LocalDate/parse start) - ed (java.time.LocalDate/parse end) - days (int (Math/abs (- (.toEpochDay sd) - (.toEpochDay ed))))] - (for [i (range 0 (inc days))] - (.toString (.plusDays sd i))))) - (defn write-day-by-day ([start-date end-date] (write-day-by-day start-date end-date {})) ([start-date end-date opts] (let [all-dates (set (or (opts :date-set) [])) - date-range (if (empty? all-dates) - (date-seq start-date end-date) - (filter all-dates - (date-seq start-date end-date))) + date-range (if (empty? all-dates) + (p/date-seq start-date end-date) + (filter all-dates + (p/date-seq start-date end-date))) batch-size (or (opts :batch-size) 100)] (doseq [^String day date-range] (println "[migration] processing" day) diff --git a/src/clj/auto_ap/storage/parquet.clj b/src/clj/auto_ap/storage/parquet.clj index 119799f0..121a97e7 100644 --- a/src/clj/auto_ap/storage/parquet.clj +++ b/src/clj/auto_ap/storage/parquet.clj @@ -36,7 +36,7 @@ (.execute stmt "SET memory_limit='2GB'") (.close stmt) (.addShutdownHook (Runtime/getRuntime) - (Thread. #(fn []))) + (Thread. #(when-let [c @db] (.close ^java.sql.Connection c)))) (reset! db conn))) (defn disconnect! [] @@ -251,12 +251,13 @@ (defn date-seq [start end] "Seq of YYYY-MM-DD strings between start and end inclusive." (let [sd (LocalDate/parse start) - ed (LocalDate/parse end) - days (int (Math/abs - (- (.toEpochDay sd) - (.toEpochDay ed))))] - (for [i (range 0 (inc days))] - (.toString (.plusDays sd i))))) + ed (LocalDate/parse end)] + (when (.isAfter sd ed) + (throw (ex-info "date-seq: start must be <= end" {:start start :end end}))) + (let [days (int (- (.toEpochDay ed) + (.toEpochDay sd)))] + (for [i (range 0 (inc days))] + (.toString (.plusDays sd i)))))) (defn today [] (.toString (LocalDate/now))) @@ -269,7 +270,7 @@ (.toString (LocalDate/parse s mm-dd-yyyy)) (if (> (count s) 10) (subs s 0 10) s)))) -(defn- month-seq [start-date end-date] +(defn month-seq [start-date end-date] "Seq of YYYY-MM strings between start-date and end-date inclusive." (let [sd (LocalDate/parse (normalize-date-str start-date)) ed (LocalDate/parse (normalize-date-str end-date))] @@ -414,7 +415,7 @@ (query-rows (str (:sql q) " QUALIFY ROW_NUMBER() OVER" - " (PARTITION BY sales_order.external_id" + " (PARTITION BY \"external-id\"" " ORDER BY _seq_no DESC) = 1")))) (defn query-by-entity-id [entity-type external-id diff --git a/src/clj/auto_ap/storage/sales_summaries.clj b/src/clj/auto_ap/storage/sales_summaries.clj index 23d45a20..d927ad42 100644 --- a/src/clj/auto_ap/storage/sales_summaries.clj +++ b/src/clj/auto_ap/storage/sales_summaries.clj @@ -15,12 +15,12 @@ 0.0))) (defn- pq-files [entity-type start-date end-date] - "Vector of S3 parquet file paths for date range." - (let [dates (p/date-seq start-date end-date)] + "Vector of S3 parquet file paths for date range (monthly partitions)." + (let [months (p/month-seq start-date end-date)] (vec (map #(str "'s3://" p/*bucket* "/sales-details/" entity-type "/" - % ".parquet") dates)))) + % ".parquet") months)))) (defn sum-payments-by-type [client-id start-date end-date] "Return {processor-key -> {type-name-string -> total-double}}." diff --git a/test/clj/auto_ap/storage/perf_test.clj b/test/clj/auto_ap/storage/perf_test.clj index 187c6a59..a1e7e177 100644 --- a/test/clj/auto_ap/storage/perf_test.clj +++ b/test/clj/auto_ap/storage/perf_test.clj @@ -108,5 +108,6 @@ (finally (p/disconnect!)))) -(run-perf-tests) -(println "\n=== Done ===") \ No newline at end of file +(comment + (run-perf-tests) + (println "\n=== Done ===")) \ No newline at end of file