fulltext better.

This commit is contained in:
2022-09-29 11:05:04 -07:00
parent 65ecee7014
commit 1f57ed2d1c
3 changed files with 72 additions and 30 deletions

View File

@@ -10,7 +10,9 @@
[clojure.java.io :as io]
[amazonica.aws.s3 :as s3]
[config.core :refer [env]]
[clojure.core.async :as a]
[datomic.client.api :as dc]
[datomic.client.api.async :as dca]
[datomic.dev-local :as dl]
[clojure.set :as set]))
@@ -182,7 +184,9 @@
:vendor-schedule-payment-dom/client #{"client"}})
(def full-dependencies
(merge-with into reference->entity manual-dependencies))
(update (merge-with into reference->entity manual-dependencies)
:journal-entry/original-entity
#(disj % "journal-entry")))
(def entity-dependencies
(let [base-dependencies
@@ -203,8 +207,7 @@
(def order-of-insert
(loop [entity-dependencies entity-dependencies
order []
]
order []]
(let [next-order (for [[entity deps] entity-dependencies
:when (not (seq deps))]
entity)
@@ -217,6 +220,7 @@
entity-dependencies)))
(apply dissoc entity-dependencies next-order)
next-order)]
(println order next-deps)
(if (seq next-deps)
(recur next-deps (into order next-order))
(into order next-order)))))
@@ -245,9 +249,44 @@
[k (first v)]))))
(defn tx-pipeline
"Transacts data from from-ch. Returns a map with:
:result, a return channel getting {:error t} or {:completed n}
:stop, a fn you can use to terminate early."
[conn conc from-ch]
(let [to-ch (a/chan 100)
done-ch (a/chan)
transact-data (fn [data]
(try
(dc/transact conn {:tx-data data})
; if exception in a transaction
; will close channels and put error
; on done channel.
(catch Throwable t
(.printStackTrace t)
(a/close! from-ch)
(a/close! to-ch)
(a/>!! done-ch {:error t}))))]
; go block prints a '.' after every 1000 transactions, puts completed
; report on done channel when no value left to be taken.
(a/go-loop [total 0]
(print ".")
(flush)
(if-let [c (a/<! to-ch)]
(recur (inc total))
(a/>! done-ch {:completed total})))
(def loaded (atom #{"charge" "order-line-item" "journal-entry-line"}))
; pipeline that uses transducer form of map to transact data taken from
; from-ch and puts results on to-ch
(a/pipeline-blocking conc to-ch (map transact-data) from-ch)
; returns done channel and a function that you can use
; for early termination.
{:result done-ch
:stop (fn [] (a/close! to-ch))}))
(def loaded (atom #{}))
(defn migrate []
(dc/transact (dc/connect local-client {:db-name "prod-migration"}) {:tx-data [{:db/ident :entity/migration-key
@@ -292,7 +331,7 @@
:db/doc "A transaction account that was deleted"})))})
(doseq [entity (drop-while #(not= % "journal-entry") (filter (complement #{"audit"}) order-of-insert))
(doseq [entity (filter (complement #{"audit"}) order-of-insert)
:let [_ (swap! loaded conj entity)
_ (println "querying for " entity)
entities (d/q '[:find [?e ...]
@@ -300,10 +339,14 @@
:where [?e ?a]]
remote-db
(cond-> (entity->best-key entity)
(not (vector? (entity->best-key entity))) vector))]]
(not (vector? (entity->best-key entity))) vector))
tx-chan (a/chan)
pipeline (tx-pipeline (dc/connect local-client {:db-name "prod-migration"}) 30
tx-chan)]]
(println "Inserting " entity ": " (count entities))
(flush)
(doseq [batch (partition-all 2000 entities)]
(print ".")
(let [transaction {:tx-data (->> (d/pull-many remote-db
(->> schema
(filter :db/valueType)
@@ -328,15 +371,11 @@
:payment/memo)))
m
m))))}]
(try
(dc/transact (dc/connect local-client {:db-name "prod-migration"}) transaction)
(Thread/sleep 50)
(catch Exception e
(clojure.pprint/pprint transaction)
(println e)
(throw e)
)))
(flush))
(a/>!! tx-chan (:tx-data transaction))))
(a/close tx-chan)
(println "waiting for done from" pipeline)
(flush)
(println (a/<!! (:result pipeline)))
(println)
(println "Done")))