Attempts solution of retrying transaction

This commit is contained in:
2023-03-30 06:27:04 -07:00
parent 5a6a43f183
commit f0603d922a
2 changed files with 38 additions and 13 deletions

View File

@@ -43,11 +43,23 @@
(def loaded (atom #{}))
(defn upsert-batch [batch]
(defn upsert-batch-impl
([batch ] (upsert-batch-impl batch 0))
([batch attempt]
(try
(dc/transact auto-ap.datomic/conn {:tx-data batch})
batch
(catch Exception e
(if (< attempt 10)
(do
(Thread/sleep (* attempt 1000))
(upsert-batch-impl batch (inc attempt)))
(throw e))))))
(defn upsert-batch
[batch]
(de/future-with request-pool
(do
(dc/transact auto-ap.datomic/conn {:tx-data batch})
batch)))
(upsert-batch-impl batch)))
@@ -76,7 +88,7 @@
(mu/log ::error
:exception e)
(throw e)))))
(->> (partition-all 200 entities)
(->> (partition-all 1000 entities)
(s/->source)
(s/onto buffered)
(s/map (fn [entities]
@@ -84,7 +96,7 @@
(reset! die? false)
(throw (Exception. "dead")))
(upsert-batch entities)))
(s/buffer 20)
(s/buffer 50)
(s/realize-each)))
(swap! loaded conj entity))))

View File

@@ -504,6 +504,19 @@
(:bank-account/numeric-code (bank-accounts a)))
:client_id client-id})))
(defn upsert-batch-impl
([batch ] (upsert-batch-impl batch 0))
([batch attempt]
(try
(dc/transact auto-ap.datomic/conn {:tx-data batch})
batch
(catch Exception e
(if (< attempt 10)
(do
(Thread/sleep (* attempt 1000))
(upsert-batch-impl batch (inc attempt)))
(throw e))))))
(defn reset-client+account+location+date
([] (reset-client+account+location+date (map first (dc/q '[:find ?c :where [?c :client/code]] (dc/db conn)))))
([clients]
@@ -532,13 +545,13 @@
(-> je :journal-entry/date)]})
(:journal-entry/line-items je))))
(partition-all 200)
(partition-all 500)
(s/->source)
(s/map (fn [batch]
(de/future
(dc/transact conn {:tx-data batch})
(upsert-batch-impl batch)
(count batch))))
(s/buffer 20)
(s/buffer 50)
(s/realize-each)
(s/consume (fn [batch-count]
(swap! so-far #(+ % batch-count))
@@ -694,13 +707,13 @@
(s/realize-each)
(s/mapcat (fn [x]
x))
(s/buffer 20)
(s/transform (partition-all 200))
(s/buffer 50)
(s/transform (partition-all 500))
(s/map (fn [batch]
(de/future
(dc/transact conn {:tx-data batch})
(upsert-batch-impl batch)
(count batch))))
(s/buffer 20)
(s/buffer 50)
(s/realize-each)
(s/consume (fn [batch-count]
(swap! so-far #(+ % batch-count))