- Fix pq-files using date-seq (daily) vs actual monthly parquet partitions - Fix safe-cleanup-all destructure [[_ y m]] -> [[y m]] against [year month] - Fix shutdown hook no-op: Thread wrapping #(fn []) now actually closes conn - Fix query-deduped: PARTITION BY "external-id" not sales_order.external_id - Fix :client_code -> :client-code key mismatch in get-payment-items-parquet - Fix object-exists? downloading full S3 objects; use head-object instead - Fix date-seq silently producing wrong range when start > end; now throws - Remove duplicate private get-fees that shadowed public version - Deduplicate date-seq: remove from sales_to_parquet, use p/date-seq - Wrap run-perf-tests in (comment ...) to prevent execution on lein test - Make month-seq public so sales_summaries.clj can use it
113 lines
4.5 KiB
Clojure
113 lines
4.5 KiB
Clojure
(ns auto-ap.storage.perf-test
|
|
(:require [auto-ap.storage.parquet :as p]
|
|
[amazonica.aws.s3 :as s3]
|
|
[clojure.java.io :as io]
|
|
[clojure.string :as str])
|
|
(:import (java.sql DriverManager)
|
|
(java.time Instant)))
|
|
|
|
(defn timestamp []
|
|
(System/currentTimeMillis))
|
|
|
|
(defn timed [label sql-fn]
|
|
(let [start (timestamp)
|
|
result (sql-fn)
|
|
elapsed (- (timestamp) start)]
|
|
(println (format "%s: %d ms" label elapsed))
|
|
result))
|
|
|
|
(defn run-perf-tests []
|
|
(p/connect!)
|
|
(try
|
|
(let [bucket "data.dev.app.integreatconsult.com"
|
|
prefix "test-duckdb"
|
|
local-parquet "/tmp/test_data.parquet"
|
|
s3-key (str prefix "/data.parquet")]
|
|
|
|
;; Create 100k test rows
|
|
(println "\n=== Creating 100k test rows ===")
|
|
(p/execute! "DROP TABLE IF EXISTS test_data")
|
|
(p/execute! (str "
|
|
CREATE TABLE test_data AS
|
|
SELECT
|
|
i AS id,
|
|
'order_' || i AS external_id,
|
|
CASE (i % 5)
|
|
WHEN 0 THEN 'north'
|
|
WHEN 1 THEN 'south'
|
|
WHEN 2 THEN 'east'
|
|
WHEN 3 THEN 'west'
|
|
ELSE 'central'
|
|
END AS region,
|
|
CASE (i % 8)
|
|
WHEN 0 THEN 'food'
|
|
WHEN 1 THEN 'beverage'
|
|
WHEN 2 THEN 'alcohol'
|
|
WHEN 3 THEN 'catering'
|
|
WHEN 4 THEN 'retail'
|
|
WHEN 5 THEN 'dessert'
|
|
WHEN 6 THEN 'merch'
|
|
ELSE 'other'
|
|
END AS category,
|
|
ROUND(1 + ABS(RANDOM() % 10000) / 100.0, 2) AS amount,
|
|
CAST(DATE '2024-01-01' + (i % 365) * INTERVAL '1 day' AS DATE) AS sale_date,
|
|
CASE WHEN i % 20 = 0 THEN 'voided' ELSE 'active' END AS status
|
|
FROM generate_series(1, 100000) AS t(i)"))
|
|
(println "Row count:" (p/query-scalar "SELECT COUNT(*) FROM test_data"))
|
|
(println "Voided count:" (p/query-scalar "SELECT COUNT(*) FROM test_data WHERE status = 'voided'"))
|
|
(println "Amount > 3 count:" (p/query-scalar "SELECT COUNT(*) FROM test_data WHERE amount > 3"))
|
|
|
|
;; Write to local parquet
|
|
(println "\n=== Writing local parquet ===")
|
|
(timed "Write parquet" #(p/execute-to-parquet! "SELECT * FROM test_data" local-parquet))
|
|
(let [f (io/file local-parquet)]
|
|
(println "File size:" (format "%.1f MB" (/ (.length f) 1048576.0))))
|
|
|
|
;; Upload to S3
|
|
(println "\n=== Uploading to S3 ===")
|
|
(timed "S3 upload" #(p/upload-parquet! (io/file local-parquet) prefix))
|
|
(println "S3 URI:" (p/s3-location s3-key))
|
|
|
|
;; Now test reading from S3
|
|
(println "\n=== Performance Tests (reading from S3) ===")
|
|
(let [s3-uri (str "s3://" bucket "/" s3-key)]
|
|
|
|
;; Register S3 parquet as a view/table in DuckDB
|
|
(p/execute! (format "CREATE VIEW s3_test AS SELECT * FROM read_parquet('%s')" s3-uri))
|
|
(println "Total rows in S3:" (p/query-scalar "SELECT COUNT(*) FROM s3_test"))
|
|
|
|
;; Test 1: Page 1 - first 25 rows
|
|
(println "\n--- Test 1: Page 1 (LIMIT 25 OFFSET 0) ---")
|
|
(timed "First page (25 rows)" #(p/query-rows "SELECT * FROM s3_test ORDER BY id LIMIT 25"))
|
|
(println "Sample row:" (first (p/query-rows "SELECT * FROM s3_test ORDER BY id LIMIT 1")))
|
|
|
|
;; Test 2: Page 20 - rows 475-500 (OFFSET 475)
|
|
(println "\n--- Test 2: Page 20 (LIMIT 25 OFFSET 475) ---")
|
|
(timed "Page 20 (25 rows)" #(p/query-rows "SELECT * FROM s3_test ORDER BY id LIMIT 25 OFFSET 475"))
|
|
|
|
;; Test 3: Filter amount > 3 (no pagination)
|
|
(println "\n--- Test 3: Filter amount > 3 (no limit) ---")
|
|
(timed "Filter amount > 3 (all)" #(do (p/query-scalar "SELECT COUNT(*) FROM s3_test WHERE amount > 3") :done))
|
|
|
|
;; Test 4: Filter + pagination
|
|
(println "\n--- Test 4: Filter amount > 3 + LIMIT 25 ---")
|
|
(timed "Filter + paginated (25 rows)" #(p/query-rows "SELECT * FROM s3_test WHERE amount > 3 ORDER BY id LIMIT 25"))
|
|
|
|
;; Test 5: Filter + page 20
|
|
(println "\n--- Test 5: Filter amount > 3 + LIMIT 25 OFFSET 475 ---")
|
|
(timed "Filter + page 20" #(p/query-rows "SELECT * FROM s3_test WHERE amount > 3 ORDER BY id LIMIT 25 OFFSET 475"))
|
|
|
|
;; Test 6: Aggregation on S3 data
|
|
(println "\n--- Test 6: Aggregation (SUM, AVG on amount) ---")
|
|
(timed "Aggregation SUM/AVG" #(p/query-scalar "SELECT SUM(amount), AVG(amount) FROM s3_test WHERE status = 'active'"))
|
|
|
|
;; Cleanup
|
|
(p/execute! "DROP VIEW IF EXISTS s3_test")
|
|
(p/execute! "DROP TABLE IF EXISTS test_data"))
|
|
|
|
(finally
|
|
(p/disconnect!))))
|
|
|
|
(comment
|
|
(run-perf-tests)
|
|
(println "\n=== Done ===")) |