This commit is contained in:
2022-09-30 06:41:05 -07:00
parent 1f57ed2d1c
commit aebf95a87b
3 changed files with 78 additions and 51 deletions

View File

@@ -253,33 +253,37 @@
"Transacts data from from-ch. Returns a map with:
:result, a return channel getting {:error t} or {:completed n}
:stop, a fn you can use to terminate early."
[conn conc from-ch]
(let [to-ch (a/chan 100)
[conn conc from-ch f]
(let [to-ch (a/chan 400)
done-ch (a/chan)
transact-data (fn [data]
(try
(dc/transact conn {:tx-data data})
; if exception in a transaction
; will close channels and put error
; on done channel.
(catch Throwable t
(.printStackTrace t)
(a/close! from-ch)
(a/close! to-ch)
(a/>!! done-ch {:error t}))))]
transact-data (fn [data result]
(a/go
(try
(a/>! result (a/<! (dca/transact conn {:tx-data (f data)})))
(a/close! result)
; if exception in a transaction
; will close channels and put error
; on done channel.
(catch Throwable t
(.printStackTrace t)
(a/close! from-ch)
(a/close! to-ch)
(a/>! done-ch {:error t})))))]
; go block prints a '.' after every 1000 transactions, puts completed
; report on done channel when no value left to be taken.
(a/go-loop [total 0]
(print ".")
(flush)
(if (= (mod total 2) 0)
(do
(print ".")
(flush)))
(if-let [c (a/<! to-ch)]
(recur (inc total))
(a/>! done-ch {:completed total})))
; pipeline that uses transducer form of map to transact data taken from
; from-ch and puts results on to-ch
(a/pipeline-blocking conc to-ch (map transact-data) from-ch)
(a/pipeline-async conc to-ch transact-data from-ch)
; returns done channel and a function that you can use
; for early termination.
@@ -341,41 +345,50 @@
(cond-> (entity->best-key entity)
(not (vector? (entity->best-key entity))) vector))
tx-chan (a/chan)
pipeline (tx-pipeline (dc/connect local-client {:db-name "prod-migration"}) 30
tx-chan)]]
(println "Inserting " entity ": " (count entities))
(flush)
(doseq [batch (partition-all 2000 entities)]
(let [transaction {:tx-data (->> (d/pull-many remote-db
(->> schema
(filter :db/valueType)
(mapv :db/ident)
(filter #(= entity (namespace %)))
(into [:db/id]))
batch)
(mapv (fn [m ]
(reduce
(fn [m [k v]]
(cond
(= k :db/id)
(-> m
(assoc :entity/migration-key v)
(dissoc :db/id))
(full-dependencies k)
(if (vector? v)
(assoc m k (mapv (fn [r] [:entity/migration-key (:db/id r)]) v))
(assoc m k [:entity/migration-key (:db/id v)]))
:else
(dissoc m :payment/pdf-data
:payment/memo)))
m
m))))}]
(a/>!! tx-chan (:tx-data transaction))))
(a/close tx-chan)
tx-chan (a/chan 400)
entities->transaction (fn [entities]
(->> (d/pull-many remote-db
(->> schema
(filter :db/valueType)
(mapv :db/ident)
(filter #(= entity (namespace %)))
(into [:db/id]))
entities)
(mapv (fn [m ]
(reduce
(fn [m [k v]]
(cond
(= k :db/id)
(-> m
(assoc :entity/migration-key v)
(dissoc :db/id))
(full-dependencies k)
(if (vector? v)
(assoc m k (mapv (fn [r] [:entity/migration-key (:db/id r)]) v))
(assoc m k [:entity/migration-key (:db/id v)]))
:else
(dissoc m :payment/pdf-data
:payment/memo)))
m
m)))))
_ (println "Inserting " entity ": " (count entities))
_ (flush)
pipeline (tx-pipeline (dc/connect local-client {:db-name "prod-migration"})
50
tx-chan
entities->transaction)]]
(doseq [batch (partition-all 500 entities)]
(try
(a/>!! tx-chan batch)
(catch Exception e
(println e)
((:stop pipeline)))))
(println "waiting for done from" pipeline)
(flush)
(a/close! tx-chan)
(println (a/<!! (:result pipeline)))
((:stop pipeline))
(println)
(println "Done")))