Dialing things down to avoid service unavailable.

This commit is contained in:
2023-03-29 20:45:30 -07:00
parent 97298dbeb4
commit 5a6a43f183
2 changed files with 7 additions and 7 deletions

View File

@@ -76,7 +76,7 @@
(mu/log ::error (mu/log ::error
:exception e) :exception e)
(throw e))))) (throw e)))))
(->> (partition-all 1000 entities) (->> (partition-all 200 entities)
(s/->source) (s/->source)
(s/onto buffered) (s/onto buffered)
(s/map (fn [entities] (s/map (fn [entities]
@@ -84,7 +84,7 @@
(reset! die? false) (reset! die? false)
(throw (Exception. "dead"))) (throw (Exception. "dead")))
(upsert-batch entities))) (upsert-batch entities)))
(s/buffer 50) (s/buffer 20)
(s/realize-each))) (s/realize-each)))
(swap! loaded conj entity)))) (swap! loaded conj entity))))

View File

@@ -532,13 +532,13 @@
(-> je :journal-entry/date)]}) (-> je :journal-entry/date)]})
(:journal-entry/line-items je)))) (:journal-entry/line-items je))))
(partition-all 500) (partition-all 200)
(s/->source) (s/->source)
(s/map (fn [batch] (s/map (fn [batch]
(de/future (de/future
(dc/transact conn {:tx-data batch}) (dc/transact conn {:tx-data batch})
(count batch)))) (count batch))))
(s/buffer 50) (s/buffer 20)
(s/realize-each) (s/realize-each)
(s/consume (fn [batch-count] (s/consume (fn [batch-count]
(swap! so-far #(+ % batch-count)) (swap! so-far #(+ % batch-count))
@@ -694,13 +694,13 @@
(s/realize-each) (s/realize-each)
(s/mapcat (fn [x] (s/mapcat (fn [x]
x)) x))
(s/buffer 50) (s/buffer 20)
(s/transform (partition-all 500)) (s/transform (partition-all 200))
(s/map (fn [batch] (s/map (fn [batch]
(de/future (de/future
(dc/transact conn {:tx-data batch}) (dc/transact conn {:tx-data batch})
(count batch)))) (count batch))))
(s/buffer 50) (s/buffer 20)
(s/realize-each) (s/realize-each)
(s/consume (fn [batch-count] (s/consume (fn [batch-count]
(swap! so-far #(+ % batch-count)) (swap! so-far #(+ % batch-count))