Adds the ability to launch a job that restores a database

This commit is contained in:
2023-03-29 16:46:36 -07:00
parent 1ec694a5d6
commit 420bd126bd
4 changed files with 236 additions and 152 deletions

View File

@@ -1,5 +1,5 @@
{:scheme "https" {:scheme "https"
:db-name "prod-mirror" :db-name "prod-mirror2"
:client-config {:server-type :cloud :client-config {:server-type :cloud
:region "us-east-1" :region "us-east-1"
:system "iol-cloud" :system "iol-cloud"

View File

@@ -1,16 +1,23 @@
(ns auto-ap.jobs.restore-from-backup (ns auto-ap.jobs.restore-from-backup
(:require [clojure.java.io :as io] (:require
[amazonica.aws.s3 :as s3] [amazonica.aws.s3 :as s3]
[config.core :refer [env]] [auto-ap.datomic]
[clojure.core.async :as a] [auto-ap.ledger]
[datomic.client.api :as dc] [auto-ap.jobs.core :refer [execute]]
[auto-ap.datomic :refer [client]] [clojure.edn :as edn]
[lambdaisland.edn-lines :as ednl] [clojure.java.io :as io]
[datomic.client.api.async :as dca] [clojure.set :as set]
[datomic.dev-local :as dl] [com.brunobonacci.mulog :as mu]
[clojure.set :as set] [config.core :refer [env]]
[clojure.string :as str] [datomic.client.api :as dc]
[clj-http.client :as client])) [lambdaisland.edn-lines :as ednl]
[manifold.deferred :as de]
[manifold.executor :as ex]
[manifold.stream :as s]))
(def request-pool (ex/fixed-thread-executor 100))
(def buffered (ex/fixed-thread-executor 100))
(defn order-of-insert [entity-dependencies] (defn order-of-insert [entity-dependencies]
(loop [entity-dependencies entity-dependencies (loop [entity-dependencies entity-dependencies
@@ -27,7 +34,6 @@
entity-dependencies))) entity-dependencies)))
(apply dissoc entity-dependencies next-order) (apply dissoc entity-dependencies next-order)
next-order)] next-order)]
(println order next-deps)
(if (seq next-deps) (if (seq next-deps)
(recur next-deps (into order next-order)) (recur next-deps (into order next-order))
(into order next-order))))) (into order next-order)))))
@@ -35,53 +41,14 @@
(defn tx-pipeline
"Transacts data from from-ch. Returns a map with:
:result, a return channel getting {:error t} or {:completed n}
:stop, a fn you can use to terminate early."
[conn conc from-ch f]
(let [to-ch (a/chan 400)
done-ch (a/chan)
transact-data (fn [data result]
(a/go
(try
(let [tx-r (a/<! (dca/transact conn {:tx-data (f data)}))]
(when (:db/error tx-r)
(throw (ex-info "Invalid transaction" tx-r)))
(a/>! result tx-r)
(a/close! result))
; if exception in a transaction
; will close channels and put error
; on done channel.
(catch Throwable t
(.printStackTrace t)
(a/close! from-ch)
(a/close! to-ch)
(a/>! done-ch {:error t})))))]
; go block prints a '.' after every 1000 transactions, puts completed
; report on done channel when no value left to be taken.
(a/go-loop [total 0]
(if (= (mod total 5) 0)
(do
(print ".")
(flush)))
(if-let [c (a/<! to-ch)]
(recur (inc total))
(a/>! done-ch {:completed total})))
; pipeline that uses transducer form of map to transact data taken from
; from-ch and puts results on to-ch
(a/pipeline-async conc to-ch transact-data from-ch)
; returns done channel and a function that you can use
; for early termination.
{:result done-ch
:stop (fn [] (a/close! to-ch))}))
(def loaded (atom #{})) (def loaded (atom #{}))
(defn upsert-batch [batch]
(de/future-with request-pool
(do
(dc/transact auto-ap.datomic/conn {:tx-data batch})
batch)))
(defn pull-file [backup which] (defn pull-file [backup which]
@@ -91,13 +58,42 @@
w)) w))
"/tmp/tmp-edn") "/tmp/tmp-edn")
(def so-far (atom 0))
(def die? (atom false))
(defn load-entity [entity entities]
(mu/trace ::restore-entity
[:entity entity]
(mu/with-context {:entity entity}
(mu/log ::starting)
@(s/consume (fn [batch]
(mu/with-context {:entity entity}
(try
(swap! so-far #(+ % (count batch)))
(mu/log ::loaded :count (count batch)
:so-far @so-far)
(catch Exception e
(mu/log ::error
:exception e)
(throw e)))))
(->> (partition-all 1000 entities)
(s/->source)
(s/onto buffered)
(s/map (fn [entities]
(when @die?
(reset! die? false)
(throw (Exception. "dead")))
(upsert-batch entities)))
(s/buffer 50)
(s/realize-each)))
(swap! loaded conj entity))))
(defn load-from-backup (defn load-from-backup
([backup-id connection] (load-from-backup backup-id connection nil)) ([backup-id connection] (load-from-backup backup-id connection nil))
([backup-id connection item-list] ([backup-id connection item-list]
(let [schema (clojure.edn/read-string (slurp (pull-file backup-id "schema.edn"))) (let [schema (edn/read-string (slurp (pull-file backup-id "schema.edn")))
full-dependencies (clojure.edn/read-string (slurp (pull-file backup-id "full-dependencies.edn"))) full-dependencies (edn/read-string (slurp (pull-file backup-id "full-dependencies.edn")))
entity-dependencies (clojure.edn/read-string (slurp (pull-file backup-id "entity-dependencies.edn")))] entity-dependencies (edn/read-string (slurp (pull-file backup-id "entity-dependencies.edn")))]
(dc/transact connection {:tx-data [{:db/ident :entity/migration-key (dc/transact connection {:tx-data [{:db/ident :entity/migration-key
:db/unique :db.unique/identity :db/unique :db.unique/identity
:db/cardinality :db.cardinality/one :db/cardinality :db.cardinality/one
@@ -108,47 +104,54 @@
schema)}) schema)})
;; TEMP - this has been fixed in current export (ezcater-olaciotn) ;; TEMP - this has been fixed in current export (ezcater-olaciotn)
(dc/transact auto-ap.datomic/conn {:tx-data [{:entity/migration-key 17592291325318}]})
(dc/transact connection {:tx-data [{:entity/migration-key 17592257603901 :vendor/name "unknown"} (dc/transact connection {:tx-data [{:entity/migration-key 17592257603901 :vendor/name "unknown"}
{:entity/migration-key 17592232621701} {:entity/migration-key 17592232621701}
{:entity/migration-key 17592263907739} {:entity/migration-key 17592263907739}
{:entity/migration-key 17592271516922}] {:entity/migration-key 17592271516922}]})
})
(doseq [entity (or item-list (filter (complement (conj @loaded "audit")) (order-of-insert entity-dependencies))) (doseq [entity (or item-list (filter (complement (conj @loaded "audit")) (order-of-insert entity-dependencies)))
:let [_ (swap! loaded conj entity) :let [_ (reset! so-far 0)
_ (println "querying for " entity) _ (mu/log ::querying :entity entity)
entities (ednl/slurp (pull-file backup-id (str entity ".ednl"))) entities (mu/trace ::pulling-file
[:file (str backup-id "/" entity ".ednl")]
(ednl/slurp (pull-file backup-id (str entity ".ednl"))))]]
(load-entity entity entities)))))
_ (println "Found some! here's a few: " (take 3 entities)) (defn restore-fresh-from-backup [{:keys [backup]}]
tx-chan (a/chan 50) (mu/log ::beginning-restore)
entities->transaction (fn [entities] (load-from-backup backup auto-ap.datomic/conn)
entities) (mu/log ::restore-complete)
pipeline (tx-pipeline connection (mu/log ::beginning-index-build)
10 (auto-ap.datomic/transact-schema auto-ap.datomic/conn)
tx-chan (auto-ap.ledger/reset-client+account+location+date)
entities->transaction)]] (mu/log ::index-build-complete)
(doseq [batch (partition-all 200 entities)] (mu/log ::refresh-running-balance-cache)
(try (auto-ap.ledger/rebuild-running-balance-cache)
(a/>!! tx-chan batch) (mu/log ::refresh-running-balance-cache-complete)
(catch Exception e (mu/log ::done))
(println e)
((:stop pipeline)))))
(println "waiting for done from" pipeline)
(flush) (defn -main [& _]
(a/close! tx-chan) (try
(println (a/<!! (:result pipeline))) (execute "restore-from-backup" #(restore-fresh-from-backup (:args env)))
((:stop pipeline)) (catch Exception e
(println) (println e)
(println "Done"))))) (mu/log ::quit-error
:exception e
:background-job "restore-from-backup"
:service "restore-from-backup")
(Thread/sleep 5000)
(throw e))))
;; cloud load ;; cloud load
(comment #_(comment
(load-from-backup "a1975512-9091-49d1-a348-ee445363ba34" auto-ap.datomic/conn )
;; /datomic-backup/079df203-eae0-4acf-94d5-8608ba8b8a9a
(load-from-backup "079df203-eae0-4acf-94d5-8608ba8b8a9a" auto-ap.datomic/conn ["charge"])
(load-entity "charge" (ednl/slurp "/tmp/tmp-edn"))
) )
;; => nil ;; => nil

View File

@@ -14,7 +14,9 @@
[com.brunobonacci.mulog :as mu] [com.brunobonacci.mulog :as mu]
[com.unbounce.dogstatsd.core :as statsd] [com.unbounce.dogstatsd.core :as statsd]
[datomic.client.api :as dc] [datomic.client.api :as dc]
[iol-ion.tx :refer [upsert-ledger]])) [iol-ion.tx :refer [upsert-ledger]]
[manifold.deferred :as de]
[manifold.stream :as s]))
(defn datums->impacted-entity [db [e changes]] (defn datums->impacted-entity [db [e changes]]
(let [entity (dc/pull db '[{:invoice/_expense-accounts [:db/id] :transaction/_accounts [:db/id]}] e) (let [entity (dc/pull db '[{:invoice/_expense-accounts [:db/id] :transaction/_accounts [:db/id]}] e)
@@ -505,50 +507,65 @@
(defn reset-client+account+location+date (defn reset-client+account+location+date
([] (reset-client+account+location+date (map first (dc/q '[:find ?c :where [?c :client/code]] (dc/db conn))))) ([] (reset-client+account+location+date (map first (dc/q '[:find ?c :where [?c :client/code]] (dc/db conn)))))
([clients] ([clients]
(doseq [client clients (doseq [[client i] (map vector clients (range))]
:let [_ (mu/log ::reseting-index-for :client client)] (mu/trace ::reset-index
batch [:client client]
(->> (dc/qseq '[:find (pull ?je [:journal-entry/date :journal-entry/client {:journal-entry/line-items [:journal-entry-line/account :journal-entry-line/location :db/id]}]) (mu/with-context {:client client
:in $ ?c :client-index i
:where [?je :journal-entry/client ?c]] :client-count (count clients)}
(dc/db conn) (mu/log ::reseting-index)
client (let [so-far (atom 0)]
@(->> (dc/qseq '[:find (pull ?je [:journal-entry/date :journal-entry/client {:journal-entry/line-items [:journal-entry-line/account :journal-entry-line/location :db/id]}])
:in $ ?c
:where [?je :journal-entry/client ?c]]
(dc/db conn)
client)
(map first)
(mapcat (fn [je]
(map (fn [jel]
{:db/id (:db/id jel)
:journal-entry-line/client+account+location+date
[(-> je :journal-entry/client :db/id)
(-> jel :journal-entry-line/account :db/id)
) (-> jel :journal-entry-line/location)
(map first)
(mapcat (fn [je]
(map (fn [jel]
{:db/id (:db/id jel)
:journal-entry-line/client+account+location+date
[(-> je :journal-entry/client :db/id)
(-> jel :journal-entry-line/account :db/id)
(-> jel :journal-entry-line/location) (-> je :journal-entry/date)]})
(:journal-entry/line-items je))))
(-> je :journal-entry/date)]}) (partition-all 500)
(:journal-entry/line-items je)))) (s/->source)
(partition-all 500) (s/map (fn [batch]
)] (de/future
(mu/log ::batch-completed) (dc/transact conn {:tx-data batch})
(dc/transact conn {:tx-data batch})))) (count batch))))
(s/buffer 50)
(s/realize-each)
(s/consume (fn [batch-count]
(swap! so-far #(+ % batch-count))
(mu/log ::reset :count batch-count
:so-far @so-far
:client client
:client-index i
:client-count (count clients))))))
(mu/log ::client-completed))))))
(defn find-mismatch-index [] (defn find-mismatch-index []
(reduce + 0 (reduce + 0
(for [c (map first (dc/q '[:find ?c :where [?c :client/code]] (dc/db conn))) (for [c (map first (dc/q '[:find ?c :where [?c :client/code]] (dc/db conn)))
:let [_ (println "searching for" c) :let [_ (println "searching for" c)
a (->> (dc/index-pull (dc/db conn) a (->> (dc/index-pull (dc/db conn)
{:index :avet {:index :avet
:selector [:db/id :journal-entry-line/location :journal-entry-line/account :journal-entry-line/client+account+location+date {:journal-entry/_line-items [:journal-entry/date :journal-entry/client]}] :selector [:db/id :journal-entry-line/location :journal-entry-line/account :journal-entry-line/client+account+location+date {:journal-entry/_line-items [:journal-entry/date :journal-entry/client]}]
:start [:journal-entry-line/client+account+location+date [c]]}) :start [:journal-entry-line/client+account+location+date [c]]})
(take-while (fn [result] (take-while (fn [result]
(= c (first (:journal-entry-line/client+account+location+date result))) (= c (first (:journal-entry-line/client+account+location+date result)))
)) ))
(filter (fn [{index :journal-entry-line/client+account+location+date :as result}] (filter (fn [{index :journal-entry-line/client+account+location+date :as result}]
(not= index (not= index
[(-> result :journal-entry/_line-items :journal-entry/client :db/id) [(-> result :journal-entry/_line-items :journal-entry/client :db/id)
(-> result :journal-entry-line/account :db/id) (-> result :journal-entry-line/account :db/id)
(-> result :journal-entry-line/location) (-> result :journal-entry-line/location)
(-> result :journal-entry/_line-items :journal-entry/date)]))))]] (-> result :journal-entry/_line-items :journal-entry/date)]))))]]
(do (println (count a)) (do (println (count a))
(count a))))) (count a)))))
@@ -573,6 +590,24 @@
:starting-at starting-at :starting-at starting-at
:location location}))))) :location location})))))
(defn all-accounts-needing-rebuild [ db client]
(let [client (pull-id db client)]
(->> (dc/qseq '[:find ?c ?a ?l (min ?d)
:in $ ?c
:where
[?je :journal-entry/client ?c]
[?je :journal-entry/line-items ?jel]
[?jel :journal-entry-line/account ?a]
[?jel :journal-entry-line/location ?l]
[?je :journal-entry/date ?d]]
db
client)
(map (fn [[client account location starting-at ]]
{:client client
:account account
:starting-at starting-at
:location location})))))
(defn find-running-balance-start [{:keys [client account location starting-at]} db ] (defn find-running-balance-start [{:keys [client account location starting-at]} db ]
(let [client (pull-id db client) (let [client (pull-id db client)
account (pull-id db account)] account (pull-id db account)]
@@ -612,10 +647,6 @@
(take-while identity) (take-while identity)
(mapcat identity) (mapcat identity)
(take-while (fn [{[result-client result-account result-location] :journal-entry-line/client+account+location+date}] (take-while (fn [{[result-client result-account result-location] :journal-entry-line/client+account+location+date}]
#_(println
[[ client result-client]
[ account result-account]
[ location result-location]])
(and (and
(= client result-client) (= client result-client)
(= account result-account) (= account result-account)
@@ -646,31 +677,77 @@
(:dirty-entries account-needing-refresh))))) (:dirty-entries account-needing-refresh)))))
(defn refresh-running-balance-accounts [accounts-needing-rebuild clients i db]
(mu/log ::found-accounts-needing-rebuild
:accounts (count accounts-needing-rebuild))
(let [so-far (atom 0)]
@(->> accounts-needing-rebuild
(s/->source)
(s/map (fn [account-needing-rebuild]
(de/future
(mu/with-context {:account account-needing-rebuild}
(-> account-needing-rebuild
(assoc :build-from (find-running-balance-start account-needing-rebuild db))
(assoc :dirty-entries (get-dirty-entries account-needing-rebuild db))
(assoc :account-type (:account_type ((build-account-lookup (:client account-needing-rebuild)) (:account account-needing-rebuild))))
(compute-running-balance))))))
(s/realize-each)
(s/mapcat (fn [x]
x))
(s/buffer 50)
(s/transform (partition-all 500))
(s/map (fn [batch]
(de/future
(dc/transact conn {:tx-data batch})
(count batch))))
(s/buffer 50)
(s/realize-each)
(s/consume (fn [batch-count]
(swap! so-far #(+ % batch-count))
(mu/log ::reset
:count batch-count
:so-far @so-far
:client c
:client-index i
:client-count (count clients)))))))
(defn refresh-running-balance-cache (defn refresh-running-balance-cache
([] (refresh-running-balance-cache (shuffle (map first ([] (refresh-running-balance-cache (shuffle (map first
(dc/q '[:find (pull ?c [:client/code :db/id]) (dc/q '[:find (pull ?c [:client/code :db/id])
:where [?c :client/code]] :where [?c :client/code]]
(dc/db conn)))))) (dc/db conn))))))
([clients] ([clients]
(doseq [c clients] (doseq [[c i] (map vector clients (range))]
(mu/trace ::building-running-balance (mu/trace ::building-running-balance
[:client c] [:client c]
(mu/with-context {:client c} (mu/with-context {:client c
:client-index i
:client-count (count clients)}
(mu/log ::searching-for-accounts)
(let [db (dc/db conn) (let [db (dc/db conn)
accounts-needing-rebuild (accounts-needing-rebuild db (:db/id c))] accounts-needing-rebuild (accounts-needing-rebuild db (:db/id c))]
(when (seq accounts-needing-rebuild) (when (seq accounts-needing-rebuild)
(mu/log ::found-accounts-needing-rebuild (refresh-running-balance-accounts accounts-needing-rebuild clients i db)
:accounts (count accounts-needing-rebuild)) (mu/log ::client-completed))))))))
(audit-transact-batch
(->> accounts-needing-rebuild (defn rebuild-running-balance-cache
(mapcat (fn [account-needing-rebuild] ([] (rebuild-running-balance-cache (shuffle (map first
(mu/with-context {:account account-needing-rebuild} (dc/q '[:find (pull ?c [:client/code :db/id])
(-> account-needing-rebuild :where [?c :client/code]]
(assoc :build-from (find-running-balance-start account-needing-rebuild db)) (dc/db conn))))))
(assoc :dirty-entries (get-dirty-entries account-needing-rebuild db)) ([clients]
(assoc :account-type (:account_type ((build-account-lookup (:client account-needing-rebuild)) (:account account-needing-rebuild)))) (doseq [[c i] (map vector clients (range))]
(compute-running-balance)))))) (mu/trace ::building-running-balance
{:user/name "running-balance-cache"})))))))) [:client c]
(mu/with-context {:client c
:client-index i
:client-count (count clients)}
(mu/log ::searching-for-accounts)
(let [db (dc/db conn)
accounts-needing-rebuild (all-accounts-needing-rebuild db (:db/id c))]
(when (seq accounts-needing-rebuild)
(refresh-running-balance-accounts accounts-needing-rebuild clients i db)
(mu/log ::client-completed))))))))
;; TODO only enable once IOL is set up in clod ;; TODO only enable once IOL is set up in clod

View File

@@ -2,6 +2,7 @@
(:gen-class) (:gen-class)
(:require (:require
[auto-ap.handler :refer [app]] [auto-ap.handler :refer [app]]
[auto-ap.jobs.restore-from-backup :as job-restore-from-backup]
[auto-ap.jobs.bulk-journal-import :as job-bulk-journal-import] [auto-ap.jobs.bulk-journal-import :as job-bulk-journal-import]
[auto-ap.jobs.close-auto-invoices :as job-close-auto-invoices] [auto-ap.jobs.close-auto-invoices :as job-close-auto-invoices]
[auto-ap.jobs.current-balance-cache :as job-current-balance-cache] [auto-ap.jobs.current-balance-cache :as job-current-balance-cache]
@@ -136,6 +137,9 @@
(= job "bulk-journal-import") (= job "bulk-journal-import")
(job-bulk-journal-import/-main) (job-bulk-journal-import/-main)
(= job "restore-from-backup")
(job-restore-from-backup/-main)
:else :else
(do (do
(add-shutdown-hook! shutdown-mount) (add-shutdown-hook! shutdown-mount)