diff --git a/project.clj b/project.clj index 89c5cf4e..a44a20df 100644 --- a/project.clj +++ b/project.clj @@ -92,6 +92,11 @@ [org.clojure/data.json "1.0.0"] [org.clojure/data.csv "1.0.0"] [io.rkn/conformity "0.5.4"] + [com.brunobonacci/mulog "0.9.0"] + [com.brunobonacci/mulog-adv-console "0.9.0"] + [manifold "0.3.0" + :exclusions + [org.clojure/core.async]] [hiccup "1.0.5"] diff --git a/src/clj/auto_ap/background/metrics.clj b/src/clj/auto_ap/background/metrics.clj index 75a33cf2..950a030d 100644 --- a/src/clj/auto_ap/background/metrics.clj +++ b/src/clj/auto_ap/background/metrics.clj @@ -4,7 +4,8 @@ [clj-http.client :as http] [clojure.tools.logging :as log] [config.core :refer [env]] - [unilog.context :as lc])) + [unilog.context :as lc] + [com.brunobonacci.mulog :as mu])) (defn get-container-data [] (try @@ -34,7 +35,10 @@ (defn set-logging-context [] (when (seq container-data) (lc/push-context "container" (:DockerId container-data)) - (lc/push-context "ip" (-> container-data :Networks first :IPv4Addresses first)))) + (lc/push-context "ip" (-> container-data :Networks first :IPv4Addresses first)) + (mu/set-global-context! + {:container (:DockerId container-data) + :ip (-> container-data :Networks first :IPv4Addresses first)}))) (defn stop-logging-context [] (when (seq container-data) diff --git a/src/clj/auto_ap/jobs/core.clj b/src/clj/auto_ap/jobs/core.clj index a0740c13..7807dd08 100644 --- a/src/clj/auto_ap/jobs/core.clj +++ b/src/clj/auto_ap/jobs/core.clj @@ -4,15 +4,17 @@ [auto-ap.datomic :refer [conn]] [clojure.tools.logging :as log] [auto-ap.background.metrics :refer [metrics-setup container-tags container-data logging-context]] - [unilog.context :as lc])) + [unilog.context :as lc] + [com.brunobonacci.mulog :as mu])) (defn execute [name f] (try (lc/with-context {:background-job name} - (mount/start (mount/only #{#'conn #'metrics-setup #'container-tags #'logging-context #'container-data})) - ((heartbeat f name)) - (log/info "Stopping " name) - (Thread/sleep 15000) - (mount/stop)) + (mu/with-context {:background-job name} + (mount/start (mount/only #{#'conn #'metrics-setup #'container-tags #'logging-context #'container-data})) + ((heartbeat f name)) + (log/info "Stopping " name) + (Thread/sleep 15000) + (mount/stop))) (finally (System/exit 0)))) diff --git a/src/clj/auto_ap/square/core2.clj b/src/clj/auto_ap/square/core2.clj index 9dd04325..80b50e67 100644 --- a/src/clj/auto_ap/square/core2.clj +++ b/src/clj/auto_ap/square/core2.clj @@ -204,7 +204,7 @@ (mapv :item_type (:line_items order )))] (if is-order-only-for-charge? (->> (:tenders order) - (map #(tender->charge order client location %))) + (mapv #(tender->charge order client location %))) [(remove-nils #:sales-order {:date (coerce/to-date (time/to-time-zone (coerce/to-date-time (:created_at order)) (time/time-zone-for-id "America/Los_Angeles"))) @@ -239,7 +239,8 @@ (item-id->category-name client (:catalog_object_id li))) :total (amount->money (:total_money li)) :tax (amount->money (:total_tax_money li)) - :discount (amount->money (:total_discount_money li))}))))})]))) + :discount (amount->money (:total_discount_money li))}))) + (into []))})]))) (defn daily-results ([client location] @@ -519,3 +520,13 @@ (some-> (:object &throw-context) str) "Unknown error")})))))) +(defn preview-changes + ([client] + (first (for [client (get-square-clients client) + :when (seq (filter :square-location/client-location (:client/square-locations client))) + square-location (:client/square-locations client) + :when (:square-location/client-location square-location)] + (preview-changes client square-location (time/plus (time/now) (time/days -30)) (time/now))))) + ([client location start end] + (lc/with-context {:source "Square loading"} + (daily-results client location start end)))) diff --git a/src/clj/auto_ap/square/core3.clj b/src/clj/auto_ap/square/core3.clj new file mode 100644 index 00000000..559fbe20 --- /dev/null +++ b/src/clj/auto_ap/square/core3.clj @@ -0,0 +1,707 @@ +(ns auto-ap.square.core3 + (:require + [auto-ap.datomic :refer [conn remove-nils]] + [auto-ap.time :as atime] + [auto-ap.async-error :as aerror] + [clj-http.client :as client] + [clj-time.coerce :as coerce] + [clj-time.core :as time] + [clj-time.format :as f] + [clj-time.periodic :as periodic] + [clojure.core.async :as async] + [clojure.data.json :as json] + [clojure.set :as set] + [clojure.string :as str] + [clojure.tools.logging :as log] + [cemerick.url :as url] + [datomic.api :as d] + [slingshot.slingshot :refer [try+]] + [unilog.context :as lc] + [manifold.executor :as ex] + [manifold.deferred :as de] + [manifold.time :as mt] + [manifold.stream :as s] + [com.brunobonacci.mulog :as mu])) + +(defn client-base-headers [client] + {"Square-Version" "2021-08-18" + "Authorization" (str "Bearer " (:client/square-auth-token client)) + "Content-Type" "application/json"}) + + +(defn retry-4 [ex try-count _] + (mu/log ::aborting-request + :attempt try-count + :exception ex) + (if (> try-count 4) false true)) + +(defmacro with-context-as [ctx s & body] + `(mu/with-context ~ctx + (let [~s (mu/local-context)] + ~@body))) + +(defmacro capture-context->lc [& body] + `(let [~'lc (mu/local-context)] + ~@body)) + +(def manifold-api-stream + (let [stream (s/stream 100)] + (->> stream + (s/throttle 80) + (s/map (fn [[request attempt response-deferred]] + (de/catch + (de/chain + (de/loop [attempt 0] + (-> (de/chain (de/future-with (ex/execute-pool) + (mu/log ::request-started + :url (:url request) + :attempt attempt + :source "Square 3" + :background-job "Square 3") + (client/request (assoc request + :as :json + :retry-handler retry-4)))) + (de/catch + (fn [e] + (if (= attempt 5) + (throw e) + (de/chain + (mt/in 1000 (fn [] 1)) + (fn [x] (de/recur (inc attempt))))))) + (de/chain identity))) + (fn [result] + (de/success! response-deferred result))) + (fn [error] + (de/error! response-deferred error))))) + + (s/buffer 50) + (s/realize-each) + (s/consume (fn [result] + (mu/log ::request-completed + :source "Square 3" + :background-job "Square 3")))) + stream)) + +(defn manifold-api-call + ([request] (manifold-api-call request 0)) + ([request attempt] + (manifold-api-call request 0 (de/deferred))) + ([request attempt response-deferred] + (de/chain (s/put! manifold-api-stream [request attempt response-deferred]) + (fn [r] + (when r + response-deferred))))) + +(defn lookup-dates [] + (->> (periodic/periodic-seq (time/plus (time/now) (time/days -15)) + (time/now) + (time/days 5)) + (map (fn [d] + [(atime/unparse (time/plus d (time/days 1)) atime/iso-date) + + (atime/unparse (time/plus d (time/days 5)) atime/iso-date)])))) + +(defn client-locations [client] + (capture-context->lc + (de/catch + (de/chain (manifold-api-call + {:url "https://connect.squareup.com/v2/locations" + :method :get + :headers (client-base-headers client)}) + :body + :locations) + (fn [error] + (mu/with-context lc + (mu/log ::no-locations-found + :exception error)) + [])))) + + +(def item-cache (atom {})) + +(defn fetch-catalog [client i] + (capture-context->lc + (de/chain + [client i] + (fn [[client i]] + (mu/with-context lc + (manifold-api-call {:url (str "https://connect.squareup.com/v2/catalog/object/" i) + :method :get + :headers (client-base-headers client) + :query-params {"include_related_items" "true"} + :as :json}))) + :body + :object + #(do (swap! item-cache assoc i %) + %)))) + + +(defn fetch-catalog-cache [client i] + (if (get @item-cache i) + (de/success-deferred (get @item-cache i)) + (fetch-catalog client i))) + + +(defn item->category-name-impl [client item ] + (capture-context->lc + (cond (:item_id (:item_variation_data item)) + (de/chain (fetch-catalog-cache client (:item_id (:item_variation_data item))) + (fn [item] + (mu/with-context lc + (item->category-name-impl client item)))) + + (:category_id (:item_data item)) + (de/chain (fetch-catalog-cache client (:category_id (:item_data item))) + :category_data + :name) + + (:item_data item) + "Uncategorized" + + :else + (do + (mu/log ::no-look-up-item + :item item + ) + "Uncategorized")))) + + +(defn item-id->category-name [client i] + (capture-context->lc + (-> [client i] + (de/chain + (fn [[client i]] + (if (str/blank? i) + "Uncategorized" + (de/chain (fetch-catalog-cache client i) + #(mu/with-context lc + (item->category-name-impl client %))))))))) + +(defn pc [start end] + {"query" {"filter" {"date_time_filter" + { + "created_at" { + "start_at" (f/unparse (f/formatter "YYYY-MM-dd'T'HH:mm:ssZZ") start) + "end_at" (f/unparse (f/formatter "YYYY-MM-dd'T'HH:mm:ssZZ") end) + }}} + + "sort" { + "sort_field" "CREATED_AT" + "sort_order" "DESC"}}}) + +#_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]} +(defn get-order + ([client location order-id] + (mu/log ::searching-for-order + :location location + :order-id order-id) + (let [result (->> (client/get (str "https://connect.squareup.com/v2/orders/" order-id) + {:headers (client-base-headers client) + :as :json}) + :body + )] + result))) + +(defn continue-search [client location start end cursor] + (mu/log ::continue-order-search + :cursor cursor) + + (capture-context->lc + (de/chain (manifold-api-call + {:url "https://connect.squareup.com/v2/orders/search" + :method :post + :headers (client-base-headers client) + :body (json/write-str (cond-> {"location_ids" [(:square-location/square-id location)] + "limit" 10000 + "cursor" cursor} + start (merge (pc start end)))) + :as :json}) + :body + (fn [result] + (mu/with-context + lc + (mu/log ::orders-found + :count (count (:orders result))) + (if (not-empty (:cursor result)) + (de/chain (continue-search client location start end (:cursor result)) + (fn [continued-results] + (mu/with-context + lc + (concat (:orders result) continued-results)))) + (:orders result))))))) + + +(defn search + ([client location start end] + (capture-context->lc + (mu/log ::searching + :location (:square-location/client-location location)) + (de/chain (manifold-api-call {:url "https://connect.squareup.com/v2/orders/search" + :method :post + :headers (client-base-headers client) + :body (json/write-str (cond-> {"location_ids" [(:square-location/square-id location)] "limit" 10000} + start (merge (pc start end)))) + :as :json}) + :body + (fn [result] + (mu/with-context lc + (mu/log ::orders-found + :count (count (:orders result))) + (if (not-empty (:cursor result)) + (de/chain (continue-search client location start end (:cursor result)) + (fn [continued-results] + (mu/with-context lc + (concat (:orders result) continued-results)))) + (:orders result)))))))) + + +(defn amount->money [amt] + (* 0.01 (or (:amount amt) 0.0))) + + +;; to get totals: +(comment + (reduce + (fn [total i] + (+ total (+ (- (:sales-order/total i) + (:sales-order/tax i) + (:sales-order/tip i) + (:sales-order/service-charge i)) + (:sales-order/returns i) + + (:sales-order/discount i) + ))) + 0.0 + [])) + +(defn tender->charge [order client location t] + (remove-nils + #:charge + {:type-name (:type t) + :date (coerce/to-date (time/to-time-zone (coerce/to-date-time (:created_at order)) (time/time-zone-for-id "America/Los_Angeles"))) + :client (:db/id client) + :note (:note t) + :location (:square-location/client-location location) + :reference-link (str (url/url "https://squareup.com/receipt/preview" (:id t) )) + :external-id (when (:id t) + (str "square/charge/" (:id t))) + :processor (condp = (:type t) + "OTHER" + (condp = (some-> (:note t) str/lower-case) + "doordash" :ccp-processor/doordash + "dd" :ccp-processor/doordash + "ubereats" :ccp-processor/uber-eats + "ue" :ccp-processor/uber-eats + "grubhub" :ccp-processor/grubhub + "grub" :ccp-processor/grubhub + "gh" :ccp-processor/grubhub + (condp = (:name (:source order)) + "GRUBHUB" :ccp-processor/grubhub + "UBEREATS" :ccp-processor/uber-eats + "DOORDASH" :ccp-processor/doordash + "Koala" :ccp-processor/koala + "koala-production" :ccp-processor/koala + :ccp-processor/na)) + "CARD" + :ccp-processor/square + + "SQUARE_GIFT_CARD" + :ccp-processor/square + + "CASH" + :ccp-processor/na + + :ccp-processor/na) + :total (amount->money (:amount_money t)) + :tip (amount->money (:tip_money t))})) + +(defn order->sales-order [client location order] + (capture-context->lc + (let [is-order-only-for-charge? (= ["CUSTOM_AMOUNT"] + (mapv :item_type (:line_items order )))] + (if is-order-only-for-charge? + (de/success-deferred + (->> (:tenders order) + (map #(tender->charge order client location %)))) + (de/let-flow [line-items + (->> + (or (:line_items order) []) + (s/->source) + (s/transform + (map-indexed (fn [i li] + (mu/with-context lc + (de/let-flow [category (item-id->category-name client (:catalog_object_id li))] + (remove-nils + #:order-line-item + {:external-id (str "square/order/" (:client/code client) "-" (:square-location/client-location location) "-" (:id order) "-" i) + :item-name (:name li) + :category (if (= "GIFT_CARD" (:item_type li)) + "Gift Card" + category) + :total (amount->money (:total_money li)) + :tax (amount->money (:total_tax_money li)) + :discount (amount->money (:total_discount_money li))}))))) + ) + (s/buffer 5) + (s/realize-each) + (s/reduce conj []))] + [(remove-nils + #:sales-order + {:date (coerce/to-date (time/to-time-zone (coerce/to-date-time (:created_at order)) (time/time-zone-for-id "America/Los_Angeles"))) + :client (:db/id client) + :location (:square-location/client-location location) + :external-id (str "square/order/" (:client/code client) "-" (:square-location/client-location location) "-" (:id order)) + :source (or (:name (:source order)) + "Square") + :vendor :vendor/ccp-square + + :reference-link (str (url/url "https://squareup.com/dashboard/sales/transactions" (:id order) "by-unit" (:square-location/square-id location))) + :total (-> order :net_amounts :total_money amount->money) + :tax (-> order :net_amounts :tax_money amount->money) + :tip (-> order :net_amounts :tip_money amount->money) + :discount (-> order :net_amounts :discount_money amount->money) + :service-charge (-> order :net_amounts :service_charge_money amount->money) + :returns (+ (- (-> order :return_amounts :total_money amount->money) + (-> order :return_amounts :tax_money amount->money) + (-> order :return_amounts :tip_money amount->money) + (-> order :return_amounts :service_charge_money amount->money)) + (-> order :return_amounts :discount_money amount->money)) + :charges (->> (:tenders order) + (map #(tender->charge order client location %))) + :line-items line-items})]))))) + +(defn daily-results + ([client location] + (daily-results client location (time/plus (time/now) (time/days -7)) (time/now))) + ([client location start end] + (capture-context->lc + (de/chain (search client location start end) + (fn [search-results] + (->> search-results + (s/->source) + (s/filter (fn [order] + ;; sometimes orders stay open in square. At least one payment + ;; is needed to import, in order to avoid importing orders in-progress. + (and + (or (> (count (:tenders order)) 0) + (seq (:returns order))) + (or (= #{} (set (map #(:status (:card_details %)) (:tenders order)))) + (not= #{} (set/difference + (set (map #(:status (:card_details %)) (:tenders order))) + #{"FAILED" "VOIDED"})))))) + (s/map #(mu/with-context lc (order->sales-order client location %))) + (s/buffer 10) + (s/realize-each) + (s/reduce into []))))))) + + +(defn get-payment [client p] + (de/chain (manifold-api-call + {:url (str "https://connect.squareup.com/v2/payments/" p) + :method :get + :headers (client-base-headers client)}) + :body + :payment)) + +(defn settlements + ([client location] (settlements client location (lookup-dates))) + ([client location lookup-dates] + (with-context-as {:location (:square-location/client-location location)} lc + (de/chain + + (->> lookup-dates + (s/->source) + (s/map (fn [[start-date end-date]] + (mu/with-context lc + (mu/log ::searching-settlements + :start-date start-date + :end-date end-date) + (de/chain (manifold-api-call + {:url (str "https://connect.squareup.com/v1/" (:square-location/square-id location) "/settlements") + :method :get + :headers (client-base-headers client) + :query-params {"begin_time" start-date + "end_time" end-date}}) + :body + (fn [settlements] + (map :id settlements)))))) + (s/buffer 3) + (s/realize-each) + (s/map (fn [settlement-set] + settlement-set)) + (s/reduce into #{})) + (fn [settlement-id-set] + (->> settlement-id-set + (s/->source) + (s/map (fn [settlement-id] + (mu/with-context lc + (mu/log ::looking-up-settlement + :settlement-id settlement-id) + (de/chain + (manifold-api-call + {:url (str "https://connect.squareup.com/v1/" (:square-location/square-id location) "/settlements/" settlement-id) + :method :get + :headers (client-base-headers client) + :as :json + :retry-handler retry-4}) + :body)))) + (s/buffer 10) + (s/realize-each) + (s/reduce conj []))))))) + +(defn daily-settlements + ([client location] + (de/chain (settlements client location) + (fn [settlements] + (->> (for [settlement settlements + :let [best-sales-date (->> (d/q '[:find ?s4 (count ?s) + :in $ ?settlement-id + :where + [?settlement :expected-deposit/external-id ?settlement-id] + [?settlement :expected-deposit/charges ?c] + [?s :sales-order/charges ?c] + [?s :sales-order/date ?sales-date] + [(clj-time.coerce/to-date-time ?sales-date) ?s2] + [(auto-ap.time/localize ?s2) ?s3] + [(clj-time.coerce/to-local-date ?s3) ?s4]] + (d/db conn) + (str "square/settlement/" (:id settlement))) + (sort-by last) + last + first + coerce/to-date)]] + #:expected-deposit {:external-id (str "square/settlement/" (:id settlement)) + :vendor :vendor/ccp-square + :status :expected-deposit-status/pending + :total (amount->money (:total_money settlement)) + :client (:db/id client) + :location (:square-location/client-location location) + :fee (- (reduce + 0.0 (map (fn [entry] + (if (= (:type entry) "REFUND") + (- (amount->money (:fee_money entry))) + (amount->money (:fee_money entry)))) + (:entries settlement)))) + :date (-> (:initiated_at settlement) + (coerce/to-date)) + :sales-date (or best-sales-date + (-> (:initiated_at settlement) + (coerce/to-date))) + :charges (->> (:entries settlement) + (filter :payment_id) + (map (fn [p] {:charge/external-id (str "square/charge/" (:payment_id p))})))}) + (filter :expected-deposit/date)))))) + +(defn refunds + ([client l] + (de/chain (manifold-api-call {:url (str "https://connect.squareup.com/v2/refunds?location_id=" (:square-location/square-id l)) + :method :get + + :headers (client-base-headers client) + :as :json + :retry-handler retry-4}) + :body + :refunds + (fn [refunds] + (->> refunds + (filter (fn [r] (= "COMPLETED" (:status r)))) + (s/->source ) + (s/map (fn [r] + (de/chain + (get-payment client (:payment_id r)) + (fn [payment] + #:sales-refund {:external-id (str "square/refund/" (:id r)) + :vendor :vendor/ccp-square + :total (amount->money (:amount_money r)) + :fee (transduce + (comp (filter #(= "ADJUSTMENT" (:type %))) + (map :amount_money) + (map amount->money)) + + + 0.0 + (:processing_fee r)) + :client (:db/id client) + :location (:square-location/client-location l) + :date (coerce/to-date (:created_at r)) + :type (:source_type payment)})))) + (s/buffer 5) + (s/realize-each) + (s/reduce conj [])))))) +(defn upsert + ([client ] + (apply de/zip + (for [square-location (:client/square-locations client) + :when (:square-location/client-location square-location)] + (upsert client square-location (time/plus (time/now) (time/days -7)) (time/now))))) + ([client location start end] + (capture-context->lc + (de/chain (daily-results client location start end) + (fn [results ] + (mu/with-context lc + (doseq [x (partition-all 100 results)] + (mu/log ::loading-orders + :count (count x)) + @(d/transact conn x)))))))) + + +(defn upsert-settlements + ([client] + (apply de/zip + (for [square-location (:client/square-locations client) + :when (:square-location/client-location square-location)] + (upsert-settlements client square-location)))) + ([client location] + (with-context-as {:source "Square settlements loading" + :client (:client/code client)} lc + + (de/chain (daily-settlements client location) + (fn [settlements] + (mu/with-context lc + (doseq [x (partition-all 20 settlements)] + (mu/log ::loading-deposits + :count (count x)) + @(d/transact conn x)) + (mu/log ::done-loading-deposits))))))) + +(defn upsert-refunds + ([client] + (apply de/zip + (for [square-location (:client/square-locations client) + :when (:square-location/client-location square-location)] + (upsert-refunds client square-location)))) + ([client location] + (de/chain (refunds client location) + (fn [refunds] + (mu/with-context {:source "Loading Square Settlements" + :client (:client/code client) + :location (:square-location/client-location client)} + (doseq [x (partition-all 100 refunds)] + (mu/log ::loading-refunds + :count (count x)) + @(d/transact conn x)) + (mu/log ::done-loading-refunds)))))) + +(def square-read [:db/id + :client/code + :client/square-auth-token + {:client/square-locations [:db/id :square-location/name :square-location/square-id :square-location/client-location]}]) + +(defn get-square-clients + ([] + (d/q '[:find [(pull ?c [:db/id + :client/square-integration-status + :client/code + :client/square-auth-token + {:client/square-locations [:db/id :square-location/name :square-location/square-id :square-location/client-location]}]) ...] + :in $ + :where [?c :client/square-auth-token] + [?c :client/feature-flags "new-square"]] + (d/db conn))) + ([ & codes] + (d/q '[:find [(pull ?c [:db/id + :client/code + :client/square-auth-token + {:client/square-locations [:db/id :square-location/name :square-location/square-id :square-location/client-location]}]) ...] + :in $ [?code ...] + :where [?c :client/square-auth-token] + [?c :client/feature-flags "new-square"] + [?c :client/code ?code]] + (d/db conn) + codes))) + +(defn upsert-locations + ([] + (apply de/zip + (for [client (get-square-clients)] + (upsert-locations client)))) + ([client] + (let [square-id->id (into {} + (map + (fn [sl] + [(:square-location/square-id sl) + (:db/id sl)]) + (:client/square-locations client)))] + (de/chain (client-locations client) + (fn [client-locations] + @(d/transact conn + (for [square-location client-locations] + {:db/id (or (square-id->id (:id square-location)) (d/tempid :db.part/user)) + :client/_square-locations (:db/id client) + :square-location/name (:name square-location) + :square-location/square-id (:id square-location)}))))))) + +#_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]} +(defn reset [] + (->> + (d/query {:query {:find ['?e] + :in ['$] + :where ['(or [?e :sales-order/date] + [?e :expected-deposit/date])]} + :args [(d/db conn)]}) + (map first) + (map (fn [x] [:db/retractEntity x])))) + +(defn mark-integration-status [client integration-status] + @(d/transact conn + [{:db/id (:db/id client) + :client/square-integration-status (assoc integration-status + :db/id (or (-> client :client/square-integration-status :db/id) + #db/id [:db.part/user]))}])) + +(defn upsert-all [ & clients] + (capture-context->lc + (->> (apply get-square-clients clients) + (s/->source) + (s/filter (fn [client] + (seq (filter :square-location/client-location (:client/square-locations client))))) + (s/map (fn [client] + (with-context-as (merge lc {:client (:client/code client)}) lc + (mu/log ::import-started) + (mark-integration-status client {:integration-status/last-attempt (coerce/to-date (time/now))}) + + (-> + (de/chain (upsert-locations client) + (fn [_] + (mu/with-context lc + (mu/log ::upsert-orders-started) + (upsert client))) + (fn [_] + (mu/with-context lc + (mu/log ::upsert-settlements-started) + (upsert-settlements client))) + (fn [_] + (mu/with-context lc + (mu/log ::upsert-refunds-started) + (upsert-refunds client))) + (fn [_] + (mu/with-context lc + (mu/log ::upsert-done)) + (mark-integration-status client {:integration-status/state :integration-state/success + :integration-status/last-updated (coerce/to-date (time/now))}))) + (de/catch (fn [e] + (mu/with-context lc + (let [data (ex-data e)] + (mu/log ::upsert-all-failed + :severity :warn + :exception e) + (cond (= (:status data) 401) + (mark-integration-status client {:integration-status/state :integration-state/unauthorized + :integration-status/message (-> data :body str)}) + + (= (:status data) 503) + (mark-integration-status client {:integration-status/state :integration-state/failed + :integration-status/message (-> data :body str)}) + :else + (mark-integration-status client {:integration-status/state :integration-state/failed + :integration-status/message (or (ex-message e) + (str e))})))))))))) + (s/buffer 5) + (s/realize-each) + (s/reduce conj [])))) + +(defn do-upsert-all [& clients] + (mu/with-context {:background-job "Square 3"} + (mu/trace + ::upsert-all + [:clients clients] + @(apply upsert-all clients)))) + diff --git a/src/cljc/auto_ap/async_error.cljc b/src/cljc/auto_ap/async_error.cljc new file mode 100644 index 00000000..f749415c --- /dev/null +++ b/src/cljc/auto_ap/async_error.cljc @@ -0,0 +1,54 @@ +(ns auto-ap.async-error + #?(:clj + (:require [clojure.core.async]))) + +;; ---- Helpers Taken from Prismatic Schema ----------------------------------- + +#?(:clj + (defn cljs-env? + "Take the &env from a macro, and tell whether we are expanding into cljs." + [env] + (boolean (:ns env)))) + +#?(:clj + (defmacro if-cljs + "Return then if we are generating cljs code and else for Clojure code. + https://groups.google.com/d/msg/clojurescript/iBY5HaQda4A/w1lAQi9_AwsJ" + [then else] + (if (cljs-env? &env) then else))) + +;; ---- Helpers --------------------------------------------------------------- + +(defn throw-err [e] + (when (instance? #?(:clj Throwable :cljs js/Error) e) (throw e)) + e) + +;; ---- Public API ------------------------------------------------------------ + +#?(:clj + (defmacro