diff --git a/src/clj/auto_ap/datomic/sales_orders.clj b/src/clj/auto_ap/datomic/sales_orders.clj index 42736ab5..f47209d4 100644 --- a/src/clj/auto_ap/datomic/sales_orders.clj +++ b/src/clj/auto_ap/datomic/sales_orders.clj @@ -80,11 +80,14 @@ (defn- extract-date-str [v] (when v (cond - (string? v) (if (> (count v) 10) (.substring v 0 10) v) (instance? org.joda.time.DateTime v) (atime/unparse-local v atime/normal-date) (instance? org.joda.time.LocalDate v) (atime/unparse-local v atime/normal-date) (instance? java.util.Date v) (atime/unparse-local (coerce/to-date-time v) atime/normal-date) (instance? java.time.LocalDate v) (.toString v) + (string? v) (if (re-find #"^\d{2}/\d{2}/\d{4}" v) + (-> (java.time.LocalDate/parse v (java.time.format.DateTimeFormatter/ofPattern "MM/dd/yyyy")) + .toString) + (if (> (count v) 10) (.substring v 0 10) v)) :else (str v)))) (defn- get-date [qp k] @@ -112,20 +115,10 @@ true (assoc :limit (or (:per-page qp) 25) :offset (or (:start qp) 0))))) -(defn- last-week-range [] - (let [today (java.time.LocalDate/now) - end (.toString (.minusDays today 1)) - start (.toString (.minusDays today 8))] - [start end])) - (defn- default-date-range [] - (let [[s e] (last-week-range) - result (try (pq/get-sales-orders-summary s e) (catch Exception _ nil))] - (if (and result (> (:total result) 0)) - [s e] - (let [yesterday (.toString (.minusDays (java.time.LocalDate/of 2024 4 24) 1)) - week-before (.toString (.minusDays (java.time.LocalDate/of 2024 4 24) 8))] - [week-before yesterday])))) + (let [today (.toString (java.time.LocalDate/now)) + week-ago (.toString (.minusDays (java.time.LocalDate/now) 7))] + [week-ago today])) (defn- qp->date-range [qp] (let [[default-start default-end] (default-date-range)] @@ -136,6 +129,11 @@ (extract-date-str (get-in qp [:date-range :end])) default-end)])) +(defn- request->client-codes [request] + (let [clients (:clients request) + codes (keep :client/code clients)] + (when (seq codes) codes))) + (defn fetch-page-ssr "Fetch sales orders from parquet for the SSR page." [request] @@ -145,6 +143,8 @@ (->> (into {} (remove (fn [[_ v]] (str/blank? v)))))) [start end] (qp->date-range (merge raw-qp qp)) opts (qp->opts qp) + client-codes (request->client-codes request) + opts (if client-codes (assoc opts :client-codes client-codes) opts) result (pq/get-sales-orders start end opts) rows (mapv <-row (:rows result))] {:rows rows :count (:count result)})) @@ -157,7 +157,9 @@ ring-codec/form-decode (->> (into {} (remove (fn [[_ v]] (str/blank? v)))))) [start end] (qp->date-range (merge raw-qp qp)) - opts (dissoc (qp->opts qp) :limit :offset :sort :order)] + opts (dissoc (qp->opts qp) :limit :offset :sort :order) + client-codes (request->client-codes request) + opts (if client-codes (assoc opts :client-codes client-codes) opts)] (pq/get-sales-orders-summary start end opts))) (defn summarize-orders [rows] diff --git a/src/clj/auto_ap/storage/parquet.clj b/src/clj/auto_ap/storage/parquet.clj index d56aed12..119799f0 100644 --- a/src/clj/auto_ap/storage/parquet.clj +++ b/src/clj/auto_ap/storage/parquet.clj @@ -3,7 +3,9 @@ [amazonica.aws.s3 :as s3] [clojure.java.io :as io] [clojure.string :as str] - [clojure.data.json :as json]) + [clojure.data.json :as json] + [clojure.core.cache :as cache] + [com.brunobonacci.mulog :as mu]) (:import (java.sql DriverManager) (java.time LocalDate))) @@ -14,7 +16,10 @@ (str "s3://" *bucket* "/" filename)) (defn parquet-key [entity-type date-str] - (str parquet-prefix "/" entity-type "/" date-str ".parquet")) + (let [month-str (if (= (count date-str) 10) + (subs date-str 0 7) + date-str)] + (str parquet-prefix "/" entity-type "/" month-str ".parquet"))) (def db (atom nil)) @@ -26,6 +31,9 @@ (.execute stmt (str "SET s3_access_key_id='" key "'")) (.execute stmt (str "SET s3_secret_access_key='" (:aws-secret-access-key env) "'")) (.execute stmt (str "SET s3_region='" (or (:aws-region env) "us-east-1") "'"))) + (.execute stmt "PRAGMA enable_object_cache") + (.execute stmt "SET temp_directory='/tmp/duckdb-temp'") + (.execute stmt "SET memory_limit='2GB'") (.close stmt) (.addShutdownHook (Runtime/getRuntime) (Thread. #(fn []))) @@ -53,34 +61,52 @@ (.execute stmt sql) nil))) +(defn- sql-snippet [sql] (subs sql 0 (min (count sql) 500))) + (defn query-scalar [sql] - (with-duckdb - (let [stmt (.createStatement conn) - rs (.executeQuery stmt sql)] - (when (.next rs) - (.getObject rs 1))))) + (mu/trace ::query-scalar + [:sql (sql-snippet sql)] + (with-duckdb + (let [stmt (.createStatement conn) + rs (.executeQuery stmt sql)] + (when (.next rs) + (.getObject rs 1)))))) + +(def ^:private count-cache + (atom (-> (cache/ttl-cache-factory {} :ttl 1800000) + (cache/lru-cache-factory :threshold 256)))) + +(defn- cached-count [sql] + (if-let [v (find @count-cache sql)] + (do (mu/log ::count-cache :hit true :sql (sql-snippet sql)) (val v)) + (do (mu/log ::count-cache :hit false :sql (sql-snippet sql)) + (let [result (query-scalar sql)] + (swap! count-cache assoc sql result) + result)))) (defn query-rows [sql] - (with-duckdb - (let [stmt (.createStatement conn) - rs (.executeQuery stmt sql) - meta (.getMetaData rs) - col-count (.getColumnCount meta) - cols (vec (for [i (range 1 (inc col-count))] - (keyword (.getColumnLabel meta i))))] - (loop [rows []] - (if (.next rs) - (recur (conj rows - (zipmap cols - (vec (for [i (range 1 (inc col-count))] - (.getObject rs i)))))) - rows))))) + (mu/trace ::query-rows + [:sql (sql-snippet sql)] + (with-duckdb + (let [stmt (.createStatement conn) + rs (.executeQuery stmt sql) + meta (.getMetaData rs) + col-count (.getColumnCount meta) + cols (vec (for [i (range 1 (inc col-count))] + (keyword (.getColumnLabel meta i))))] + (loop [rows []] + (if (.next rs) + (recur (conj rows + (zipmap cols + (vec (for [i (range 1 (inc col-count))] + (.getObject rs i)))))) + rows)))))) (defn execute-to-parquet! [sql ^String parquet-path] (with-duckdb (let [stmt (.createStatement conn)] (.execute stmt - (format "COPY (%s) TO '%s' (FORMAT PARQUET, OVERWRITE_OR_IGNORE)" + (format "COPY (%s) TO '%s' (FORMAT PARQUET, OVERWRITE_OR_IGNORE, ROW_GROUP_SIZE 10000, COMPRESSION 'zstd')" sql parquet-path)) (io/file parquet-path)))) @@ -129,31 +155,46 @@ vals (mapcat identity) count)) (defn flush-to-parquet! [entity-type date-str] - "Flush buffered records for entity-type to parquet + S3." + "Flush buffered records for entity-type to monthly parquet + S3. + Reads existing monthly file (if any), merges with new records, and uploads. + Uses temp table to ensure ROW_GROUP_SIZE is respected (DuckDB ignores it + when reading directly from S3 via COPY)." (let [records (get @*buffers* entity-type [])] (if (empty? records) {:status :no-records} (let [date-str (or date-str (.toString (LocalDate/now))) + s3-key (parquet-key entity-type date-str) + s3-url (s3-location s3-key) jsonl-file (io/file "/tmp" (str entity-type "-" date-str ".jsonl")) parquet-file (io/file "/tmp" (str entity-type "-" date-str ".parquet")) - s3-key (parquet-key entity-type date-str)] + tbl (format "\"_flush_%s_%s\"" + (clojure.string/replace entity-type "-" "_") + (subs date-str 0 7))] (try (with-open [w (io/writer jsonl-file :append true)] (doseq [r records] (.write w ^String (json/write-str (dissoc r :_seq-no))) (.write w (int \newline)))) - (execute-to-parquet! - (format "SELECT * FROM read_json_auto('%s')" - (.getAbsolutePath jsonl-file)) - (.getAbsolutePath parquet-file)) - (upload-parquet! parquet-file s3-key) - (clear-buffer! entity-type) - (.delete ^java.io.File jsonl-file) - (.delete ^java.io.File parquet-file) - {:key s3-key :status :ok} + (let [existing-sql (format + "SELECT * FROM read_parquet('%s', union_by_name=true)" + s3-url) + new-sql (format + "SELECT * FROM read_json_auto('%s')" + (.getAbsolutePath jsonl-file))] + (execute! (format "CREATE OR REPLACE TABLE %s AS SELECT * FROM (%s UNION ALL %s) ORDER BY \"client-code\", date" + tbl existing-sql new-sql)) + (execute! (format "COPY (SELECT * FROM %s) TO '%s' (FORMAT PARQUET, OVERWRITE_OR_IGNORE, ROW_GROUP_SIZE 10000, COMPRESSION 'zstd')" + tbl (.getAbsolutePath parquet-file))) + (execute! (format "DROP TABLE IF EXISTS %s" tbl)) + (upload-parquet! parquet-file s3-key) + (clear-buffer! entity-type) + (.delete ^java.io.File jsonl-file) + (.delete ^java.io.File parquet-file) + {:key s3-key :status :ok}) (catch Exception e + (execute! (format "DROP TABLE IF EXISTS %s" tbl)) (throw (ex-info "Flush failed" {:entity-type entity-type :error (.getMessage e)})))))))) @@ -191,12 +232,12 @@ {} (into {} (keep (fn [et] - (let [f (io/file - (wal-dir) - (str et ".jsonl"))] - (when (.exists f) - [et (slurp f)]))) - etypes)))] + (let [f (io/file + (wal-dir) + (str et ".jsonl"))] + (when (.exists f) + [et (slurp f)])))) + etypes))] (swap! *buffers* merge loaded))) (defn get-unflushed-count [] @@ -220,51 +261,44 @@ (defn today [] (.toString (LocalDate/now))) +(def ^:private mm-dd-yyyy (java.time.format.DateTimeFormatter/ofPattern "MM/dd/yyyy")) + +(defn- normalize-date-str [s] + (when s + (if (re-find #"^\d{2}/\d{2}/\d{4}" s) + (.toString (LocalDate/parse s mm-dd-yyyy)) + (if (> (count s) 10) (subs s 0 10) s)))) + +(defn- month-seq [start-date end-date] + "Seq of YYYY-MM strings between start-date and end-date inclusive." + (let [sd (LocalDate/parse (normalize-date-str start-date)) + ed (LocalDate/parse (normalize-date-str end-date))] + (loop [months [] cur sd] + (if (.isAfter cur ed) + months + (recur (conj months (.toString (.withDayOfMonth cur 1))) + (.plusMonths cur 1)))))) + (defn- parquet-glob [entity-type start-date end-date] - "Build a glob pattern or explicit file list for the date range. - Uses glob patterns for ranges > 60 days; explicit list otherwise." - (let [days (-> (LocalDate/parse end-date) - (.toEpochDay) - (- (.toEpochDay (LocalDate/parse start-date))) - inc)] - (if (> days 60) - (let [prefix (format "s3://%s/sales-details/%s/" *bucket* entity-type) - sy (-> (LocalDate/parse start-date) .getYear) - ey (-> (LocalDate/parse end-date) .getYear)] - (if (= sy ey) - [(format "%s%d-*.parquet" prefix sy)] - (vec - (for [y (range sy (inc ey))] - (format "%s%d-*.parquet" prefix y))))) - (vec - (map (fn [d] - (format "'s3://%s/sales-details/%s/%s.parquet'" - *bucket* entity-type d)) - (date-seq start-date end-date)))))) + "Build explicit file list for the date range using monthly partitions. + Monthly files mean only 1-3 files for typical queries, 12 for a full year." + (let [prefix (format "s3://%s/sales-details/%s/" *bucket* entity-type)] + (vec + (map (fn [m] + (format "'%s%s.parquet'" prefix m)) + (month-seq start-date end-date))))) (defn parquet-query [entity-type start-date end-date] - "Build SQL to read all parquet files in date range. - Returns map with :sql and :count-sql keys." - (let [globs (parquet-glob entity-type start-date end-date) - use-glob? (some #(.endsWith ^String % "*.parquet") globs) - base (if use-glob? - (format "SELECT * FROM read_parquet(%s, union_by_name=true)" - (if (= (count globs) 1) - (format "'%s'" (first globs)) - (format "[%s]" - (str/join ", " (map #(format "'%s'" %) globs))))) - (format "SELECT * FROM read_parquet([%s])" - (str/join ", " globs))) - add-date-filter (fn [sql] - (if (> (-> (LocalDate/parse end-date) - (.toEpochDay) - (- (.toEpochDay (LocalDate/parse start-date))) - inc) - 60) - (format "%s WHERE date >= '%s' AND date <= '%s'" - sql start-date end-date) - sql)) - sql (add-date-filter base)] + "Build SQL to read monthly parquet files in date range. + Uses explicit file list (monthly = few files) + WHERE date filter. + Normalizes date formats (handles MM/dd/yyyy from UI)." + (let [sd (normalize-date-str start-date) + ed (normalize-date-str end-date) + files (parquet-glob entity-type sd ed) + base (format "SELECT * FROM read_parquet([%s], union_by_name=true)" + (str/join ", " files)) + sql (format "%s WHERE date >= '%s' AND date <= '%s'" + base sd ed)] {:sql sql :count-sql (format "SELECT COUNT(*) FROM (%s) t" sql)})) @@ -280,6 +314,14 @@ [[:client "client-code"] [:vendor "vendor"] [:location "location"]]) + in-clauses (keep + (fn [[key col]] + (let [vs (get opts key)] + (when (seq vs) + (str "\"" col "\" IN (" + (str/join ", " (map #(str "'" % "'") vs)) + ")")))) + [[:client-codes "client-code"]]) like-clauses (keep (fn [[key col]] (let [v (get opts key)] @@ -295,60 +337,76 @@ (str "\"" col "\" " op " " v)))) [[:total-gte "total" ">="] [:total-lte "total" "<="]]) - all-clauses (concat eq-clauses like-clauses range-clauses)] + all-clauses (concat eq-clauses in-clauses like-clauses range-clauses)] (when (seq all-clauses) - (str " WHERE " (str/join " AND " all-clauses))))) + (str/join " AND " all-clauses)))) (defn get-sales-orders ([start-date end-date] (get-sales-orders start-date end-date {})) ([start-date end-date opts] - (try - (let [q (parquet-query "sales-order" - start-date end-date) - base-sql (:sql q) - count-sql (:count-sql q) - sort (get opts :sort "date") - order (get opts :order "DESC") - limit (get opts :limit) - offset (get opts :offset) - where-str (build-sales-orders-where opts) - full-sql (if where-str - (str base-sql where-str) - base-sql) - result (cond-> full-sql - sort (str " ORDER BY " sort - " " (name order)) - limit (str " LIMIT " limit) - offset (str " OFFSET " offset)) - full-count (if where-str - (str count-sql where-str) - count-sql)] - {:rows (query-rows result) - :count (or - (int - (query-scalar - full-count)) 0)}) - (catch Exception _ - {:rows [] :count 0})))) + (mu/trace ::get-sales-orders + [:start-date start-date :end-date end-date :opts opts] + (try + (let [q (parquet-query "sales-order" start-date end-date) + base-sql (:sql q) + has-where? (str/includes? base-sql " WHERE ") + sort (get opts :sort "date") + order (get opts :order "DESC") + limit (get opts :limit) + offset (get opts :offset) + extra-clauses (build-sales-orders-where opts) + full-sql (if extra-clauses + (str base-sql (if has-where? " AND " " WHERE ") extra-clauses) + base-sql) + data-sql (cond-> full-sql + sort (str " ORDER BY " sort " " (name order)) + limit (str " LIMIT " limit) + offset (str " OFFSET " offset)) + count-sql (format "SELECT COUNT(*) FROM (%s) t" full-sql)] + (mu/log ::get-sales-orders :data-sql data-sql :count-sql count-sql) + (let [cnt (cached-count count-sql) + rows (query-rows data-sql)] + {:rows rows + :count (or (int cnt) 0)})) + (catch Exception e + (mu/log ::get-sales-orders :error e :start-date start-date :end-date end-date :opts opts) + (throw e)))))) + +(def ^:private summary-cache + (atom (-> (cache/ttl-cache-factory {} :ttl 1800000) + (cache/lru-cache-factory :threshold 256)))) + +(defn- cached-summary [sql] + (if-let [v (find @summary-cache sql)] + (do (mu/log ::summary-cache :hit true :sql (sql-snippet sql)) v) + (do (mu/log ::summary-cache :hit false :sql (sql-snippet sql)) + (let [result (let [row (first (query-rows sql))] + {:total (or (:total row) 0.0) + :tax (or (:tax row) 0.0)})] + (swap! summary-cache assoc sql result) + result)))) (defn get-sales-orders-summary ([start-date end-date] (get-sales-orders-summary start-date end-date {})) ([start-date end-date opts] - (try - (let [q (parquet-query "sales-order" start-date end-date) - base-sql (:sql q) - where-str (build-sales-orders-where opts) - full-sql (if where-str - (str base-sql where-str) - base-sql) - sum-sql (format "SELECT COALESCE(SUM(total), 0) as total, COALESCE(SUM(tax), 0) as tax FROM (%s) t" full-sql) - row (first (query-rows sum-sql))] - {:total (or (:total row) 0.0) - :tax (or (:tax row) 0.0)}) - (catch Exception _ - {:total 0.0 :tax 0.0})))) + (mu/trace ::get-sales-orders-summary + [:start-date start-date :end-date end-date :opts opts] + (try + (let [q (parquet-query "sales-order" start-date end-date) + base-sql (:sql q) + has-where? (str/includes? base-sql " WHERE ") + extra-clauses (build-sales-orders-where opts) + full-sql (if extra-clauses + (str base-sql (if has-where? " AND " " WHERE ") extra-clauses) + base-sql) + sum-sql (format "SELECT COALESCE(SUM(total), 0) as total, COALESCE(SUM(tax), 0) as tax FROM (%s) t" full-sql)] + (mu/log ::get-sales-orders-summary :sum-sql sum-sql) + (cached-summary sum-sql)) + (catch Exception e + (mu/log ::get-sales-orders-summary :error e :start-date start-date :end-date end-date :opts opts) + (throw e)))))) (defn query-deduped [entity-type start-date end-date] "Query records deduplicated by external-id (latest _seq_no wins)."