Files
integreat/scratch-sessions/backup.repl
2023-06-01 14:39:03 -07:00

531 lines
24 KiB
Plaintext

;; This buffer is for Clojure experiments and evaluation.
;; Press C-j to evaluate the last expression.
;; You can also press C-u C-j to evaluate the expression and pretty-print its result.
(comment
(ns auto-ap.backup
(:require [datomic.api :as d]
[manifold.deferred :as de]
[manifold.executor :as ex]
[manifold.stream :as s]
[manifold.time :as mt]
[auto-ap.jobs.core :refer [execute]]
[clojure.java.io :as io]
[amazonica.aws.s3 :as s3]
[config.core :refer [env]]
[clojure.core.async :as a]
[lambdaisland.edn-lines :as ednl]
[clojure.set :as set]
[com.brunobonacci.mulog :as mu]))
(def request-pool (ex/fixed-thread-executor 30))
(def buffered (ex/fixed-thread-executor 30))
(defn get-schema [remote-db]
(let [everything (->> (d/q '[:find [(pull ?e [:db/ident
{:db/valueType [:db/ident]}
{:db/cardinality [:db/ident]}
:db.attr/preds
{:db/unique [:db/ident]}
:db/isComponent
:db/id
:db/noHistory
:db/tupleAttrs
:db.entity/attrs
:db.entity/preds
:db/doc]) ...]
:where [?e :db/ident]]
remote-db))
schema-attrs (->> everything
(filter :db/ident)
(filter (fn [{:db/keys [ident]}]
(if (namespace ident)
(re-matches #"^(?!cartographer)(?!db)(?!fressian).+" (namespace ident))
true
))))
meta-schema-schema (filter #(-> % :db/ident not) everything)]
schema-attrs))
(def entity->best-key
{"transaction-rule"
[:transaction-rule/description, :transaction-rule/note :transaction-rule/vendor]
"square-location"
:square-location/square-id,
"expected-deposit"
:expected-deposit/date,
"journal-entry-line"
[:journal-entry-line/account, :journal-entry-line/debit :journal-entry-line/credit]
"vendor"
[:vendor/name, :vendor/default-account, :vendor/hidden]
"transaction"
:transaction/amount,
"yodlee-provider-account"
:yodlee-provider-account/id,
"journal-entry"
:journal-entry/source,
"yodlee-merchant" :yodlee-merchant/yodlee-id,
"invoice"
:invoice/invoice-number,
"vendor-terms-override"
:vendor-terms-override/client,
"integration-status"
:integration-status/state,
"conformity" :conformity/conformed-norms-index,
"user"
:user/provider-id,
"sales-refund"
:sales-refund/total,
"plaid-account"
:plaid-account/name,
"charge"
[:charge/total, :charge/external-id]
"location-match" :location-match/location,
"vendor-schedule-payment-dom"
:vendor-schedule-payment-dom/dom,
"account-client-override"
:account-client-override/client,
"plaid-item"
:plaid-item/client,
"transaction-account"
:transaction-account/account,
"address"
[:address/street1, :address/city :address/state :address/zip]
"order-line-item"
:order-line-item/total,
"ezcater-location" [:ezcater-location/location, :ezcater-location/caterer]
"account"
[:account/numeric-code, :account/code :account/name :account/type]
"intuit-bank-account"
:intuit-bank-account/name,
"saved-query"
:saved-query/guid,
"ezcater-caterer"
:ezcater-caterer/uuid,
"forecasted-transaction"
:forecasted-transaction/day-of-month,
"audit" :audit/user,
"yodlee-account"
:yodlee-account/id,
"transaction-rule-account"
[:transaction-rule-account/account, :transaction-rule-account/location]
"ezcater-integration"
:ezcater-integration/subscriber-uuid,
"report"
:report/created,
"bank-account"
:bank-account/code,
"vendor-usage"
:vendor-usage/key,
"invoice-expense-account"
[:invoice-expense-account/expense-account-id, :invoice-expense-account/account :invoice-expense-account/location :invoice-expense-account/amount]
"sales-order"
:sales-order/date,
"client"
:client/code,
"email-contact" :email-contact/email,
"invoice-payment"
:invoice-payment/amount,
"contact"
[:contact/name, :contact/phone :contact/email]
"import-batch"
:import-batch/date,
"payment"
[:payment/date, :payment/bank-account]
"vendor-account-override"
:vendor-account-override/client})
#_(defn references [schema]
(filter (comp #{:db.type/ref} :db/ident :db/valueType) schema ))
#_(defn reference->entity [remote-db]
(->> (d/q '[:find ?a ?v3
:in $ $$ [?a ...]
:where [$$ _ ?a ?e]
[$ ?e ?v _ _]
[$ ?v :db/ident ?v2 _ _]
[(namespace ?v2) ?v3]
[(namespace ?v2) ?v3]]
remote-db
(d/since remote-db #inst "2022-06-01")
(map :db/ident references)
)
(group-by first)
(map (fn [[k v]]
[k (disj (set (map second v)) "db")]))
(into {})))
#_(def manual-dependencies
{:client/location-matches #{"location-match"}
:transaction/yodlee-merchant #{"yodlee-merchant"}
:vendor-account-override/account #{"account"}
:vendor-account-override/client #{"client"}
:vendor/secondary-contact #{"contact"}
:vendor/account-overrides #{"vendor-account-override"}
:client/bank-accounts #{"bank-account"}
:transaction-rule/yodlee-merchant #{"yodlee-merchant"}
:client/forecasted-transactions #{"forecasted-transaction"}
:transaction/forecast-match #{"forecasted-transaction"}
:vendor/automatically-paid-when-due #{"client"}
:vendor/schedule-payment-dom #{"vendor-schedule-payment-dom"}
:vendor/terms-overrides #{"vendor-terms-override"}
:vendor-schedule-payment-dom/client #{"client"}})
#_(defn full-dependencies [remote-db]
(update (merge-with into (reference->entity remote-db) manual-dependencies)
:journal-entry/original-entity
#(disj % "journal-entry")))
#_(defn entity-dependencies [schema]
(let [base-dependencies
(into
{}
(map (fn [i]
[i #{}])
(set (map (comp namespace :db/ident)
(filter :db/valueType
schema))))
)
]
(into base-dependencies (reduce
(fn [acc [ref deps]]
(update acc (namespace ref) (fnil #(into % deps) #{})))
{}
(full-dependencies remote-db)))))
(def full-dependencies
{:invoice/client #{"client"},
:sales-order/client #{"client"},
:transaction-rule/transaction-approval-status #{},
:transaction/forecast-match #{"forecasted-transaction"},
:user/role #{},
:vendor-schedule-payment-dom/client #{"client"},
:invoice-payment/payment #{"payment"},
:transaction-rule/client #{"client"},
:invoice/status #{},
:payment/type #{},
:expected-deposit/client #{"client"},
:transaction/bank-account #{"bank-account"},
:transaction-rule-account/account #{"account"},
:import-batch/status #{},
:user/clients #{"client"},
:payment/client #{"client"},
:expected-deposit/charges #{"charge"},
:vendor/automatically-paid-when-due #{"client"},
:payment/invoices #{"invoice"},
:client/forecasted-transactions #{"forecasted-transaction"},
:transaction/matched-rule #{"transaction-rule"},
:invoice/import-status #{},
:charge/processor #{},
:expected-deposit/vendor #{"vendor"},
:client/square-locations #{"square-location"},
:payment/status #{},
:client/location-matches #{"location-match"},
:saved-query/client #{"client"},
:transaction/payment #{"payment"},
:transaction-rule/vendor #{"vendor"},
:plaid-item/client #{"client"},
:account/applicability #{},
:journal-entry-line/account #{"account" "bank-account"},
:client/bank-accounts #{"bank-account"},
:yodlee-provider-account/client #{"client"},
:account/vendor-allowance #{},
:payment/bank-account #{"bank-account"},
:account/default-allowance #{},
:transaction-rule/yodlee-merchant #{"yodlee-merchant"},
:vendor/account-overrides #{"vendor-account-override"},
:transaction/client #{"client"},
:invoice/vendor #{"vendor"},
:sales-order/vendor #{"vendor"},
:expected-deposit/status #{},
:journal-entry/original-entity #{"transaction" "invoice"},
:vendor-usage/client #{"client"},
:transaction/expected-deposit #{"expected-deposit"},
:client/ezcater-locations #{"ezcater-location"},
:journal-entry/client #{"client"},
:vendor/secondary-contact #{"contact"},
:journal-entry/line-items #{"journal-entry-line"},
:vendor/legal-entity-1099-type #{},
:transaction-rule/bank-account #{"bank-account"},
:transaction-account/account #{"account"},
:vendor/terms-overrides #{"vendor-terms-override"},
:vendor/default-account #{"account"},
:transaction/yodlee-merchant #{"yodlee-merchant"},
:sales-refund/client #{"client"},
:client/emails #{"email-contact"},
:payment/vendor #{"vendor"},
:invoice-payment/invoice #{"invoice"},
:report/client #{"client"},
:transaction-rule/accounts #{"transaction-rule-account"},
:charge/client #{"client"},
:bank-account/type #{},
:invoice-expense-account/account #{"account"},
:vendor/legal-entity-tin-type #{},
:transaction/approval-status #{},
:import-batch/entry #{"transaction"},
:bank-account/intuit-bank-account #{"intuit-bank-account"},
:account/type #{},
:sales-refund/vendor #{"vendor"},
:bank-account/yodlee-account #{"yodlee-account"},
:vendor/address #{"address"},
:integration-status/state #{},
:transaction/accounts #{"transaction-account"},
:sales-order/charges #{"charge"},
:client/address #{"address"},
:ezcater-location/caterer #{"ezcater-caterer"},
:vendor-account-override/client #{"client"},
:bank-account/integration-status #{"integration-status"},
:yodlee-provider-account/accounts #{"yodlee-account"},
:account/invoice-allowance #{},
:journal-entry/vendor #{"vendor"},
:plaid-item/accounts #{"plaid-account"},
:vendor-usage/vendor #{"vendor"},
:sales-order/line-items #{"order-line-item"},
:invoice/expense-accounts #{"invoice-expense-account"},
:account-client-override/client #{"client"},
:vendor/primary-contact #{"contact"},
:vendor/schedule-payment-dom #{"vendor-schedule-payment-dom"},
:account/client-overrides #{"account-client-override"},
:transaction/vendor #{"vendor"},
:client/square-integration-status #{"integration-status"},
:ezcater-integration/caterers #{"ezcater-caterer"},
:ezcater-integration/integration-status #{"integration-status"}
:vendor-account-override/account #{"account"},
:import-batch/source #{}})
(def entity-dependencies
{"transaction-rule"
#{"vendor" "yodlee-merchant" "transaction-rule-account" "bank-account"
"client"},
"square-location" #{},
"expected-deposit" #{"vendor" "charge" "client"},
"journal-entry-line" #{"account" "bank-account"},
"vendor"
#{"vendor-schedule-payment-dom" "address" "account" "client" "contact"
"vendor-account-override"},
"transaction"
#{"transaction-rule" "expected-deposit" "vendor" "yodlee-merchant"
"transaction-account" "forecasted-transaction" "bank-account" "client"
"payment"},
"yodlee-provider-account" #{"yodlee-account" "client"},
"journal-entry"
#{"journal-entry-line" "vendor" "transaction" "invoice" "client"},
"yodlee-merchant" #{},
"invoice" #{"vendor" "invoice-expense-account" "client"},
"vendor-terms-override" #{},
"integration-status" #{},
"conformity" #{},
"user" #{"client"},
"sales-refund" #{"vendor" "client"},
"plaid-account" #{},
"charge" #{"client"},
"location-match" #{},
"vendor-schedule-payment-dom" #{"client"},
"account-client-override" #{"client"},
"plaid-item" #{"plaid-account" "client"},
"transaction-account" #{"account"},
"address" #{},
"order-line-item" #{},
"ezcater-location" #{"ezcater-caterer"},
"account" #{"account-client-override"},
"intuit-bank-account" #{},
"saved-query" #{"client"},
"ezcater-caterer" #{},
"forecasted-transaction" #{},
"audit" #{},
"yodlee-account" #{},
"transaction-rule-account" #{"account"},
"ezcater-integration" #{"ezcater-caterer" "integration-status"},
"report" #{"client"},
"bank-account" #{"integration-status" "intuit-bank-account" "yodlee-account"},
"vendor-usage" #{"vendor" "client"},
"invoice-expense-account" #{"account"},
"sales-order" #{"vendor" "charge" "order-line-item" "client"},
"client"
#{"square-location" "integration-status" "location-match" "address"
"ezcater-location" "forecasted-transaction" "bank-account" "email-contact"},
"email-contact" #{},
"invoice-payment" #{"invoice" "payment"},
"contact" #{},
"import-batch" #{"transaction"},
"payment" #{"vendor" "invoice" "bank-account" "client"},
"vendor-account-override" #{"account" "client"}})
(defn order-of-insert [entity-dependencies]
(loop [entity-dependencies entity-dependencies
order []]
(let [next-order (for [[entity deps] entity-dependencies
:when (not (seq deps))]
entity)
next-deps (reduce
(fn [entity-dependencies next-entity]
(into {}
(map
(fn [[k v]]
[k (disj v next-entity)])
entity-dependencies)))
(apply dissoc entity-dependencies next-order)
next-order)]
(if (seq next-deps)
(recur next-deps (into order next-order))
(into order next-order)))))
(def loaded (atom #{}))
(def dumped (atom #{}))
(defn write-s3 [data location]
(spit (io/file "/tmp/temp-edn")
(with-out-str (clojure.pprint/pprint data)))
(s3/put-object :bucket-name (:data-bucket env)
:key location
:input-stream (io/make-input-stream (io/file "/tmp/temp-edn") {})))
(defn dump-schema [schema backup]
(write-s3 (map
(fn [s]
(set/rename-keys s {:db/id :entity/migration-key}))
schema)
(str backup "/schema.edn"))
(write-s3 full-dependencies
(str backup "/full-dependencies.edn"))
(write-s3 entity-dependencies
(str backup "/entity-dependencies.edn")))
(defn pull-batch [remote-db schema entity entities]
(de/future-with request-pool
(mu/with-context {:entity entity}
(try
(when (= 0 (rand-int 100))
(mu/log ::pulling :count (count entities)))
(->> (d/pull-many remote-db
(->> schema
(filter :db/valueType)
(mapv :db/ident)
(filter #(= entity (namespace %)))
(into [:db/id]))
entities)
(mapv (fn [m ]
(reduce
(fn [m [k v]]
(cond
(= k :db/id)
(-> m
(assoc :entity/migration-key v)
(dissoc :db/id))
(full-dependencies k)
(if (vector? v)
(assoc m k (mapv (fn [r] [:entity/migration-key (:db/id r)]) v))
(assoc m k [:entity/migration-key (:db/id v)]))
:else
(dissoc m :payment/pdf-data
:payment/memo
:vendor/invoice-reminder-schedule)))
m
m))))
(catch Throwable e
(mu/log ::pull-error
:exception e)
(throw e))))))
(def in-flight (atom 0))
(def so-far (atom 0))
(def total (atom 0))
(defn dump-all
([] (dump-all nil))
([item-list]
(let [backup-id (str "/datomic-backup/" (java.util.UUID/randomUUID))
_ (mu/log ::starting-backup :backup backup-id)
remote-db (d/db (datomic.api/connect "datomic:ddb://us-east-1/integreat/integreat-prod"))
_ (mu/log ::fetching-schema)
schema (get-schema remote-db)
]
(mu/log ::dumping-schema)
(dump-schema schema backup-id)
(mu/log ::schema-dumped)
(doseq [entity (or item-list (filter (complement (conj @loaded "audit")) (order-of-insert entity-dependencies)))
:let [_ (swap! dumped conj entity)
_ (reset! so-far 0)
_ (mu/log ::querying :entity entity)
entities (d/q '[:find [?e ...]
:in $ [?a ...]
:where [?e ?a]]
remote-db
(cond-> (entity->best-key entity)
(not (vector? (entity->best-key entity))) vector))
_ (reset! total (count entities))
_ (mu/log ::entity-total-found :count (count entities) :entity entity)]]
(mu/trace ::single-entity
[:entity entity]
(mu/with-context {:entity entity :total @total}
(mu/log ::starting)
(mu/log ::deleting)
(io/delete-file (io/file "/tmp/tmp-ednl") true)
(mu/log ::pulling)
(ednl/with-append [append "/tmp/tmp-ednl" ]
@(s/consume (fn [batch]
(mu/with-context {:entity entity :total @total}
(doseq [a batch]
(try
(append a)
(catch Exception e
(mu/log ::error
:exception e)
(throw e)))
)
(swap! so-far #(+ % (count batch)))
(when (= 0 (rand-int 100))
(mu/log ::appended :count (count batch)
:so-far @so-far))))
(->> (partition-all 100 entities)
(into [])
(s/->source)
(s/onto buffered)
(s/buffer 20)
(s/map (fn [entities]
(pull-batch remote-db schema entity entities)))
(s/buffer 20)
(s/realize-each)))
)
(try
(mu/log ::copying)
(let [f (io/file "/tmp/tmp-ednl")]
(s3/put-object :bucket-name (:data-bucket env)
:key (str backup-id "/" entity ".ednl")
:input-stream (io/make-input-stream f {})
:metadata {:content-length (.length f)}))
(mu/log ::copied)
(catch Exception e
(mu/log ::upload-error
:exception e)
(throw e)))))))))
(defn -main [& _]
(try
(execute "export-backup" #(dump-all))
(catch Exception e
(println e)
(mu/log ::quit-error
:exception e
:background-job "export-backup"
:service "export-backup")
(Thread/sleep 5000)
(throw e))))
)