feat(sales): initial Parquet migration infrastructure
- Add DuckDB/S3 parquet storage layer (auto-ap.storage.parquet) - Add sales_to_parquet migration script for historical data - Add cleanup_sales for post-migration Datomic cleanup - Add sales_orders_new.clj with DuckDB read layer for SSR views - Add test scaffolding for parquet storage - Add plan document for move-detailed-sales-to-parquet feat(sales): redirect production and read flows to Parquet/DuckDB - U3: Square production (upsert) now buffers to parquet via flatten-order-to-parquet! - U3: EzCater core import-order now buffers to parquet instead of Datomic transact - U3: EzCater XLS upload-xls now buffers to parquet instead of audit-transact - U4: Rewrite sales_orders.clj to read from DuckDB via pq/get-sales-orders - U5: Rewrite sales_summaries to use parquet aggregation functions - get-payment-items-parquet, get-discounts-parquet, get-refund-items-parquet - get-tax-parquet, get-tip-parquet, get-sales-parquet - Add sum-* aggregation functions to storage/sales_summaries.clj - sum-discounts, sum-refunds-by-type, sum-taxes, sum-tips, sum-sales-by-category
This commit is contained in:
30
test/clj/auto_ap/storage/parquet_test.clj
Normal file
30
test/clj/auto_ap/storage/parquet_test.clj
Normal file
@@ -0,0 +1,30 @@
|
||||
(ns auto-ap.storage.parquet-test
|
||||
(:require [auto-ap.storage.parquet :as p]
|
||||
[clojure.test :refer [deftest is testing use-fixtures]]))
|
||||
|
||||
(deftest test-query-scalar
|
||||
(testing "SELECT 1 returns 1"
|
||||
(is (= 1 (p/query-scalar "SELECT 1")))))
|
||||
|
||||
(deftest test-query-scalar-with-expression
|
||||
(testing "SELECT 2 + 2 returns 4"
|
||||
(is (= 4 (p/query-scalar "SELECT 2 + 2")))))
|
||||
|
||||
(deftest test-buffer
|
||||
(testing "buffer! adds record to buffer"
|
||||
(p/clear-buffer! "test-type")
|
||||
(p/buffer! "test-type" {:id 1 :name "test"})
|
||||
(is (= 1 (p/buffer-count "test-type")))))
|
||||
|
||||
(deftest test-clear-buffer
|
||||
(testing "clear-buffer! empties buffer"
|
||||
(p/clear-buffer! "test-type")
|
||||
(p/buffer! "test-type" {:id 2})
|
||||
(is (= 1 (p/buffer-count "test-type")))
|
||||
(p/clear-buffer! "test-type")
|
||||
(is (= 0 (p/buffer-count "test-type")))))
|
||||
|
||||
(deftest test-date-seq
|
||||
(testing "date-seq generates correct sequence"
|
||||
(let [result (p/date-seq "2024-04-01" "2024-04-03")]
|
||||
(is (= ["2024-04-01" "2024-04-02" "2024-04-03"] result)))))
|
||||
112
test/clj/auto_ap/storage/perf_test.clj
Normal file
112
test/clj/auto_ap/storage/perf_test.clj
Normal file
@@ -0,0 +1,112 @@
|
||||
(ns auto-ap.storage.perf-test
|
||||
(:require [auto-ap.storage.parquet :as p]
|
||||
[amazonica.aws.s3 :as s3]
|
||||
[clojure.java.io :as io]
|
||||
[clojure.string :as str])
|
||||
(:import (java.sql DriverManager)
|
||||
(java.time Instant)))
|
||||
|
||||
(defn timestamp []
|
||||
(System/currentTimeMillis))
|
||||
|
||||
(defn timed [label sql-fn]
|
||||
(let [start (timestamp)
|
||||
result (sql-fn)
|
||||
elapsed (- (timestamp) start)]
|
||||
(println (format "%s: %d ms" label elapsed))
|
||||
result))
|
||||
|
||||
(defn run-perf-tests []
|
||||
(p/connect!)
|
||||
(try
|
||||
(let [bucket "data.dev.app.integreatconsult.com"
|
||||
prefix "test-duckdb"
|
||||
local-parquet "/tmp/test_data.parquet"
|
||||
s3-key (str prefix "/data.parquet")]
|
||||
|
||||
;; Create 100k test rows
|
||||
(println "\n=== Creating 100k test rows ===")
|
||||
(p/execute! "DROP TABLE IF EXISTS test_data")
|
||||
(p/execute! (str "
|
||||
CREATE TABLE test_data AS
|
||||
SELECT
|
||||
i AS id,
|
||||
'order_' || i AS external_id,
|
||||
CASE (i % 5)
|
||||
WHEN 0 THEN 'north'
|
||||
WHEN 1 THEN 'south'
|
||||
WHEN 2 THEN 'east'
|
||||
WHEN 3 THEN 'west'
|
||||
ELSE 'central'
|
||||
END AS region,
|
||||
CASE (i % 8)
|
||||
WHEN 0 THEN 'food'
|
||||
WHEN 1 THEN 'beverage'
|
||||
WHEN 2 THEN 'alcohol'
|
||||
WHEN 3 THEN 'catering'
|
||||
WHEN 4 THEN 'retail'
|
||||
WHEN 5 THEN 'dessert'
|
||||
WHEN 6 THEN 'merch'
|
||||
ELSE 'other'
|
||||
END AS category,
|
||||
ROUND(1 + ABS(RANDOM() % 10000) / 100.0, 2) AS amount,
|
||||
CAST(DATE '2024-01-01' + (i % 365) * INTERVAL '1 day' AS DATE) AS sale_date,
|
||||
CASE WHEN i % 20 = 0 THEN 'voided' ELSE 'active' END AS status
|
||||
FROM generate_series(1, 100000) AS t(i)"))
|
||||
(println "Row count:" (p/query-scalar "SELECT COUNT(*) FROM test_data"))
|
||||
(println "Voided count:" (p/query-scalar "SELECT COUNT(*) FROM test_data WHERE status = 'voided'"))
|
||||
(println "Amount > 3 count:" (p/query-scalar "SELECT COUNT(*) FROM test_data WHERE amount > 3"))
|
||||
|
||||
;; Write to local parquet
|
||||
(println "\n=== Writing local parquet ===")
|
||||
(timed "Write parquet" #(p/execute-to-parquet! "SELECT * FROM test_data" local-parquet))
|
||||
(let [f (io/file local-parquet)]
|
||||
(println "File size:" (format "%.1f MB" (/ (.length f) 1048576.0))))
|
||||
|
||||
;; Upload to S3
|
||||
(println "\n=== Uploading to S3 ===")
|
||||
(timed "S3 upload" #(p/upload-parquet! (io/file local-parquet) prefix))
|
||||
(println "S3 URI:" (p/s3-location s3-key))
|
||||
|
||||
;; Now test reading from S3
|
||||
(println "\n=== Performance Tests (reading from S3) ===")
|
||||
(let [s3-uri (str "s3://" bucket "/" s3-key)]
|
||||
|
||||
;; Register S3 parquet as a view/table in DuckDB
|
||||
(p/execute! (format "CREATE VIEW s3_test AS SELECT * FROM read_parquet('%s')" s3-uri))
|
||||
(println "Total rows in S3:" (p/query-scalar "SELECT COUNT(*) FROM s3_test"))
|
||||
|
||||
;; Test 1: Page 1 - first 25 rows
|
||||
(println "\n--- Test 1: Page 1 (LIMIT 25 OFFSET 0) ---")
|
||||
(timed "First page (25 rows)" #(p/query-rows "SELECT * FROM s3_test ORDER BY id LIMIT 25"))
|
||||
(println "Sample row:" (first (p/query-rows "SELECT * FROM s3_test ORDER BY id LIMIT 1")))
|
||||
|
||||
;; Test 2: Page 20 - rows 475-500 (OFFSET 475)
|
||||
(println "\n--- Test 2: Page 20 (LIMIT 25 OFFSET 475) ---")
|
||||
(timed "Page 20 (25 rows)" #(p/query-rows "SELECT * FROM s3_test ORDER BY id LIMIT 25 OFFSET 475"))
|
||||
|
||||
;; Test 3: Filter amount > 3 (no pagination)
|
||||
(println "\n--- Test 3: Filter amount > 3 (no limit) ---")
|
||||
(timed "Filter amount > 3 (all)" #(do (p/query-scalar "SELECT COUNT(*) FROM s3_test WHERE amount > 3") :done))
|
||||
|
||||
;; Test 4: Filter + pagination
|
||||
(println "\n--- Test 4: Filter amount > 3 + LIMIT 25 ---")
|
||||
(timed "Filter + paginated (25 rows)" #(p/query-rows "SELECT * FROM s3_test WHERE amount > 3 ORDER BY id LIMIT 25"))
|
||||
|
||||
;; Test 5: Filter + page 20
|
||||
(println "\n--- Test 5: Filter amount > 3 + LIMIT 25 OFFSET 475 ---")
|
||||
(timed "Filter + page 20" #(p/query-rows "SELECT * FROM s3_test WHERE amount > 3 ORDER BY id LIMIT 25 OFFSET 475"))
|
||||
|
||||
;; Test 6: Aggregation on S3 data
|
||||
(println "\n--- Test 6: Aggregation (SUM, AVG on amount) ---")
|
||||
(timed "Aggregation SUM/AVG" #(p/query-scalar "SELECT SUM(amount), AVG(amount) FROM s3_test WHERE status = 'active'"))
|
||||
|
||||
;; Cleanup
|
||||
(p/execute! "DROP VIEW IF EXISTS s3_test")
|
||||
(p/execute! "DROP TABLE IF EXISTS test_data"))
|
||||
|
||||
(finally
|
||||
(p/disconnect!))))
|
||||
|
||||
(run-perf-tests)
|
||||
(println "\n=== Done ===")
|
||||
Reference in New Issue
Block a user