(ns auto-ap.storage.parquet (:require [config.core :refer [env]] [amazonica.aws.s3 :as s3] [clojure.java.io :as io] [clojure.string :as str] [clojure.data.json :as json]) (:import (java.sql DriverManager) (java.time LocalDate))) (def ^:dynamic *bucket* (:data-bucket env)) (def parquet-prefix "sales-details") (defn s3-location [filename] (str "s3://" *bucket* "/" filename)) (defn parquet-key [entity-type date-str] (str parquet-prefix "/" entity-type "/" date-str ".parquet")) (def db (atom nil)) (defn connect! [] (let [conn (DriverManager/getConnection "jdbc:duckdb:") stmt (.createStatement conn)] (.execute stmt "INSTALL httpfs; LOAD httpfs;") (when-let [key (:aws-access-key-id env)] (.execute stmt (str "SET s3_access_key_id='" key "'")) (.execute stmt (str "SET s3_secret_access_key='" (:aws-secret-access-key env) "'")) (.execute stmt (str "SET s3_region='" (or (:aws-region env) "us-east-1") "'"))) (.close stmt) (.addShutdownHook (Runtime/getRuntime) (Thread. #(fn []))) (reset! db conn))) (defn disconnect! [] (locking db (when-let [c @db] (.close c) (reset! db nil)))) (defmacro with-duckdb [& body] `(let [conn# (or @db (connect!))] (try (let [~'conn conn#] ~@body) (finally (when (and (not @db) conn#) (.close conn#)))))) (defn execute! [sql] (with-duckdb (let [stmt (.createStatement conn)] (.execute stmt sql) nil))) (defn query-scalar [sql] (with-duckdb (let [stmt (.createStatement conn) rs (.executeQuery stmt sql)] (when (.next rs) (.getObject rs 1))))) (defn query-rows [sql] (with-duckdb (let [stmt (.createStatement conn) rs (.executeQuery stmt sql) meta (.getMetaData rs) col-count (.getColumnCount meta) cols (vec (for [i (range 1 (inc col-count))] (keyword (.getColumnLabel meta i))))] (loop [rows []] (if (.next rs) (recur (conj rows (zipmap cols (vec (for [i (range 1 (inc col-count))] (.getObject rs i)))))) rows))))) (defn execute-to-parquet! [sql ^String parquet-path] (with-duckdb (let [stmt (.createStatement conn)] (.execute stmt (format "COPY (%s) TO '%s' (FORMAT PARQUET, OVERWRITE_OR_IGNORE)" sql parquet-path)) (io/file parquet-path)))) (defn upload-parquet! [local-parquet-file s3-key] (s3/put-object {:bucket-name *bucket* :key s3-key :file local-parquet-file}) (s3-location s3-key)) (defonce *buffers* (atom {})) (defn- wal-dir [] (io/file (System/getProperty "user.dir" "/tmp") "parquet-wal")) (defn- init-wal! [] (let [dir (wal-dir)] (when-not (.exists dir) (.mkdirs dir)))) (defn buffer! [entity-type record] (init-wal!) (let [seq-no (System/currentTimeMillis) entry (assoc record :_seq-no seq-no)] (swap! *buffers* update entity-type (fnil conj []) entry) (try (let [wal-file (io/file (wal-dir) (str entity-type ".jsonl"))] (io/make-parents wal-file) (with-open [w (io/writer wal-file :append true)] (.write w ^String (json/write-str {:seq-no seq-no :record record})) (.write w (int \newline)))) (catch Exception e (println "[parquet/wal]" (.getMessage e)))) entry)) (defn clear-buffer! [entity-type] (swap! *buffers* dissoc entity-type)) (defn buffer-count [entity-type] (-> @*buffers* (get entity-type []) count)) (defn total-buf-count [] (->> @*buffers* vals (mapcat identity) count)) (defn flush-to-parquet! [entity-type date-str] "Flush buffered records for entity-type to parquet + S3." (let [records (get @*buffers* entity-type [])] (if (empty? records) {:status :no-records} (let [date-str (or date-str (.toString (LocalDate/now))) jsonl-file (io/file "/tmp" (str entity-type "-" date-str ".jsonl")) parquet-file (io/file "/tmp" (str entity-type "-" date-str ".parquet")) s3-key (parquet-key entity-type date-str)] (try (with-open [w (io/writer jsonl-file :append true)] (doseq [r records] (.write w ^String (json/write-str (dissoc r :_seq-no))) (.write w (int \newline)))) (execute-to-parquet! (format "SELECT * FROM read_json_auto('%s')" (.getAbsolutePath jsonl-file)) (.getAbsolutePath parquet-file)) (upload-parquet! parquet-file s3-key) (clear-buffer! entity-type) (.delete ^java.io.File jsonl-file) (.delete ^java.io.File parquet-file) {:key s3-key :status :ok} (catch Exception e (throw (ex-info "Flush failed" {:entity-type entity-type :error (.getMessage e)})))))))) (defn flush-by-date! [] "Flush all entity types for today." (let [etypes ["sales-order" "charge" "line-item" "sales-refund"] today (.toString (LocalDate/now)) flushed (into #{} (keep (fn [et] (let [{:keys [status]} (flush-to-parquet! et today)] (when (= status :ok) et)))) etypes)] {:flushed flushed})) (defn load-unflushed! [] "Restore unflushed records from WAL jsonl files into *buffers." (init-wal!) (let [etypes ["sales-order" "charge" "line-item" "sales-refund"] loaded (reduce-kv (fn [acc et data] (if-not (empty? data) (assoc acc et (->> (str/split-lines data) (keep #(try (let [entry (json/read-str %)] (when entry (assoc (:record entry) :_seq-no (:seq-no entry)))) (catch Exception _))))) acc)) {} (into {} (keep (fn [et] (let [f (io/file (wal-dir) (str et ".jsonl"))] (when (.exists f) [et (slurp f)]))) etypes)))] (swap! *buffers* merge loaded))) (defn get-unflushed-count [] (total-buf-count)) (defn unflushed-records? [] (not= 0 (total-buf-count))) ;;; DuckDB Read Layer (defn date-seq [start end] "Seq of YYYY-MM-DD strings between start and end inclusive." (let [sd (LocalDate/parse start) ed (LocalDate/parse end) days (int (Math/abs (- (.toEpochDay sd) (.toEpochDay ed))))] (for [i (range 0 (inc days))] (.toString (.plusDays sd i))))) (defn today [] (.toString (LocalDate/now))) (defn- parquet-glob [entity-type start-date end-date] "Build a glob pattern or explicit file list for the date range. Uses glob patterns for ranges > 60 days; explicit list otherwise." (let [days (-> (LocalDate/parse end-date) (.toEpochDay) (- (.toEpochDay (LocalDate/parse start-date))) inc)] (if (> days 60) (let [prefix (format "s3://%s/sales-details/%s/" *bucket* entity-type) sy (-> (LocalDate/parse start-date) .getYear) ey (-> (LocalDate/parse end-date) .getYear)] (if (= sy ey) [(format "%s%d-*.parquet" prefix sy)] (vec (for [y (range sy (inc ey))] (format "%s%d-*.parquet" prefix y))))) (vec (map (fn [d] (format "'s3://%s/sales-details/%s/%s.parquet'" *bucket* entity-type d)) (date-seq start-date end-date)))))) (defn parquet-query [entity-type start-date end-date] "Build SQL to read all parquet files in date range. Returns map with :sql and :count-sql keys." (let [globs (parquet-glob entity-type start-date end-date) use-glob? (some #(.endsWith ^String % "*.parquet") globs) base (if use-glob? (format "SELECT * FROM read_parquet(%s, union_by_name=true)" (if (= (count globs) 1) (format "'%s'" (first globs)) (format "[%s]" (str/join ", " (map #(format "'%s'" %) globs))))) (format "SELECT * FROM read_parquet([%s])" (str/join ", " globs))) add-date-filter (fn [sql] (if (> (-> (LocalDate/parse end-date) (.toEpochDay) (- (.toEpochDay (LocalDate/parse start-date))) inc) 60) (format "%s WHERE date >= '%s' AND date <= '%s'" sql start-date end-date) sql)) sql (add-date-filter base)] {:sql sql :count-sql (format "SELECT COUNT(*) FROM (%s) t" sql)})) (defn- like-clause [col v] (str "\"" col "\" LIKE '%" v "%'")) (defn- build-sales-orders-where [opts] (let [eq-clauses (keep (fn [[key col]] (let [v (get opts key)] (when v (str "\"" col "\" = '" v "'")))) [[:client "client-code"] [:vendor "vendor"] [:location "location"]]) like-clauses (keep (fn [[key col]] (let [v (get opts key)] (when v (like-clause col v)))) [[:payment-method "payment-methods"] [:processor "processors"] [:category "categories"]]) range-clauses (keep (fn [[key col op]] (let [v (get opts key)] (when v (str "\"" col "\" " op " " v)))) [[:total-gte "total" ">="] [:total-lte "total" "<="]]) all-clauses (concat eq-clauses like-clauses range-clauses)] (when (seq all-clauses) (str " WHERE " (str/join " AND " all-clauses))))) (defn get-sales-orders ([start-date end-date] (get-sales-orders start-date end-date {})) ([start-date end-date opts] (try (let [q (parquet-query "sales-order" start-date end-date) base-sql (:sql q) count-sql (:count-sql q) sort (get opts :sort "date") order (get opts :order "DESC") limit (get opts :limit) offset (get opts :offset) where-str (build-sales-orders-where opts) full-sql (if where-str (str base-sql where-str) base-sql) result (cond-> full-sql sort (str " ORDER BY " sort " " (name order)) limit (str " LIMIT " limit) offset (str " OFFSET " offset)) full-count (if where-str (str count-sql where-str) count-sql)] {:rows (query-rows result) :count (or (int (query-scalar full-count)) 0)}) (catch Exception _ {:rows [] :count 0})))) (defn get-sales-orders-summary ([start-date end-date] (get-sales-orders-summary start-date end-date {})) ([start-date end-date opts] (try (let [q (parquet-query "sales-order" start-date end-date) base-sql (:sql q) where-str (build-sales-orders-where opts) full-sql (if where-str (str base-sql where-str) base-sql) sum-sql (format "SELECT COALESCE(SUM(total), 0) as total, COALESCE(SUM(tax), 0) as tax FROM (%s) t" full-sql) row (first (query-rows sum-sql))] {:total (or (:total row) 0.0) :tax (or (:tax row) 0.0)}) (catch Exception _ {:total 0.0 :tax 0.0})))) (defn query-deduped [entity-type start-date end-date] "Query records deduplicated by external-id (latest _seq_no wins)." (let [q (parquet-query entity-type start-date end-date)] (query-rows (str (:sql q) " QUALIFY ROW_NUMBER() OVER" " (PARTITION BY sales_order.external_id" " ORDER BY _seq_no DESC) = 1")))) (defn query-by-entity-id [entity-type external-id start-date end-date] (->> (query-deduped entity-type start-date end-date) (filter #(= (:external_id %) (name external-id))) first)) (defn count-records-in-parquet [entity-type start-date end-date] (let [q (parquet-query entity-type start-date end-date)] (or (int (query-scalar (:count-sql q))) 0)))