155 lines
5.7 KiB
Clojure
155 lines
5.7 KiB
Clojure
(ns restore-from-backup
|
|
(:require [clojure.java.io :as io]
|
|
[amazonica.aws.s3 :as s3]
|
|
[config.core :refer [env]]
|
|
[clojure.core.async :as a]
|
|
[datomic.client.api :as dc]
|
|
[lambdaisland.edn-lines :as ednl]
|
|
[datomic.client.api.async :as dca]
|
|
[datomic.dev-local :as dl]
|
|
[clojure.set :as set]
|
|
[clojure.string :as str]
|
|
[clj-http.client :as client]))
|
|
|
|
|
|
|
|
(def client auto-ap.datomic/client)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
(defn order-of-insert [entity-dependencies]
|
|
(loop [entity-dependencies entity-dependencies
|
|
order []]
|
|
(let [next-order (for [[entity deps] entity-dependencies
|
|
:when (not (seq deps))]
|
|
entity)
|
|
next-deps (reduce
|
|
(fn [entity-dependencies next-entity]
|
|
(into {}
|
|
(map
|
|
(fn [[k v]]
|
|
[k (disj v next-entity)])
|
|
entity-dependencies)))
|
|
(apply dissoc entity-dependencies next-order)
|
|
next-order)]
|
|
(println order next-deps)
|
|
(if (seq next-deps)
|
|
(recur next-deps (into order next-order))
|
|
(into order next-order)))))
|
|
|
|
|
|
|
|
|
|
(defn tx-pipeline
|
|
"Transacts data from from-ch. Returns a map with:
|
|
:result, a return channel getting {:error t} or {:completed n}
|
|
:stop, a fn you can use to terminate early."
|
|
[conn conc from-ch f]
|
|
(let [to-ch (a/chan 400)
|
|
done-ch (a/chan)
|
|
transact-data (fn [data result]
|
|
(a/go
|
|
(try
|
|
|
|
(let [tx-r (a/<! (dca/transact conn {:tx-data (f data)}))]
|
|
(when (:db/error tx-r)
|
|
(throw (ex-info "Invalid transaction" tx-r)))
|
|
(a/>! result tx-r)
|
|
(a/close! result))
|
|
; if exception in a transaction
|
|
; will close channels and put error
|
|
; on done channel.
|
|
(catch Throwable t
|
|
(.printStackTrace t)
|
|
(a/close! from-ch)
|
|
(a/close! to-ch)
|
|
(a/>! done-ch {:error t})))))]
|
|
|
|
; go block prints a '.' after every 1000 transactions, puts completed
|
|
; report on done channel when no value left to be taken.
|
|
(a/go-loop [total 0]
|
|
(if (= (mod total 5) 0)
|
|
(do
|
|
(print ".")
|
|
(flush)))
|
|
(if-let [c (a/<! to-ch)]
|
|
(recur (inc total))
|
|
(a/>! done-ch {:completed total})))
|
|
|
|
; pipeline that uses transducer form of map to transact data taken from
|
|
; from-ch and puts results on to-ch
|
|
(a/pipeline-async conc to-ch transact-data from-ch)
|
|
|
|
; returns done channel and a function that you can use
|
|
; for early termination.
|
|
{:result done-ch
|
|
:stop (fn [] (a/close! to-ch))}))
|
|
|
|
(def loaded (atom #{}))
|
|
|
|
|
|
|
|
(defn load-from-backup
|
|
([backup-id connection] (load-from-backup backup-id connection nil))
|
|
([backup-id connection item-list]
|
|
(let [schema (clojure.edn/read-string (slurp (str backup-id "/schema.edn")))
|
|
full-dependencies (clojure.edn/read-string (slurp (str backup-id "/full-dependencies.edn")))
|
|
entity-dependencies (clojure.edn/read-string (slurp (str backup-id "/entity-dependencies.edn")))]
|
|
(dc/transact connection {:tx-data [{:db/ident :entity/migration-key
|
|
:db/unique :db.unique/identity
|
|
:db/cardinality :db.cardinality/one
|
|
:db/valueType :db.type/long}]})
|
|
(dc/transact connection {:tx-data (map
|
|
(fn [s]
|
|
(set/rename-keys s {:db/id :entity/migration-key}))
|
|
schema)})
|
|
|
|
(dc/transact connection {:tx-data [{:entity/migration-key 17592257603901 :vendor/name "unknown"}
|
|
{:entity/migration-key 17592232621701}
|
|
{:entity/migration-key 17592263907739}
|
|
{:entity/migration-key 17592271516922}]
|
|
|
|
})
|
|
|
|
|
|
|
|
(doseq [entity (or item-list (filter (complement (conj @loaded "audit")) (order-of-insert entity-dependencies)))
|
|
:let [_ (swap! loaded conj entity)
|
|
_ (println "querying for " entity)
|
|
entities (ednl/slurp (str backup-id "/" entity ".ednl"))
|
|
|
|
_ (println "Found some! here's a few: " (take 3 entities))
|
|
tx-chan (a/chan 50)
|
|
entities->transaction (fn [entities]
|
|
entities)
|
|
pipeline (tx-pipeline connection
|
|
10
|
|
tx-chan
|
|
entities->transaction)]]
|
|
(doseq [batch (partition-all 200 entities)]
|
|
(try
|
|
(a/>!! tx-chan batch)
|
|
(catch Exception e
|
|
(println e)
|
|
((:stop pipeline)))))
|
|
|
|
(println "waiting for done from" pipeline)
|
|
(flush)
|
|
(a/close! tx-chan)
|
|
(println (a/<!! (:result pipeline)))
|
|
((:stop pipeline))
|
|
(println)
|
|
(println "Done")))))
|
|
|
|
;; cloud load
|
|
(comment
|
|
(load-from-backup "backups/8e245d3d-be7a-4d90-8e9e-e6a110582658" auto-ap.datomic/conn ["journal-entry-line" "journal-entry"]))
|
|
;; => nil
|