(ns auto-ap.square.core3 (:require [auto-ap.datomic :refer [conn remove-nils]] [auto-ap.logging :as log :refer [capture-context->lc with-context-as]] [auto-ap.time :as atime] [cemerick.url :as url] [clj-http.client :as client] [clj-time.coerce :as coerce] [clj-time.core :as time] [clj-time.format :as f] [clj-time.periodic :as periodic] [clojure.data.json :as json] [clojure.set :as set] [clojure.string :as str] [com.brunobonacci.mulog :as mu] [datomic.api :as dc] [manifold.deferred :as de] [manifold.executor :as ex] [manifold.stream :as s] [manifold.time :as mt])) (defn client-base-headers ([client] (client-base-headers client "2021-08-18")) ([client v] {"Square-Version" v "Authorization" (str "Bearer " (:client/square-auth-token client)) "Content-Type" "application/json"})) (defn ->square-date [d] (f/unparse (f/formatter "YYYY-MM-dd'T'HH:mm:ssZZ") d)) (def manifold-api-stream (let [stream (s/stream 100)] (->> stream (s/throttle 80) (s/map (fn [[request attempt response-deferred]] (de/catch (de/chain (de/loop [attempt 0] (-> (de/chain (de/future-with (ex/execute-pool) (log/info ::request-started :url (:url request) :attempt attempt :source "Square 3" :background-job "Square 3") (try (client/request (assoc request :socket-timeout 5000 :connection-timeout 5000 #_#_:connection-request-timeout 5000 :as :json)) (catch Throwable e (log/warn ::raw-request-failed :exception e) (throw e))))) (de/catch (fn [e] (if (>= attempt 5) (throw e) (de/chain (mt/in 1000 (fn [] 1)) (fn [_] (de/recur (inc attempt))))))) (de/chain identity))) (fn [result] (de/success! response-deferred result))) (fn [error] (de/error! response-deferred error))))) (s/buffer 50) (s/realize-each) (s/consume (fn [_] (log/info ::request-completed :source "Square 3" :background-job "Square 3")))) stream)) (defn manifold-api-call ([request] (manifold-api-call request 0)) ([request attempt] (manifold-api-call request attempt (de/deferred))) ([request attempt response-deferred] (de/chain (s/put! manifold-api-stream [request attempt response-deferred]) (fn [r] (when r response-deferred))))) (defn lookup-dates [] (->> (periodic/periodic-seq (time/plus (time/now) (time/days -15)) (time/now) (time/days 5)) (map (fn [d] [(atime/unparse (time/plus d (time/days 1)) atime/iso-date) (atime/unparse (time/plus d (time/days 5)) atime/iso-date)])))) (defn client-locations [client] (capture-context->lc (de/catch (de/chain (manifold-api-call {:url "https://connect.squareup.com/v2/locations" :method :get :headers (client-base-headers client)}) :body :locations) (fn [error] (mu/with-context lc (log/error ::no-locations-found :exception error)) [])))) (def item-cache (atom {})) (defn fetch-catalog [client i v] (capture-context->lc (de/chain [client i] (fn [[client i]] (mu/with-context lc (manifold-api-call {:url (str "https://connect.squareup.com/v2/catalog/object/" i) :method :get :headers (client-base-headers client) :query-params {"include_related_items" "true" "catalog_version" v} :as :json}))) :body :object #(do (swap! item-cache assoc i %) %)))) (defn fetch-catalog-cache [client i version] (if (get @item-cache i) (de/success-deferred (get @item-cache i)) (fetch-catalog client i version))) (defn item->category-name-impl [client item version] (capture-context->lc (cond (:item_id (:item_variation_data item)) (de/catch (de/chain (fetch-catalog-cache client (:item_id (:item_variation_data item)) version) (fn [item] (mu/with-context lc (item->category-name-impl client item version)))) (fn [e] (log/warn ::couldnt-fetch-variation :exception e) "Uncategorized")) (:category_id (:item_data item)) (de/catch (de/chain (fetch-catalog-cache client (:category_id (:item_data item)) version) :category_data :name) (fn [e] (log/warn ::couldnt-fetch-category :exception e) "Uncategorized")) (:item_data item) "Uncategorized" :else (do (log/warn ::no-look-up-item :item item) "Uncategorized")))) (defn item-id->category-name [client i version] (capture-context->lc (-> [client i] (de/chain (fn [[client i]] (if (str/blank? i) "Uncategorized" (de/catch (de/chain (fetch-catalog-cache client i version) #(mu/with-context lc (item->category-name-impl client % version))) (fn [error] (log/warn ::couldnt-fetch-item :exception error) "Uncategorized" (throw error))))))))) (defn pc [start end] {"query" {"filter" {"date_time_filter" { "created_at" { "start_at" (->square-date start) "end_at" (->square-date end) }}} "sort" { "sort_field" "CREATED_AT" "sort_order" "DESC"}}}) #_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]} (defn get-order ([client location order-id] (log/info ::searching-for-order :location location :order-id order-id) (let [result (->> (client/get (str "https://connect.squareup.com/v2/orders/" order-id) {:headers (client-base-headers client) :as :json}) :body )] result))) (defn continue-search [client location start end cursor] (log/info ::continue-order-search :cursor cursor) (capture-context->lc (de/chain (manifold-api-call {:url "https://connect.squareup.com/v2/orders/search" :method :post :headers (client-base-headers client) :body (json/write-str (cond-> {"location_ids" [(:square-location/square-id location)] "limit" 10000 "cursor" cursor} start (merge (pc start end)))) :as :json}) :body (fn [result] (mu/with-context lc (log/info ::orders-found :count (count (:orders result))) (if (not-empty (:cursor result)) (de/chain (continue-search client location start end (:cursor result)) (fn [continued-results] (mu/with-context lc (concat (:orders result) continued-results)))) (:orders result))))))) (defn search ([client location start end] (capture-context->lc (log/info ::searching :location (:square-location/client-location location)) (de/chain (manifold-api-call {:url "https://connect.squareup.com/v2/orders/search" :method :post :headers (client-base-headers client) :body (json/write-str (cond-> {"location_ids" [(:square-location/square-id location)] "limit" 10000} start (merge (pc start end)))) :as :json}) :body (fn [result] (mu/with-context lc (log/info ::orders-found :count (count (:orders result))) (if (not-empty (:cursor result)) (de/chain (continue-search client location start end (:cursor result)) (fn [continued-results] (mu/with-context lc (concat (:orders result) continued-results)))) (:orders result)))))))) (defn amount->money [amt] (* 0.01 (or (:amount amt) 0.0))) ;; to get totals: (comment (reduce (fn [total i] (+ total (+ (- (:sales-order/total i) (:sales-order/tax i) (:sales-order/tip i) (:sales-order/service-charge i)) (:sales-order/returns i) (:sales-order/discount i) ))) 0.0 [])) (defn tender->charge [order client location t] (remove-nils #:charge {:type-name (:type t) :date (coerce/to-date (time/to-time-zone (coerce/to-date-time (:created_at order)) (time/time-zone-for-id "America/Los_Angeles"))) :client (:db/id client) :note (:note t) :location (:square-location/client-location location) :reference-link (str (url/url "https://squareup.com/receipt/preview" (:id t) )) :external-id (when (:id t) (str "square/charge/" (:id t))) :processor (condp = (:type t) "OTHER" (condp = (some-> (:note t) str/lower-case) "doordash" :ccp-processor/doordash "dd" :ccp-processor/doordash "ubereats" :ccp-processor/uber-eats "ue" :ccp-processor/uber-eats "grubhub" :ccp-processor/grubhub "grub" :ccp-processor/grubhub "gh" :ccp-processor/grubhub (condp = (:name (:source order)) "GRUBHUB" :ccp-processor/grubhub "UBEREATS" :ccp-processor/uber-eats "DOORDASH" :ccp-processor/doordash "Koala" :ccp-processor/koala "koala-production" :ccp-processor/koala :ccp-processor/na)) "CARD" :ccp-processor/square "SQUARE_GIFT_CARD" :ccp-processor/square "CASH" :ccp-processor/na :ccp-processor/na) :total (amount->money (:amount_money t)) :tip (amount->money (:tip_money t))})) (defn order->sales-order [client location order] (capture-context->lc (let [is-order-only-for-charge? (= ["CUSTOM_AMOUNT"] (mapv :item_type (:line_items order )))] (if is-order-only-for-charge? (de/success-deferred (->> (:tenders order) (map #(tender->charge order client location %)))) (de/catch (de/let-flow [line-items (->> (or (:line_items order) []) (s/->source) (s/transform (map-indexed (fn [i li] (mu/with-context lc (-> (de/let-flow [category (item-id->category-name client (:catalog_object_id li) (:catalog_version li))] (remove-nils #:order-line-item {:external-id (str "square/order/" (:client/code client) "-" (:square-location/client-location location) "-" (:id order) "-" i) :item-name (:name li) :category (if (= "GIFT_CARD" (:item_type li)) "Gift Card" category) :total (amount->money (:total_money li)) :tax (amount->money (:total_tax_money li)) :discount (amount->money (:total_discount_money li))})) (de/catch (fn [e] (log/error ::cant-transform :exception e :line-item li))))))) ) (s/buffer 5) (s/realize-each) (s/reduce conj []))] [(remove-nils #:sales-order {:date (coerce/to-date (time/to-time-zone (coerce/to-date-time (:created_at order)) (time/time-zone-for-id "America/Los_Angeles"))) :client (:db/id client) :location (:square-location/client-location location) :external-id (str "square/order/" (:client/code client) "-" (:square-location/client-location location) "-" (:id order)) :source (or (:name (:source order)) "Square") :vendor :vendor/ccp-square :reference-link (str (url/url "https://squareup.com/dashboard/sales/transactions" (:id order) "by-unit" (:square-location/square-id location))) :total (-> order :net_amounts :total_money amount->money) :tax (-> order :net_amounts :tax_money amount->money) :tip (-> order :net_amounts :tip_money amount->money) :discount (-> order :net_amounts :discount_money amount->money) :service-charge (-> order :net_amounts :service_charge_money amount->money) :returns (+ (- (-> order :return_amounts :total_money amount->money) (-> order :return_amounts :tax_money amount->money) (-> order :return_amounts :tip_money amount->money) (-> order :return_amounts :service_charge_money amount->money)) (-> order :return_amounts :discount_money amount->money)) :charges (->> (:tenders order) (map #(tender->charge order client location %))) :line-items line-items})]) (fn [e] (log/error ::failed-to-transform-order :exception e))))))) (defn daily-results ([client location] (daily-results client location (time/plus (time/now) (time/days -7)) (time/now))) ([client location start end] (capture-context->lc (-> (de/chain (search client location start end) (fn [search-results] (->> (or search-results []) (s/->source) (s/filter (fn [order] ;; sometimes orders stay open in square. At least one payment ;; is needed to import, in order to avoid importing orders in-progress. (and (or (> (count (:tenders order)) 0) (seq (:returns order))) (or (= #{} (set (map #(:status (:card_details %)) (:tenders order)))) (not= #{} (set/difference (set (map #(:status (:card_details %)) (:tenders order))) #{"FAILED" "VOIDED"})))))) (s/map #(mu/with-context lc (order->sales-order client location %))) (s/buffer 10) (s/realize-each) (s/reduce into [])))) (de/catch (fn [e] (log/error ::cant-create-results :exception e))))))) (defn get-payment [client p] (de/chain (manifold-api-call {:url (str "https://connect.squareup.com/v2/payments/" p) :method :get :headers (client-base-headers client)}) :body :payment)) (defn settlements ([client location] (settlements client location (lookup-dates))) ([client location lookup-dates] (with-context-as {:location (:square-location/client-location location)} lc (de/chain (->> lookup-dates (s/->source) (s/map (fn [[start-date end-date]] (mu/with-context lc (log/info ::searching-settlements :start-date start-date :end-date end-date) (de/chain (manifold-api-call {:url (str "https://connect.squareup.com/v1/" (:square-location/square-id location) "/settlements") :method :get :headers (client-base-headers client) :query-params {"begin_time" start-date "end_time" end-date}}) :body (fn [settlements] (map :id settlements)))))) (s/buffer 3) (s/realize-each) (s/map (fn [settlement-set] settlement-set)) (s/reduce into #{})) (fn [settlement-id-set] (->> settlement-id-set (s/->source) (s/map (fn [settlement-id] (mu/with-context lc (log/info ::looking-up-settlement :settlement-id settlement-id) (de/chain (manifold-api-call {:url (str "https://connect.squareup.com/v1/" (:square-location/square-id location) "/settlements/" settlement-id) :method :get :headers (client-base-headers client) :as :json}) :body)))) (s/buffer 10) (s/realize-each) (s/reduce conj []))))))) (defn daily-settlements ([client location] (daily-settlements client location (settlements client location))) ([client location settlements] (de/chain settlements (fn [settlements] (log/info ::transforming-settlements) (try (->> (for [settlement settlements :let [best-sales-date (some->> (dc/q '[:find ?s4 (count ?s) :in $ ?settlement-id :where [?settlement :expected-deposit/external-id ?settlement-id] [?settlement :expected-deposit/charges ?c] [?s :sales-order/charges ?c] [?s :sales-order/date ?sales-date] [(clj-time.coerce/to-date-time ?sales-date) ?s2] [(auto-ap.time/localize ?s2) ?s3] [(clj-time.coerce/to-local-date ?s3) ?s4]] (dc/db conn) (str "square/settlement/" (:id settlement))) (sort-by last) last first coerce/to-date-time atime/as-local-time coerce/to-date)]] #:expected-deposit {:external-id (str "square/settlement/" (:id settlement)) :vendor :vendor/ccp-square :status :expected-deposit-status/pending :total (amount->money (:total_money settlement)) :client (:db/id client) :location (:square-location/client-location location) :fee (- (reduce + 0.0 (map (fn [entry] (if (= (:type entry) "REFUND") (- (amount->money (:fee_money entry))) (amount->money (:fee_money entry)))) (:entries settlement)))) :date (-> (:initiated_at settlement) (coerce/to-date)) :sales-date (or best-sales-date (-> (:initiated_at settlement) (coerce/to-date))) :charges (->> (:entries settlement) (filter :payment_id) (map (fn [p] {:charge/external-id (str "square/charge/" (:payment_id p))})))}) (filter :expected-deposit/date) (into [])) (catch Throwable e (log/error ::transform-settlement-failed :exception e))))))) (defn refunds ([client l] (de/chain (manifold-api-call {:url (str "https://connect.squareup.com/v2/refunds?location_id=" (:square-location/square-id l)) :method :get :headers (client-base-headers client) :as :json}) :body :refunds (fn [refunds] (->> refunds (filter (fn [r] (= "COMPLETED" (:status r)))) (s/->source ) (s/map (fn [r] (de/chain (get-payment client (:payment_id r)) (fn [payment] #:sales-refund {:external-id (str "square/refund/" (:id r)) :vendor :vendor/ccp-square :total (amount->money (:amount_money r)) :fee (transduce (comp (filter #(= "ADJUSTMENT" (:type %))) (map :amount_money) (map amount->money)) + 0.0 (:processing_fee r)) :client (:db/id client) :location (:square-location/client-location l) :date (coerce/to-date (:created_at r)) :type (:source_type payment)})))) (s/buffer 5) (s/realize-each) (s/reduce conj [])))))) (defn upsert ([client ] (apply de/zip (for [square-location (:client/square-locations client) :when (:square-location/client-location square-location)] (upsert client square-location (time/plus (time/now) (time/days -14)) (time/now))))) ([client location start end] (capture-context->lc (de/chain (daily-results client location start end) (fn [results ] (mu/with-context lc (doseq [x (partition-all 100 results)] (log/info ::loading-orders :count (count x)) @(dc/transact-async conn x)))))))) (defn upsert-settlements ([client] (apply de/zip (for [square-location (:client/square-locations client) :when (:square-location/client-location square-location)] (upsert-settlements client square-location)))) ([client location] (with-context-as {:source "Square settlements loading" :client (:client/code client)} lc (de/chain (daily-settlements client location) (fn [settlements] (mu/with-context lc (log/info ::started-loading-deposits :settlement-count (count settlements)) (doseq [x (partition-all 20 settlements)] (log/info ::loading-deposits :count (count x)) @(dc/transact-async conn x)) (log/info ::done-loading-deposits))))))) (defn upsert-refunds ([client] (apply de/zip (for [square-location (:client/square-locations client) :when (:square-location/client-location square-location)] (upsert-refunds client square-location)))) ([client location] (with-context-as {:source "Square refunds loading" :client (:client/code client)} lc (de/chain (refunds client location) (fn [refunds] (mu/with-context lc (try (doseq [x (partition-all 100 refunds)] (log/info ::loading-refunds :count (count x) :sample (first x)) @(dc/transact-async conn x)) (catch Throwable e (log/error ::upsert-refunds-failed :exception e))) (log/info ::done-loading-refunds))))))) (defn get-cash-shift [client id] (de/chain (manifold-api-call {:url (str (url/url "https://connect.squareup.com/v2/cash-drawers/shifts" id )) :method :get :headers (client-base-headers client "2023-04-19") :as :json}) :body :cash_drawer_shift)) (defn cash-drawer-shifts ([client l] (cash-drawer-shifts client l (time/plus (time/now) (time/days -75)) (time/now))) ([client l start end] (de/chain (manifold-api-call {:url (str "https://connect.squareup.com/v2/cash-drawers/shifts" "?" (url/map->query {:location_id (:square-location/square-id l) :begin_time (->square-date start) :end_time (->square-date end) :limit 1000})) :method :get :headers (client-base-headers client "2023-04-19") :as :json}) :body :cash_drawer_shifts (fn [shifts] (->> shifts (filter (fn [r] (= "ENDED" (:state r)))) (s/->source ) (s/map (fn [s] (de/chain (get-cash-shift client (:id s)) (fn [cash-drawer-shift] #:cash-drawer-shift {:external-id (str "square/cash-drawer-shift/" (:id cash-drawer-shift)) :vendor :vendor/ccp-square :paid-in (amount->money (:cash_paid_in_money cash-drawer-shift)) :paid-out (amount->money (:cash_paid_out_money cash-drawer-shift)) :expected-cash (amount->money (:expected_cash_money cash-drawer-shift)) :opened-cash (amount->money (:opened_cash_money cash-drawer-shift)) :date (coerce/to-date (:opened_at cash-drawer-shift)) :client (:db/id client) :location (:square-location/client-location l) })))) (s/buffer 5) (s/realize-each) (s/reduce conj [])))))) (defn upsert-cash-shifts ([client] (apply de/zip (for [square-location (:client/square-locations client) :when (:square-location/client-location square-location)] (upsert-cash-shifts client square-location)))) ([client location] (with-context-as {:source "Square cash shift loading" :client (:client/code client)} lc (de/chain (cash-drawer-shifts client location) (fn [cash-shifts] (mu/with-context lc (try (doseq [x (partition-all 100 cash-shifts)] (log/info ::loading-cash-shifts :count (count x) :sample (first x)) @(dc/transact-async conn x)) (catch Throwable e (log/error ::upsert-cash-shifts-failed :exception e))) (log/info ::done-loading-cash-shifts))))))) (def square-read [:db/id :client/code :client/square-auth-token {:client/square-locations [:db/id :square-location/name :square-location/square-id :square-location/client-location]}]) (defn get-square-clients ([] (map first (dc/q '[:find (pull ?c [:db/id :client/square-integration-status :client/code :client/square-auth-token {:client/square-locations [:db/id :square-location/name :square-location/square-id :square-location/client-location]}]) :in $ :where [?c :client/square-auth-token] [?c :client/feature-flags "new-square"]] (dc/db conn)))) ([ & codes] (map first (dc/q '[:find (pull ?c [:db/id :client/code :client/square-auth-token {:client/square-locations [:db/id :square-location/name :square-location/square-id :square-location/client-location]}]) :in $ [?code ...] :where [?c :client/square-auth-token] [?c :client/feature-flags "new-square"] [?c :client/code ?code]] (dc/db conn) codes)))) (defn upsert-locations ([] (apply de/zip (for [client (get-square-clients)] (upsert-locations client)))) ([client] (let [square-id->id (into {} (map (fn [sl] [(:square-location/square-id sl) (:db/id sl)]) (:client/square-locations client)))] (de/chain (client-locations client) (fn [client-locations] @(dc/transact-async conn (for [square-location client-locations] {:db/id (or (square-id->id (:id square-location)) (str (java.util.UUID/randomUUID))) :client/_square-locations (:db/id client) :square-location/name (:name square-location) :square-location/square-id (:id square-location)}))))))) #_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]} (defn reset [] (->> (dc/q {:find ['?e] :in ['$] :where ['(or [?e :sales-order/date] [?e :expected-deposit/date])]} (dc/db conn)) (map first) (map (fn [x] [:db/retractEntity x])))) (defn mark-integration-status [client integration-status] @(dc/transact-async conn [{:db/id (:db/id client) :client/square-integration-status (assoc integration-status :db/id (or (-> client :client/square-integration-status :db/id) (str (java.util.UUID/randomUUID))))}])) (defn upsert-all [ & clients] (capture-context->lc (log/info ::starting-upsert) (->> (apply get-square-clients clients) (s/->source) (s/filter (fn [client] (seq (filter :square-location/client-location (:client/square-locations client))))) (s/map (fn [client] (with-context-as (merge lc {:client (:client/code client)}) lc (log/info ::import-started) (mark-integration-status client {:integration-status/last-attempt (coerce/to-date (time/now))}) (-> (de/chain (upsert-locations client) (fn [_] (mu/with-context lc (log/info ::upsert-orders-started) (upsert client))) (fn [_] (mu/with-context lc (log/info ::upsert-settlements-started) (upsert-settlements client))) (fn [_] (mu/with-context lc (log/info ::upsert-refunds-started) (upsert-refunds client))) (fn [_] (mu/with-context lc (log/info ::upsert-cash-shifts) (upsert-cash-shifts client))) (fn [_] (mu/with-context lc (log/info ::upsert-done)) (mark-integration-status client {:integration-status/state :integration-state/success :integration-status/last-updated (coerce/to-date (time/now))}))) (de/catch (fn [e] (mu/with-context lc (let [data (ex-data e)] (log/info ::upsert-all-failed :severity :error :exception e) (cond (= (:status data) 401) (mark-integration-status client {:integration-status/state :integration-state/unauthorized :integration-status/message (-> data :body str)}) (= (:status data) 503) (mark-integration-status client {:integration-status/state :integration-state/failed :integration-status/message (-> data :body str)}) :else (mark-integration-status client {:integration-status/state :integration-state/failed :integration-status/message (or (ex-message e) (str e))})))))))))) (s/buffer 5) (s/realize-each) (s/reduce conj [])))) (defn do-upsert-all [& clients] (mu/trace ::upsert-all [:clients clients] @(apply upsert-all clients)))