Makes new square loading much faster, parallel.

This commit is contained in:
2023-01-08 08:27:48 -08:00
parent ff48e1fab4
commit 553330297a
4 changed files with 58 additions and 55 deletions

View File

@@ -42,7 +42,9 @@
{:container (:DockerId container-data) {:container (:DockerId container-data)
:ip (-> container-data :Networks first :IPv4Addresses first) :ip (-> container-data :Networks first :IPv4Addresses first)
:env (:dd-env env) :env (:dd-env env)
:service (:dd-service env)}))) :service (or
(System/getenv "INTEGREAT_JOB")
(:dd-service env))})))
(defn stop-logging-context [] (defn stop-logging-context []
(when (seq container-data) (when (seq container-data)

View File

@@ -10,7 +10,8 @@
(defn execute [name f] (defn execute [name f]
(try (try
(lc/with-context {:background-job name} (lc/with-context {:background-job name}
(mu/with-context {:background-job name} (mu/with-context {:background-job name
:service name}
(mount/start (mount/only #{#'conn #'metrics-setup #'container-tags #'logging-context #'container-data})) (mount/start (mount/only #{#'conn #'metrics-setup #'container-tags #'logging-context #'container-data}))
((heartbeat f name)) ((heartbeat f name))
(log/info "Stopping " name) (log/info "Stopping " name)

View File

@@ -2,8 +2,8 @@
(:gen-class) (:gen-class)
(:require (:require
[auto-ap.jobs.core :refer [execute]] [auto-ap.jobs.core :refer [execute]]
[auto-ap.square.core2 :as square2])) [auto-ap.square.core3 :as square3]))
(defn -main [& _] (defn -main [& _]
(execute "square2-loading" square2/upsert-all)) (execute "square3-loading" square3/do-upsert-all))

View File

@@ -637,59 +637,59 @@
(defn upsert-all [ & clients] (defn upsert-all [ & clients]
(capture-context->lc (capture-context->lc
(->> (apply get-square-clients clients) (log/info ::starting-upsert)
(s/->source) (->> (apply get-square-clients clients)
(s/filter (fn [client] (s/->source)
(seq (filter :square-location/client-location (:client/square-locations client))))) (s/filter (fn [client]
(s/map (fn [client] (seq (filter :square-location/client-location (:client/square-locations client)))))
(with-context-as (merge lc {:client (:client/code client)}) lc (s/map (fn [client]
(log/info ::import-started) (with-context-as (merge lc {:client (:client/code client)}) lc
(mark-integration-status client {:integration-status/last-attempt (coerce/to-date (time/now))}) (log/info ::import-started)
(mark-integration-status client {:integration-status/last-attempt (coerce/to-date (time/now))})
(-> (->
(de/chain (upsert-locations client) (de/chain (upsert-locations client)
(fn [_] (fn [_]
(mu/with-context lc (mu/with-context lc
(log/info ::upsert-orders-started) (log/info ::upsert-orders-started)
(upsert client))) (upsert client)))
(fn [_] (fn [_]
(mu/with-context lc (mu/with-context lc
(log/info ::upsert-settlements-started) (log/info ::upsert-settlements-started)
(upsert-settlements client))) (upsert-settlements client)))
(fn [_] (fn [_]
(mu/with-context lc (mu/with-context lc
(log/info ::upsert-refunds-started) (log/info ::upsert-refunds-started)
(upsert-refunds client))) (upsert-refunds client)))
(fn [_] (fn [_]
(mu/with-context lc (mu/with-context lc
(log/info ::upsert-done)) (log/info ::upsert-done))
(mark-integration-status client {:integration-status/state :integration-state/success (mark-integration-status client {:integration-status/state :integration-state/success
:integration-status/last-updated (coerce/to-date (time/now))}))) :integration-status/last-updated (coerce/to-date (time/now))})))
(de/catch (fn [e] (de/catch (fn [e]
(mu/with-context lc (mu/with-context lc
(let [data (ex-data e)] (let [data (ex-data e)]
(log/info ::upsert-all-failed (log/info ::upsert-all-failed
:severity :warn :severity :warn
:exception e) :exception e)
(cond (= (:status data) 401) (cond (= (:status data) 401)
(mark-integration-status client {:integration-status/state :integration-state/unauthorized (mark-integration-status client {:integration-status/state :integration-state/unauthorized
:integration-status/message (-> data :body str)}) :integration-status/message (-> data :body str)})
(= (:status data) 503) (= (:status data) 503)
(mark-integration-status client {:integration-status/state :integration-state/failed (mark-integration-status client {:integration-status/state :integration-state/failed
:integration-status/message (-> data :body str)}) :integration-status/message (-> data :body str)})
:else :else
(mark-integration-status client {:integration-status/state :integration-state/failed (mark-integration-status client {:integration-status/state :integration-state/failed
:integration-status/message (or (ex-message e) :integration-status/message (or (ex-message e)
(str e))})))))))))) (str e))}))))))))))
(s/buffer 5) (s/buffer 5)
(s/realize-each) (s/realize-each)
(s/reduce conj [])))) (s/reduce conj []))))
(defn do-upsert-all [& clients] (defn do-upsert-all [& clients]
(mu/with-context {:background-job "Square 3"} (mu/trace
(mu/trace ::upsert-all
::upsert-all [:clients clients]
[:clients clients] @(apply upsert-all clients)))
@(apply upsert-all clients))))