Made tweaks to support datomic

This commit is contained in:
2023-03-17 23:29:28 -07:00
parent f7fe2b2bee
commit bd658906b5
3 changed files with 264 additions and 95 deletions

View File

@@ -175,12 +175,15 @@
:transaction/yodlee-merchant #{"yodlee-merchant"}
:vendor-account-override/account #{"account"}
:vendor-account-override/client #{"client"}
:vendor/secondary-contact #{"contact"}
:vendor/account-overrides #{"vendor-account-override"}
:client/bank-accounts #{"bank-account"}
:transaction-rule/yodlee-merchant #{"yodlee-merchant"}
:client/forecasted-transactions #{"forecasted-transaction"}
:transaction/forecast-match #{"forecasted-transaction"}
:vendor/automatically-paid-when-due #{"client"}
:vendor/schedule-payment-dom #{"vendor-schedule-payment-dom"}
:vendor/terms-overrides #{"vendor-terms-override"}
:vendor-schedule-payment-dom/client #{"client"}})
(def full-dependencies
@@ -205,6 +208,162 @@
{}
full-dependencies))))
(def full-dependencies
{:invoice/client #{"client"},
:sales-order/client #{"client"},
:transaction-rule/transaction-approval-status #{},
:transaction/forecast-match #{"forecasted-transaction"},
:user/role #{},
:vendor-schedule-payment-dom/client #{"client"},
:invoice-payment/payment #{"payment"},
:transaction-rule/client #{"client"},
:invoice/status #{},
:payment/type #{},
:expected-deposit/client #{"client"},
:transaction/bank-account #{"bank-account"},
:transaction-rule-account/account #{"account"},
:import-batch/status #{},
:user/clients #{"client"},
:payment/client #{"client"},
:expected-deposit/charges #{"charge"},
:vendor/automatically-paid-when-due #{"client"},
:payment/invoices #{"invoice"},
:client/forecasted-transactions #{"forecasted-transaction"},
:transaction/matched-rule #{"transaction-rule"},
:invoice/import-status #{},
:charge/processor #{},
:expected-deposit/vendor #{"vendor"},
:client/square-locations #{"square-location"},
:payment/status #{},
:client/location-matches #{"location-match"},
:saved-query/client #{"client"},
:transaction/payment #{"payment"},
:transaction-rule/vendor #{"vendor"},
:plaid-item/client #{"client"},
:account/applicability #{},
:journal-entry-line/account #{"account" "bank-account"},
:client/bank-accounts #{"bank-account"},
:yodlee-provider-account/client #{"client"},
:account/vendor-allowance #{},
:payment/bank-account #{"bank-account"},
:account/default-allowance #{},
:transaction-rule/yodlee-merchant #{"yodlee-merchant"},
:vendor/account-overrides #{"vendor-account-override"},
:transaction/client #{"client"},
:invoice/vendor #{"vendor"},
:sales-order/vendor #{"vendor"},
:expected-deposit/status #{},
:journal-entry/original-entity #{"transaction" "invoice"},
:vendor-usage/client #{"client"},
:transaction/expected-deposit #{"expected-deposit"},
:client/ezcater-locations #{"ezcater-location"},
:journal-entry/client #{"client"},
:vendor/secondary-contact #{"contact"},
:journal-entry/line-items #{"journal-entry-line"},
:vendor/legal-entity-1099-type #{},
:transaction-rule/bank-account #{"bank-account"},
:transaction-account/account #{"account"},
:vendor/terms-overrides #{"vendor-terms-override"},
:vendor/default-account #{"account"},
:transaction/yodlee-merchant #{"yodlee-merchant"},
:sales-refund/client #{"client"},
:client/emails #{"email-contact"},
:payment/vendor #{"vendor"},
:invoice-payment/invoice #{"invoice"},
:report/client #{"client"},
:transaction-rule/accounts #{"transaction-rule-account"},
:charge/client #{"client"},
:bank-account/type #{},
:invoice-expense-account/account #{"account"},
:vendor/legal-entity-tin-type #{},
:transaction/approval-status #{},
:import-batch/entry #{"transaction"},
:bank-account/intuit-bank-account #{"intuit-bank-account"},
:account/type #{},
:sales-refund/vendor #{"vendor"},
:bank-account/yodlee-account #{"yodlee-account"},
:vendor/address #{"address"},
:integration-status/state #{},
:transaction/accounts #{"transaction-account"},
:sales-order/charges #{"charge"},
:client/address #{"address"},
:ezcater-location/caterer #{"ezcater-caterer"},
:vendor-account-override/client #{"client"},
:bank-account/integration-status #{"integration-status"},
:yodlee-provider-account/accounts #{"yodlee-account"},
:account/invoice-allowance #{},
:journal-entry/vendor #{"vendor"},
:plaid-item/accounts #{"plaid-account"},
:vendor-usage/vendor #{"vendor"},
:sales-order/line-items #{"order-line-item"},
:invoice/expense-accounts #{"invoice-expense-account"},
:account-client-override/client #{"client"},
:vendor/primary-contact #{"contact"},
:vendor/schedule-payment-dom #{"vendor-schedule-payment-dom"},
:account/client-overrides #{"account-client-override"},
:transaction/vendor #{"vendor"},
:client/square-integration-status #{"integration-status"},
:ezcater-integration/caterers #{"ezcater-caterer"},
:vendor-account-override/account #{"account"},
:import-batch/source #{}})
(def entity-dependencies
{"transaction-rule"
#{"vendor" "yodlee-merchant" "transaction-rule-account" "bank-account"
"client"},
"square-location" #{},
"expected-deposit" #{"vendor" "charge" "client"},
"journal-entry-line" #{"account" "bank-account"},
"vendor"
#{"vendor-schedule-payment-dom" "address" "account" "client" "contact"
"vendor-account-override"},
"transaction"
#{"transaction-rule" "expected-deposit" "vendor" "yodlee-merchant"
"transaction-account" "forecasted-transaction" "bank-account" "client"
"payment"},
"yodlee-provider-account" #{"yodlee-account" "client"},
"journal-entry"
#{"journal-entry-line" "vendor" "transaction" "invoice" "client"},
"yodlee-merchant" #{},
"invoice" #{"vendor" "invoice-expense-account" "client"},
"vendor-terms-override" #{},
"integration-status" #{},
"conformity" #{},
"user" #{"client"},
"sales-refund" #{"vendor" "client"},
"plaid-account" #{},
"charge" #{"client"},
"location-match" #{},
"vendor-schedule-payment-dom" #{"client"},
"account-client-override" #{"client"},
"plaid-item" #{"plaid-account" "client"},
"transaction-account" #{"account"},
"address" #{},
"order-line-item" #{},
"ezcater-location" #{"ezcater-caterer"},
"account" #{"account-client-override"},
"intuit-bank-account" #{},
"saved-query" #{"client"},
"ezcater-caterer" #{},
"forecasted-transaction" #{},
"audit" #{},
"yodlee-account" #{},
"transaction-rule-account" #{"account"},
"ezcater-integration" #{"ezcater-caterer"},
"report" #{"client"},
"bank-account" #{"integration-status" "intuit-bank-account" "yodlee-account"},
"vendor-usage" #{"vendor" "client"},
"invoice-expense-account" #{"account"},
"sales-order" #{"vendor" "charge" "order-line-item" "client"},
"client"
#{"square-location" "integration-status" "location-match" "address"
"ezcater-location" "forecasted-transaction" "bank-account" "email-contact"},
"email-contact" #{},
"invoice-payment" #{"invoice" "payment"},
"contact" #{},
"import-batch" #{"transaction"},
"payment" #{"vendor" "invoice" "bank-account" "client"},
"vendor-account-override" #{"account" "client"}})
(def order-of-insert
(loop [entity-dependencies entity-dependencies
order []]
@@ -225,6 +384,8 @@
(recur next-deps (into order next-order))
(into order next-order)))))
#_(def best-attributes
(->> (map :db/ident (filter :db/valueType schema))
(d/q
@@ -259,8 +420,12 @@
transact-data (fn [data result]
(a/go
(try
(a/>! result (a/<! (dca/transact conn {:tx-data (f data)})))
(a/close! result)
(let [tx-r (a/<! (dca/transact conn {:tx-data (f data)}))]
(when (:db/error tx-r)
(throw (ex-info "Invalid transaction" tx-r)))
(a/>! result tx-r)
(a/close! result))
; if exception in a transaction
; will close channels and put error
; on done channel.
@@ -292,105 +457,109 @@
(def loaded (atom #{}))
(defn migrate []
(dc/transact (dc/connect local-client {:db-name "prod-migration"}) {:tx-data [{:db/ident :entity/migration-key
:db/unique :db.unique/identity
:db/cardinality :db.cardinality/one
:db/valueType :db.type/long}]})
(dc/transact (dc/connect local-client {:db-name "prod-migration"}) {:tx-data (map
(fn [s]
(set/rename-keys s {:db/id :entity/migration-key}))
schema)})
(defn migrate
([] (migrate nil))
([item-list]
(dc/transact (dc/connect local-client {:db-name "prod-migration"}) {:tx-data [{:db/ident :entity/migration-key
:db/unique :db.unique/identity
:db/cardinality :db.cardinality/one
:db/valueType :db.type/long}]})
(dc/transact (dc/connect local-client {:db-name "prod-migration"}) {:tx-data (map
(fn [s]
(set/rename-keys s {:db/id :entity/migration-key}))
schema)})
(dc/transact (dc/connect local-client {:db-name "prod-migration"}) {:tx-data [{:entity/migration-key 17592257603901 :vendor/name "unknown"}
{:entity/migration-key 17592232621701}
{:entity/migration-key 17592263907739}
{:entity/migration-key 17592271516922}]
(dc/transact (dc/connect local-client {:db-name "prod-migration"}) {:tx-data [{:entity/migration-key 17592257603901 :vendor/name "unknown"}
{:entity/migration-key 17592232621701}
{:entity/migration-key 17592263907739}
{:entity/migration-key 17592271516922}]
})
})
(dc/transact (dc/connect local-client {:db-name "prod-migration"})
{:tx-data (->> (d/q '[:find ?e
:in $ $$
:where [$$ ?e :transaction-rule/note _ _ true]
(not [$ ?e :transaction-rule/note] )]
(dc/transact (dc/connect local-client {:db-name "prod-migration"})
{:tx-data (->> (d/q '[:find ?e
:in $ $$
:where [$$ ?e :transaction-rule/note _ _ true]
(not [$ ?e :transaction-rule/note] )]
remote-db
(d/history remote-db))
(map (fn [[old-rule]]
{:entity/migration-key old-rule
:db/doc "A transaction rule that was deleted"})))})
(map (fn [[old-rule]]
{:entity/migration-key old-rule
:db/doc "A transaction rule that was deleted"})))})
#_(dc/transact (dc/connect local-client {:db-name "prod-migration"})
{:tx-data (->> (d/q '[:find (count ?e)
:in $
:where
#_[$$ ?e :transaction-account/account _ _ true]
[_ :transaction/accounts ?e]
(not [?e :transaction-account/account _] )
]
remote-db
#_(d/history remote-db))
(map (fn [[old-rule]]
{:entity/migration-key old-rule
:db/doc "A transaction account that was deleted"})))})
#_(dc/transact (dc/connect local-client {:db-name "prod-migration"})
{:tx-data (->> (d/q '[:find (count ?e)
:in $
:where
#_[$$ ?e :transaction-account/account _ _ true]
[_ :transaction/accounts ?e]
(not [?e :transaction-account/account _] )
]
remote-db
#_(d/history remote-db))
(map (fn [[old-rule]]
{:entity/migration-key old-rule
:db/doc "A transaction account that was deleted"})))})
(doseq [entity (filter (complement #{"audit"}) order-of-insert)
:let [_ (swap! loaded conj entity)
_ (println "querying for " entity)
entities (d/q '[:find [?e ...]
:in $ [?a ...]
:where [?e ?a]]
(doseq [entity (or item-list (filter (complement (conj @loaded "audit")) order-of-insert))
:let [_ (swap! loaded conj entity)
_ (println "querying for " entity)
entities (d/q '[:find [?e ...]
:in $ [?a ...]
:where [?e ?a]]
remote-db
(cond-> (entity->best-key entity)
(not (vector? (entity->best-key entity))) vector))
tx-chan (a/chan 400)
entities->transaction (fn [entities]
(->> (d/pull-many remote-db
(->> schema
(filter :db/valueType)
(mapv :db/ident)
(filter #(= entity (namespace %)))
(into [:db/id]))
entities)
(mapv (fn [m ]
(reduce
(fn [m [k v]]
(cond
(= k :db/id)
(-> m
(assoc :entity/migration-key v)
(dissoc :db/id))
(full-dependencies k)
(if (vector? v)
(assoc m k (mapv (fn [r] [:entity/migration-key (:db/id r)]) v))
(assoc m k [:entity/migration-key (:db/id v)]))
:else
(dissoc m :payment/pdf-data
:payment/memo)))
m
m)))))
_ (println "Inserting " entity ": " (count entities))
_ (flush)
pipeline (tx-pipeline (dc/connect local-client {:db-name "prod-migration"})
50
tx-chan
entities->transaction)]]
(doseq [batch (partition-all 500 entities)]
(try
(a/>!! tx-chan batch)
(catch Exception e
(println e)
((:stop pipeline)))))
_ (println "Found some! here's a few: " (take 3 entities))
tx-chan (a/chan 50)
entities->transaction (fn [entities]
(->> (d/pull-many remote-db
(->> schema
(filter :db/valueType)
(mapv :db/ident)
(filter #(= entity (namespace %)))
(into [:db/id]))
entities)
(mapv (fn [m ]
(reduce
(fn [m [k v]]
(cond
(= k :db/id)
(-> m
(assoc :entity/migration-key v)
(dissoc :db/id))
(full-dependencies k)
(if (vector? v)
(assoc m k (mapv (fn [r] [:entity/migration-key (:db/id r)]) v))
(assoc m k [:entity/migration-key (:db/id v)]))
:else
(dissoc m :payment/pdf-data
:payment/memo)))
m
m)))))
_ (println "Inserting " entity ": " (count entities))
_ (flush)
pipeline (tx-pipeline (dc/connect local-client {:db-name "prod-migration"})
10
tx-chan
entities->transaction)]]
(doseq [batch (partition-all 100 entities)]
(try
(a/>!! tx-chan batch)
(catch Exception e
(println e)
((:stop pipeline)))))
(println "waiting for done from" pipeline)
(flush)
(a/close! tx-chan)
(println (a/<!! (:result pipeline)))
((:stop pipeline))
(println)
(println "Done")))
(println "waiting for done from" pipeline)
(flush)
(a/close! tx-chan)
(println (a/<!! (:result pipeline)))
((:stop pipeline))
(println)
(println "Done"))))
(defn reset-migrate []
(dc/delete-database local-client {:db-name "prod-migration"})
@@ -405,7 +574,7 @@
(migrate)
(datomic.dev-local/release-db {:db-name "prod-migration"
:system "dev"})
:system "dev"})
(d/pull remote-db '[*] 17592271182081)
@@ -422,7 +591,7 @@
:where (or [?e :charge/total]
[?e :charge/external-id])
[?e :entity/migration-key ?m]]
(dc/db (dc/connect local-client {:db-name "prod-migration"})))))
(dc/db (dc/connect local-client {:db-name "prod-migration"})))))
(count local-charges)