Compare commits
2 Commits
9153494ed7
...
feat/sales
| Author | SHA1 | Date | |
|---|---|---|---|
| f575f425a2 | |||
| 218d0684c0 |
@@ -80,11 +80,14 @@
|
|||||||
(defn- extract-date-str [v]
|
(defn- extract-date-str [v]
|
||||||
(when v
|
(when v
|
||||||
(cond
|
(cond
|
||||||
(string? v) (if (> (count v) 10) (.substring v 0 10) v)
|
|
||||||
(instance? org.joda.time.DateTime v) (atime/unparse-local v atime/normal-date)
|
(instance? org.joda.time.DateTime v) (atime/unparse-local v atime/normal-date)
|
||||||
(instance? org.joda.time.LocalDate v) (atime/unparse-local v atime/normal-date)
|
(instance? org.joda.time.LocalDate v) (atime/unparse-local v atime/normal-date)
|
||||||
(instance? java.util.Date v) (atime/unparse-local (coerce/to-date-time v) atime/normal-date)
|
(instance? java.util.Date v) (atime/unparse-local (coerce/to-date-time v) atime/normal-date)
|
||||||
(instance? java.time.LocalDate v) (.toString v)
|
(instance? java.time.LocalDate v) (.toString v)
|
||||||
|
(string? v) (if (re-find #"^\d{2}/\d{2}/\d{4}" v)
|
||||||
|
(-> (java.time.LocalDate/parse v (java.time.format.DateTimeFormatter/ofPattern "MM/dd/yyyy"))
|
||||||
|
.toString)
|
||||||
|
(if (> (count v) 10) (.substring v 0 10) v))
|
||||||
:else (str v))))
|
:else (str v))))
|
||||||
|
|
||||||
(defn- get-date [qp k]
|
(defn- get-date [qp k]
|
||||||
@@ -112,20 +115,10 @@
|
|||||||
true (assoc :limit (or (:per-page qp) 25)
|
true (assoc :limit (or (:per-page qp) 25)
|
||||||
:offset (or (:start qp) 0)))))
|
:offset (or (:start qp) 0)))))
|
||||||
|
|
||||||
(defn- last-week-range []
|
|
||||||
(let [today (java.time.LocalDate/now)
|
|
||||||
end (.toString (.minusDays today 1))
|
|
||||||
start (.toString (.minusDays today 8))]
|
|
||||||
[start end]))
|
|
||||||
|
|
||||||
(defn- default-date-range []
|
(defn- default-date-range []
|
||||||
(let [[s e] (last-week-range)
|
(let [today (.toString (java.time.LocalDate/now))
|
||||||
result (try (pq/get-sales-orders-summary s e) (catch Exception _ nil))]
|
week-ago (.toString (.minusDays (java.time.LocalDate/now) 7))]
|
||||||
(if (and result (> (:total result) 0))
|
[week-ago today]))
|
||||||
[s e]
|
|
||||||
(let [yesterday (.toString (.minusDays (java.time.LocalDate/of 2024 4 24) 1))
|
|
||||||
week-before (.toString (.minusDays (java.time.LocalDate/of 2024 4 24) 8))]
|
|
||||||
[week-before yesterday]))))
|
|
||||||
|
|
||||||
(defn- qp->date-range [qp]
|
(defn- qp->date-range [qp]
|
||||||
(let [[default-start default-end] (default-date-range)]
|
(let [[default-start default-end] (default-date-range)]
|
||||||
@@ -136,6 +129,11 @@
|
|||||||
(extract-date-str (get-in qp [:date-range :end]))
|
(extract-date-str (get-in qp [:date-range :end]))
|
||||||
default-end)]))
|
default-end)]))
|
||||||
|
|
||||||
|
(defn- request->client-codes [request]
|
||||||
|
(let [clients (:clients request)
|
||||||
|
codes (keep :client/code clients)]
|
||||||
|
(when (seq codes) codes)))
|
||||||
|
|
||||||
(defn fetch-page-ssr
|
(defn fetch-page-ssr
|
||||||
"Fetch sales orders from parquet for the SSR page."
|
"Fetch sales orders from parquet for the SSR page."
|
||||||
[request]
|
[request]
|
||||||
@@ -145,6 +143,8 @@
|
|||||||
(->> (into {} (remove (fn [[_ v]] (str/blank? v))))))
|
(->> (into {} (remove (fn [[_ v]] (str/blank? v))))))
|
||||||
[start end] (qp->date-range (merge raw-qp qp))
|
[start end] (qp->date-range (merge raw-qp qp))
|
||||||
opts (qp->opts qp)
|
opts (qp->opts qp)
|
||||||
|
client-codes (request->client-codes request)
|
||||||
|
opts (if client-codes (assoc opts :client-codes client-codes) opts)
|
||||||
result (pq/get-sales-orders start end opts)
|
result (pq/get-sales-orders start end opts)
|
||||||
rows (mapv <-row (:rows result))]
|
rows (mapv <-row (:rows result))]
|
||||||
{:rows rows :count (:count result)}))
|
{:rows rows :count (:count result)}))
|
||||||
@@ -157,7 +157,9 @@
|
|||||||
ring-codec/form-decode
|
ring-codec/form-decode
|
||||||
(->> (into {} (remove (fn [[_ v]] (str/blank? v))))))
|
(->> (into {} (remove (fn [[_ v]] (str/blank? v))))))
|
||||||
[start end] (qp->date-range (merge raw-qp qp))
|
[start end] (qp->date-range (merge raw-qp qp))
|
||||||
opts (dissoc (qp->opts qp) :limit :offset :sort :order)]
|
opts (dissoc (qp->opts qp) :limit :offset :sort :order)
|
||||||
|
client-codes (request->client-codes request)
|
||||||
|
opts (if client-codes (assoc opts :client-codes client-codes) opts)]
|
||||||
(pq/get-sales-orders-summary start end opts)))
|
(pq/get-sales-orders-summary start end opts)))
|
||||||
|
|
||||||
(defn summarize-orders [rows]
|
(defn summarize-orders [rows]
|
||||||
|
|||||||
@@ -100,7 +100,7 @@
|
|||||||
(let [date-str (.toString date)]
|
(let [date-str (.toString date)]
|
||||||
(when-let [rows (seq (pq/query-deduped "charge" date-str date-str))]
|
(when-let [rows (seq (pq/query-deduped "charge" date-str date-str))]
|
||||||
(let [client-code (if (map? c) (:client/code c) c)
|
(let [client-code (if (map? c) (:client/code c) c)
|
||||||
filtered (filter #(= client-code (:client_code %)) rows)]
|
filtered (filter #(= client-code (:client-code %)) rows)]
|
||||||
(reduce
|
(reduce
|
||||||
(fn [acc {:keys [processor type-name total]}]
|
(fn [acc {:keys [processor type-name total]}]
|
||||||
(update acc
|
(update acc
|
||||||
@@ -142,14 +142,6 @@
|
|||||||
:ledger-mapped/ledger-side :ledger-side/credit})
|
:ledger-mapped/ledger-side :ledger-side/credit})
|
||||||
refunds))))
|
refunds))))
|
||||||
|
|
||||||
(defn- get-fees [c date]
|
|
||||||
(when-let [fee (get-fee c date)]
|
|
||||||
{:db/id (str (java.util.UUID/randomUUID))
|
|
||||||
:sales-summary-item/sort-order 2
|
|
||||||
:sales-summary-item/category "Fees"
|
|
||||||
:ledger-mapped/amount fee
|
|
||||||
:ledger-mapped/ledger-side :ledger-side/debit}))
|
|
||||||
|
|
||||||
(defn- get-tax-parquet [c date]
|
(defn- get-tax-parquet [c date]
|
||||||
(let [client-code (if (map? c) (:client/code c) c)
|
(let [client-code (if (map? c) (:client/code c) c)
|
||||||
date-str (.toString date)
|
date-str (.toString date)
|
||||||
|
|||||||
@@ -111,11 +111,12 @@
|
|||||||
(.toString (java.time.LocalDate/ofEpochDay d)))))
|
(.toString (java.time.LocalDate/ofEpochDay d)))))
|
||||||
|
|
||||||
(defn- object-exists?
|
(defn- object-exists?
|
||||||
"Check if an S3 object exists by attempting get-object."
|
"Check if an S3 object exists via head-object (no download)."
|
||||||
[key]
|
[key]
|
||||||
(try
|
(try
|
||||||
(s3/get-object {:bucket-name pq/*bucket*
|
(s3/get-object {:bucket-name pq/*bucket*
|
||||||
:key key})
|
:key key}
|
||||||
|
{:request-method :head})
|
||||||
true
|
true
|
||||||
(catch com.amazonaws.services.s3.model.AmazonS3Exception _
|
(catch com.amazonaws.services.s3.model.AmazonS3Exception _
|
||||||
false)))
|
false)))
|
||||||
@@ -196,7 +197,7 @@
|
|||||||
(println "=== safe-cleanup-all"
|
(println "=== safe-cleanup-all"
|
||||||
"months:" (count months)
|
"months:" (count months)
|
||||||
"dry-run? =" DRY-RUN?)
|
"dry-run? =" DRY-RUN?)
|
||||||
(doseq [[_ y m] months]
|
(doseq [[y m] months]
|
||||||
(when-not DRY-RUN?
|
(when-not DRY-RUN?
|
||||||
(let [result (verify-month-in-s3? y m)
|
(let [result (verify-month-in-s3? y m)
|
||||||
missing (:missing result)]
|
missing (:missing result)]
|
||||||
|
|||||||
@@ -137,24 +137,15 @@
|
|||||||
(map first)
|
(map first)
|
||||||
vec)))
|
vec)))
|
||||||
|
|
||||||
(defn- date-seq [start end]
|
|
||||||
"Seq of YYYY-MM-DD strings between start and end inclusive."
|
|
||||||
(let [sd (java.time.LocalDate/parse start)
|
|
||||||
ed (java.time.LocalDate/parse end)
|
|
||||||
days (int (Math/abs (- (.toEpochDay sd)
|
|
||||||
(.toEpochDay ed))))]
|
|
||||||
(for [i (range 0 (inc days))]
|
|
||||||
(.toString (.plusDays sd i)))))
|
|
||||||
|
|
||||||
(defn write-day-by-day
|
(defn write-day-by-day
|
||||||
([start-date end-date]
|
([start-date end-date]
|
||||||
(write-day-by-day start-date end-date {}))
|
(write-day-by-day start-date end-date {}))
|
||||||
([start-date end-date opts]
|
([start-date end-date opts]
|
||||||
(let [all-dates (set (or (opts :date-set) []))
|
(let [all-dates (set (or (opts :date-set) []))
|
||||||
date-range (if (empty? all-dates)
|
date-range (if (empty? all-dates)
|
||||||
(date-seq start-date end-date)
|
(p/date-seq start-date end-date)
|
||||||
(filter all-dates
|
(filter all-dates
|
||||||
(date-seq start-date end-date)))
|
(p/date-seq start-date end-date)))
|
||||||
batch-size (or (opts :batch-size) 100)]
|
batch-size (or (opts :batch-size) 100)]
|
||||||
(doseq [^String day date-range]
|
(doseq [^String day date-range]
|
||||||
(println "[migration] processing" day)
|
(println "[migration] processing" day)
|
||||||
|
|||||||
@@ -3,7 +3,9 @@
|
|||||||
[amazonica.aws.s3 :as s3]
|
[amazonica.aws.s3 :as s3]
|
||||||
[clojure.java.io :as io]
|
[clojure.java.io :as io]
|
||||||
[clojure.string :as str]
|
[clojure.string :as str]
|
||||||
[clojure.data.json :as json])
|
[clojure.data.json :as json]
|
||||||
|
[clojure.core.cache :as cache]
|
||||||
|
[com.brunobonacci.mulog :as mu])
|
||||||
(:import (java.sql DriverManager)
|
(:import (java.sql DriverManager)
|
||||||
(java.time LocalDate)))
|
(java.time LocalDate)))
|
||||||
|
|
||||||
@@ -14,7 +16,10 @@
|
|||||||
(str "s3://" *bucket* "/" filename))
|
(str "s3://" *bucket* "/" filename))
|
||||||
|
|
||||||
(defn parquet-key [entity-type date-str]
|
(defn parquet-key [entity-type date-str]
|
||||||
(str parquet-prefix "/" entity-type "/" date-str ".parquet"))
|
(let [month-str (if (= (count date-str) 10)
|
||||||
|
(subs date-str 0 7)
|
||||||
|
date-str)]
|
||||||
|
(str parquet-prefix "/" entity-type "/" month-str ".parquet")))
|
||||||
|
|
||||||
(def db (atom nil))
|
(def db (atom nil))
|
||||||
|
|
||||||
@@ -26,9 +31,12 @@
|
|||||||
(.execute stmt (str "SET s3_access_key_id='" key "'"))
|
(.execute stmt (str "SET s3_access_key_id='" key "'"))
|
||||||
(.execute stmt (str "SET s3_secret_access_key='" (:aws-secret-access-key env) "'"))
|
(.execute stmt (str "SET s3_secret_access_key='" (:aws-secret-access-key env) "'"))
|
||||||
(.execute stmt (str "SET s3_region='" (or (:aws-region env) "us-east-1") "'")))
|
(.execute stmt (str "SET s3_region='" (or (:aws-region env) "us-east-1") "'")))
|
||||||
|
(.execute stmt "PRAGMA enable_object_cache")
|
||||||
|
(.execute stmt "SET temp_directory='/tmp/duckdb-temp'")
|
||||||
|
(.execute stmt "SET memory_limit='2GB'")
|
||||||
(.close stmt)
|
(.close stmt)
|
||||||
(.addShutdownHook (Runtime/getRuntime)
|
(.addShutdownHook (Runtime/getRuntime)
|
||||||
(Thread. #(fn [])))
|
(Thread. #(when-let [c @db] (.close ^java.sql.Connection c))))
|
||||||
(reset! db conn)))
|
(reset! db conn)))
|
||||||
|
|
||||||
(defn disconnect! []
|
(defn disconnect! []
|
||||||
@@ -53,34 +61,52 @@
|
|||||||
(.execute stmt sql)
|
(.execute stmt sql)
|
||||||
nil)))
|
nil)))
|
||||||
|
|
||||||
|
(defn- sql-snippet [sql] (subs sql 0 (min (count sql) 500)))
|
||||||
|
|
||||||
(defn query-scalar [sql]
|
(defn query-scalar [sql]
|
||||||
(with-duckdb
|
(mu/trace ::query-scalar
|
||||||
(let [stmt (.createStatement conn)
|
[:sql (sql-snippet sql)]
|
||||||
rs (.executeQuery stmt sql)]
|
(with-duckdb
|
||||||
(when (.next rs)
|
(let [stmt (.createStatement conn)
|
||||||
(.getObject rs 1)))))
|
rs (.executeQuery stmt sql)]
|
||||||
|
(when (.next rs)
|
||||||
|
(.getObject rs 1))))))
|
||||||
|
|
||||||
|
(def ^:private count-cache
|
||||||
|
(atom (-> (cache/ttl-cache-factory {} :ttl 1800000)
|
||||||
|
(cache/lru-cache-factory :threshold 256))))
|
||||||
|
|
||||||
|
(defn- cached-count [sql]
|
||||||
|
(if-let [v (find @count-cache sql)]
|
||||||
|
(do (mu/log ::count-cache :hit true :sql (sql-snippet sql)) (val v))
|
||||||
|
(do (mu/log ::count-cache :hit false :sql (sql-snippet sql))
|
||||||
|
(let [result (query-scalar sql)]
|
||||||
|
(swap! count-cache assoc sql result)
|
||||||
|
result))))
|
||||||
|
|
||||||
(defn query-rows [sql]
|
(defn query-rows [sql]
|
||||||
(with-duckdb
|
(mu/trace ::query-rows
|
||||||
(let [stmt (.createStatement conn)
|
[:sql (sql-snippet sql)]
|
||||||
rs (.executeQuery stmt sql)
|
(with-duckdb
|
||||||
meta (.getMetaData rs)
|
(let [stmt (.createStatement conn)
|
||||||
col-count (.getColumnCount meta)
|
rs (.executeQuery stmt sql)
|
||||||
cols (vec (for [i (range 1 (inc col-count))]
|
meta (.getMetaData rs)
|
||||||
(keyword (.getColumnLabel meta i))))]
|
col-count (.getColumnCount meta)
|
||||||
(loop [rows []]
|
cols (vec (for [i (range 1 (inc col-count))]
|
||||||
(if (.next rs)
|
(keyword (.getColumnLabel meta i))))]
|
||||||
(recur (conj rows
|
(loop [rows []]
|
||||||
(zipmap cols
|
(if (.next rs)
|
||||||
(vec (for [i (range 1 (inc col-count))]
|
(recur (conj rows
|
||||||
(.getObject rs i))))))
|
(zipmap cols
|
||||||
rows)))))
|
(vec (for [i (range 1 (inc col-count))]
|
||||||
|
(.getObject rs i))))))
|
||||||
|
rows))))))
|
||||||
|
|
||||||
(defn execute-to-parquet! [sql ^String parquet-path]
|
(defn execute-to-parquet! [sql ^String parquet-path]
|
||||||
(with-duckdb
|
(with-duckdb
|
||||||
(let [stmt (.createStatement conn)]
|
(let [stmt (.createStatement conn)]
|
||||||
(.execute stmt
|
(.execute stmt
|
||||||
(format "COPY (%s) TO '%s' (FORMAT PARQUET, OVERWRITE_OR_IGNORE)"
|
(format "COPY (%s) TO '%s' (FORMAT PARQUET, OVERWRITE_OR_IGNORE, ROW_GROUP_SIZE 10000, COMPRESSION 'zstd')"
|
||||||
sql parquet-path))
|
sql parquet-path))
|
||||||
(io/file parquet-path))))
|
(io/file parquet-path))))
|
||||||
|
|
||||||
@@ -129,31 +155,46 @@
|
|||||||
vals (mapcat identity) count))
|
vals (mapcat identity) count))
|
||||||
|
|
||||||
(defn flush-to-parquet! [entity-type date-str]
|
(defn flush-to-parquet! [entity-type date-str]
|
||||||
"Flush buffered records for entity-type to parquet + S3."
|
"Flush buffered records for entity-type to monthly parquet + S3.
|
||||||
|
Reads existing monthly file (if any), merges with new records, and uploads.
|
||||||
|
Uses temp table to ensure ROW_GROUP_SIZE is respected (DuckDB ignores it
|
||||||
|
when reading directly from S3 via COPY)."
|
||||||
(let [records (get @*buffers* entity-type [])]
|
(let [records (get @*buffers* entity-type [])]
|
||||||
(if (empty? records)
|
(if (empty? records)
|
||||||
{:status :no-records}
|
{:status :no-records}
|
||||||
(let [date-str (or date-str (.toString (LocalDate/now)))
|
(let [date-str (or date-str (.toString (LocalDate/now)))
|
||||||
|
s3-key (parquet-key entity-type date-str)
|
||||||
|
s3-url (s3-location s3-key)
|
||||||
jsonl-file (io/file "/tmp"
|
jsonl-file (io/file "/tmp"
|
||||||
(str entity-type "-" date-str ".jsonl"))
|
(str entity-type "-" date-str ".jsonl"))
|
||||||
parquet-file (io/file "/tmp"
|
parquet-file (io/file "/tmp"
|
||||||
(str entity-type "-" date-str ".parquet"))
|
(str entity-type "-" date-str ".parquet"))
|
||||||
s3-key (parquet-key entity-type date-str)]
|
tbl (format "\"_flush_%s_%s\""
|
||||||
|
(clojure.string/replace entity-type "-" "_")
|
||||||
|
(subs date-str 0 7))]
|
||||||
(try
|
(try
|
||||||
(with-open [w (io/writer jsonl-file :append true)]
|
(with-open [w (io/writer jsonl-file :append true)]
|
||||||
(doseq [r records]
|
(doseq [r records]
|
||||||
(.write w ^String (json/write-str (dissoc r :_seq-no)))
|
(.write w ^String (json/write-str (dissoc r :_seq-no)))
|
||||||
(.write w (int \newline))))
|
(.write w (int \newline))))
|
||||||
(execute-to-parquet!
|
(let [existing-sql (format
|
||||||
(format "SELECT * FROM read_json_auto('%s')"
|
"SELECT * FROM read_parquet('%s', union_by_name=true)"
|
||||||
(.getAbsolutePath jsonl-file))
|
s3-url)
|
||||||
(.getAbsolutePath parquet-file))
|
new-sql (format
|
||||||
(upload-parquet! parquet-file s3-key)
|
"SELECT * FROM read_json_auto('%s')"
|
||||||
(clear-buffer! entity-type)
|
(.getAbsolutePath jsonl-file))]
|
||||||
(.delete ^java.io.File jsonl-file)
|
(execute! (format "CREATE OR REPLACE TABLE %s AS SELECT * FROM (%s UNION ALL %s) ORDER BY \"client-code\", date"
|
||||||
(.delete ^java.io.File parquet-file)
|
tbl existing-sql new-sql))
|
||||||
{:key s3-key :status :ok}
|
(execute! (format "COPY (SELECT * FROM %s) TO '%s' (FORMAT PARQUET, OVERWRITE_OR_IGNORE, ROW_GROUP_SIZE 10000, COMPRESSION 'zstd')"
|
||||||
|
tbl (.getAbsolutePath parquet-file)))
|
||||||
|
(execute! (format "DROP TABLE IF EXISTS %s" tbl))
|
||||||
|
(upload-parquet! parquet-file s3-key)
|
||||||
|
(clear-buffer! entity-type)
|
||||||
|
(.delete ^java.io.File jsonl-file)
|
||||||
|
(.delete ^java.io.File parquet-file)
|
||||||
|
{:key s3-key :status :ok})
|
||||||
(catch Exception e
|
(catch Exception e
|
||||||
|
(execute! (format "DROP TABLE IF EXISTS %s" tbl))
|
||||||
(throw (ex-info "Flush failed"
|
(throw (ex-info "Flush failed"
|
||||||
{:entity-type entity-type
|
{:entity-type entity-type
|
||||||
:error (.getMessage e)}))))))))
|
:error (.getMessage e)}))))))))
|
||||||
@@ -191,12 +232,12 @@
|
|||||||
{}
|
{}
|
||||||
(into {}
|
(into {}
|
||||||
(keep (fn [et]
|
(keep (fn [et]
|
||||||
(let [f (io/file
|
(let [f (io/file
|
||||||
(wal-dir)
|
(wal-dir)
|
||||||
(str et ".jsonl"))]
|
(str et ".jsonl"))]
|
||||||
(when (.exists f)
|
(when (.exists f)
|
||||||
[et (slurp f)])))
|
[et (slurp f)]))))
|
||||||
etypes)))]
|
etypes))]
|
||||||
(swap! *buffers* merge loaded)))
|
(swap! *buffers* merge loaded)))
|
||||||
|
|
||||||
(defn get-unflushed-count []
|
(defn get-unflushed-count []
|
||||||
@@ -210,61 +251,55 @@
|
|||||||
(defn date-seq [start end]
|
(defn date-seq [start end]
|
||||||
"Seq of YYYY-MM-DD strings between start and end inclusive."
|
"Seq of YYYY-MM-DD strings between start and end inclusive."
|
||||||
(let [sd (LocalDate/parse start)
|
(let [sd (LocalDate/parse start)
|
||||||
ed (LocalDate/parse end)
|
ed (LocalDate/parse end)]
|
||||||
days (int (Math/abs
|
(when (.isAfter sd ed)
|
||||||
(- (.toEpochDay sd)
|
(throw (ex-info "date-seq: start must be <= end" {:start start :end end})))
|
||||||
(.toEpochDay ed))))]
|
(let [days (int (- (.toEpochDay ed)
|
||||||
(for [i (range 0 (inc days))]
|
(.toEpochDay sd)))]
|
||||||
(.toString (.plusDays sd i)))))
|
(for [i (range 0 (inc days))]
|
||||||
|
(.toString (.plusDays sd i))))))
|
||||||
|
|
||||||
(defn today []
|
(defn today []
|
||||||
(.toString (LocalDate/now)))
|
(.toString (LocalDate/now)))
|
||||||
|
|
||||||
|
(def ^:private mm-dd-yyyy (java.time.format.DateTimeFormatter/ofPattern "MM/dd/yyyy"))
|
||||||
|
|
||||||
|
(defn- normalize-date-str [s]
|
||||||
|
(when s
|
||||||
|
(if (re-find #"^\d{2}/\d{2}/\d{4}" s)
|
||||||
|
(.toString (LocalDate/parse s mm-dd-yyyy))
|
||||||
|
(if (> (count s) 10) (subs s 0 10) s))))
|
||||||
|
|
||||||
|
(defn month-seq [start-date end-date]
|
||||||
|
"Seq of YYYY-MM strings between start-date and end-date inclusive."
|
||||||
|
(let [sd (LocalDate/parse (normalize-date-str start-date))
|
||||||
|
ed (LocalDate/parse (normalize-date-str end-date))]
|
||||||
|
(loop [months [] cur sd]
|
||||||
|
(if (.isAfter cur ed)
|
||||||
|
months
|
||||||
|
(recur (conj months (.toString (.withDayOfMonth cur 1)))
|
||||||
|
(.plusMonths cur 1))))))
|
||||||
|
|
||||||
(defn- parquet-glob [entity-type start-date end-date]
|
(defn- parquet-glob [entity-type start-date end-date]
|
||||||
"Build a glob pattern or explicit file list for the date range.
|
"Build explicit file list for the date range using monthly partitions.
|
||||||
Uses glob patterns for ranges > 60 days; explicit list otherwise."
|
Monthly files mean only 1-3 files for typical queries, 12 for a full year."
|
||||||
(let [days (-> (LocalDate/parse end-date)
|
(let [prefix (format "s3://%s/sales-details/%s/" *bucket* entity-type)]
|
||||||
(.toEpochDay)
|
(vec
|
||||||
(- (.toEpochDay (LocalDate/parse start-date)))
|
(map (fn [m]
|
||||||
inc)]
|
(format "'%s%s.parquet'" prefix m))
|
||||||
(if (> days 60)
|
(month-seq start-date end-date)))))
|
||||||
(let [prefix (format "s3://%s/sales-details/%s/" *bucket* entity-type)
|
|
||||||
sy (-> (LocalDate/parse start-date) .getYear)
|
|
||||||
ey (-> (LocalDate/parse end-date) .getYear)]
|
|
||||||
(if (= sy ey)
|
|
||||||
[(format "%s%d-*.parquet" prefix sy)]
|
|
||||||
(vec
|
|
||||||
(for [y (range sy (inc ey))]
|
|
||||||
(format "%s%d-*.parquet" prefix y)))))
|
|
||||||
(vec
|
|
||||||
(map (fn [d]
|
|
||||||
(format "'s3://%s/sales-details/%s/%s.parquet'"
|
|
||||||
*bucket* entity-type d))
|
|
||||||
(date-seq start-date end-date))))))
|
|
||||||
|
|
||||||
(defn parquet-query [entity-type start-date end-date]
|
(defn parquet-query [entity-type start-date end-date]
|
||||||
"Build SQL to read all parquet files in date range.
|
"Build SQL to read monthly parquet files in date range.
|
||||||
Returns map with :sql and :count-sql keys."
|
Uses explicit file list (monthly = few files) + WHERE date filter.
|
||||||
(let [globs (parquet-glob entity-type start-date end-date)
|
Normalizes date formats (handles MM/dd/yyyy from UI)."
|
||||||
use-glob? (some #(.endsWith ^String % "*.parquet") globs)
|
(let [sd (normalize-date-str start-date)
|
||||||
base (if use-glob?
|
ed (normalize-date-str end-date)
|
||||||
(format "SELECT * FROM read_parquet(%s, union_by_name=true)"
|
files (parquet-glob entity-type sd ed)
|
||||||
(if (= (count globs) 1)
|
base (format "SELECT * FROM read_parquet([%s], union_by_name=true)"
|
||||||
(format "'%s'" (first globs))
|
(str/join ", " files))
|
||||||
(format "[%s]"
|
sql (format "%s WHERE date >= '%s' AND date <= '%s'"
|
||||||
(str/join ", " (map #(format "'%s'" %) globs)))))
|
base sd ed)]
|
||||||
(format "SELECT * FROM read_parquet([%s])"
|
|
||||||
(str/join ", " globs)))
|
|
||||||
add-date-filter (fn [sql]
|
|
||||||
(if (> (-> (LocalDate/parse end-date)
|
|
||||||
(.toEpochDay)
|
|
||||||
(- (.toEpochDay (LocalDate/parse start-date)))
|
|
||||||
inc)
|
|
||||||
60)
|
|
||||||
(format "%s WHERE date >= '%s' AND date <= '%s'"
|
|
||||||
sql start-date end-date)
|
|
||||||
sql))
|
|
||||||
sql (add-date-filter base)]
|
|
||||||
{:sql sql
|
{:sql sql
|
||||||
:count-sql (format "SELECT COUNT(*) FROM (%s) t" sql)}))
|
:count-sql (format "SELECT COUNT(*) FROM (%s) t" sql)}))
|
||||||
|
|
||||||
@@ -280,6 +315,14 @@
|
|||||||
[[:client "client-code"]
|
[[:client "client-code"]
|
||||||
[:vendor "vendor"]
|
[:vendor "vendor"]
|
||||||
[:location "location"]])
|
[:location "location"]])
|
||||||
|
in-clauses (keep
|
||||||
|
(fn [[key col]]
|
||||||
|
(let [vs (get opts key)]
|
||||||
|
(when (seq vs)
|
||||||
|
(str "\"" col "\" IN ("
|
||||||
|
(str/join ", " (map #(str "'" % "'") vs))
|
||||||
|
")"))))
|
||||||
|
[[:client-codes "client-code"]])
|
||||||
like-clauses (keep
|
like-clauses (keep
|
||||||
(fn [[key col]]
|
(fn [[key col]]
|
||||||
(let [v (get opts key)]
|
(let [v (get opts key)]
|
||||||
@@ -295,60 +338,76 @@
|
|||||||
(str "\"" col "\" " op " " v))))
|
(str "\"" col "\" " op " " v))))
|
||||||
[[:total-gte "total" ">="]
|
[[:total-gte "total" ">="]
|
||||||
[:total-lte "total" "<="]])
|
[:total-lte "total" "<="]])
|
||||||
all-clauses (concat eq-clauses like-clauses range-clauses)]
|
all-clauses (concat eq-clauses in-clauses like-clauses range-clauses)]
|
||||||
(when (seq all-clauses)
|
(when (seq all-clauses)
|
||||||
(str " WHERE " (str/join " AND " all-clauses)))))
|
(str/join " AND " all-clauses))))
|
||||||
|
|
||||||
(defn get-sales-orders
|
(defn get-sales-orders
|
||||||
([start-date end-date]
|
([start-date end-date]
|
||||||
(get-sales-orders start-date end-date {}))
|
(get-sales-orders start-date end-date {}))
|
||||||
([start-date end-date opts]
|
([start-date end-date opts]
|
||||||
(try
|
(mu/trace ::get-sales-orders
|
||||||
(let [q (parquet-query "sales-order"
|
[:start-date start-date :end-date end-date :opts opts]
|
||||||
start-date end-date)
|
(try
|
||||||
base-sql (:sql q)
|
(let [q (parquet-query "sales-order" start-date end-date)
|
||||||
count-sql (:count-sql q)
|
base-sql (:sql q)
|
||||||
sort (get opts :sort "date")
|
has-where? (str/includes? base-sql " WHERE ")
|
||||||
order (get opts :order "DESC")
|
sort (get opts :sort "date")
|
||||||
limit (get opts :limit)
|
order (get opts :order "DESC")
|
||||||
offset (get opts :offset)
|
limit (get opts :limit)
|
||||||
where-str (build-sales-orders-where opts)
|
offset (get opts :offset)
|
||||||
full-sql (if where-str
|
extra-clauses (build-sales-orders-where opts)
|
||||||
(str base-sql where-str)
|
full-sql (if extra-clauses
|
||||||
base-sql)
|
(str base-sql (if has-where? " AND " " WHERE ") extra-clauses)
|
||||||
result (cond-> full-sql
|
base-sql)
|
||||||
sort (str " ORDER BY " sort
|
data-sql (cond-> full-sql
|
||||||
" " (name order))
|
sort (str " ORDER BY " sort " " (name order))
|
||||||
limit (str " LIMIT " limit)
|
limit (str " LIMIT " limit)
|
||||||
offset (str " OFFSET " offset))
|
offset (str " OFFSET " offset))
|
||||||
full-count (if where-str
|
count-sql (format "SELECT COUNT(*) FROM (%s) t" full-sql)]
|
||||||
(str count-sql where-str)
|
(mu/log ::get-sales-orders :data-sql data-sql :count-sql count-sql)
|
||||||
count-sql)]
|
(let [cnt (cached-count count-sql)
|
||||||
{:rows (query-rows result)
|
rows (query-rows data-sql)]
|
||||||
:count (or
|
{:rows rows
|
||||||
(int
|
:count (or (int cnt) 0)}))
|
||||||
(query-scalar
|
(catch Exception e
|
||||||
full-count)) 0)})
|
(mu/log ::get-sales-orders :error e :start-date start-date :end-date end-date :opts opts)
|
||||||
(catch Exception _
|
(throw e))))))
|
||||||
{:rows [] :count 0}))))
|
|
||||||
|
(def ^:private summary-cache
|
||||||
|
(atom (-> (cache/ttl-cache-factory {} :ttl 1800000)
|
||||||
|
(cache/lru-cache-factory :threshold 256))))
|
||||||
|
|
||||||
|
(defn- cached-summary [sql]
|
||||||
|
(if-let [v (find @summary-cache sql)]
|
||||||
|
(do (mu/log ::summary-cache :hit true :sql (sql-snippet sql)) v)
|
||||||
|
(do (mu/log ::summary-cache :hit false :sql (sql-snippet sql))
|
||||||
|
(let [result (let [row (first (query-rows sql))]
|
||||||
|
{:total (or (:total row) 0.0)
|
||||||
|
:tax (or (:tax row) 0.0)})]
|
||||||
|
(swap! summary-cache assoc sql result)
|
||||||
|
result))))
|
||||||
|
|
||||||
(defn get-sales-orders-summary
|
(defn get-sales-orders-summary
|
||||||
([start-date end-date]
|
([start-date end-date]
|
||||||
(get-sales-orders-summary start-date end-date {}))
|
(get-sales-orders-summary start-date end-date {}))
|
||||||
([start-date end-date opts]
|
([start-date end-date opts]
|
||||||
(try
|
(mu/trace ::get-sales-orders-summary
|
||||||
(let [q (parquet-query "sales-order" start-date end-date)
|
[:start-date start-date :end-date end-date :opts opts]
|
||||||
base-sql (:sql q)
|
(try
|
||||||
where-str (build-sales-orders-where opts)
|
(let [q (parquet-query "sales-order" start-date end-date)
|
||||||
full-sql (if where-str
|
base-sql (:sql q)
|
||||||
(str base-sql where-str)
|
has-where? (str/includes? base-sql " WHERE ")
|
||||||
base-sql)
|
extra-clauses (build-sales-orders-where opts)
|
||||||
sum-sql (format "SELECT COALESCE(SUM(total), 0) as total, COALESCE(SUM(tax), 0) as tax FROM (%s) t" full-sql)
|
full-sql (if extra-clauses
|
||||||
row (first (query-rows sum-sql))]
|
(str base-sql (if has-where? " AND " " WHERE ") extra-clauses)
|
||||||
{:total (or (:total row) 0.0)
|
base-sql)
|
||||||
:tax (or (:tax row) 0.0)})
|
sum-sql (format "SELECT COALESCE(SUM(total), 0) as total, COALESCE(SUM(tax), 0) as tax FROM (%s) t" full-sql)]
|
||||||
(catch Exception _
|
(mu/log ::get-sales-orders-summary :sum-sql sum-sql)
|
||||||
{:total 0.0 :tax 0.0}))))
|
(cached-summary sum-sql))
|
||||||
|
(catch Exception e
|
||||||
|
(mu/log ::get-sales-orders-summary :error e :start-date start-date :end-date end-date :opts opts)
|
||||||
|
(throw e))))))
|
||||||
|
|
||||||
(defn query-deduped [entity-type start-date end-date]
|
(defn query-deduped [entity-type start-date end-date]
|
||||||
"Query records deduplicated by external-id (latest _seq_no wins)."
|
"Query records deduplicated by external-id (latest _seq_no wins)."
|
||||||
@@ -356,7 +415,7 @@
|
|||||||
(query-rows
|
(query-rows
|
||||||
(str (:sql q)
|
(str (:sql q)
|
||||||
" QUALIFY ROW_NUMBER() OVER"
|
" QUALIFY ROW_NUMBER() OVER"
|
||||||
" (PARTITION BY sales_order.external_id"
|
" (PARTITION BY \"external-id\""
|
||||||
" ORDER BY _seq_no DESC) = 1"))))
|
" ORDER BY _seq_no DESC) = 1"))))
|
||||||
|
|
||||||
(defn query-by-entity-id [entity-type external-id
|
(defn query-by-entity-id [entity-type external-id
|
||||||
|
|||||||
@@ -15,12 +15,12 @@
|
|||||||
0.0)))
|
0.0)))
|
||||||
|
|
||||||
(defn- pq-files [entity-type start-date end-date]
|
(defn- pq-files [entity-type start-date end-date]
|
||||||
"Vector of S3 parquet file paths for date range."
|
"Vector of S3 parquet file paths for date range (monthly partitions)."
|
||||||
(let [dates (p/date-seq start-date end-date)]
|
(let [months (p/month-seq start-date end-date)]
|
||||||
(vec
|
(vec
|
||||||
(map #(str "'s3://" p/*bucket*
|
(map #(str "'s3://" p/*bucket*
|
||||||
"/sales-details/" entity-type "/"
|
"/sales-details/" entity-type "/"
|
||||||
% ".parquet") dates))))
|
% ".parquet") months))))
|
||||||
|
|
||||||
(defn sum-payments-by-type [client-id start-date end-date]
|
(defn sum-payments-by-type [client-id start-date end-date]
|
||||||
"Return {processor-key -> {type-name-string -> total-double}}."
|
"Return {processor-key -> {type-name-string -> total-double}}."
|
||||||
|
|||||||
@@ -108,5 +108,6 @@
|
|||||||
(finally
|
(finally
|
||||||
(p/disconnect!))))
|
(p/disconnect!))))
|
||||||
|
|
||||||
(run-perf-tests)
|
(comment
|
||||||
(println "\n=== Done ===")
|
(run-perf-tests)
|
||||||
|
(println "\n=== Done ==="))
|
||||||
Reference in New Issue
Block a user