perf(sales): add LRU+TTL caching, client filtering, and date normalization

- Cache COUNT queries and summary queries with LRU (256) + TTL (30 min) caches
- Pass session client codes to parquet queries via IN clause (was showing all clients)
- Normalize MM/dd/yyyy date strings from UI in parquet.clj (month-seq, parquet-query)
- Remove expensive get-sales-orders-summary call from default-date-range
- Add mu/trace and mu/log throughout parquet query layer
This commit is contained in:
2026-04-28 21:09:23 -07:00
parent 9153494ed7
commit 218d0684c0
2 changed files with 199 additions and 139 deletions

View File

@@ -80,11 +80,14 @@
(defn- extract-date-str [v]
(when v
(cond
(string? v) (if (> (count v) 10) (.substring v 0 10) v)
(instance? org.joda.time.DateTime v) (atime/unparse-local v atime/normal-date)
(instance? org.joda.time.LocalDate v) (atime/unparse-local v atime/normal-date)
(instance? java.util.Date v) (atime/unparse-local (coerce/to-date-time v) atime/normal-date)
(instance? java.time.LocalDate v) (.toString v)
(string? v) (if (re-find #"^\d{2}/\d{2}/\d{4}" v)
(-> (java.time.LocalDate/parse v (java.time.format.DateTimeFormatter/ofPattern "MM/dd/yyyy"))
.toString)
(if (> (count v) 10) (.substring v 0 10) v))
:else (str v))))
(defn- get-date [qp k]
@@ -112,20 +115,10 @@
true (assoc :limit (or (:per-page qp) 25)
:offset (or (:start qp) 0)))))
(defn- last-week-range []
(let [today (java.time.LocalDate/now)
end (.toString (.minusDays today 1))
start (.toString (.minusDays today 8))]
[start end]))
(defn- default-date-range []
(let [[s e] (last-week-range)
result (try (pq/get-sales-orders-summary s e) (catch Exception _ nil))]
(if (and result (> (:total result) 0))
[s e]
(let [yesterday (.toString (.minusDays (java.time.LocalDate/of 2024 4 24) 1))
week-before (.toString (.minusDays (java.time.LocalDate/of 2024 4 24) 8))]
[week-before yesterday]))))
(let [today (.toString (java.time.LocalDate/now))
week-ago (.toString (.minusDays (java.time.LocalDate/now) 7))]
[week-ago today]))
(defn- qp->date-range [qp]
(let [[default-start default-end] (default-date-range)]
@@ -136,6 +129,11 @@
(extract-date-str (get-in qp [:date-range :end]))
default-end)]))
(defn- request->client-codes [request]
(let [clients (:clients request)
codes (keep :client/code clients)]
(when (seq codes) codes)))
(defn fetch-page-ssr
"Fetch sales orders from parquet for the SSR page."
[request]
@@ -145,6 +143,8 @@
(->> (into {} (remove (fn [[_ v]] (str/blank? v))))))
[start end] (qp->date-range (merge raw-qp qp))
opts (qp->opts qp)
client-codes (request->client-codes request)
opts (if client-codes (assoc opts :client-codes client-codes) opts)
result (pq/get-sales-orders start end opts)
rows (mapv <-row (:rows result))]
{:rows rows :count (:count result)}))
@@ -157,7 +157,9 @@
ring-codec/form-decode
(->> (into {} (remove (fn [[_ v]] (str/blank? v))))))
[start end] (qp->date-range (merge raw-qp qp))
opts (dissoc (qp->opts qp) :limit :offset :sort :order)]
opts (dissoc (qp->opts qp) :limit :offset :sort :order)
client-codes (request->client-codes request)
opts (if client-codes (assoc opts :client-codes client-codes) opts)]
(pq/get-sales-orders-summary start end opts)))
(defn summarize-orders [rows]

View File

@@ -3,7 +3,9 @@
[amazonica.aws.s3 :as s3]
[clojure.java.io :as io]
[clojure.string :as str]
[clojure.data.json :as json])
[clojure.data.json :as json]
[clojure.core.cache :as cache]
[com.brunobonacci.mulog :as mu])
(:import (java.sql DriverManager)
(java.time LocalDate)))
@@ -14,7 +16,10 @@
(str "s3://" *bucket* "/" filename))
(defn parquet-key [entity-type date-str]
(str parquet-prefix "/" entity-type "/" date-str ".parquet"))
(let [month-str (if (= (count date-str) 10)
(subs date-str 0 7)
date-str)]
(str parquet-prefix "/" entity-type "/" month-str ".parquet")))
(def db (atom nil))
@@ -26,6 +31,9 @@
(.execute stmt (str "SET s3_access_key_id='" key "'"))
(.execute stmt (str "SET s3_secret_access_key='" (:aws-secret-access-key env) "'"))
(.execute stmt (str "SET s3_region='" (or (:aws-region env) "us-east-1") "'")))
(.execute stmt "PRAGMA enable_object_cache")
(.execute stmt "SET temp_directory='/tmp/duckdb-temp'")
(.execute stmt "SET memory_limit='2GB'")
(.close stmt)
(.addShutdownHook (Runtime/getRuntime)
(Thread. #(fn [])))
@@ -53,34 +61,52 @@
(.execute stmt sql)
nil)))
(defn- sql-snippet [sql] (subs sql 0 (min (count sql) 500)))
(defn query-scalar [sql]
(with-duckdb
(let [stmt (.createStatement conn)
rs (.executeQuery stmt sql)]
(when (.next rs)
(.getObject rs 1)))))
(mu/trace ::query-scalar
[:sql (sql-snippet sql)]
(with-duckdb
(let [stmt (.createStatement conn)
rs (.executeQuery stmt sql)]
(when (.next rs)
(.getObject rs 1))))))
(def ^:private count-cache
(atom (-> (cache/ttl-cache-factory {} :ttl 1800000)
(cache/lru-cache-factory :threshold 256))))
(defn- cached-count [sql]
(if-let [v (find @count-cache sql)]
(do (mu/log ::count-cache :hit true :sql (sql-snippet sql)) (val v))
(do (mu/log ::count-cache :hit false :sql (sql-snippet sql))
(let [result (query-scalar sql)]
(swap! count-cache assoc sql result)
result))))
(defn query-rows [sql]
(with-duckdb
(let [stmt (.createStatement conn)
rs (.executeQuery stmt sql)
meta (.getMetaData rs)
col-count (.getColumnCount meta)
cols (vec (for [i (range 1 (inc col-count))]
(keyword (.getColumnLabel meta i))))]
(loop [rows []]
(if (.next rs)
(recur (conj rows
(zipmap cols
(vec (for [i (range 1 (inc col-count))]
(.getObject rs i))))))
rows)))))
(mu/trace ::query-rows
[:sql (sql-snippet sql)]
(with-duckdb
(let [stmt (.createStatement conn)
rs (.executeQuery stmt sql)
meta (.getMetaData rs)
col-count (.getColumnCount meta)
cols (vec (for [i (range 1 (inc col-count))]
(keyword (.getColumnLabel meta i))))]
(loop [rows []]
(if (.next rs)
(recur (conj rows
(zipmap cols
(vec (for [i (range 1 (inc col-count))]
(.getObject rs i))))))
rows))))))
(defn execute-to-parquet! [sql ^String parquet-path]
(with-duckdb
(let [stmt (.createStatement conn)]
(.execute stmt
(format "COPY (%s) TO '%s' (FORMAT PARQUET, OVERWRITE_OR_IGNORE)"
(format "COPY (%s) TO '%s' (FORMAT PARQUET, OVERWRITE_OR_IGNORE, ROW_GROUP_SIZE 10000, COMPRESSION 'zstd')"
sql parquet-path))
(io/file parquet-path))))
@@ -129,31 +155,46 @@
vals (mapcat identity) count))
(defn flush-to-parquet! [entity-type date-str]
"Flush buffered records for entity-type to parquet + S3."
"Flush buffered records for entity-type to monthly parquet + S3.
Reads existing monthly file (if any), merges with new records, and uploads.
Uses temp table to ensure ROW_GROUP_SIZE is respected (DuckDB ignores it
when reading directly from S3 via COPY)."
(let [records (get @*buffers* entity-type [])]
(if (empty? records)
{:status :no-records}
(let [date-str (or date-str (.toString (LocalDate/now)))
s3-key (parquet-key entity-type date-str)
s3-url (s3-location s3-key)
jsonl-file (io/file "/tmp"
(str entity-type "-" date-str ".jsonl"))
parquet-file (io/file "/tmp"
(str entity-type "-" date-str ".parquet"))
s3-key (parquet-key entity-type date-str)]
tbl (format "\"_flush_%s_%s\""
(clojure.string/replace entity-type "-" "_")
(subs date-str 0 7))]
(try
(with-open [w (io/writer jsonl-file :append true)]
(doseq [r records]
(.write w ^String (json/write-str (dissoc r :_seq-no)))
(.write w (int \newline))))
(execute-to-parquet!
(format "SELECT * FROM read_json_auto('%s')"
(.getAbsolutePath jsonl-file))
(.getAbsolutePath parquet-file))
(upload-parquet! parquet-file s3-key)
(clear-buffer! entity-type)
(.delete ^java.io.File jsonl-file)
(.delete ^java.io.File parquet-file)
{:key s3-key :status :ok}
(let [existing-sql (format
"SELECT * FROM read_parquet('%s', union_by_name=true)"
s3-url)
new-sql (format
"SELECT * FROM read_json_auto('%s')"
(.getAbsolutePath jsonl-file))]
(execute! (format "CREATE OR REPLACE TABLE %s AS SELECT * FROM (%s UNION ALL %s) ORDER BY \"client-code\", date"
tbl existing-sql new-sql))
(execute! (format "COPY (SELECT * FROM %s) TO '%s' (FORMAT PARQUET, OVERWRITE_OR_IGNORE, ROW_GROUP_SIZE 10000, COMPRESSION 'zstd')"
tbl (.getAbsolutePath parquet-file)))
(execute! (format "DROP TABLE IF EXISTS %s" tbl))
(upload-parquet! parquet-file s3-key)
(clear-buffer! entity-type)
(.delete ^java.io.File jsonl-file)
(.delete ^java.io.File parquet-file)
{:key s3-key :status :ok})
(catch Exception e
(execute! (format "DROP TABLE IF EXISTS %s" tbl))
(throw (ex-info "Flush failed"
{:entity-type entity-type
:error (.getMessage e)}))))))))
@@ -191,12 +232,12 @@
{}
(into {}
(keep (fn [et]
(let [f (io/file
(wal-dir)
(str et ".jsonl"))]
(when (.exists f)
[et (slurp f)])))
etypes)))]
(let [f (io/file
(wal-dir)
(str et ".jsonl"))]
(when (.exists f)
[et (slurp f)]))))
etypes))]
(swap! *buffers* merge loaded)))
(defn get-unflushed-count []
@@ -220,51 +261,44 @@
(defn today []
(.toString (LocalDate/now)))
(def ^:private mm-dd-yyyy (java.time.format.DateTimeFormatter/ofPattern "MM/dd/yyyy"))
(defn- normalize-date-str [s]
(when s
(if (re-find #"^\d{2}/\d{2}/\d{4}" s)
(.toString (LocalDate/parse s mm-dd-yyyy))
(if (> (count s) 10) (subs s 0 10) s))))
(defn- month-seq [start-date end-date]
"Seq of YYYY-MM strings between start-date and end-date inclusive."
(let [sd (LocalDate/parse (normalize-date-str start-date))
ed (LocalDate/parse (normalize-date-str end-date))]
(loop [months [] cur sd]
(if (.isAfter cur ed)
months
(recur (conj months (.toString (.withDayOfMonth cur 1)))
(.plusMonths cur 1))))))
(defn- parquet-glob [entity-type start-date end-date]
"Build a glob pattern or explicit file list for the date range.
Uses glob patterns for ranges > 60 days; explicit list otherwise."
(let [days (-> (LocalDate/parse end-date)
(.toEpochDay)
(- (.toEpochDay (LocalDate/parse start-date)))
inc)]
(if (> days 60)
(let [prefix (format "s3://%s/sales-details/%s/" *bucket* entity-type)
sy (-> (LocalDate/parse start-date) .getYear)
ey (-> (LocalDate/parse end-date) .getYear)]
(if (= sy ey)
[(format "%s%d-*.parquet" prefix sy)]
(vec
(for [y (range sy (inc ey))]
(format "%s%d-*.parquet" prefix y)))))
(vec
(map (fn [d]
(format "'s3://%s/sales-details/%s/%s.parquet'"
*bucket* entity-type d))
(date-seq start-date end-date))))))
"Build explicit file list for the date range using monthly partitions.
Monthly files mean only 1-3 files for typical queries, 12 for a full year."
(let [prefix (format "s3://%s/sales-details/%s/" *bucket* entity-type)]
(vec
(map (fn [m]
(format "'%s%s.parquet'" prefix m))
(month-seq start-date end-date)))))
(defn parquet-query [entity-type start-date end-date]
"Build SQL to read all parquet files in date range.
Returns map with :sql and :count-sql keys."
(let [globs (parquet-glob entity-type start-date end-date)
use-glob? (some #(.endsWith ^String % "*.parquet") globs)
base (if use-glob?
(format "SELECT * FROM read_parquet(%s, union_by_name=true)"
(if (= (count globs) 1)
(format "'%s'" (first globs))
(format "[%s]"
(str/join ", " (map #(format "'%s'" %) globs)))))
(format "SELECT * FROM read_parquet([%s])"
(str/join ", " globs)))
add-date-filter (fn [sql]
(if (> (-> (LocalDate/parse end-date)
(.toEpochDay)
(- (.toEpochDay (LocalDate/parse start-date)))
inc)
60)
(format "%s WHERE date >= '%s' AND date <= '%s'"
sql start-date end-date)
sql))
sql (add-date-filter base)]
"Build SQL to read monthly parquet files in date range.
Uses explicit file list (monthly = few files) + WHERE date filter.
Normalizes date formats (handles MM/dd/yyyy from UI)."
(let [sd (normalize-date-str start-date)
ed (normalize-date-str end-date)
files (parquet-glob entity-type sd ed)
base (format "SELECT * FROM read_parquet([%s], union_by_name=true)"
(str/join ", " files))
sql (format "%s WHERE date >= '%s' AND date <= '%s'"
base sd ed)]
{:sql sql
:count-sql (format "SELECT COUNT(*) FROM (%s) t" sql)}))
@@ -280,6 +314,14 @@
[[:client "client-code"]
[:vendor "vendor"]
[:location "location"]])
in-clauses (keep
(fn [[key col]]
(let [vs (get opts key)]
(when (seq vs)
(str "\"" col "\" IN ("
(str/join ", " (map #(str "'" % "'") vs))
")"))))
[[:client-codes "client-code"]])
like-clauses (keep
(fn [[key col]]
(let [v (get opts key)]
@@ -295,60 +337,76 @@
(str "\"" col "\" " op " " v))))
[[:total-gte "total" ">="]
[:total-lte "total" "<="]])
all-clauses (concat eq-clauses like-clauses range-clauses)]
all-clauses (concat eq-clauses in-clauses like-clauses range-clauses)]
(when (seq all-clauses)
(str " WHERE " (str/join " AND " all-clauses)))))
(str/join " AND " all-clauses))))
(defn get-sales-orders
([start-date end-date]
(get-sales-orders start-date end-date {}))
([start-date end-date opts]
(try
(let [q (parquet-query "sales-order"
start-date end-date)
base-sql (:sql q)
count-sql (:count-sql q)
sort (get opts :sort "date")
order (get opts :order "DESC")
limit (get opts :limit)
offset (get opts :offset)
where-str (build-sales-orders-where opts)
full-sql (if where-str
(str base-sql where-str)
base-sql)
result (cond-> full-sql
sort (str " ORDER BY " sort
" " (name order))
limit (str " LIMIT " limit)
offset (str " OFFSET " offset))
full-count (if where-str
(str count-sql where-str)
count-sql)]
{:rows (query-rows result)
:count (or
(int
(query-scalar
full-count)) 0)})
(catch Exception _
{:rows [] :count 0}))))
(mu/trace ::get-sales-orders
[:start-date start-date :end-date end-date :opts opts]
(try
(let [q (parquet-query "sales-order" start-date end-date)
base-sql (:sql q)
has-where? (str/includes? base-sql " WHERE ")
sort (get opts :sort "date")
order (get opts :order "DESC")
limit (get opts :limit)
offset (get opts :offset)
extra-clauses (build-sales-orders-where opts)
full-sql (if extra-clauses
(str base-sql (if has-where? " AND " " WHERE ") extra-clauses)
base-sql)
data-sql (cond-> full-sql
sort (str " ORDER BY " sort " " (name order))
limit (str " LIMIT " limit)
offset (str " OFFSET " offset))
count-sql (format "SELECT COUNT(*) FROM (%s) t" full-sql)]
(mu/log ::get-sales-orders :data-sql data-sql :count-sql count-sql)
(let [cnt (cached-count count-sql)
rows (query-rows data-sql)]
{:rows rows
:count (or (int cnt) 0)}))
(catch Exception e
(mu/log ::get-sales-orders :error e :start-date start-date :end-date end-date :opts opts)
(throw e))))))
(def ^:private summary-cache
(atom (-> (cache/ttl-cache-factory {} :ttl 1800000)
(cache/lru-cache-factory :threshold 256))))
(defn- cached-summary [sql]
(if-let [v (find @summary-cache sql)]
(do (mu/log ::summary-cache :hit true :sql (sql-snippet sql)) v)
(do (mu/log ::summary-cache :hit false :sql (sql-snippet sql))
(let [result (let [row (first (query-rows sql))]
{:total (or (:total row) 0.0)
:tax (or (:tax row) 0.0)})]
(swap! summary-cache assoc sql result)
result))))
(defn get-sales-orders-summary
([start-date end-date]
(get-sales-orders-summary start-date end-date {}))
([start-date end-date opts]
(try
(let [q (parquet-query "sales-order" start-date end-date)
base-sql (:sql q)
where-str (build-sales-orders-where opts)
full-sql (if where-str
(str base-sql where-str)
base-sql)
sum-sql (format "SELECT COALESCE(SUM(total), 0) as total, COALESCE(SUM(tax), 0) as tax FROM (%s) t" full-sql)
row (first (query-rows sum-sql))]
{:total (or (:total row) 0.0)
:tax (or (:tax row) 0.0)})
(catch Exception _
{:total 0.0 :tax 0.0}))))
(mu/trace ::get-sales-orders-summary
[:start-date start-date :end-date end-date :opts opts]
(try
(let [q (parquet-query "sales-order" start-date end-date)
base-sql (:sql q)
has-where? (str/includes? base-sql " WHERE ")
extra-clauses (build-sales-orders-where opts)
full-sql (if extra-clauses
(str base-sql (if has-where? " AND " " WHERE ") extra-clauses)
base-sql)
sum-sql (format "SELECT COALESCE(SUM(total), 0) as total, COALESCE(SUM(tax), 0) as tax FROM (%s) t" full-sql)]
(mu/log ::get-sales-orders-summary :sum-sql sum-sql)
(cached-summary sum-sql))
(catch Exception e
(mu/log ::get-sales-orders-summary :error e :start-date start-date :end-date end-date :opts opts)
(throw e))))))
(defn query-deduped [entity-type start-date end-date]
"Query records deduplicated by external-id (latest _seq_no wins)."