(ns restore-from-backup (:require [clojure.java.io :as io] [amazonica.aws.s3 :as s3] [config.core :refer [env]] [clojure.core.async :as a] [datomic.client.api :as dc] [lambdaisland.edn-lines :as ednl] [datomic.client.api.async :as dca] [datomic.dev-local :as dl] [clojure.set :as set] [clojure.string :as str] [clj-http.client :as client])) (def client auto-ap.datomic/client) (defn order-of-insert [entity-dependencies] (loop [entity-dependencies entity-dependencies order []] (let [next-order (for [[entity deps] entity-dependencies :when (not (seq deps))] entity) next-deps (reduce (fn [entity-dependencies next-entity] (into {} (map (fn [[k v]] [k (disj v next-entity)]) entity-dependencies))) (apply dissoc entity-dependencies next-order) next-order)] (println order next-deps) (if (seq next-deps) (recur next-deps (into order next-order)) (into order next-order))))) (defn tx-pipeline "Transacts data from from-ch. Returns a map with: :result, a return channel getting {:error t} or {:completed n} :stop, a fn you can use to terminate early." [conn conc from-ch f] (let [to-ch (a/chan 400) done-ch (a/chan) transact-data (fn [data result] (a/go (try (let [tx-r (a/! result tx-r) (a/close! result)) ; if exception in a transaction ; will close channels and put error ; on done channel. (catch Throwable t (.printStackTrace t) (a/close! from-ch) (a/close! to-ch) (a/>! done-ch {:error t})))))] ; go block prints a '.' after every 1000 transactions, puts completed ; report on done channel when no value left to be taken. (a/go-loop [total 0] (if (= (mod total 5) 0) (do (print ".") (flush))) (if-let [c (a/! done-ch {:completed total}))) ; pipeline that uses transducer form of map to transact data taken from ; from-ch and puts results on to-ch (a/pipeline-async conc to-ch transact-data from-ch) ; returns done channel and a function that you can use ; for early termination. {:result done-ch :stop (fn [] (a/close! to-ch))})) (def loaded (atom #{})) (defn load-from-backup ([backup-id connection] (load-from-backup backup-id connection nil)) ([backup-id connection item-list] (let [schema (clojure.edn/read-string (slurp (str backup-id "/schema.edn"))) full-dependencies (clojure.edn/read-string (slurp (str backup-id "/full-dependencies.edn"))) entity-dependencies (clojure.edn/read-string (slurp (str backup-id "/entity-dependencies.edn")))] (dc/transact connection {:tx-data [{:db/ident :entity/migration-key :db/unique :db.unique/identity :db/cardinality :db.cardinality/one :db/valueType :db.type/long}]}) (dc/transact connection {:tx-data (map (fn [s] (set/rename-keys s {:db/id :entity/migration-key})) schema)}) (dc/transact connection {:tx-data [{:entity/migration-key 17592257603901 :vendor/name "unknown"} {:entity/migration-key 17592232621701} {:entity/migration-key 17592263907739} {:entity/migration-key 17592271516922}] }) (doseq [entity (or item-list (filter (complement (conj @loaded "audit")) (order-of-insert entity-dependencies))) :let [_ (swap! loaded conj entity) _ (println "querying for " entity) entities (ednl/slurp (str backup-id "/" entity ".ednl")) _ (println "Found some! here's a few: " (take 3 entities)) tx-chan (a/chan 50) entities->transaction (fn [entities] entities) pipeline (tx-pipeline connection 10 tx-chan entities->transaction)]] (doseq [batch (partition-all 200 entities)] (try (a/>!! tx-chan batch) (catch Exception e (println e) ((:stop pipeline))))) (println "waiting for done from" pipeline) (flush) (a/close! tx-chan) (println (a/ nil