;; This buffer is for Clojure experiments and evaluation. ;; Press C-j to evaluate the last expression. ;; You can also press C-u C-j to evaluate the expression and pretty-print its result. (ns dump-edn (:require #_`[datomic.api :as d] [clojure.java.io :as io] [amazonica.aws.s3 :as s3] [config.core :refer [env]] [clojure.core.async :as a] [datomic.client.api :as dc] [lambdaisland.edn-lines :as ednl] [datomic.client.api.async :as dca] [datomic.dev-local :as dl] [clojure.set :as set] [clojure.string :as str] [clj-http.client :as client])) (def remote-db (d/db (datomic.api/connect "datomic:ddb://us-east-1/integreat/integreat-prod"))) (def local-client (dc/client {:server-type :dev-local :system "dev"})) (def get-schema [] (let [everything (->> (d/q '[:find [(pull ?e [:db/ident {:db/valueType [:db/ident]} {:db/cardinality [:db/ident]} :db.attr/preds {:db/unique [:db/ident]} :db/isComponent :db/id :db/noHistory :db/tupleAttrs :db.entity/attrs :db.entity/preds :db/doc]) ...] :where [?e :db/ident]] remote-db)) schema-attrs (->> everything (filter :db/ident) (filter (fn [{:db/keys [ident]}] (if (namespace ident) (re-matches #"^(?!cartographer)(?!db)(?!fressian).+" (namespace ident)) true )))) meta-schema-schema (filter #(-> % :db/ident not) everything)] schema-attrs)) (def best-key-helper (->> schema (filter :db/valueType) (map :db/ident) (group-by namespace) #_(map (fn [[k v]] [k ]))) ) (def entity->best-key {"transaction-rule" [:transaction-rule/description, :transaction-rule/note :transaction-rule/vendor] "square-location" :square-location/square-id, "expected-deposit" :expected-deposit/date, "journal-entry-line" [:journal-entry-line/account, :journal-entry-line/debit :journal-entry-line/credit] "vendor" [:vendor/name,] "transaction" :transaction/amount, "yodlee-provider-account" :yodlee-provider-account/id, "journal-entry" :journal-entry/source, "yodlee-merchant" :yodlee-merchant/yodlee-id, "invoice" :invoice/invoice-number, "vendor-terms-override" :vendor-terms-override/client, "integration-status" :integration-status/state, "conformity" :conformity/conformed-norms-index, "user" :user/provider-id, "sales-refund" :sales-refund/total, "plaid-account" :plaid-account/name, "charge" [:charge/total, :charge/external-id] "location-match" :location-match/location, "vendor-schedule-payment-dom" :vendor-schedule-payment-dom/dom, "account-client-override" :account-client-override/client, "plaid-item" :plaid-item/client, "transaction-account" :transaction-account/account, "address" [:address/street1, :address/city :address/state :address/zip] "order-line-item" :order-line-item/total, "ezcater-location" :ezcater-location/location, "account" [:account/numeric-code, :account/code :account/name :account/type] "intuit-bank-account" :intuit-bank-account/name, "saved-query" :saved-query/guid, "ezcater-caterer" :ezcater-caterer/uuid, "forecasted-transaction" :forecasted-transaction/day-of-month, "audit" :audit/user, "yodlee-account" :yodlee-account/id, "transaction-rule-account" [:transaction-rule-account/account, :transaction-rule-account/location] "ezcater-integration" :ezcater-integration/subscriber-uuid, "report" :report/created, "bank-account" :bank-account/code, "vendor-usage" :vendor-usage/key, "invoice-expense-account" [:invoice-expense-account/expense-account-id, :invoice-expense-account/account :invoice-expense-account/location :invoice-expense-account/amount] "sales-order" :sales-order/date, "client" :client/code, "email-contact" :email-contact/email, "invoice-payment" :invoice-payment/amount, "contact" [:contact/name, :contact/phone :contact/email] "import-batch" :import-batch/date, "payment" [:payment/date, :payment/bank-account] "vendor-account-override" :vendor-account-override/client}) (def references (filter (comp #{:db.type/ref} :db/ident :db/valueType) schema )) (def reference->entity (->> (d/q '[:find ?a ?v3 :in $ $$ [?a ...] :where [$$ _ ?a ?e] [$ ?e ?v _ _] [$ ?v :db/ident ?v2 _ _] [(namespace ?v2) ?v3] [(namespace ?v2) ?v3]] remote-db (d/since remote-db #inst "2022-06-01") (map :db/ident references) ) (group-by first) (map (fn [[k v]] [k (disj (set (map second v)) "db")])) (into {}))) (def entities-that-need-manual (set (map namespace (filter (complement reference->entity) (map :db/ident references))))) (def manual-dependencies {:client/location-matches #{"location-match"} :transaction/yodlee-merchant #{"yodlee-merchant"} :vendor-account-override/account #{"account"} :vendor-account-override/client #{"client"} :vendor/secondary-contact #{"contact"} :vendor/account-overrides #{"vendor-account-override"} :client/bank-accounts #{"bank-account"} :transaction-rule/yodlee-merchant #{"yodlee-merchant"} :client/forecasted-transactions #{"forecasted-transaction"} :transaction/forecast-match #{"forecasted-transaction"} :vendor/automatically-paid-when-due #{"client"} :vendor/schedule-payment-dom #{"vendor-schedule-payment-dom"} :vendor/terms-overrides #{"vendor-terms-override"} :vendor-schedule-payment-dom/client #{"client"}}) (def full-dependencies (update (merge-with into reference->entity manual-dependencies) :journal-entry/original-entity #(disj % "journal-entry"))) (def entity-dependencies (let [base-dependencies (into {} (map (fn [i] [i #{}]) (set (map (comp namespace :db/ident) (filter :db/valueType schema)))) ) ] (into base-dependencies (reduce (fn [acc [ref deps]] (update acc (namespace ref) (fnil #(into % deps) #{}))) {} full-dependencies)))) (def full-dependencies {:invoice/client #{"client"}, :sales-order/client #{"client"}, :transaction-rule/transaction-approval-status #{}, :transaction/forecast-match #{"forecasted-transaction"}, :user/role #{}, :vendor-schedule-payment-dom/client #{"client"}, :invoice-payment/payment #{"payment"}, :transaction-rule/client #{"client"}, :invoice/status #{}, :payment/type #{}, :expected-deposit/client #{"client"}, :transaction/bank-account #{"bank-account"}, :transaction-rule-account/account #{"account"}, :import-batch/status #{}, :user/clients #{"client"}, :payment/client #{"client"}, :expected-deposit/charges #{"charge"}, :vendor/automatically-paid-when-due #{"client"}, :payment/invoices #{"invoice"}, :client/forecasted-transactions #{"forecasted-transaction"}, :transaction/matched-rule #{"transaction-rule"}, :invoice/import-status #{}, :charge/processor #{}, :expected-deposit/vendor #{"vendor"}, :client/square-locations #{"square-location"}, :payment/status #{}, :client/location-matches #{"location-match"}, :saved-query/client #{"client"}, :transaction/payment #{"payment"}, :transaction-rule/vendor #{"vendor"}, :plaid-item/client #{"client"}, :account/applicability #{}, :journal-entry-line/account #{"account" "bank-account"}, :client/bank-accounts #{"bank-account"}, :yodlee-provider-account/client #{"client"}, :account/vendor-allowance #{}, :payment/bank-account #{"bank-account"}, :account/default-allowance #{}, :transaction-rule/yodlee-merchant #{"yodlee-merchant"}, :vendor/account-overrides #{"vendor-account-override"}, :transaction/client #{"client"}, :invoice/vendor #{"vendor"}, :sales-order/vendor #{"vendor"}, :expected-deposit/status #{}, :journal-entry/original-entity #{"transaction" "invoice"}, :vendor-usage/client #{"client"}, :transaction/expected-deposit #{"expected-deposit"}, :client/ezcater-locations #{"ezcater-location"}, :journal-entry/client #{"client"}, :vendor/secondary-contact #{"contact"}, :journal-entry/line-items #{"journal-entry-line"}, :vendor/legal-entity-1099-type #{}, :transaction-rule/bank-account #{"bank-account"}, :transaction-account/account #{"account"}, :vendor/terms-overrides #{"vendor-terms-override"}, :vendor/default-account #{"account"}, :transaction/yodlee-merchant #{"yodlee-merchant"}, :sales-refund/client #{"client"}, :client/emails #{"email-contact"}, :payment/vendor #{"vendor"}, :invoice-payment/invoice #{"invoice"}, :report/client #{"client"}, :transaction-rule/accounts #{"transaction-rule-account"}, :charge/client #{"client"}, :bank-account/type #{}, :invoice-expense-account/account #{"account"}, :vendor/legal-entity-tin-type #{}, :transaction/approval-status #{}, :import-batch/entry #{"transaction"}, :bank-account/intuit-bank-account #{"intuit-bank-account"}, :account/type #{}, :sales-refund/vendor #{"vendor"}, :bank-account/yodlee-account #{"yodlee-account"}, :vendor/address #{"address"}, :integration-status/state #{}, :transaction/accounts #{"transaction-account"}, :sales-order/charges #{"charge"}, :client/address #{"address"}, :ezcater-location/caterer #{"ezcater-caterer"}, :vendor-account-override/client #{"client"}, :bank-account/integration-status #{"integration-status"}, :yodlee-provider-account/accounts #{"yodlee-account"}, :account/invoice-allowance #{}, :journal-entry/vendor #{"vendor"}, :plaid-item/accounts #{"plaid-account"}, :vendor-usage/vendor #{"vendor"}, :sales-order/line-items #{"order-line-item"}, :invoice/expense-accounts #{"invoice-expense-account"}, :account-client-override/client #{"client"}, :vendor/primary-contact #{"contact"}, :vendor/schedule-payment-dom #{"vendor-schedule-payment-dom"}, :account/client-overrides #{"account-client-override"}, :transaction/vendor #{"vendor"}, :client/square-integration-status #{"integration-status"}, :ezcater-integration/caterers #{"ezcater-caterer"}, :vendor-account-override/account #{"account"}, :import-batch/source #{}}) (def entity-dependencies {"transaction-rule" #{"vendor" "yodlee-merchant" "transaction-rule-account" "bank-account" "client"}, "square-location" #{}, "expected-deposit" #{"vendor" "charge" "client"}, "journal-entry-line" #{"account" "bank-account"}, "vendor" #{"vendor-schedule-payment-dom" "address" "account" "client" "contact" "vendor-account-override"}, "transaction" #{"transaction-rule" "expected-deposit" "vendor" "yodlee-merchant" "transaction-account" "forecasted-transaction" "bank-account" "client" "payment"}, "yodlee-provider-account" #{"yodlee-account" "client"}, "journal-entry" #{"journal-entry-line" "vendor" "transaction" "invoice" "client"}, "yodlee-merchant" #{}, "invoice" #{"vendor" "invoice-expense-account" "client"}, "vendor-terms-override" #{}, "integration-status" #{}, "conformity" #{}, "user" #{"client"}, "sales-refund" #{"vendor" "client"}, "plaid-account" #{}, "charge" #{"client"}, "location-match" #{}, "vendor-schedule-payment-dom" #{"client"}, "account-client-override" #{"client"}, "plaid-item" #{"plaid-account" "client"}, "transaction-account" #{"account"}, "address" #{}, "order-line-item" #{}, "ezcater-location" #{"ezcater-caterer"}, "account" #{"account-client-override"}, "intuit-bank-account" #{}, "saved-query" #{"client"}, "ezcater-caterer" #{}, "forecasted-transaction" #{}, "audit" #{}, "yodlee-account" #{}, "transaction-rule-account" #{"account"}, "ezcater-integration" #{"ezcater-caterer"}, "report" #{"client"}, "bank-account" #{"integration-status" "intuit-bank-account" "yodlee-account"}, "vendor-usage" #{"vendor" "client"}, "invoice-expense-account" #{"account"}, "sales-order" #{"vendor" "charge" "order-line-item" "client"}, "client" #{"square-location" "integration-status" "location-match" "address" "ezcater-location" "forecasted-transaction" "bank-account" "email-contact"}, "email-contact" #{}, "invoice-payment" #{"invoice" "payment"}, "contact" #{}, "import-batch" #{"transaction"}, "payment" #{"vendor" "invoice" "bank-account" "client"}, "vendor-account-override" #{"account" "client"}}) (defn order-of-insert [entity-dependencies] (loop [entity-dependencies entity-dependencies order []] (let [next-order (for [[entity deps] entity-dependencies :when (not (seq deps))] entity) next-deps (reduce (fn [entity-dependencies next-entity] (into {} (map (fn [[k v]] [k (disj v next-entity)]) entity-dependencies))) (apply dissoc entity-dependencies next-order) next-order)] (println order next-deps) (if (seq next-deps) (recur next-deps (into order next-order)) (into order next-order))))) #_(def best-attributes (->> (map :db/ident (filter :db/valueType schema)) (d/q '[:find ?a2 (count ?e) :in $ [?a2 ...] :where [?a :db/ident ?a2] [?e ?a]] remote-db) (map (fn [[a count]] [(namespace a) [a count]])) (reduce (fn [acc [namespace [attr count]]] (update acc namespace (fnil (fn [[curr-attr curr-count]] (if (> count curr-count) [attr count] [curr-attr curr-count])) ["" 0]))) {}) (map (fn [[k v]] [k (first v)])))) (defn tx-pipeline "Transacts data from from-ch. Returns a map with: :result, a return channel getting {:error t} or {:completed n} :stop, a fn you can use to terminate early." [conn conc from-ch f] (let [to-ch (a/chan 400) done-ch (a/chan) transact-data (fn [data result] (a/go (try (let [tx-r (a/! result tx-r) (a/close! result)) ; if exception in a transaction ; will close channels and put error ; on done channel. (catch Throwable t (.printStackTrace t) (a/close! from-ch) (a/close! to-ch) (a/>! done-ch {:error t})))))] ; go block prints a '.' after every 1000 transactions, puts completed ; report on done channel when no value left to be taken. (a/go-loop [total 0] (if (= (mod total 2) 0) (do (print ".") (flush))) (if-let [c (a/! done-ch {:completed total}))) ; pipeline that uses transducer form of map to transact data taken from ; from-ch and puts results on to-ch (a/pipeline-async conc to-ch transact-data from-ch) ; returns done channel and a function that you can use ; for early termination. {:result done-ch :stop (fn [] (a/close! to-ch))})) (def loaded (atom #{})) (defn migrate ([] (migrate nil)) ([item-list] (dc/transact (dc/connect local-client {:db-name "prod-migration"}) {:tx-data [{:db/ident :entity/migration-key :db/unique :db.unique/identity :db/cardinality :db.cardinality/one :db/valueType :db.type/long}]}) (dc/transact (dc/connect local-client {:db-name "prod-migration"}) {:tx-data (map (fn [s] (set/rename-keys s {:db/id :entity/migration-key})) schema)}) (dc/transact (dc/connect local-client {:db-name "prod-migration"}) {:tx-data [{:entity/migration-key 17592257603901 :vendor/name "unknown"} {:entity/migration-key 17592232621701} {:entity/migration-key 17592263907739} {:entity/migration-key 17592271516922}] }) (dc/transact (dc/connect local-client {:db-name "prod-migration"}) {:tx-data (->> (d/q '[:find ?e :in $ $$ :where [$$ ?e :transaction-rule/note _ _ true] (not [$ ?e :transaction-rule/note] )] remote-db (d/history remote-db)) (map (fn [[old-rule]] {:entity/migration-key old-rule :db/doc "A transaction rule that was deleted"})))}) #_(dc/transact (dc/connect local-client {:db-name "prod-migration"}) {:tx-data (->> (d/q '[:find (count ?e) :in $ :where #_[$$ ?e :transaction-account/account _ _ true] [_ :transaction/accounts ?e] (not [?e :transaction-account/account _] ) ] remote-db #_(d/history remote-db)) (map (fn [[old-rule]] {:entity/migration-key old-rule :db/doc "A transaction account that was deleted"})))}) (doseq [entity (or item-list (filter (complement (conj @loaded "audit")) (order-of-insert entity-dependencies))) :let [_ (swap! loaded conj entity) _ (println "querying for " entity) entities (d/q '[:find [?e ...] :in $ [?a ...] :where [?e ?a]] remote-db (cond-> (entity->best-key entity) (not (vector? (entity->best-key entity))) vector)) _ (println "Found some! here's a few: " (take 3 entities)) tx-chan (a/chan 50) entities->transaction (fn [entities] (->> (d/pull-many remote-db (->> schema (filter :db/valueType) (mapv :db/ident) (filter #(= entity (namespace %))) (into [:db/id])) entities) (mapv (fn [m ] (reduce (fn [m [k v]] (cond (= k :db/id) (-> m (assoc :entity/migration-key v) (dissoc :db/id)) (full-dependencies k) (if (vector? v) (assoc m k (mapv (fn [r] [:entity/migration-key (:db/id r)]) v)) (assoc m k [:entity/migration-key (:db/id v)])) :else (dissoc m :payment/pdf-data :payment/memo))) m m))))) _ (println "Inserting " entity ": " (count entities)) _ (flush) pipeline (tx-pipeline (dc/connect local-client {:db-name "prod-migration"}) 10 tx-chan entities->transaction)]] (doseq [batch (partition-all 100 entities)] (try (a/>!! tx-chan batch) (catch Exception e (println e) ((:stop pipeline))))) (println "waiting for done from" pipeline) (flush) (a/close! tx-chan) (println (a/ (entity->best-key entity) (not (vector? (entity->best-key entity))) vector)) entities->transaction (fn [entities] (->> (d/pull-many remote-db (->> schema (filter :db/valueType) (mapv :db/ident) (filter #(= entity (namespace %))) (into [:db/id])) entities) (mapv (fn [m ] (reduce (fn [m [k v]] (cond (= k :db/id) (-> m (assoc :entity/migration-key v) (dissoc :db/id)) (full-dependencies k) (if (vector? v) (assoc m k (mapv (fn [r] [:entity/migration-key (:db/id r)]) v)) (assoc m k [:entity/migration-key (:db/id v)])) :else (dissoc m :payment/pdf-data :payment/memo))) m m))))) _ (println "Inserting " entity ": " (count entities)) _ (flush)]] (ednl/with-append [append (str backup-id "/" entity ".ednl")] (doseq [batch (partition-all 1000 entities) :let [_ (do (print ".") (flush))] item (entities->transaction batch)] (try (append item) (catch Exception e (println e) (throw e))))) (println "Done"))))) (defn load-from-backup ([backup-id connection] (load-from-backup backup-id connection nil)) ([backup-id connection item-list] (let [schema (clojure.edn/read-string (slurp (str backup-id "/schema.edn"))) full-dependencies (clojure.edn/read-string (slurp (str backup-id "/full-dependencies.edn"))) entity-dependencies (clojure.edn/read-string (slurp (str backup-id "/entity-dependencies.edn")))] (dc/transact connection {:tx-data [{:db/ident :entity/migration-key :db/unique :db.unique/identity :db/cardinality :db.cardinality/one :db/valueType :db.type/long}]}) (dc/transact connection {:tx-data (map (fn [s] (set/rename-keys s {:db/id :entity/migration-key})) schema)}) (dc/transact connection {:tx-data [{:entity/migration-key 17592257603901 :vendor/name "unknown"} {:entity/migration-key 17592232621701} {:entity/migration-key 17592263907739} {:entity/migration-key 17592271516922}] }) (doseq [entity (or item-list (filter (complement (conj @loaded "audit")) (order-of-insert entity-dependencies))) :let [_ (swap! loaded conj entity) _ (println "querying for " entity) entities (ednl/slurp (str backup-id "/" entity ".ednl")) _ (println "Found some! here's a few: " (take 3 entities)) tx-chan (a/chan 50) entities->transaction (fn [entities] entities) pipeline (tx-pipeline connection 10 tx-chan entities->transaction)]] (doseq [batch (partition-all 100 entities)] (try (a/>!! tx-chan batch) (catch Exception e (println e) ((:stop pipeline))))) (println "waiting for done from" pipeline) (flush) (a/close! tx-chan) (println (a/ {:db/id 17592271182081, ;; :charge/type-name "NO_SALE", ;; :charge/total 0.0, ;; :charge/tip 0.0, ;; :charge/processor #:db{:id 17592237965415}, ;; :charge/external-id "square/charge/ndHN8rnam4dsfnDtK78LL1KiuaB"} (def local-charges (into #{} (map first) (dc/q '[:find ?m :where (or [?e :charge/total] [?e :charge/external-id]) [?e :entity/migration-key ?m]] (dc/db (dc/connect local-client {:db-name "prod-migration"}))))) (count local-charges) ;; => [[2815787]] (def remote-charges (into #{} (map first) (d/q '[:find ?e :where (or [?e :charge/total] [?e :charge/external-id])] remote-db))) ;; => [[2815955]] (d/pull-many remote-db (->> schema (filter :db/valueType) (mapv :db/ident) (filter #(= "charge" (namespace %))) (into [:db/id])) (vec (clojure.set/difference remote-charges local-charges))) (dc/transact (dc/connect local-client {:db-name "prod-migration"}) {:tx-data (->> (d/pull-many remote-db (->> schema (filter :db/valueType) (mapv :db/ident) (filter #(= "charge" (namespace %))) (into [:db/id])) (vec (clojure.set/difference remote-charges local-charges))) (mapv (fn [m ] (reduce (fn [m [k v]] (cond (= k :db/id) (-> m (assoc :entity/migration-key v) (dissoc :db/id)) (full-dependencies k) (if (vector? v) (assoc m k (mapv (fn [r] [:entity/migration-key (:db/id r)]) v)) (assoc m k [:entity/migration-key (:db/id v)])) :else (dissoc m :payment/pdf-data :payment/memo))) m m))))}) (dc/pull (dc/db (dc/connect local-client {:db-name "prod-migration"})) '[*] [:entity/migration-key 17592271182081]) (d/pull remote-db '[*] 17592232186542) ) ;; => nil