From f0603d922a9750dc180d52eb90e2ed7d89f38760 Mon Sep 17 00:00:00 2001 From: Bryce Covert Date: Thu, 30 Mar 2023 06:27:04 -0700 Subject: [PATCH] Attempts solution of retrying transaction --- src/clj/auto_ap/jobs/restore_from_backup.clj | 24 ++++++++++++----- src/clj/auto_ap/ledger.clj | 27 +++++++++++++++----- 2 files changed, 38 insertions(+), 13 deletions(-) diff --git a/src/clj/auto_ap/jobs/restore_from_backup.clj b/src/clj/auto_ap/jobs/restore_from_backup.clj index 738f1724..ff555249 100644 --- a/src/clj/auto_ap/jobs/restore_from_backup.clj +++ b/src/clj/auto_ap/jobs/restore_from_backup.clj @@ -43,11 +43,23 @@ (def loaded (atom #{})) -(defn upsert-batch [batch] +(defn upsert-batch-impl + ([batch ] (upsert-batch-impl batch 0)) + ([batch attempt] + (try + (dc/transact auto-ap.datomic/conn {:tx-data batch}) + batch + (catch Exception e + (if (< attempt 10) + (do + (Thread/sleep (* attempt 1000)) + (upsert-batch-impl batch (inc attempt))) + (throw e)))))) + +(defn upsert-batch + [batch] (de/future-with request-pool - (do - (dc/transact auto-ap.datomic/conn {:tx-data batch}) - batch))) + (upsert-batch-impl batch))) @@ -76,7 +88,7 @@ (mu/log ::error :exception e) (throw e))))) - (->> (partition-all 200 entities) + (->> (partition-all 1000 entities) (s/->source) (s/onto buffered) (s/map (fn [entities] @@ -84,7 +96,7 @@ (reset! die? false) (throw (Exception. "dead"))) (upsert-batch entities))) - (s/buffer 20) + (s/buffer 50) (s/realize-each))) (swap! loaded conj entity)))) diff --git a/src/clj/auto_ap/ledger.clj b/src/clj/auto_ap/ledger.clj index 48005177..17bff040 100644 --- a/src/clj/auto_ap/ledger.clj +++ b/src/clj/auto_ap/ledger.clj @@ -504,6 +504,19 @@ (:bank-account/numeric-code (bank-accounts a))) :client_id client-id}))) +(defn upsert-batch-impl + ([batch ] (upsert-batch-impl batch 0)) + ([batch attempt] + (try + (dc/transact auto-ap.datomic/conn {:tx-data batch}) + batch + (catch Exception e + (if (< attempt 10) + (do + (Thread/sleep (* attempt 1000)) + (upsert-batch-impl batch (inc attempt))) + (throw e)))))) + (defn reset-client+account+location+date ([] (reset-client+account+location+date (map first (dc/q '[:find ?c :where [?c :client/code]] (dc/db conn))))) ([clients] @@ -532,13 +545,13 @@ (-> je :journal-entry/date)]}) (:journal-entry/line-items je)))) - (partition-all 200) + (partition-all 500) (s/->source) (s/map (fn [batch] (de/future - (dc/transact conn {:tx-data batch}) + (upsert-batch-impl batch) (count batch)))) - (s/buffer 20) + (s/buffer 50) (s/realize-each) (s/consume (fn [batch-count] (swap! so-far #(+ % batch-count)) @@ -694,13 +707,13 @@ (s/realize-each) (s/mapcat (fn [x] x)) - (s/buffer 20) - (s/transform (partition-all 200)) + (s/buffer 50) + (s/transform (partition-all 500)) (s/map (fn [batch] (de/future - (dc/transact conn {:tx-data batch}) + (upsert-batch-impl batch) (count batch)))) - (s/buffer 20) + (s/buffer 50) (s/realize-each) (s/consume (fn [batch-count] (swap! so-far #(+ % batch-count))