From 553330297a9809e9e72c646dc13d16838d441a51 Mon Sep 17 00:00:00 2001 From: Bryce Covert Date: Sun, 8 Jan 2023 08:27:48 -0800 Subject: [PATCH] Makes new square loading much faster, parallel. --- src/clj/auto_ap/background/metrics.clj | 4 +- src/clj/auto_ap/jobs/core.clj | 3 +- src/clj/auto_ap/jobs/square2.clj | 4 +- src/clj/auto_ap/square/core3.clj | 102 ++++++++++++------------- 4 files changed, 58 insertions(+), 55 deletions(-) diff --git a/src/clj/auto_ap/background/metrics.clj b/src/clj/auto_ap/background/metrics.clj index 8f74498e..1b3f387c 100644 --- a/src/clj/auto_ap/background/metrics.clj +++ b/src/clj/auto_ap/background/metrics.clj @@ -42,7 +42,9 @@ {:container (:DockerId container-data) :ip (-> container-data :Networks first :IPv4Addresses first) :env (:dd-env env) - :service (:dd-service env)}))) + :service (or + (System/getenv "INTEGREAT_JOB") + (:dd-service env))}))) (defn stop-logging-context [] (when (seq container-data) diff --git a/src/clj/auto_ap/jobs/core.clj b/src/clj/auto_ap/jobs/core.clj index 7807dd08..ff74fc02 100644 --- a/src/clj/auto_ap/jobs/core.clj +++ b/src/clj/auto_ap/jobs/core.clj @@ -10,7 +10,8 @@ (defn execute [name f] (try (lc/with-context {:background-job name} - (mu/with-context {:background-job name} + (mu/with-context {:background-job name + :service name} (mount/start (mount/only #{#'conn #'metrics-setup #'container-tags #'logging-context #'container-data})) ((heartbeat f name)) (log/info "Stopping " name) diff --git a/src/clj/auto_ap/jobs/square2.clj b/src/clj/auto_ap/jobs/square2.clj index a6510360..0552dd08 100644 --- a/src/clj/auto_ap/jobs/square2.clj +++ b/src/clj/auto_ap/jobs/square2.clj @@ -2,8 +2,8 @@ (:gen-class) (:require [auto-ap.jobs.core :refer [execute]] - [auto-ap.square.core2 :as square2])) + [auto-ap.square.core3 :as square3])) (defn -main [& _] - (execute "square2-loading" square2/upsert-all)) + (execute "square3-loading" square3/do-upsert-all)) diff --git a/src/clj/auto_ap/square/core3.clj b/src/clj/auto_ap/square/core3.clj index 30fa4f86..4e14d6d3 100644 --- a/src/clj/auto_ap/square/core3.clj +++ b/src/clj/auto_ap/square/core3.clj @@ -637,59 +637,59 @@ (defn upsert-all [ & clients] (capture-context->lc - (->> (apply get-square-clients clients) - (s/->source) - (s/filter (fn [client] - (seq (filter :square-location/client-location (:client/square-locations client))))) - (s/map (fn [client] - (with-context-as (merge lc {:client (:client/code client)}) lc - (log/info ::import-started) - (mark-integration-status client {:integration-status/last-attempt (coerce/to-date (time/now))}) + (log/info ::starting-upsert) + (->> (apply get-square-clients clients) + (s/->source) + (s/filter (fn [client] + (seq (filter :square-location/client-location (:client/square-locations client))))) + (s/map (fn [client] + (with-context-as (merge lc {:client (:client/code client)}) lc + (log/info ::import-started) + (mark-integration-status client {:integration-status/last-attempt (coerce/to-date (time/now))}) - (-> - (de/chain (upsert-locations client) - (fn [_] - (mu/with-context lc - (log/info ::upsert-orders-started) - (upsert client))) - (fn [_] - (mu/with-context lc - (log/info ::upsert-settlements-started) - (upsert-settlements client))) - (fn [_] - (mu/with-context lc - (log/info ::upsert-refunds-started) - (upsert-refunds client))) - (fn [_] - (mu/with-context lc - (log/info ::upsert-done)) - (mark-integration-status client {:integration-status/state :integration-state/success - :integration-status/last-updated (coerce/to-date (time/now))}))) - (de/catch (fn [e] - (mu/with-context lc - (let [data (ex-data e)] - (log/info ::upsert-all-failed - :severity :warn - :exception e) - (cond (= (:status data) 401) - (mark-integration-status client {:integration-status/state :integration-state/unauthorized - :integration-status/message (-> data :body str)}) + (-> + (de/chain (upsert-locations client) + (fn [_] + (mu/with-context lc + (log/info ::upsert-orders-started) + (upsert client))) + (fn [_] + (mu/with-context lc + (log/info ::upsert-settlements-started) + (upsert-settlements client))) + (fn [_] + (mu/with-context lc + (log/info ::upsert-refunds-started) + (upsert-refunds client))) + (fn [_] + (mu/with-context lc + (log/info ::upsert-done)) + (mark-integration-status client {:integration-status/state :integration-state/success + :integration-status/last-updated (coerce/to-date (time/now))}))) + (de/catch (fn [e] + (mu/with-context lc + (let [data (ex-data e)] + (log/info ::upsert-all-failed + :severity :warn + :exception e) + (cond (= (:status data) 401) + (mark-integration-status client {:integration-status/state :integration-state/unauthorized + :integration-status/message (-> data :body str)}) - (= (:status data) 503) - (mark-integration-status client {:integration-status/state :integration-state/failed - :integration-status/message (-> data :body str)}) - :else - (mark-integration-status client {:integration-status/state :integration-state/failed - :integration-status/message (or (ex-message e) - (str e))})))))))))) - (s/buffer 5) - (s/realize-each) - (s/reduce conj [])))) + (= (:status data) 503) + (mark-integration-status client {:integration-status/state :integration-state/failed + :integration-status/message (-> data :body str)}) + :else + (mark-integration-status client {:integration-status/state :integration-state/failed + :integration-status/message (or (ex-message e) + (str e))})))))))))) + (s/buffer 5) + (s/realize-each) + (s/reduce conj [])))) (defn do-upsert-all [& clients] - (mu/with-context {:background-job "Square 3"} - (mu/trace - ::upsert-all - [:clients clients] - @(apply upsert-all clients)))) + (mu/trace + ::upsert-all + [:clients clients] + @(apply upsert-all clients)))