From 1f57ed2d1cd138e13b7d58cc73cbe4a662f9b462 Mon Sep 17 00:00:00 2001 From: Bryce Covert Date: Thu, 29 Sep 2022 11:05:04 -0700 Subject: [PATCH] fulltext better. --- scratch-sessions/dump-edn.clj | 71 +++++++++++++++++++++------- src/clj/auto_ap/datomic/invoices.clj | 2 +- src/clj/auto_ap/graphql/ezcater.clj | 29 +++++++----- 3 files changed, 72 insertions(+), 30 deletions(-) diff --git a/scratch-sessions/dump-edn.clj b/scratch-sessions/dump-edn.clj index 4143dea0..c44c0310 100644 --- a/scratch-sessions/dump-edn.clj +++ b/scratch-sessions/dump-edn.clj @@ -10,7 +10,9 @@ [clojure.java.io :as io] [amazonica.aws.s3 :as s3] [config.core :refer [env]] + [clojure.core.async :as a] [datomic.client.api :as dc] + [datomic.client.api.async :as dca] [datomic.dev-local :as dl] [clojure.set :as set])) @@ -182,7 +184,9 @@ :vendor-schedule-payment-dom/client #{"client"}}) (def full-dependencies - (merge-with into reference->entity manual-dependencies)) + (update (merge-with into reference->entity manual-dependencies) + :journal-entry/original-entity + #(disj % "journal-entry"))) (def entity-dependencies (let [base-dependencies @@ -203,8 +207,7 @@ (def order-of-insert (loop [entity-dependencies entity-dependencies - order [] - ] + order []] (let [next-order (for [[entity deps] entity-dependencies :when (not (seq deps))] entity) @@ -217,6 +220,7 @@ entity-dependencies))) (apply dissoc entity-dependencies next-order) next-order)] + (println order next-deps) (if (seq next-deps) (recur next-deps (into order next-order)) (into order next-order))))) @@ -245,9 +249,44 @@ [k (first v)])))) +(defn tx-pipeline + "Transacts data from from-ch. Returns a map with: + :result, a return channel getting {:error t} or {:completed n} + :stop, a fn you can use to terminate early." + [conn conc from-ch] + (let [to-ch (a/chan 100) + done-ch (a/chan) + transact-data (fn [data] + (try + (dc/transact conn {:tx-data data}) + ; if exception in a transaction + ; will close channels and put error + ; on done channel. + (catch Throwable t + (.printStackTrace t) + (a/close! from-ch) + (a/close! to-ch) + (a/>!! done-ch {:error t}))))] + ; go block prints a '.' after every 1000 transactions, puts completed + ; report on done channel when no value left to be taken. + (a/go-loop [total 0] + (print ".") + (flush) + (if-let [c (a/! done-ch {:completed total}))) -(def loaded (atom #{"charge" "order-line-item" "journal-entry-line"})) + ; pipeline that uses transducer form of map to transact data taken from + ; from-ch and puts results on to-ch + (a/pipeline-blocking conc to-ch (map transact-data) from-ch) + + ; returns done channel and a function that you can use + ; for early termination. + {:result done-ch + :stop (fn [] (a/close! to-ch))})) + +(def loaded (atom #{})) (defn migrate [] (dc/transact (dc/connect local-client {:db-name "prod-migration"}) {:tx-data [{:db/ident :entity/migration-key @@ -292,7 +331,7 @@ :db/doc "A transaction account that was deleted"})))}) - (doseq [entity (drop-while #(not= % "journal-entry") (filter (complement #{"audit"}) order-of-insert)) + (doseq [entity (filter (complement #{"audit"}) order-of-insert) :let [_ (swap! loaded conj entity) _ (println "querying for " entity) entities (d/q '[:find [?e ...] @@ -300,10 +339,14 @@ :where [?e ?a]] remote-db (cond-> (entity->best-key entity) - (not (vector? (entity->best-key entity))) vector))]] + (not (vector? (entity->best-key entity))) vector)) + + tx-chan (a/chan) + pipeline (tx-pipeline (dc/connect local-client {:db-name "prod-migration"}) 30 + tx-chan)]] (println "Inserting " entity ": " (count entities)) + (flush) (doseq [batch (partition-all 2000 entities)] - (print ".") (let [transaction {:tx-data (->> (d/pull-many remote-db (->> schema (filter :db/valueType) @@ -328,15 +371,11 @@ :payment/memo))) m m))))}] - (try - (dc/transact (dc/connect local-client {:db-name "prod-migration"}) transaction) - (Thread/sleep 50) - (catch Exception e - (clojure.pprint/pprint transaction) - (println e) - (throw e) - ))) - (flush)) + (a/>!! tx-chan (:tx-data transaction)))) + (a/close tx-chan) + (println "waiting for done from" pipeline) + (flush) + (println (a/> data - (sort-by (comp - last)) - (partial-match-first (:query args)) - (map (fn [[n i]] - {:name n - :id i}))))) + (let [search-query (cleanse-query (:query args))] + (for [[id name] (search/search {:q search-query} + "ezcater-caterer")] + {:name name + :id (Long/parseLong id)}))) +(defn rebuild-search-index [] + (search/full-index-query + (for [result (map first (dc/qseq '[:find (pull ?a [:ezcater-caterer/search-terms :db/id :ezcater-caterer/name]) + :in $ + :where [?a :ezcater-caterer/search-terms ]] + (dc/db conn)))] + {:id (:db/id result) + :name (:ezcater-caterer/name result) + :text (:ezcater-caterer/search-terms result)}) + "ezcater-caterer")) (def objects {:ezcater_caterer {:fields {:name {:type 'String}