From 420bd126bd85383d3f8bea7b8925892d318b1a7b Mon Sep 17 00:00:00 2001 From: Bryce Covert Date: Wed, 29 Mar 2023 16:46:36 -0700 Subject: [PATCH] Adds the ability to launch a job that restores a database --- config/prod-cloud.edn | 2 +- src/clj/auto_ap/jobs/restore_from_backup.clj | 189 +++++++++--------- src/clj/auto_ap/ledger.clj | 193 +++++++++++++------ src/clj/auto_ap/server.clj | 4 + 4 files changed, 236 insertions(+), 152 deletions(-) diff --git a/config/prod-cloud.edn b/config/prod-cloud.edn index f3a34d09..a1bfb18c 100644 --- a/config/prod-cloud.edn +++ b/config/prod-cloud.edn @@ -1,5 +1,5 @@ {:scheme "https" - :db-name "prod-mirror" + :db-name "prod-mirror2" :client-config {:server-type :cloud :region "us-east-1" :system "iol-cloud" diff --git a/src/clj/auto_ap/jobs/restore_from_backup.clj b/src/clj/auto_ap/jobs/restore_from_backup.clj index 6a21febb..d2f4407c 100644 --- a/src/clj/auto_ap/jobs/restore_from_backup.clj +++ b/src/clj/auto_ap/jobs/restore_from_backup.clj @@ -1,16 +1,23 @@ (ns auto-ap.jobs.restore-from-backup - (:require [clojure.java.io :as io] - [amazonica.aws.s3 :as s3] - [config.core :refer [env]] - [clojure.core.async :as a] - [datomic.client.api :as dc] - [auto-ap.datomic :refer [client]] - [lambdaisland.edn-lines :as ednl] - [datomic.client.api.async :as dca] - [datomic.dev-local :as dl] - [clojure.set :as set] - [clojure.string :as str] - [clj-http.client :as client])) + (:require + [amazonica.aws.s3 :as s3] + [auto-ap.datomic] + [auto-ap.ledger] + [auto-ap.jobs.core :refer [execute]] + [clojure.edn :as edn] + [clojure.java.io :as io] + [clojure.set :as set] + [com.brunobonacci.mulog :as mu] + [config.core :refer [env]] + [datomic.client.api :as dc] + [lambdaisland.edn-lines :as ednl] + [manifold.deferred :as de] + [manifold.executor :as ex] + [manifold.stream :as s])) + +(def request-pool (ex/fixed-thread-executor 100)) + +(def buffered (ex/fixed-thread-executor 100)) (defn order-of-insert [entity-dependencies] (loop [entity-dependencies entity-dependencies @@ -27,7 +34,6 @@ entity-dependencies))) (apply dissoc entity-dependencies next-order) next-order)] - (println order next-deps) (if (seq next-deps) (recur next-deps (into order next-order)) (into order next-order))))) @@ -35,53 +41,14 @@ -(defn tx-pipeline - "Transacts data from from-ch. Returns a map with: - :result, a return channel getting {:error t} or {:completed n} - :stop, a fn you can use to terminate early." - [conn conc from-ch f] - (let [to-ch (a/chan 400) - done-ch (a/chan) - transact-data (fn [data result] - (a/go - (try - - (let [tx-r (a/! result tx-r) - (a/close! result)) - ; if exception in a transaction - ; will close channels and put error - ; on done channel. - (catch Throwable t - (.printStackTrace t) - (a/close! from-ch) - (a/close! to-ch) - (a/>! done-ch {:error t})))))] - - ; go block prints a '.' after every 1000 transactions, puts completed - ; report on done channel when no value left to be taken. - (a/go-loop [total 0] - (if (= (mod total 5) 0) - (do - (print ".") - (flush))) - (if-let [c (a/! done-ch {:completed total}))) - - ; pipeline that uses transducer form of map to transact data taken from - ; from-ch and puts results on to-ch - (a/pipeline-async conc to-ch transact-data from-ch) - - ; returns done channel and a function that you can use - ; for early termination. - {:result done-ch - :stop (fn [] (a/close! to-ch))})) - (def loaded (atom #{})) +(defn upsert-batch [batch] + (de/future-with request-pool + (do + (dc/transact auto-ap.datomic/conn {:tx-data batch}) + batch))) + (defn pull-file [backup which] @@ -90,14 +57,43 @@ :key (str "/datomic-backup/" backup "/" which)})) w)) "/tmp/tmp-edn") - + +(def so-far (atom 0)) +(def die? (atom false)) + +(defn load-entity [entity entities] + (mu/trace ::restore-entity + [:entity entity] + (mu/with-context {:entity entity} + (mu/log ::starting) + @(s/consume (fn [batch] + (mu/with-context {:entity entity} + (try + (swap! so-far #(+ % (count batch))) + (mu/log ::loaded :count (count batch) + :so-far @so-far) + (catch Exception e + (mu/log ::error + :exception e) + (throw e))))) + (->> (partition-all 1000 entities) + (s/->source) + (s/onto buffered) + (s/map (fn [entities] + (when @die? + (reset! die? false) + (throw (Exception. "dead"))) + (upsert-batch entities))) + (s/buffer 50) + (s/realize-each))) + (swap! loaded conj entity)))) (defn load-from-backup ([backup-id connection] (load-from-backup backup-id connection nil)) ([backup-id connection item-list] - (let [schema (clojure.edn/read-string (slurp (pull-file backup-id "schema.edn"))) - full-dependencies (clojure.edn/read-string (slurp (pull-file backup-id "full-dependencies.edn"))) - entity-dependencies (clojure.edn/read-string (slurp (pull-file backup-id "entity-dependencies.edn")))] + (let [schema (edn/read-string (slurp (pull-file backup-id "schema.edn"))) + full-dependencies (edn/read-string (slurp (pull-file backup-id "full-dependencies.edn"))) + entity-dependencies (edn/read-string (slurp (pull-file backup-id "entity-dependencies.edn")))] (dc/transact connection {:tx-data [{:db/ident :entity/migration-key :db/unique :db.unique/identity :db/cardinality :db.cardinality/one @@ -108,47 +104,54 @@ schema)}) ;; TEMP - this has been fixed in current export (ezcater-olaciotn) - (dc/transact auto-ap.datomic/conn {:tx-data [{:entity/migration-key 17592291325318}]}) (dc/transact connection {:tx-data [{:entity/migration-key 17592257603901 :vendor/name "unknown"} {:entity/migration-key 17592232621701} {:entity/migration-key 17592263907739} - {:entity/migration-key 17592271516922}] - - }) - + {:entity/migration-key 17592271516922}]}) (doseq [entity (or item-list (filter (complement (conj @loaded "audit")) (order-of-insert entity-dependencies))) - :let [_ (swap! loaded conj entity) - _ (println "querying for " entity) - entities (ednl/slurp (pull-file backup-id (str entity ".ednl"))) + :let [_ (reset! so-far 0) + _ (mu/log ::querying :entity entity) + entities (mu/trace ::pulling-file + [:file (str backup-id "/" entity ".ednl")] + (ednl/slurp (pull-file backup-id (str entity ".ednl"))))]] + (load-entity entity entities))))) - _ (println "Found some! here's a few: " (take 3 entities)) - tx-chan (a/chan 50) - entities->transaction (fn [entities] - entities) - pipeline (tx-pipeline connection - 10 - tx-chan - entities->transaction)]] - (doseq [batch (partition-all 200 entities)] - (try - (a/>!! tx-chan batch) - (catch Exception e - (println e) - ((:stop pipeline))))) +(defn restore-fresh-from-backup [{:keys [backup]}] + (mu/log ::beginning-restore) + (load-from-backup backup auto-ap.datomic/conn) + (mu/log ::restore-complete) + (mu/log ::beginning-index-build) + (auto-ap.datomic/transact-schema auto-ap.datomic/conn) + (auto-ap.ledger/reset-client+account+location+date) + (mu/log ::index-build-complete) + (mu/log ::refresh-running-balance-cache) + (auto-ap.ledger/rebuild-running-balance-cache) + (mu/log ::refresh-running-balance-cache-complete) + (mu/log ::done)) - (println "waiting for done from" pipeline) - (flush) - (a/close! tx-chan) - (println (a/ nil diff --git a/src/clj/auto_ap/ledger.clj b/src/clj/auto_ap/ledger.clj index 20e1c186..4ea60b2c 100644 --- a/src/clj/auto_ap/ledger.clj +++ b/src/clj/auto_ap/ledger.clj @@ -14,7 +14,9 @@ [com.brunobonacci.mulog :as mu] [com.unbounce.dogstatsd.core :as statsd] [datomic.client.api :as dc] - [iol-ion.tx :refer [upsert-ledger]])) + [iol-ion.tx :refer [upsert-ledger]] + [manifold.deferred :as de] + [manifold.stream :as s])) (defn datums->impacted-entity [db [e changes]] (let [entity (dc/pull db '[{:invoice/_expense-accounts [:db/id] :transaction/_accounts [:db/id]}] e) @@ -505,50 +507,65 @@ (defn reset-client+account+location+date ([] (reset-client+account+location+date (map first (dc/q '[:find ?c :where [?c :client/code]] (dc/db conn))))) ([clients] - (doseq [client clients - :let [_ (mu/log ::reseting-index-for :client client)] - batch - (->> (dc/qseq '[:find (pull ?je [:journal-entry/date :journal-entry/client {:journal-entry/line-items [:journal-entry-line/account :journal-entry-line/location :db/id]}]) - :in $ ?c - :where [?je :journal-entry/client ?c]] - (dc/db conn) - client - - ) - (map first) - (mapcat (fn [je] - (map (fn [jel] - {:db/id (:db/id jel) - :journal-entry-line/client+account+location+date - [(-> je :journal-entry/client :db/id) - (-> jel :journal-entry-line/account :db/id) + (doseq [[client i] (map vector clients (range))] + (mu/trace ::reset-index + [:client client] + (mu/with-context {:client client + :client-index i + :client-count (count clients)} + (mu/log ::reseting-index) + (let [so-far (atom 0)] + @(->> (dc/qseq '[:find (pull ?je [:journal-entry/date :journal-entry/client {:journal-entry/line-items [:journal-entry-line/account :journal-entry-line/location :db/id]}]) + :in $ ?c + :where [?je :journal-entry/client ?c]] + (dc/db conn) + client) + (map first) + (mapcat (fn [je] + (map (fn [jel] + {:db/id (:db/id jel) + :journal-entry-line/client+account+location+date + [(-> je :journal-entry/client :db/id) + (-> jel :journal-entry-line/account :db/id) - (-> jel :journal-entry-line/location) + (-> jel :journal-entry-line/location) - (-> je :journal-entry/date)]}) - (:journal-entry/line-items je)))) - (partition-all 500) - )] - (mu/log ::batch-completed) - (dc/transact conn {:tx-data batch})))) + (-> je :journal-entry/date)]}) + (:journal-entry/line-items je)))) + (partition-all 500) + (s/->source) + (s/map (fn [batch] + (de/future + (dc/transact conn {:tx-data batch}) + (count batch)))) + (s/buffer 50) + (s/realize-each) + (s/consume (fn [batch-count] + (swap! so-far #(+ % batch-count)) + (mu/log ::reset :count batch-count + :so-far @so-far + :client client + :client-index i + :client-count (count clients)))))) + (mu/log ::client-completed)))))) (defn find-mismatch-index [] (reduce + 0 - (for [c (map first (dc/q '[:find ?c :where [?c :client/code]] (dc/db conn))) - :let [_ (println "searching for" c) - a (->> (dc/index-pull (dc/db conn) - {:index :avet - :selector [:db/id :journal-entry-line/location :journal-entry-line/account :journal-entry-line/client+account+location+date {:journal-entry/_line-items [:journal-entry/date :journal-entry/client]}] - :start [:journal-entry-line/client+account+location+date [c]]}) - (take-while (fn [result] - (= c (first (:journal-entry-line/client+account+location+date result))) - )) - (filter (fn [{index :journal-entry-line/client+account+location+date :as result}] - (not= index - [(-> result :journal-entry/_line-items :journal-entry/client :db/id) - (-> result :journal-entry-line/account :db/id) - (-> result :journal-entry-line/location) - (-> result :journal-entry/_line-items :journal-entry/date)]))))]] + (for [c (map first (dc/q '[:find ?c :where [?c :client/code]] (dc/db conn))) + :let [_ (println "searching for" c) + a (->> (dc/index-pull (dc/db conn) + {:index :avet + :selector [:db/id :journal-entry-line/location :journal-entry-line/account :journal-entry-line/client+account+location+date {:journal-entry/_line-items [:journal-entry/date :journal-entry/client]}] + :start [:journal-entry-line/client+account+location+date [c]]}) + (take-while (fn [result] + (= c (first (:journal-entry-line/client+account+location+date result))) + )) + (filter (fn [{index :journal-entry-line/client+account+location+date :as result}] + (not= index + [(-> result :journal-entry/_line-items :journal-entry/client :db/id) + (-> result :journal-entry-line/account :db/id) + (-> result :journal-entry-line/location) + (-> result :journal-entry/_line-items :journal-entry/date)]))))]] (do (println (count a)) (count a))))) @@ -573,6 +590,24 @@ :starting-at starting-at :location location}))))) +(defn all-accounts-needing-rebuild [ db client] + (let [client (pull-id db client)] + (->> (dc/qseq '[:find ?c ?a ?l (min ?d) + :in $ ?c + :where + [?je :journal-entry/client ?c] + [?je :journal-entry/line-items ?jel] + [?jel :journal-entry-line/account ?a] + [?jel :journal-entry-line/location ?l] + [?je :journal-entry/date ?d]] + db + client) + (map (fn [[client account location starting-at ]] + {:client client + :account account + :starting-at starting-at + :location location}))))) + (defn find-running-balance-start [{:keys [client account location starting-at]} db ] (let [client (pull-id db client) account (pull-id db account)] @@ -612,10 +647,6 @@ (take-while identity) (mapcat identity) (take-while (fn [{[result-client result-account result-location] :journal-entry-line/client+account+location+date}] - #_(println - [[ client result-client] - [ account result-account] - [ location result-location]]) (and (= client result-client) (= account result-account) @@ -646,31 +677,77 @@ (:dirty-entries account-needing-refresh))))) +(defn refresh-running-balance-accounts [accounts-needing-rebuild clients i db] + (mu/log ::found-accounts-needing-rebuild + :accounts (count accounts-needing-rebuild)) + (let [so-far (atom 0)] + @(->> accounts-needing-rebuild + (s/->source) + (s/map (fn [account-needing-rebuild] + (de/future + (mu/with-context {:account account-needing-rebuild} + (-> account-needing-rebuild + (assoc :build-from (find-running-balance-start account-needing-rebuild db)) + (assoc :dirty-entries (get-dirty-entries account-needing-rebuild db)) + (assoc :account-type (:account_type ((build-account-lookup (:client account-needing-rebuild)) (:account account-needing-rebuild)))) + (compute-running-balance)))))) + (s/realize-each) + (s/mapcat (fn [x] + x)) + (s/buffer 50) + (s/transform (partition-all 500)) + (s/map (fn [batch] + (de/future + (dc/transact conn {:tx-data batch}) + (count batch)))) + (s/buffer 50) + (s/realize-each) + (s/consume (fn [batch-count] + (swap! so-far #(+ % batch-count)) + (mu/log ::reset + :count batch-count + :so-far @so-far + :client c + :client-index i + :client-count (count clients))))))) + (defn refresh-running-balance-cache ([] (refresh-running-balance-cache (shuffle (map first (dc/q '[:find (pull ?c [:client/code :db/id]) :where [?c :client/code]] (dc/db conn)))))) ([clients] - (doseq [c clients] + (doseq [[c i] (map vector clients (range))] (mu/trace ::building-running-balance [:client c] - (mu/with-context {:client c} + (mu/with-context {:client c + :client-index i + :client-count (count clients)} + (mu/log ::searching-for-accounts) (let [db (dc/db conn) accounts-needing-rebuild (accounts-needing-rebuild db (:db/id c))] (when (seq accounts-needing-rebuild) - (mu/log ::found-accounts-needing-rebuild - :accounts (count accounts-needing-rebuild)) - (audit-transact-batch - (->> accounts-needing-rebuild - (mapcat (fn [account-needing-rebuild] - (mu/with-context {:account account-needing-rebuild} - (-> account-needing-rebuild - (assoc :build-from (find-running-balance-start account-needing-rebuild db)) - (assoc :dirty-entries (get-dirty-entries account-needing-rebuild db)) - (assoc :account-type (:account_type ((build-account-lookup (:client account-needing-rebuild)) (:account account-needing-rebuild)))) - (compute-running-balance)))))) - {:user/name "running-balance-cache"})))))))) + (refresh-running-balance-accounts accounts-needing-rebuild clients i db) + (mu/log ::client-completed)))))))) + +(defn rebuild-running-balance-cache + ([] (rebuild-running-balance-cache (shuffle (map first + (dc/q '[:find (pull ?c [:client/code :db/id]) + :where [?c :client/code]] + (dc/db conn)))))) + ([clients] + (doseq [[c i] (map vector clients (range))] + (mu/trace ::building-running-balance + [:client c] + (mu/with-context {:client c + :client-index i + :client-count (count clients)} + (mu/log ::searching-for-accounts) + (let [db (dc/db conn) + accounts-needing-rebuild (all-accounts-needing-rebuild db (:db/id c))] + (when (seq accounts-needing-rebuild) + (refresh-running-balance-accounts accounts-needing-rebuild clients i db) + (mu/log ::client-completed)))))))) ;; TODO only enable once IOL is set up in clod diff --git a/src/clj/auto_ap/server.clj b/src/clj/auto_ap/server.clj index 0f6d74d6..1480f7a3 100644 --- a/src/clj/auto_ap/server.clj +++ b/src/clj/auto_ap/server.clj @@ -2,6 +2,7 @@ (:gen-class) (:require [auto-ap.handler :refer [app]] + [auto-ap.jobs.restore-from-backup :as job-restore-from-backup] [auto-ap.jobs.bulk-journal-import :as job-bulk-journal-import] [auto-ap.jobs.close-auto-invoices :as job-close-auto-invoices] [auto-ap.jobs.current-balance-cache :as job-current-balance-cache] @@ -136,6 +137,9 @@ (= job "bulk-journal-import") (job-bulk-journal-import/-main) + (= job "restore-from-backup") + (job-restore-from-backup/-main) + :else (do (add-shutdown-hook! shutdown-mount)