;; This buffer is for Clojure experiments and evaluation. ;; Press C-j to evaluate the last expression. ;; You can also press C-u C-j to evaluate the expression and pretty-print its result. (comment (ns auto-ap.backup (:require [datomic.api :as d] [manifold.deferred :as de] [manifold.executor :as ex] [manifold.stream :as s] [manifold.time :as mt] [auto-ap.jobs.core :refer [execute]] [clojure.java.io :as io] [amazonica.aws.s3 :as s3] [config.core :refer [env]] [clojure.core.async :as a] [lambdaisland.edn-lines :as ednl] [clojure.set :as set] [com.brunobonacci.mulog :as mu])) (def request-pool (ex/fixed-thread-executor 30)) (def buffered (ex/fixed-thread-executor 30)) (defn get-schema [remote-db] (let [everything (->> (d/q '[:find [(pull ?e [:db/ident {:db/valueType [:db/ident]} {:db/cardinality [:db/ident]} :db.attr/preds {:db/unique [:db/ident]} :db/isComponent :db/id :db/noHistory :db/tupleAttrs :db.entity/attrs :db.entity/preds :db/doc]) ...] :where [?e :db/ident]] remote-db)) schema-attrs (->> everything (filter :db/ident) (filter (fn [{:db/keys [ident]}] (if (namespace ident) (re-matches #"^(?!cartographer)(?!db)(?!fressian).+" (namespace ident)) true )))) meta-schema-schema (filter #(-> % :db/ident not) everything)] schema-attrs)) (def entity->best-key {"transaction-rule" [:transaction-rule/description, :transaction-rule/note :transaction-rule/vendor] "square-location" :square-location/square-id, "expected-deposit" :expected-deposit/date, "journal-entry-line" [:journal-entry-line/account, :journal-entry-line/debit :journal-entry-line/credit] "vendor" [:vendor/name, :vendor/default-account, :vendor/hidden] "transaction" :transaction/amount, "yodlee-provider-account" :yodlee-provider-account/id, "journal-entry" :journal-entry/source, "yodlee-merchant" :yodlee-merchant/yodlee-id, "invoice" :invoice/invoice-number, "vendor-terms-override" :vendor-terms-override/client, "integration-status" :integration-status/state, "conformity" :conformity/conformed-norms-index, "user" :user/provider-id, "sales-refund" :sales-refund/total, "plaid-account" :plaid-account/name, "charge" [:charge/total, :charge/external-id] "location-match" :location-match/location, "vendor-schedule-payment-dom" :vendor-schedule-payment-dom/dom, "account-client-override" :account-client-override/client, "plaid-item" :plaid-item/client, "transaction-account" :transaction-account/account, "address" [:address/street1, :address/city :address/state :address/zip] "order-line-item" :order-line-item/total, "ezcater-location" [:ezcater-location/location, :ezcater-location/caterer] "account" [:account/numeric-code, :account/code :account/name :account/type] "intuit-bank-account" :intuit-bank-account/name, "saved-query" :saved-query/guid, "ezcater-caterer" :ezcater-caterer/uuid, "forecasted-transaction" :forecasted-transaction/day-of-month, "audit" :audit/user, "yodlee-account" :yodlee-account/id, "transaction-rule-account" [:transaction-rule-account/account, :transaction-rule-account/location] "ezcater-integration" :ezcater-integration/subscriber-uuid, "report" :report/created, "bank-account" :bank-account/code, "vendor-usage" :vendor-usage/key, "invoice-expense-account" [:invoice-expense-account/expense-account-id, :invoice-expense-account/account :invoice-expense-account/location :invoice-expense-account/amount] "sales-order" :sales-order/date, "client" :client/code, "email-contact" :email-contact/email, "invoice-payment" :invoice-payment/amount, "contact" [:contact/name, :contact/phone :contact/email] "import-batch" :import-batch/date, "payment" [:payment/date, :payment/bank-account] "vendor-account-override" :vendor-account-override/client}) #_(defn references [schema] (filter (comp #{:db.type/ref} :db/ident :db/valueType) schema )) #_(defn reference->entity [remote-db] (->> (d/q '[:find ?a ?v3 :in $ $$ [?a ...] :where [$$ _ ?a ?e] [$ ?e ?v _ _] [$ ?v :db/ident ?v2 _ _] [(namespace ?v2) ?v3] [(namespace ?v2) ?v3]] remote-db (d/since remote-db #inst "2022-06-01") (map :db/ident references) ) (group-by first) (map (fn [[k v]] [k (disj (set (map second v)) "db")])) (into {}))) #_(def manual-dependencies {:client/location-matches #{"location-match"} :transaction/yodlee-merchant #{"yodlee-merchant"} :vendor-account-override/account #{"account"} :vendor-account-override/client #{"client"} :vendor/secondary-contact #{"contact"} :vendor/account-overrides #{"vendor-account-override"} :client/bank-accounts #{"bank-account"} :transaction-rule/yodlee-merchant #{"yodlee-merchant"} :client/forecasted-transactions #{"forecasted-transaction"} :transaction/forecast-match #{"forecasted-transaction"} :vendor/automatically-paid-when-due #{"client"} :vendor/schedule-payment-dom #{"vendor-schedule-payment-dom"} :vendor/terms-overrides #{"vendor-terms-override"} :vendor-schedule-payment-dom/client #{"client"}}) #_(defn full-dependencies [remote-db] (update (merge-with into (reference->entity remote-db) manual-dependencies) :journal-entry/original-entity #(disj % "journal-entry"))) #_(defn entity-dependencies [schema] (let [base-dependencies (into {} (map (fn [i] [i #{}]) (set (map (comp namespace :db/ident) (filter :db/valueType schema)))) ) ] (into base-dependencies (reduce (fn [acc [ref deps]] (update acc (namespace ref) (fnil #(into % deps) #{}))) {} (full-dependencies remote-db))))) (def full-dependencies {:invoice/client #{"client"}, :sales-order/client #{"client"}, :transaction-rule/transaction-approval-status #{}, :transaction/forecast-match #{"forecasted-transaction"}, :user/role #{}, :vendor-schedule-payment-dom/client #{"client"}, :invoice-payment/payment #{"payment"}, :transaction-rule/client #{"client"}, :invoice/status #{}, :payment/type #{}, :expected-deposit/client #{"client"}, :transaction/bank-account #{"bank-account"}, :transaction-rule-account/account #{"account"}, :import-batch/status #{}, :user/clients #{"client"}, :payment/client #{"client"}, :expected-deposit/charges #{"charge"}, :vendor/automatically-paid-when-due #{"client"}, :payment/invoices #{"invoice"}, :client/forecasted-transactions #{"forecasted-transaction"}, :transaction/matched-rule #{"transaction-rule"}, :invoice/import-status #{}, :charge/processor #{}, :expected-deposit/vendor #{"vendor"}, :client/square-locations #{"square-location"}, :payment/status #{}, :client/location-matches #{"location-match"}, :saved-query/client #{"client"}, :transaction/payment #{"payment"}, :transaction-rule/vendor #{"vendor"}, :plaid-item/client #{"client"}, :account/applicability #{}, :journal-entry-line/account #{"account" "bank-account"}, :client/bank-accounts #{"bank-account"}, :yodlee-provider-account/client #{"client"}, :account/vendor-allowance #{}, :payment/bank-account #{"bank-account"}, :account/default-allowance #{}, :transaction-rule/yodlee-merchant #{"yodlee-merchant"}, :vendor/account-overrides #{"vendor-account-override"}, :transaction/client #{"client"}, :invoice/vendor #{"vendor"}, :sales-order/vendor #{"vendor"}, :expected-deposit/status #{}, :journal-entry/original-entity #{"transaction" "invoice"}, :vendor-usage/client #{"client"}, :transaction/expected-deposit #{"expected-deposit"}, :client/ezcater-locations #{"ezcater-location"}, :journal-entry/client #{"client"}, :vendor/secondary-contact #{"contact"}, :journal-entry/line-items #{"journal-entry-line"}, :vendor/legal-entity-1099-type #{}, :transaction-rule/bank-account #{"bank-account"}, :transaction-account/account #{"account"}, :vendor/terms-overrides #{"vendor-terms-override"}, :vendor/default-account #{"account"}, :transaction/yodlee-merchant #{"yodlee-merchant"}, :sales-refund/client #{"client"}, :client/emails #{"email-contact"}, :payment/vendor #{"vendor"}, :invoice-payment/invoice #{"invoice"}, :report/client #{"client"}, :transaction-rule/accounts #{"transaction-rule-account"}, :charge/client #{"client"}, :bank-account/type #{}, :invoice-expense-account/account #{"account"}, :vendor/legal-entity-tin-type #{}, :transaction/approval-status #{}, :import-batch/entry #{"transaction"}, :bank-account/intuit-bank-account #{"intuit-bank-account"}, :account/type #{}, :sales-refund/vendor #{"vendor"}, :bank-account/yodlee-account #{"yodlee-account"}, :vendor/address #{"address"}, :integration-status/state #{}, :transaction/accounts #{"transaction-account"}, :sales-order/charges #{"charge"}, :client/address #{"address"}, :ezcater-location/caterer #{"ezcater-caterer"}, :vendor-account-override/client #{"client"}, :bank-account/integration-status #{"integration-status"}, :yodlee-provider-account/accounts #{"yodlee-account"}, :account/invoice-allowance #{}, :journal-entry/vendor #{"vendor"}, :plaid-item/accounts #{"plaid-account"}, :vendor-usage/vendor #{"vendor"}, :sales-order/line-items #{"order-line-item"}, :invoice/expense-accounts #{"invoice-expense-account"}, :account-client-override/client #{"client"}, :vendor/primary-contact #{"contact"}, :vendor/schedule-payment-dom #{"vendor-schedule-payment-dom"}, :account/client-overrides #{"account-client-override"}, :transaction/vendor #{"vendor"}, :client/square-integration-status #{"integration-status"}, :ezcater-integration/caterers #{"ezcater-caterer"}, :ezcater-integration/integration-status #{"integration-status"} :vendor-account-override/account #{"account"}, :import-batch/source #{}}) (def entity-dependencies {"transaction-rule" #{"vendor" "yodlee-merchant" "transaction-rule-account" "bank-account" "client"}, "square-location" #{}, "expected-deposit" #{"vendor" "charge" "client"}, "journal-entry-line" #{"account" "bank-account"}, "vendor" #{"vendor-schedule-payment-dom" "address" "account" "client" "contact" "vendor-account-override"}, "transaction" #{"transaction-rule" "expected-deposit" "vendor" "yodlee-merchant" "transaction-account" "forecasted-transaction" "bank-account" "client" "payment"}, "yodlee-provider-account" #{"yodlee-account" "client"}, "journal-entry" #{"journal-entry-line" "vendor" "transaction" "invoice" "client"}, "yodlee-merchant" #{}, "invoice" #{"vendor" "invoice-expense-account" "client"}, "vendor-terms-override" #{}, "integration-status" #{}, "conformity" #{}, "user" #{"client"}, "sales-refund" #{"vendor" "client"}, "plaid-account" #{}, "charge" #{"client"}, "location-match" #{}, "vendor-schedule-payment-dom" #{"client"}, "account-client-override" #{"client"}, "plaid-item" #{"plaid-account" "client"}, "transaction-account" #{"account"}, "address" #{}, "order-line-item" #{}, "ezcater-location" #{"ezcater-caterer"}, "account" #{"account-client-override"}, "intuit-bank-account" #{}, "saved-query" #{"client"}, "ezcater-caterer" #{}, "forecasted-transaction" #{}, "audit" #{}, "yodlee-account" #{}, "transaction-rule-account" #{"account"}, "ezcater-integration" #{"ezcater-caterer" "integration-status"}, "report" #{"client"}, "bank-account" #{"integration-status" "intuit-bank-account" "yodlee-account"}, "vendor-usage" #{"vendor" "client"}, "invoice-expense-account" #{"account"}, "sales-order" #{"vendor" "charge" "order-line-item" "client"}, "client" #{"square-location" "integration-status" "location-match" "address" "ezcater-location" "forecasted-transaction" "bank-account" "email-contact"}, "email-contact" #{}, "invoice-payment" #{"invoice" "payment"}, "contact" #{}, "import-batch" #{"transaction"}, "payment" #{"vendor" "invoice" "bank-account" "client"}, "vendor-account-override" #{"account" "client"}}) (defn order-of-insert [entity-dependencies] (loop [entity-dependencies entity-dependencies order []] (let [next-order (for [[entity deps] entity-dependencies :when (not (seq deps))] entity) next-deps (reduce (fn [entity-dependencies next-entity] (into {} (map (fn [[k v]] [k (disj v next-entity)]) entity-dependencies))) (apply dissoc entity-dependencies next-order) next-order)] (if (seq next-deps) (recur next-deps (into order next-order)) (into order next-order))))) (def loaded (atom #{})) (def dumped (atom #{})) (defn write-s3 [data location] (spit (io/file "/tmp/temp-edn") (with-out-str (clojure.pprint/pprint data))) (s3/put-object :bucket-name (:data-bucket env) :key location :input-stream (io/make-input-stream (io/file "/tmp/temp-edn") {}))) (defn dump-schema [schema backup] (write-s3 (map (fn [s] (set/rename-keys s {:db/id :entity/migration-key})) schema) (str backup "/schema.edn")) (write-s3 full-dependencies (str backup "/full-dependencies.edn")) (write-s3 entity-dependencies (str backup "/entity-dependencies.edn"))) (defn pull-batch [remote-db schema entity entities] (de/future-with request-pool (mu/with-context {:entity entity} (try (when (= 0 (rand-int 100)) (mu/log ::pulling :count (count entities))) (->> (d/pull-many remote-db (->> schema (filter :db/valueType) (mapv :db/ident) (filter #(= entity (namespace %))) (into [:db/id])) entities) (mapv (fn [m ] (reduce (fn [m [k v]] (cond (= k :db/id) (-> m (assoc :entity/migration-key v) (dissoc :db/id)) (full-dependencies k) (if (vector? v) (assoc m k (mapv (fn [r] [:entity/migration-key (:db/id r)]) v)) (assoc m k [:entity/migration-key (:db/id v)])) :else (dissoc m :payment/pdf-data :payment/memo :vendor/invoice-reminder-schedule))) m m)))) (catch Throwable e (mu/log ::pull-error :exception e) (throw e)))))) (def in-flight (atom 0)) (def so-far (atom 0)) (def total (atom 0)) (defn dump-all ([] (dump-all nil)) ([item-list] (let [backup-id (str "/datomic-backup/" (java.util.UUID/randomUUID)) _ (mu/log ::starting-backup :backup backup-id) remote-db (d/db (datomic.api/connect "datomic:ddb://us-east-1/integreat/integreat-prod")) _ (mu/log ::fetching-schema) schema (get-schema remote-db) ] (mu/log ::dumping-schema) (dump-schema schema backup-id) (mu/log ::schema-dumped) (doseq [entity (or item-list (filter (complement (conj @loaded "audit")) (order-of-insert entity-dependencies))) :let [_ (swap! dumped conj entity) _ (reset! so-far 0) _ (mu/log ::querying :entity entity) entities (d/q '[:find [?e ...] :in $ [?a ...] :where [?e ?a]] remote-db (cond-> (entity->best-key entity) (not (vector? (entity->best-key entity))) vector)) _ (reset! total (count entities)) _ (mu/log ::entity-total-found :count (count entities) :entity entity)]] (mu/trace ::single-entity [:entity entity] (mu/with-context {:entity entity :total @total} (mu/log ::starting) (mu/log ::deleting) (io/delete-file (io/file "/tmp/tmp-ednl") true) (mu/log ::pulling) (ednl/with-append [append "/tmp/tmp-ednl" ] @(s/consume (fn [batch] (mu/with-context {:entity entity :total @total} (doseq [a batch] (try (append a) (catch Exception e (mu/log ::error :exception e) (throw e))) ) (swap! so-far #(+ % (count batch))) (when (= 0 (rand-int 100)) (mu/log ::appended :count (count batch) :so-far @so-far)))) (->> (partition-all 100 entities) (into []) (s/->source) (s/onto buffered) (s/buffer 20) (s/map (fn [entities] (pull-batch remote-db schema entity entities))) (s/buffer 20) (s/realize-each))) ) (try (mu/log ::copying) (let [f (io/file "/tmp/tmp-ednl")] (s3/put-object :bucket-name (:data-bucket env) :key (str backup-id "/" entity ".ednl") :input-stream (io/make-input-stream f {}) :metadata {:content-length (.length f)})) (mu/log ::copied) (catch Exception e (mu/log ::upload-error :exception e) (throw e))))))))) (defn -main [& _] (try (execute "export-backup" #(dump-all)) (catch Exception e (println e) (mu/log ::quit-error :exception e :background-job "export-backup" :service "export-backup") (Thread/sleep 5000) (throw e)))) )