(ns auto-ap.jobs.restore-from-backup (:require [amazonica.aws.s3 :as s3] [auto-ap.datomic :refer [transact-with-backoff]] [auto-ap.ledger] [auto-ap.jobs.core :refer [execute]] [clojure.edn :as edn] [clojure.java.io :as io] [clojure.set :as set] [com.brunobonacci.mulog :as mu] [config.core :refer [env]] [datomic.api :as dc] [lambdaisland.edn-lines :as ednl] [manifold.deferred :as de] [manifold.executor :as ex] [manifold.stream :as s])) (def request-pool (ex/fixed-thread-executor 100)) (def buffered (ex/fixed-thread-executor 100)) (defn order-of-insert [entity-dependencies] (loop [entity-dependencies entity-dependencies order []] (let [next-order (for [[entity deps] entity-dependencies :when (not (seq deps))] entity) next-deps (reduce (fn [entity-dependencies next-entity] (into {} (map (fn [[k v]] [k (disj v next-entity)]) entity-dependencies))) (apply dissoc entity-dependencies next-order) next-order)] (if (seq next-deps) (recur next-deps (into order next-order)) (into order next-order))))) (def loaded (atom #{})) (defn upsert-batch [batch context] (de/future-with request-pool (mu/with-context context (transact-with-backoff batch)) batch)) (defn pull-file [backup which] (mu/log ::pulling-file :file (str "/datomic-backup/" backup "/" which)) (with-open [w (io/writer "/tmp/tmp-edn")] (io/copy (:input-stream (s3/get-object {:bucket-name "data.prod.app.integreatconsult.com" :key (str "/datomic-backup/" backup "/" which)})) w)) "/tmp/tmp-edn") (def so-far (atom 0)) (def die? (atom false)) (defn load-entity [entity entities] (mu/trace ::restore-entity [:entity entity] (mu/with-context {:entity entity} (mu/log ::starting) @(s/consume (fn [batch] (mu/with-context {:entity entity} (try (swap! so-far #(+ % (count batch))) (mu/log ::loaded :count (count batch) :so-far @so-far) (catch Exception e (mu/log ::error :exception e) (throw e))))) (->> (partition-all 1000 entities) (s/->source) (s/onto buffered) (s/map (fn [entities] (when @die? (reset! die? false) (throw (Exception. "dead"))) (upsert-batch entities {:entity entity :service "restore-from-backup" :background-job "restore-from-backup"}))) (s/buffer 20) (s/realize-each))) (swap! loaded conj entity)))) (defn load-from-backup ([backup-id connection] (load-from-backup backup-id connection nil)) ([backup-id connection starting-at] (let [schema (edn/read-string (slurp (pull-file backup-id "schema.edn"))) full-dependencies (edn/read-string (slurp (pull-file backup-id "full-dependencies.edn"))) entity-dependencies (edn/read-string (slurp (pull-file backup-id "entity-dependencies.edn")))] @(dc/transact connection [{:db/ident :entity/migration-key :db/unique :db.unique/identity :db/cardinality :db.cardinality/one :db/valueType :db.type/long}]) @(dc/transact connection (map (fn [s] (set/rename-keys s {:db/id :entity/migration-key})) schema)) ;; TEMP - this has been fixed in current export (ezcater-olaciotn) @(dc/transact connection [{:entity/migration-key 17592257603901 :vendor/name "unknown"} {:entity/migration-key 17592232621701} {:entity/migration-key 17592263907739} {:entity/migration-key 17592271516922}]) (doseq [entity (cond->> (order-of-insert entity-dependencies) true (filter #(not= "audit" %)) starting-at (drop-while #(not= starting-at %))) :let [_ (reset! so-far 0) _ (mu/log ::querying :entity entity) entities (mu/trace ::file-pulled [:file (str backup-id "/" entity ".ednl")] (ednl/slurp (pull-file backup-id (str entity ".ednl"))))]] (load-entity entity entities))))) (defn restore-fresh-from-backup [{:keys [backup starting-at]}] (println "beginning") (mu/log ::beginning-restore) (load-from-backup backup auto-ap.datomic/conn starting-at) (mu/log ::restore-complete) (mu/log ::beginning-index-build) (auto-ap.datomic/transact-schema auto-ap.datomic/conn) (mu/log ::index-build-complete) (mu/log ::refresh-running-balance-cache) #_(auto-ap.ledger/rebuild-running-balance-cache) (mu/log ::refresh-running-balance-cache-complete) (mu/log ::done)) (defn -main [& _] (try (println "restore") (execute "restore-from-backup" #(restore-fresh-from-backup (:args env))) (catch Exception e (println e) (mu/log ::quit-error :exception e :background-job "restore-from-backup" :service "restore-from-backup") (Thread/sleep 5000) (throw e)))) ;; cloud load #_(comment ;; /datomic-backup/079df203-eae0-4acf-94d5-8608ba8b8a9a (load-from-backup "079df203-eae0-4acf-94d5-8608ba8b8a9a" auto-ap.datomic/conn ["charge"]) (load-entity "charge" (ednl/slurp "/tmp/tmp-edn"))) ;; => nil