Getting closer to datomic cloud

This commit is contained in:
2023-03-18 20:07:34 -07:00
parent bd658906b5
commit 78519663ac
17 changed files with 299 additions and 177 deletions

View File

@@ -5,47 +5,51 @@
;; You can also press C-u C-j to evaluate the expression and pretty-print its result.
(ns dump-edn
(:require [datomic.api :as d]
(:require #_`[datomic.api :as d]
[clojure.java.io :as io]
[amazonica.aws.s3 :as s3]
[config.core :refer [env]]
[clojure.core.async :as a]
[datomic.client.api :as dc]
[lambdaisland.edn-lines :as ednl]
[datomic.client.api.async :as dca]
[datomic.dev-local :as dl]
[clojure.set :as set]))
[clojure.set :as set]
[clojure.string :as str]
[clj-http.client :as client]))
(def remote-db (d/db (datomic.api/connect "datomic:ddb://us-east-1/integreat/integreat-prod")))
(def local-client (dc/client {:server-type :dev-local
:system "dev"}))
(dc/list-databases local-client {})
(def schema (let [everything (->> (d/q '[:find [(pull ?e [:db/ident
{:db/valueType [:db/ident]}
{:db/cardinality [:db/ident]}
:db.attr/preds
{:db/unique [:db/ident]}
:db/isComponent
:db/id
:db/noHistory
:db/tupleAttrs
:db.entity/attrs
:db.entity/preds
:db/doc]) ...]
:where [?e :db/ident]]
remote-db))
schema-attrs (->> everything
(filter :db/ident)
(filter (fn [{:db/keys [ident]}]
(if (namespace ident)
(re-matches #"^(?!cartographer)(?!db)(?!fressian).+" (namespace ident))
true
))))
meta-schema-schema (filter #(-> % :db/ident not) everything)]
schema-attrs))
(def get-schema []
(let [everything (->> (d/q '[:find [(pull ?e [:db/ident
{:db/valueType [:db/ident]}
{:db/cardinality [:db/ident]}
:db.attr/preds
{:db/unique [:db/ident]}
:db/isComponent
:db/id
:db/noHistory
:db/tupleAttrs
:db.entity/attrs
:db.entity/preds
:db/doc]) ...]
:where [?e :db/ident]]
remote-db))
schema-attrs (->> everything
(filter :db/ident)
(filter (fn [{:db/keys [ident]}]
(if (namespace ident)
(re-matches #"^(?!cartographer)(?!db)(?!fressian).+" (namespace ident))
true
))))
meta-schema-schema (filter #(-> % :db/ident not) everything)]
schema-attrs))
(def best-key-helper
(->> schema
@@ -364,7 +368,8 @@
"import-batch" #{"transaction"},
"payment" #{"vendor" "invoice" "bank-account" "client"},
"vendor-account-override" #{"account" "client"}})
(def order-of-insert
(defn order-of-insert [entity-dependencies]
(loop [entity-dependencies entity-dependencies
order []]
(let [next-order (for [[entity deps] entity-dependencies
@@ -502,7 +507,7 @@
:db/doc "A transaction account that was deleted"})))})
(doseq [entity (or item-list (filter (complement (conj @loaded "audit")) order-of-insert))
(doseq [entity (or item-list (filter (complement (conj @loaded "audit")) (order-of-insert entity-dependencies)))
:let [_ (swap! loaded conj entity)
_ (println "querying for " entity)
entities (d/q '[:find [?e ...]
@@ -561,6 +566,127 @@
(println)
(println "Done"))))
(def dumped (atom #{}))
(defn dump-schema [backup]
(spit (io/file (str backup "/schema.edn"))
(with-out-str (prn (map
(fn [s]
(set/rename-keys s {:db/id :entity/migration-key}))
schema))))
(spit (io/file (str backup "/full-dependencies.edn"))
(with-out-str (prn full-dependencies)))
(spit (io/file (str backup "/entity-dependencies.edn"))
(with-out-str (prn entity-dependencies))))
(defn dump-all
([] (dump-all nil))
([item-list]
(let [backup-id (str "backups/" (java.util.UUID/randomUUID))]
(.mkdir (io/file backup-id))
(dump-schema backup-id)
(doseq [entity (or item-list (filter (complement (conj @loaded "audit")) (order-of-insert entity-dependencies)))
:let [_ (swap! dumped conj entity)
_ (println "querying for " entity)
entities (d/q '[:find [?e ...]
:in $ [?a ...]
:where [?e ?a]]
remote-db
(cond-> (entity->best-key entity)
(not (vector? (entity->best-key entity))) vector))
entities->transaction (fn [entities]
(->> (d/pull-many remote-db
(->> schema
(filter :db/valueType)
(mapv :db/ident)
(filter #(= entity (namespace %)))
(into [:db/id]))
entities)
(mapv (fn [m ]
(reduce
(fn [m [k v]]
(cond
(= k :db/id)
(-> m
(assoc :entity/migration-key v)
(dissoc :db/id))
(full-dependencies k)
(if (vector? v)
(assoc m k (mapv (fn [r] [:entity/migration-key (:db/id r)]) v))
(assoc m k [:entity/migration-key (:db/id v)]))
:else
(dissoc m :payment/pdf-data
:payment/memo)))
m
m)))))
_ (println "Inserting " entity ": " (count entities))
_ (flush)]]
(ednl/with-append [append (str backup-id "/" entity ".ednl")]
(doseq [batch (partition-all 1000 entities)
:let [_ (do (print ".") (flush))]
item (entities->transaction batch)]
(try
(append item)
(catch Exception e
(println e)
(throw e)))))
(println "Done")))))
(defn load-from-backup
([backup-id connection] (load-from-backup backup-id connection nil))
([backup-id connection item-list]
(let [schema (clojure.edn/read-string (slurp (str backup-id "/schema.edn")))
full-dependencies (clojure.edn/read-string (slurp (str backup-id "/full-dependencies.edn")))
entity-dependencies (clojure.edn/read-string (slurp (str backup-id "/entity-dependencies.edn")))]
(dc/transact connection {:tx-data [{:db/ident :entity/migration-key
:db/unique :db.unique/identity
:db/cardinality :db.cardinality/one
:db/valueType :db.type/long}]})
(dc/transact connection {:tx-data (map
(fn [s]
(set/rename-keys s {:db/id :entity/migration-key}))
schema)})
(dc/transact connection {:tx-data [{:entity/migration-key 17592257603901 :vendor/name "unknown"}
{:entity/migration-key 17592232621701}
{:entity/migration-key 17592263907739}
{:entity/migration-key 17592271516922}]
})
(doseq [entity (or item-list (filter (complement (conj @loaded "audit")) (order-of-insert entity-dependencies)))
:let [_ (swap! loaded conj entity)
_ (println "querying for " entity)
entities (ednl/slurp (str backup-id "/" entity ".ednl"))
_ (println "Found some! here's a few: " (take 3 entities))
tx-chan (a/chan 50)
entities->transaction (fn [entities]
entities)
pipeline (tx-pipeline connection
10
tx-chan
entities->transaction)]]
(doseq [batch (partition-all 100 entities)]
(try
(a/>!! tx-chan batch)
(catch Exception e
(println e)
((:stop pipeline)))))
(println "waiting for done from" pipeline)
(flush)
(a/close! tx-chan)
(println (a/<!! (:result pipeline)))
((:stop pipeline))
(println)
(println "Done")))))
(defn reset-migrate []
(dc/delete-database local-client {:db-name "prod-migration"})
(dc/create-database local-client {:db-name "prod-migration"})
@@ -569,6 +695,19 @@
(comment
;; cloud load
(do
(let [client (dc/client {:server-type :cloud
:region "us-east-1"
:system "iol-cloud"
:endpoint "https://53syis8n1m.execute-api.us-east-1.amazonaws.com"})
;; _ (dc/create-database client {:db-name "prod-mirror"})
connection (dc/connect client {:db-name "prod-mirror"})]
(load-from-backup "backups/8e245d3d-be7a-4d90-8e9e-e6a110582658" connection)
)
)
(reset-migrate)
(migrate)