starts logging in the event of an issue.
This commit is contained in:
@@ -831,3 +831,25 @@
|
|||||||
;; this is temporary for any new stuff that needs to be asserted for cloud migration.
|
;; this is temporary for any new stuff that needs to be asserted for cloud migration.
|
||||||
(dc/transact conn
|
(dc/transact conn
|
||||||
{:tx-data (edn/read-string (slurp (io/resource "cloud-migration-schema.edn")))}))
|
{:tx-data (edn/read-string (slurp (io/resource "cloud-migration-schema.edn")))}))
|
||||||
|
|
||||||
|
(defn backoff [n]
|
||||||
|
(let [base-timeout 500
|
||||||
|
max-timeout 300000 ; 5 minutes
|
||||||
|
max-retries 10
|
||||||
|
backoff-time (* base-timeout (Math/pow 2 (min n max-retries)))]
|
||||||
|
(min (+ backoff-time (rand-int base-timeout)) max-timeout)))
|
||||||
|
|
||||||
|
(defn transact-with-backoff
|
||||||
|
([tx ] (transact-with-backoff tx 0))
|
||||||
|
([tx attempt]
|
||||||
|
(try
|
||||||
|
(dc/transact conn {:tx-data tx})
|
||||||
|
(catch Exception e
|
||||||
|
(if (< attempt 10)
|
||||||
|
(do
|
||||||
|
(Thread/sleep (backoff attempt))
|
||||||
|
(mu/log ::transact-failed
|
||||||
|
:exception e
|
||||||
|
:status "WARN")
|
||||||
|
(transact-with-backoff tx (inc attempt)))
|
||||||
|
(throw e))))))
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
(ns auto-ap.jobs.restore-from-backup
|
(ns auto-ap.jobs.restore-from-backup
|
||||||
(:require
|
(:require
|
||||||
[amazonica.aws.s3 :as s3]
|
[amazonica.aws.s3 :as s3]
|
||||||
[auto-ap.datomic]
|
[auto-ap.datomic :refer [transact-with-backoff]]
|
||||||
[auto-ap.ledger]
|
[auto-ap.ledger]
|
||||||
[auto-ap.jobs.core :refer [execute]]
|
[auto-ap.jobs.core :refer [execute]]
|
||||||
[clojure.edn :as edn]
|
[clojure.edn :as edn]
|
||||||
@@ -43,32 +43,11 @@
|
|||||||
|
|
||||||
(def loaded (atom #{}))
|
(def loaded (atom #{}))
|
||||||
|
|
||||||
(defn backoff [n]
|
|
||||||
(let [base-timeout 500
|
|
||||||
max-timeout 300000 ; 5 minutes
|
|
||||||
max-retries 10
|
|
||||||
backoff-time (* base-timeout (Math/pow 2 (min n max-retries)))]
|
|
||||||
(min (+ backoff-time (rand-int base-timeout)) max-timeout)))
|
|
||||||
|
|
||||||
(defn upsert-batch-impl
|
|
||||||
([batch ] (upsert-batch-impl batch 0))
|
|
||||||
([batch attempt]
|
|
||||||
(try
|
|
||||||
(dc/transact auto-ap.datomic/conn {:tx-data batch})
|
|
||||||
batch
|
|
||||||
(catch Exception e
|
|
||||||
(if (< attempt 10)
|
|
||||||
(do
|
|
||||||
(Thread/sleep (backoff attempt))
|
|
||||||
(upsert-batch-impl batch (inc attempt)))
|
|
||||||
(throw e))))))
|
|
||||||
|
|
||||||
(defn upsert-batch
|
(defn upsert-batch
|
||||||
[batch]
|
[batch]
|
||||||
(de/future-with request-pool
|
(de/future-with request-pool
|
||||||
(upsert-batch-impl batch)))
|
(transact-with-backoff batch)
|
||||||
|
batch))
|
||||||
|
|
||||||
|
|
||||||
(defn pull-file [backup which]
|
(defn pull-file [backup which]
|
||||||
(mu/log ::pulling-file
|
(mu/log ::pulling-file
|
||||||
|
|||||||
@@ -3,6 +3,7 @@
|
|||||||
[auto-ap.datomic
|
[auto-ap.datomic
|
||||||
:refer [audit-transact
|
:refer [audit-transact
|
||||||
audit-transact-batch
|
audit-transact-batch
|
||||||
|
transact-with-backoff
|
||||||
conn
|
conn
|
||||||
pull-id
|
pull-id
|
||||||
pull-ref
|
pull-ref
|
||||||
@@ -504,18 +505,6 @@
|
|||||||
(:bank-account/numeric-code (bank-accounts a)))
|
(:bank-account/numeric-code (bank-accounts a)))
|
||||||
:client_id client-id})))
|
:client_id client-id})))
|
||||||
|
|
||||||
(defn upsert-batch-impl
|
|
||||||
([batch ] (upsert-batch-impl batch 0))
|
|
||||||
([batch attempt]
|
|
||||||
(try
|
|
||||||
(dc/transact auto-ap.datomic/conn {:tx-data batch})
|
|
||||||
batch
|
|
||||||
(catch Exception e
|
|
||||||
(if (< attempt 10)
|
|
||||||
(do
|
|
||||||
(Thread/sleep (* attempt 1000))
|
|
||||||
(upsert-batch-impl batch (inc attempt)))
|
|
||||||
(throw e))))))
|
|
||||||
|
|
||||||
(defn reset-client+account+location+date
|
(defn reset-client+account+location+date
|
||||||
([] (reset-client+account+location+date (map first (dc/q '[:find ?c :where [?c :client/code]] (dc/db conn)))))
|
([] (reset-client+account+location+date (map first (dc/q '[:find ?c :where [?c :client/code]] (dc/db conn)))))
|
||||||
@@ -549,7 +538,7 @@
|
|||||||
(s/->source)
|
(s/->source)
|
||||||
(s/map (fn [batch]
|
(s/map (fn [batch]
|
||||||
(de/future
|
(de/future
|
||||||
(upsert-batch-impl batch)
|
(transact-with-backoff batch)
|
||||||
(count batch))))
|
(count batch))))
|
||||||
(s/buffer 50)
|
(s/buffer 50)
|
||||||
(s/realize-each)
|
(s/realize-each)
|
||||||
@@ -711,7 +700,7 @@
|
|||||||
(s/transform (partition-all 500))
|
(s/transform (partition-all 500))
|
||||||
(s/map (fn [batch]
|
(s/map (fn [batch]
|
||||||
(de/future
|
(de/future
|
||||||
(upsert-batch-impl batch)
|
(transact-with-backoff batch)
|
||||||
(count batch))))
|
(count batch))))
|
||||||
(s/buffer 50)
|
(s/buffer 50)
|
||||||
(s/realize-each)
|
(s/realize-each)
|
||||||
|
|||||||
Reference in New Issue
Block a user