(ns auto-ap.migration.sales-to-parquet "Migrate historical sales data from Datomic to Parquet + S3. Groups records by business date and writes daily partitions. Dead-letter records (missing dates) are written separately. Usage: (migrate-all) ; full migration earliest → latest (write-day-by-day \"2024-01-01\" \"2024-03-31\") ; date range (write-dead-letter [flat]) ; write orphaned records" (:require [auto-ap.datomic :refer [conn]] [auto-ap.storage.parquet :as p] [datomic.api :as dc] [clj-time.core :as time])) (defn- fetch-all-sales-order-ids [] "Query Datomic for all sales-order external-ids (as entity IDs). Returns a vector of entitity ids." (->> (dc/q '[:find ?e :where [_ :sales-order/external-id ?_ext]] (dc/db conn)) (map first) vec)) (def ^:private sales-order-read '[:sales-order/external-id :sales-order/date {:sales-order/client [:client/code]} :sales-order/location :sales-order/vendor :sales-order/total :sales-order/tax :sales-order/tip :sales-order/discount :sales-order/service-charge {:sales-order/charges [:charge/external-id :charge/type-name :charge/total :charge/tax :charge/tip :charge/date :charge/processor :charge/returns {:charge/client [:client/code]}]} {:sales-order/line-items [:order-line-item/item-name :order-line-item/category :order-line-item/total :order-line-item/tax :order-line-item/discount {:order-line-item/unit-price {}} :order-line-item/quantity :order-line-item/note]}]) (defn- pull-sales-order-data [eids] "Batch pull full sales-order entities plus nested children." (if (empty? eids) [] (dc/pull-many (dc/db conn) sales-order-read eids))) (defn- flatten-order-to-pieces! [order flat] "Flatten a pulled sales-order into :entity-type tagged maps. Appends to the existing flat vector, which is returned." (let [so-ext-id (:sales-order/external-id order) so-date (.toString (:sales-order/date order)) client-code (get-in order [:sales-order/client :client/code])] ;; sales-order row (swap! flat conj {:entity-type "sales-order" :external-id (str so-ext-id) :client-code client-code :location (:sales-order/location order) :vendor (:sales-order/vendor order) :total (:sales-order/total order) :tax (:sales-order/tax order) :tip (:sales-order/tip order) :discount (:sales-order/discount order) :service-charge (:sales-order/service-charge order) :date so-date}) ;; charges & line-items (when-let [charges (:sales-order/charges order)] (doseq [chg charges] (swap! flat conj {:entity-type "charge" :external-id (str (get chg :charge/external-id)) :type-name (get chg :charge/type-name) :total (get chg :charge/total) :tax (get chg :charge/tax) :tip (get chg :charge/tip) :date so-date :processor (get-in chg [:charge/processor :db/ident]) :sales-order-external-id (str so-ext-id)}) ;; charge returns → sales-refund rows (when-let [returns (:charge/returns chg)] (doseq [rt returns] (swap! flat conj {:entity-type "sales-refund" :type-name (get rt :type-name) :total (get rt :total) :sales-order-external-id (str so-ext-id)}))))) ;; line-items (when-let [items (:sales-order/line-items order)] (doseq [li items] (swap! flat conj {:entity-type "line-item" :item-name (get li :order-line-item/item-name) :category (get li :order-line-item/category) :total (get li :order-line-item/total) :tax (get li :order-line-item/tax) :discount (get li :order-line-item/discount) :sales-order-external-id (str so-ext-id)}))))) (defn -fetch-order-ids-for-date "Query Datomic for all sales-order eids on a given business date." [db date-str] (let [day-ms (.toEpochSecond ^java.time.LocalDate (java.time.LocalDate/parse date-str)) start (* day-ms 1000) end (+ start (* 86400000))] (->> (dc/q '[:find ?e :in $ ?start-ms ?end-ms :where [_ :sales-order/date ?d] [(>= ?d ?start-ms)] [(<= ?d ?end-ms)]] db start end) (map first) vec))) (defn- date-seq [start end] "Seq of YYYY-MM-DD strings between start and end inclusive." (let [sd (java.time.LocalDate/parse start) ed (java.time.LocalDate/parse end) days (int (Math/abs (- (.toEpochDay sd) (.toEpochDay ed))))] (for [i (range 0 (inc days))] (.toString (.plusDays sd i))))) (defn- write-day-by-day ([start-date end-date] (write-day-by-day start-date end-date nil)) ([start-date end-date opts] (let [all-dates (set (or (opts :date-set) [])) date-range (if (empty? all-dates) (date-seq start-date end-date) (filter all-dates (date-seq start-date end-date))) batch-size (or (opts :batch-size) 100)] (doseq [^String day date-range] (println "[migration] processing" day) (let [eids (-fetch-order-ids-for-date (dc/db conn) day) batches (partition-all batch-size eids)] (doseq [batch batches] (let [orders (pull-sales-order-data batch) flat (volatile! [])] (doseq [o orders] (flatten-order-to-pieces! o flat)) (doseq [r @flat] (p/buffer! (:entity-type r) r))))) (doseq [etype ["sales-order" "charge" "line-item" "sales-refund"]] (p/flush-to-parquet! etype)) (println "[migration]" day "complete")) {:status :completed :total-days (count date-range)}))) (defn- write-dead-letter ([flat] (write-dead-letter "dead" flat)) ([prefix flat] "Write records with missing dates to a parquet file." (let [dead (filter #(nil? (:date %)) flat)] (when (seq dead) (doseq [r dead] (p/buffer! (str prefix "-" (:entity-type r)) r)))))) (defn- flush-all-types [] "Flush all entity-type buffers, tracking counts." (let [etypes ["sales-order" "charge" "line-item" "sales-refund"] start (p/total-buf-count)] (doseq [et etypes] (try (p/flush-to-parquet! et) (catch Exception e (println "[migration/flush]" et "error:" (.getMessage e))))) {:records-flush (- (p/total-buf-count) start)})) (defn- get-date-range [] "Get the earliest and latest business dates from Datomic." (let [dates (->> (dc/q '[:find ?d :where [_ :sales-order/date ?d]] (dc/db conn)) (map first) distinct sort)] [(when (seq dates) (.toString (first dates))) (when (seq dates) (.toString (last dates)))])) (defn migrate-all [] "Full migration from earliest to latest date: load unflushed, fetch / buffer / flush day by day. Write dead-records for sales orders with missing dates." (println "[migration] starting full migration...") (p/load-unflushed!) (let [order-ids (fetch-all-sales-order-ids) start-date (first (get-date-range)) end-date (second (get-date-range))] (if-not (seq order-ids) (do (println "[migration] no orders found") :no-orders) (try ;; pull & buffer any orders missing a business date (doseq [o (pull-sales-order-data order-ids) :when (not (:sales-order/date o))] (let [flat (volatile! [])] (flatten-order-to-pieces! o flat) (doseq [r @flat] (p/buffer! "dead" r)))) (write-day-by-day start-date end-date {:batch-size 100}) (flush-all-types) (println "[migration] done") :ok (catch Exception e (println "[migration/error]" (.getMessage e)) e)))))