;; This buffer is for Clojure experiments and evaluation. ;; Press C-j to evaluate the last expression. ;; You can also press C-u C-j to evaluate the expression and pretty-print its result. (ns dump-edn (:require [datomic.api :as d] [clojure.java.io :as io] [amazonica.aws.s3 :as s3] [config.core :refer [env]] [clojure.core.async :as a] [datomic.client.api :as dc] [datomic.client.api.async :as dca] [datomic.dev-local :as dl] [clojure.set :as set])) (def remote-db (d/db (datomic.api/connect "datomic:ddb://us-east-1/integreat/integreat-prod"))) (def local-client (dc/client {:server-type :dev-local :system "dev"})) (dc/list-databases local-client {}) (def schema (let [everything (->> (d/q '[:find [(pull ?e [:db/ident {:db/valueType [:db/ident]} {:db/cardinality [:db/ident]} :db.attr/preds {:db/unique [:db/ident]} :db/isComponent :db/id :db/noHistory :db/tupleAttrs :db.entity/attrs :db.entity/preds :db/doc]) ...] :where [?e :db/ident]] remote-db)) schema-attrs (->> everything (filter :db/ident) (filter (fn [{:db/keys [ident]}] (if (namespace ident) (re-matches #"^(?!cartographer)(?!db)(?!fressian).+" (namespace ident)) true )))) meta-schema-schema (filter #(-> % :db/ident not) everything)] schema-attrs)) (def best-key-helper (->> schema (filter :db/valueType) (map :db/ident) (group-by namespace) #_(map (fn [[k v]] [k ]))) ) (def entity->best-key {"transaction-rule" [:transaction-rule/description, :transaction-rule/note :transaction-rule/vendor] "square-location" :square-location/square-id, "expected-deposit" :expected-deposit/date, "journal-entry-line" [:journal-entry-line/account, :journal-entry-line/debit :journal-entry-line/credit] "vendor" [:vendor/name,] "transaction" :transaction/amount, "yodlee-provider-account" :yodlee-provider-account/id, "journal-entry" :journal-entry/source, "yodlee-merchant" :yodlee-merchant/yodlee-id, "invoice" :invoice/invoice-number, "vendor-terms-override" :vendor-terms-override/client, "integration-status" :integration-status/state, "conformity" :conformity/conformed-norms-index, "user" :user/provider-id, "sales-refund" :sales-refund/total, "plaid-account" :plaid-account/name, "charge" [:charge/total, :charge/external-id] "location-match" :location-match/location, "vendor-schedule-payment-dom" :vendor-schedule-payment-dom/dom, "account-client-override" :account-client-override/client, "plaid-item" :plaid-item/client, "transaction-account" :transaction-account/account, "address" [:address/street1, :address/city :address/state :address/zip] "order-line-item" :order-line-item/total, "ezcater-location" :ezcater-location/location, "account" [:account/numeric-code, :account/code :account/name :account/type] "intuit-bank-account" :intuit-bank-account/name, "saved-query" :saved-query/guid, "ezcater-caterer" :ezcater-caterer/uuid, "forecasted-transaction" :forecasted-transaction/day-of-month, "audit" :audit/user, "yodlee-account" :yodlee-account/id, "transaction-rule-account" [:transaction-rule-account/account, :transaction-rule-account/location] "ezcater-integration" :ezcater-integration/subscriber-uuid, "report" :report/created, "bank-account" :bank-account/code, "vendor-usage" :vendor-usage/key, "invoice-expense-account" [:invoice-expense-account/expense-account-id, :invoice-expense-account/account :invoice-expense-account/location :invoice-expense-account/amount] "sales-order" :sales-order/date, "client" :client/code, "email-contact" :email-contact/email, "invoice-payment" :invoice-payment/amount, "contact" [:contact/name, :contact/phone :contact/email] "import-batch" :import-batch/date, "payment" [:payment/date, :payment/bank-account] "vendor-account-override" :vendor-account-override/client}) (def references (filter (comp #{:db.type/ref} :db/ident :db/valueType) schema )) (def reference->entity (->> (d/q '[:find ?a ?v3 :in $ $$ [?a ...] :where [$$ _ ?a ?e] [$ ?e ?v _ _] [$ ?v :db/ident ?v2 _ _] [(namespace ?v2) ?v3] [(namespace ?v2) ?v3]] remote-db (d/since remote-db #inst "2022-06-01") (map :db/ident references) ) (group-by first) (map (fn [[k v]] [k (disj (set (map second v)) "db")])) (into {}))) (def entities-that-need-manual (set (map namespace (filter (complement reference->entity) (map :db/ident references))))) (def manual-dependencies {:client/location-matches #{"location-match"} :transaction/yodlee-merchant #{"yodlee-merchant"} :vendor-account-override/account #{"account"} :vendor-account-override/client #{"client"} :vendor/account-overrides #{"vendor-account-override"} :transaction-rule/yodlee-merchant #{"yodlee-merchant"} :client/forecasted-transactions #{"forecasted-transaction"} :transaction/forecast-match #{"forecasted-transaction"} :vendor/automatically-paid-when-due #{"client"} :vendor/schedule-payment-dom #{"vendor-schedule-payment-dom"} :vendor-schedule-payment-dom/client #{"client"}}) (def full-dependencies (update (merge-with into reference->entity manual-dependencies) :journal-entry/original-entity #(disj % "journal-entry"))) (def entity-dependencies (let [base-dependencies (into {} (map (fn [i] [i #{}]) (set (map (comp namespace :db/ident) (filter :db/valueType schema)))) ) ] (into base-dependencies (reduce (fn [acc [ref deps]] (update acc (namespace ref) (fnil #(into % deps) #{}))) {} full-dependencies)))) (def order-of-insert (loop [entity-dependencies entity-dependencies order []] (let [next-order (for [[entity deps] entity-dependencies :when (not (seq deps))] entity) next-deps (reduce (fn [entity-dependencies next-entity] (into {} (map (fn [[k v]] [k (disj v next-entity)]) entity-dependencies))) (apply dissoc entity-dependencies next-order) next-order)] (println order next-deps) (if (seq next-deps) (recur next-deps (into order next-order)) (into order next-order))))) #_(def best-attributes (->> (map :db/ident (filter :db/valueType schema)) (d/q '[:find ?a2 (count ?e) :in $ [?a2 ...] :where [?a :db/ident ?a2] [?e ?a]] remote-db) (map (fn [[a count]] [(namespace a) [a count]])) (reduce (fn [acc [namespace [attr count]]] (update acc namespace (fnil (fn [[curr-attr curr-count]] (if (> count curr-count) [attr count] [curr-attr curr-count])) ["" 0]))) {}) (map (fn [[k v]] [k (first v)])))) (defn tx-pipeline "Transacts data from from-ch. Returns a map with: :result, a return channel getting {:error t} or {:completed n} :stop, a fn you can use to terminate early." [conn conc from-ch f] (let [to-ch (a/chan 400) done-ch (a/chan) transact-data (fn [data result] (a/go (try (a/>! result (a/! done-ch {:error t})))))] ; go block prints a '.' after every 1000 transactions, puts completed ; report on done channel when no value left to be taken. (a/go-loop [total 0] (if (= (mod total 2) 0) (do (print ".") (flush))) (if-let [c (a/! done-ch {:completed total}))) ; pipeline that uses transducer form of map to transact data taken from ; from-ch and puts results on to-ch (a/pipeline-async conc to-ch transact-data from-ch) ; returns done channel and a function that you can use ; for early termination. {:result done-ch :stop (fn [] (a/close! to-ch))})) (def loaded (atom #{})) (defn migrate [] (dc/transact (dc/connect local-client {:db-name "prod-migration"}) {:tx-data [{:db/ident :entity/migration-key :db/unique :db.unique/identity :db/cardinality :db.cardinality/one :db/valueType :db.type/long}]}) (dc/transact (dc/connect local-client {:db-name "prod-migration"}) {:tx-data (map (fn [s] (set/rename-keys s {:db/id :entity/migration-key})) schema)}) (dc/transact (dc/connect local-client {:db-name "prod-migration"}) {:tx-data [{:entity/migration-key 17592257603901 :vendor/name "unknown"} {:entity/migration-key 17592232621701} {:entity/migration-key 17592263907739} {:entity/migration-key 17592271516922}] }) (dc/transact (dc/connect local-client {:db-name "prod-migration"}) {:tx-data (->> (d/q '[:find ?e :in $ $$ :where [$$ ?e :transaction-rule/note _ _ true] (not [$ ?e :transaction-rule/note] )] remote-db (d/history remote-db)) (map (fn [[old-rule]] {:entity/migration-key old-rule :db/doc "A transaction rule that was deleted"})))}) #_(dc/transact (dc/connect local-client {:db-name "prod-migration"}) {:tx-data (->> (d/q '[:find (count ?e) :in $ :where #_[$$ ?e :transaction-account/account _ _ true] [_ :transaction/accounts ?e] (not [?e :transaction-account/account _] ) ] remote-db #_(d/history remote-db)) (map (fn [[old-rule]] {:entity/migration-key old-rule :db/doc "A transaction account that was deleted"})))}) (doseq [entity (filter (complement #{"audit"}) order-of-insert) :let [_ (swap! loaded conj entity) _ (println "querying for " entity) entities (d/q '[:find [?e ...] :in $ [?a ...] :where [?e ?a]] remote-db (cond-> (entity->best-key entity) (not (vector? (entity->best-key entity))) vector)) tx-chan (a/chan 400) entities->transaction (fn [entities] (->> (d/pull-many remote-db (->> schema (filter :db/valueType) (mapv :db/ident) (filter #(= entity (namespace %))) (into [:db/id])) entities) (mapv (fn [m ] (reduce (fn [m [k v]] (cond (= k :db/id) (-> m (assoc :entity/migration-key v) (dissoc :db/id)) (full-dependencies k) (if (vector? v) (assoc m k (mapv (fn [r] [:entity/migration-key (:db/id r)]) v)) (assoc m k [:entity/migration-key (:db/id v)])) :else (dissoc m :payment/pdf-data :payment/memo))) m m))))) _ (println "Inserting " entity ": " (count entities)) _ (flush) pipeline (tx-pipeline (dc/connect local-client {:db-name "prod-migration"}) 50 tx-chan entities->transaction)]] (doseq [batch (partition-all 500 entities)] (try (a/>!! tx-chan batch) (catch Exception e (println e) ((:stop pipeline))))) (println "waiting for done from" pipeline) (flush) (a/close! tx-chan) (println (a/ {:db/id 17592271182081, ;; :charge/type-name "NO_SALE", ;; :charge/total 0.0, ;; :charge/tip 0.0, ;; :charge/processor #:db{:id 17592237965415}, ;; :charge/external-id "square/charge/ndHN8rnam4dsfnDtK78LL1KiuaB"} (def local-charges (into #{} (map first) (dc/q '[:find ?m :where (or [?e :charge/total] [?e :charge/external-id]) [?e :entity/migration-key ?m]] (dc/db (dc/connect local-client {:db-name "prod-migration"}))))) (count local-charges) ;; => [[2815787]] (def remote-charges (into #{} (map first) (d/q '[:find ?e :where (or [?e :charge/total] [?e :charge/external-id])] remote-db))) ;; => [[2815955]] (d/pull-many remote-db (->> schema (filter :db/valueType) (mapv :db/ident) (filter #(= "charge" (namespace %))) (into [:db/id])) (vec (clojure.set/difference remote-charges local-charges))) (dc/transact (dc/connect local-client {:db-name "prod-migration"}) {:tx-data (->> (d/pull-many remote-db (->> schema (filter :db/valueType) (mapv :db/ident) (filter #(= "charge" (namespace %))) (into [:db/id])) (vec (clojure.set/difference remote-charges local-charges))) (mapv (fn [m ] (reduce (fn [m [k v]] (cond (= k :db/id) (-> m (assoc :entity/migration-key v) (dissoc :db/id)) (full-dependencies k) (if (vector? v) (assoc m k (mapv (fn [r] [:entity/migration-key (:db/id r)]) v)) (assoc m k [:entity/migration-key (:db/id v)])) :else (dissoc m :payment/pdf-data :payment/memo))) m m))))}) (dc/pull (dc/db (dc/connect local-client {:db-name "prod-migration"})) '[*] [:entity/migration-key 17592271182081]) (d/pull remote-db '[*] 17592232186542) ) ;; => nil