(cloud) incremental rebuild of search indexes
This commit is contained in:
@@ -16,10 +16,15 @@
|
||||
[auto-ap.utils :refer [heartbeat]]
|
||||
[clojure.set :as set]
|
||||
[clojure.string :as str]
|
||||
[manifold.executor :as ex]
|
||||
[manifold.deferred :as de]
|
||||
[clojure.tools.logging :as log]
|
||||
[datomic.client.api :as dc]
|
||||
[yang.scheduler :as scheduler]
|
||||
[mount.core :as mount]))
|
||||
[mount.core :as mount]
|
||||
[clj-time.core :as time]
|
||||
[clj-time.coerce :as coerce]
|
||||
[com.brunobonacci.mulog :as mu]))
|
||||
|
||||
(defn can-user-edit-vendor? [vendor-id id]
|
||||
(if (is-admin? id)
|
||||
@@ -182,22 +187,60 @@
|
||||
(not (is-admin? (:id context))) (assoc :hidden false))
|
||||
"vendor")]
|
||||
{:name name
|
||||
:id id})
|
||||
))
|
||||
:id id})))
|
||||
|
||||
(def single-thread (ex/fixed-thread-executor 1))
|
||||
|
||||
(defn rebuild-search-index []
|
||||
(search/full-index-query
|
||||
(for [result (map first (dc/qseq '[:find (pull ?v [:vendor/search-terms :db/id :vendor/name :vendor/hidden])
|
||||
:in $
|
||||
:where [?v :vendor/search-terms ]]
|
||||
(dc/db conn)))]
|
||||
{:id (:db/id result)
|
||||
:text (or (first (:vendor/search-terms result))
|
||||
(:vendor/name result))
|
||||
:hidden (boolean (:vendor/hidden result))})
|
||||
"vendor"))
|
||||
(de/future-with
|
||||
single-thread
|
||||
(search/full-index-query
|
||||
(for [[result] (dc/qseq '[:find (pull ?v [:vendor/search-terms :db/id :vendor/name :vendor/hidden])
|
||||
:in $
|
||||
:where [?v :vendor/search-terms ]]
|
||||
(dc/db conn))]
|
||||
{:id (:db/id result)
|
||||
:text (or (first (:vendor/search-terms result))
|
||||
(:vendor/name result))
|
||||
:hidden (boolean (:vendor/hidden result))})
|
||||
"vendor")))
|
||||
|
||||
(def last-run-basis (atom nil))
|
||||
|
||||
(defn add-incremental-changes []
|
||||
(de/future-with
|
||||
single-thread
|
||||
(if-let [last-run-basis-value @last-run-basis]
|
||||
(let [db (dc/db conn)
|
||||
recent (dc/since db last-run-basis-value)
|
||||
_ (mu/log ::indexing
|
||||
:last-run last-run-basis-value
|
||||
:starting-from (:basisT db))
|
||||
results (for [[result] (dc/qseq '[:find (pull ?v [:vendor/search-terms :db/id :vendor/name :vendor/hidden])
|
||||
:in $ $$
|
||||
:where [$ ?v :vendor/name ]
|
||||
[$$ ?v]]
|
||||
db
|
||||
recent)]
|
||||
{:id (:db/id result)
|
||||
:text (or (first (:vendor/search-terms result))
|
||||
(:vendor/name result))
|
||||
:hidden (boolean (:vendor/hidden result))})]
|
||||
(when (seq results)
|
||||
(mu/log ::adding-to-index
|
||||
:sample (first results)
|
||||
:count (count results))
|
||||
(search/full-index-query results "vendor" false))
|
||||
(reset! last-run-basis (:basisT db))
|
||||
(count results))
|
||||
(reset! last-run-basis (:basisT (dc/db conn))))))
|
||||
|
||||
#_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]}
|
||||
(mount/defstate indexer
|
||||
:start (scheduler/every (* 5 60 1000) (heartbeat rebuild-search-index "rebuild-search-index"))
|
||||
:stop (scheduler/stop indexer))
|
||||
|
||||
#_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]}
|
||||
(mount/defstate incremental-indexer
|
||||
:start (scheduler/every (* 5 1000) (heartbeat add-incremental-changes "incremental-indexing"))
|
||||
:stop (scheduler/stop incremental-indexer))
|
||||
|
||||
@@ -9,23 +9,27 @@
|
||||
(org.apache.lucene.search BooleanClause$Occur BooleanQuery$Builder IndexSearcher PhraseQuery$Builder Query TermQuery)
|
||||
(org.apache.lucene.store FSDirectory)))
|
||||
|
||||
(defn full-index-query [results index-name]
|
||||
(let [directory (FSDirectory/open (Paths/get (java.net.URI. (str "file:///tmp/search/" (:dd-env env) "/" index-name))))
|
||||
analyzer (StandardAnalyzer.)
|
||||
index-writer-config (IndexWriterConfig. analyzer)
|
||||
index-writer (IndexWriter. directory index-writer-config)]
|
||||
(.deleteAll index-writer)
|
||||
(try
|
||||
(doseq [{:keys [text id] :as x} results
|
||||
:let [doc (doto
|
||||
(Document.)
|
||||
(.add (TextField. "name" text Field$Store/YES))
|
||||
(.add (StoredField. "id" (long id))))]]
|
||||
(doseq [k (filter (complement #{:text :id}) (keys x))]
|
||||
(.add doc (StringField. (name k) (str (get x k)) Field$Store/YES)))
|
||||
(.addDocument index-writer doc))
|
||||
(finally
|
||||
(.close index-writer)))))
|
||||
(defn full-index-query
|
||||
([results index-name]
|
||||
(full-index-query results index-name true))
|
||||
([results index-name delete?]
|
||||
(let [directory (FSDirectory/open (Paths/get (java.net.URI. (str "file:///tmp/search/" (:dd-env env) "/" index-name))))
|
||||
analyzer (StandardAnalyzer.)
|
||||
index-writer-config (IndexWriterConfig. analyzer)
|
||||
index-writer (IndexWriter. directory index-writer-config)]
|
||||
(when delete?
|
||||
(.deleteAll index-writer))
|
||||
(try
|
||||
(doseq [{:keys [text id] :as x} results
|
||||
:let [doc (doto
|
||||
(Document.)
|
||||
(.add (TextField. "name" text Field$Store/YES))
|
||||
(.add (StoredField. "id" (long id))))]]
|
||||
(doseq [k (filter (complement #{:text :id}) (keys x))]
|
||||
(.add doc (StringField. (name k) (str (get x k)) Field$Store/YES)))
|
||||
(.addDocument index-writer doc))
|
||||
(finally
|
||||
(.close index-writer))))))
|
||||
|
||||
(defn make-query [n]
|
||||
(let [
|
||||
|
||||
Reference in New Issue
Block a user