fix(sales): fix parquet SQL generation and cleanup formatting
- Fix double ORDER BY in sales_orders raw-graphql-ids (was passing full ORDER BY clause from build-sort-clause into get-sales-orders which prepends its own ORDER BY, producing 'ORDER BY ORDER BY ... DESC DESC') - Fix WHERE clause column names in parquet build-where-clause: external_id.client -> client-code, external_id.vendor -> vendor - Fix parquet-query format string (%%s -> %s with proper format call) - Fix ex-info call signature in flush! (was passing :error as third arg instead of inside the data map) - Add S3 credentials to DuckDB connect! so httpfs can read from S3 - Fix parquet buffer indentation and alignment across square/core3, ezcater/core, ezcater_xls, payments, sales_summaries, migrations - Fix broken Datomic query syntax in ezcater/core (upsert-used-subscriptions, upsert-recent find/where clauses mangled by paren-repair) - Uncomment accidentally commented code block in square/core3 - Fix paren/indentation issues in ssr/payments, jobs/sales_summaries
This commit is contained in:
@@ -22,6 +22,10 @@
|
||||
(let [conn (DriverManager/getConnection "jdbc:duckdb:")
|
||||
stmt (.createStatement conn)]
|
||||
(.execute stmt "INSTALL httpfs; LOAD httpfs;")
|
||||
(when-let [key (:aws-access-key-id env)]
|
||||
(.execute stmt (str "SET s3_access_key_id='" key "'"))
|
||||
(.execute stmt (str "SET s3_secret_access_key='" (:aws-secret-access-key env) "'"))
|
||||
(.execute stmt (str "SET s3_region='" (or (:aws-region env) "us-east-1") "'")))
|
||||
(.close stmt)
|
||||
(.addShutdownHook (Runtime/getRuntime)
|
||||
(Thread. #(fn [])))
|
||||
@@ -107,9 +111,9 @@
|
||||
(str entity-type ".jsonl"))]
|
||||
(io/make-parents wal-file)
|
||||
(with-open [w (io/writer wal-file :append true)]
|
||||
(.write w ^String (json/write-str {:seq-no seq-no
|
||||
:record record}))
|
||||
(.write w (int \newline))))
|
||||
(.write w ^String (json/write-str {:seq-no seq-no
|
||||
:record record}))
|
||||
(.write w (int \newline))))
|
||||
(catch Exception e
|
||||
(println "[parquet/wal]" (.getMessage e))))
|
||||
entry))
|
||||
@@ -138,8 +142,8 @@
|
||||
(try
|
||||
(with-open [w (io/writer jsonl-file :append true)]
|
||||
(doseq [r records]
|
||||
(.write w ^String (json/write-str (dissoc r :_seq-no)))
|
||||
(.write w (int \newline))))
|
||||
(.write w ^String (json/write-str (dissoc r :_seq-no)))
|
||||
(.write w (int \newline))))
|
||||
(execute-to-parquet!
|
||||
(format "SELECT * FROM read_json_auto('%s')"
|
||||
(.getAbsolutePath jsonl-file))
|
||||
@@ -150,8 +154,9 @@
|
||||
(.delete ^java.io.File parquet-file)
|
||||
{:key s3-key :status :ok}
|
||||
(catch Exception e
|
||||
(throw (ex-info "Flush failed" {:entity-type entity-type}
|
||||
:error (.getMessage e)))))))))
|
||||
(throw (ex-info "Flush failed"
|
||||
{:entity-type entity-type
|
||||
:error (.getMessage e)}))))))))
|
||||
|
||||
(defn flush-by-date! []
|
||||
"Flush all entity types for today."
|
||||
@@ -218,8 +223,9 @@
|
||||
Returns map with :sql and :count-sql keys."
|
||||
(let [date-strs (date-seq start-date end-date)
|
||||
urls (vec
|
||||
(map #(format "'s3://%%s/sales-details/%%s/%%s.parquet'"
|
||||
*bucket* entity-type %)
|
||||
(map (fn [d]
|
||||
(format "'s3://%s/sales-details/%s/%s.parquet'"
|
||||
*bucket* entity-type d))
|
||||
date-strs))
|
||||
sql (str "SELECT * FROM read_parquet(["
|
||||
(str/join ", " urls)
|
||||
@@ -244,35 +250,34 @@
|
||||
(get-sales-orders start-date end-date {}))
|
||||
([start-date end-date opts]
|
||||
(let [q (parquet-query "sales-order"
|
||||
start-date end-date)
|
||||
base-sql (:sql q)
|
||||
count-sql (:count-sql q)
|
||||
sort (get opts :sort "date")
|
||||
order (get opts :order "DESC")
|
||||
limit (get opts :limit)
|
||||
offset (get opts :offset)
|
||||
start-date end-date)
|
||||
base-sql (:sql q)
|
||||
count-sql (:count-sql q)
|
||||
sort (get opts :sort "date")
|
||||
order (get opts :order "DESC")
|
||||
limit (get opts :limit)
|
||||
offset (get opts :offset)
|
||||
where-str (build-where-clause
|
||||
|
||||
opts
|
||||
[[:client "external_id.client"]
|
||||
[:vendor "external_id.vendor"]
|
||||
[[:client "client-code"]
|
||||
[:vendor "vendor"]
|
||||
[:location "location"]])
|
||||
full-sql (if where-str
|
||||
(str base-sql where-str)
|
||||
base-sql)
|
||||
result (cond-> full-sql
|
||||
sort (str " ORDER BY " sort
|
||||
" " (name order))
|
||||
limit (str " LIMIT " limit)
|
||||
offset (str " OFFSET " offset))
|
||||
full-count (if where-str
|
||||
(str count-sql where-str)
|
||||
count-sql)]
|
||||
{:rows (query-rows result)
|
||||
:count (or
|
||||
(int
|
||||
(query-scalar
|
||||
full-count)) 0)})))
|
||||
full-sql (if where-str
|
||||
(str base-sql where-str)
|
||||
base-sql)
|
||||
result (cond-> full-sql
|
||||
sort (str " ORDER BY " sort
|
||||
" " (name order))
|
||||
limit (str " LIMIT " limit)
|
||||
offset (str " OFFSET " offset))
|
||||
full-count (if where-str
|
||||
(str count-sql where-str)
|
||||
count-sql)]
|
||||
{:rows (query-rows result)
|
||||
:count (or
|
||||
(int
|
||||
(query-scalar
|
||||
full-count)) 0)})))
|
||||
|
||||
(defn query-deduped [entity-type start-date end-date]
|
||||
"Query records deduplicated by external-id (latest _seq_no wins)."
|
||||
|
||||
@@ -55,7 +55,7 @@
|
||||
tname
|
||||
(+ (get b tname 0.0) total)))))))
|
||||
{}
|
||||
rows)))
|
||||
rows)))
|
||||
(catch Exception e
|
||||
(println "[sales-summaries]" (.getMessage e))
|
||||
{}))))
|
||||
|
||||
Reference in New Issue
Block a user