From aebf95a87bf066eb7a34fd83b6a4a50f7eef7da6 Mon Sep 17 00:00:00 2001 From: Bryce Covert Date: Fri, 30 Sep 2022 06:41:05 -0700 Subject: [PATCH] updates --- scratch-sessions/dump-edn.clj | 109 +++++++++++++++------------ src/clj/auto_ap/graphql/accounts.clj | 17 ++++- things-to-search-for.txt | 3 +- 3 files changed, 78 insertions(+), 51 deletions(-) diff --git a/scratch-sessions/dump-edn.clj b/scratch-sessions/dump-edn.clj index c44c0310..b8fd7b2f 100644 --- a/scratch-sessions/dump-edn.clj +++ b/scratch-sessions/dump-edn.clj @@ -253,33 +253,37 @@ "Transacts data from from-ch. Returns a map with: :result, a return channel getting {:error t} or {:completed n} :stop, a fn you can use to terminate early." - [conn conc from-ch] - (let [to-ch (a/chan 100) + [conn conc from-ch f] + (let [to-ch (a/chan 400) done-ch (a/chan) - transact-data (fn [data] - (try - (dc/transact conn {:tx-data data}) - ; if exception in a transaction - ; will close channels and put error - ; on done channel. - (catch Throwable t - (.printStackTrace t) - (a/close! from-ch) - (a/close! to-ch) - (a/>!! done-ch {:error t}))))] + transact-data (fn [data result] + (a/go + (try + (a/>! result (a/! done-ch {:error t})))))] ; go block prints a '.' after every 1000 transactions, puts completed ; report on done channel when no value left to be taken. (a/go-loop [total 0] - (print ".") - (flush) + (if (= (mod total 2) 0) + (do + (print ".") + (flush))) (if-let [c (a/! done-ch {:completed total}))) ; pipeline that uses transducer form of map to transact data taken from ; from-ch and puts results on to-ch - (a/pipeline-blocking conc to-ch (map transact-data) from-ch) + (a/pipeline-async conc to-ch transact-data from-ch) ; returns done channel and a function that you can use ; for early termination. @@ -341,41 +345,50 @@ (cond-> (entity->best-key entity) (not (vector? (entity->best-key entity))) vector)) - tx-chan (a/chan) - pipeline (tx-pipeline (dc/connect local-client {:db-name "prod-migration"}) 30 - tx-chan)]] - (println "Inserting " entity ": " (count entities)) - (flush) - (doseq [batch (partition-all 2000 entities)] - (let [transaction {:tx-data (->> (d/pull-many remote-db - (->> schema - (filter :db/valueType) - (mapv :db/ident) - (filter #(= entity (namespace %))) - (into [:db/id])) - batch) - (mapv (fn [m ] - (reduce - (fn [m [k v]] - (cond - (= k :db/id) - (-> m - (assoc :entity/migration-key v) - (dissoc :db/id)) - (full-dependencies k) - (if (vector? v) - (assoc m k (mapv (fn [r] [:entity/migration-key (:db/id r)]) v)) - (assoc m k [:entity/migration-key (:db/id v)])) - :else - (dissoc m :payment/pdf-data - :payment/memo))) - m - m))))}] - (a/>!! tx-chan (:tx-data transaction)))) - (a/close tx-chan) + tx-chan (a/chan 400) + entities->transaction (fn [entities] + (->> (d/pull-many remote-db + (->> schema + (filter :db/valueType) + (mapv :db/ident) + (filter #(= entity (namespace %))) + (into [:db/id])) + entities) + (mapv (fn [m ] + (reduce + (fn [m [k v]] + (cond + (= k :db/id) + (-> m + (assoc :entity/migration-key v) + (dissoc :db/id)) + (full-dependencies k) + (if (vector? v) + (assoc m k (mapv (fn [r] [:entity/migration-key (:db/id r)]) v)) + (assoc m k [:entity/migration-key (:db/id v)])) + :else + (dissoc m :payment/pdf-data + :payment/memo))) + m + m))))) + _ (println "Inserting " entity ": " (count entities)) + _ (flush) + pipeline (tx-pipeline (dc/connect local-client {:db-name "prod-migration"}) + 50 + tx-chan + entities->transaction)]] + (doseq [batch (partition-all 500 entities)] + (try + (a/>!! tx-chan batch) + (catch Exception e + (println e) + ((:stop pipeline))))) + (println "waiting for done from" pipeline) (flush) + (a/close! tx-chan) (println (a/