diff --git a/src/clj/auto_ap/datomic/sales_orders.clj b/src/clj/auto_ap/datomic/sales_orders.clj index 09f9c409..72709d1f 100644 --- a/src/clj/auto_ap/datomic/sales_orders.clj +++ b/src/clj/auto_ap/datomic/sales_orders.clj @@ -1,171 +1,86 @@ (ns auto-ap.datomic.sales-orders (:require - [auto-ap.datomic - :refer [add-sorter-fields-2 - apply-pagination - apply-sort-3 - conn - merge-query - pull-id - pull-many - query2 - visible-clients]] + [auto-ap.datomic :refer [conn]] + [auto-ap.storage.parquet :as pq] [clj-time.coerce :as c] - [clj-time.core :as time] [clojure.set :as set] - [com.brunobonacci.mulog :as mu] - [datomic.api :as dc] - [iol-ion.query])) + [com.brunobonacci.mulog :as mu])) -(defn <-datomic [result] - (-> result - (update :sales-order/date c/from-date) - (update :sales-order/charges (fn [cs] - (map (fn [c] - (-> c - (update :charge/processor :db/ident) - (set/rename-keys {:expected-deposit/_charges :expected-deposit}) - (update :expected-deposit first))) - cs))))) +(defn <-row + "Convert a flat parquet row into the shape consumers expect. + Parquet produces maps of the form: + {\"external-id\" \"square/order/123\", ...} + which we transform to: + {:sales-order/external-id \"square/order/123\", ...}" + [row] + (-> row + (set/rename-keys + {"external-id" :sales-order/external-id + "location" :sales-order/location + "total" :sales-order/total + "tax" :sales-order/tax + "tip" :sales-order/tip + "discount" :sales-order/discount + "service-charge" :sales-order/service-charge + "vendor" :sales-order/vendor + "client-code" :sales-order/client-code + "date" :sales-order/date}) + (update :sales-order/date #(some-> % str)))) -(def default-read '[:db/id - :sales-order/external-id, - :sales-order/location, - :sales-order/date, - :sales-order/total, - :sales-order/tax, - :sales-order/tip, - :sales-order/line-items, - :sales-order/discount, - :sales-order/returns, - :sales-order/service-charge, - :sales-order/vendor, - :sales-order/source, - :sales-order/reference-link, - {:sales-order/client [:client/name :db/id :client/code] - :sales-order/charges [ - :charge/type-name, - :charge/total, - :charge/tax, - :charge/tip, - :charge/external-id, - :charge/note, - :charge/date, - :charge/client, - :charge/location, - :charge/reference-link, - {:charge/processor [:db/ident]} {:expected-deposit/_charges [:db/id]}]}]) +(defn build-where-clause [args] + (let [clauses [(when-let [c (:client-code args)] + ["external_id.client = '" c "'"]) + (when-let [v (:vendor args)] + ["external_id.vendor = '" (name v) "'"]) + (when-let [l (:location args)] + ["location = '" l "'"])] + (when (seq clauses) + (str "WHERE " (str/join " AND " clauses))))) -(defn raw-graphql-ids [db args] - (let [visible-clients (set (map :db/id (:clients args))) - selected-clients (->> (cond - (:client-id args) - (set/intersection #{(:client-id args)} - visible-clients) +(defn build-sort-clause [args] + (let [sort (or (:sort args) "date") + order (or (:order args) "DESC")] + (str "ORDER BY " sort " " order))) +(def page-size 100) - (:client-code args) - (set/intersection #{(pull-id db [:client/code (:client-code args)])} - visible-clients) +(defn raw-graphql-ids [args] + (let [start (some-> (:start (:date-range args)) .toString) + end (some-> (:end (:date-range args)) .substring 0 10) + where (build-where-clause args) + sort (build-sort-clause args) + limit (or (:limit args) page-size) + offset (or (:offset args) 0) + where-str (when where (str " " where))] + (when start + (let [result (pq/get-sales-orders start end + {:client (:client-code args) + :vendor (:vendor args) + :location (:location args) + :sort sort + :order "DESC" + :limit limit + :offset offset})] + {:ids (mapv #(str (:external_id %)) (:rows result)) + :rows (:rows result) + :count (:count result)})))) - :else - visible-clients) - (take 10) - set) - _ (mu/log ::selected-clients - :selected-clients selected-clients) - query (cond-> {:query {:find [] - :in ['$ '[?clients ?start-date ?end-date]] - :where '[[(iol-ion.query/scan-sales-orders $ ?clients ?start-date ?end-date) [[?e _ ?sort-default] ...]]]} - :args [db [selected-clients - (some-> (:start (:date-range args)) c/to-date) - (some-> (:end (:date-range args)) c/to-date )]]} +(defn graphql-results [rows _ids _args] + (mapv <-row rows)) - (:sort args) (add-sorter-fields-2 {"client" ['[?e :sales-order/client ?c] - '[?c :client/name ?sort-client]] - "location" ['[?e :sales-order/location ?sort-location]] - "source" ['[?e :sales-order/source ?sort-source]] - "date" ['[?e :sales-order/date ?sort-date]] - "total" ['[?e :sales-order/total ?sort-total]] - "tax" ['[?e :sales-order/tax ?sort-tax]] - "tip" ['[?e :sales-order/tip ?sort-tip]]} - args) - (:category args) - (merge-query {:query {:in ['?category] - :where ['[?e :sales-order/line-items ?li] - '[?li :order-line-item/category ?category]]} - :args [(:category args)]}) - - (:processor args) - (merge-query {:query {:in ['?processor] - :where ['[?e :sales-order/charges ?chg] - '[?chg :charge/processor ?processor]]} - :args [(keyword "ccp-processor" - (name (:processor args)))]}) - (:type-name args) - (merge-query {:query {:in ['?type-name] - :where ['[?e :sales-order/charges ?chg] - '[?chg :charge/type-name ?type-name]]} - :args [(:type-name args)]}) - - (:total-gte args) - (merge-query {:query {:in ['?total-gte] - :where ['[?e :sales-order/total ?a] - '[(>= ?a ?total-gte)]]} - :args [(:total-gte args)]}) - - (:total-lte args) - (merge-query {:query {:in ['?total-lte] - :where ['[?e :sales-order/total ?a] - '[(<= ?a ?total-lte)]]} - :args [(:total-lte args)]}) - - (:total args) - (merge-query {:query {:in ['?total] - :where ['[?e :sales-order/total ?sales-order-total] - '[(iol-ion.query/dollars= ?sales-order-total ?total)]]} - :args [(:total args)]}) - - true - (merge-query {:query {:find ['?date '?e] - :where ['[?e :sales-order/date ?date]]}}))] - - (cond->> (query2 query) - true (apply-sort-3 (assoc args :default-asc? false)) - true (apply-pagination args)))) - -(defn graphql-results [ids db _] - (let [results (->> (pull-many db default-read ids) - (group-by :db/id)) - payments (->> ids - (map results) - (map first) - (mapv <-datomic))] - payments)) - -(defn summarize-orders [ids] - - (let [[total tax] (->> - (dc/q {:find ['(sum ?t) '(sum ?tax)] - :with ['?id] - :in ['$ '[?id ...]] - :where ['[?id :sales-order/total ?t] - '[?id :sales-order/tax ?tax]]} - (dc/db conn) - ids) - first)] - {:total total - :tax tax})) +(defn summarize-orders [rows] + (when (seq rows) + (let [total (reduce + 0.0 (map #(or (:total %) 0.0) rows)) + tax (reduce + 0.0 (map #(or (:tax %) 0.0) rows))] + {:total total + :tax tax}))) (defn get-graphql [args] - (let [db (dc/db conn) - {ids-to-retrieve :ids matching-count :count} (mu/trace ::get-sales-order-ids [] (raw-graphql-ids db args))] - [(->> (mu/trace ::get-results [] (graphql-results ids-to-retrieve db args))) - matching-count - (summarize-orders ids-to-retrieve)])) + (let [{:keys [ids rows count]} (mu/trace ::get-sales-order-ids [] (raw-graphql-ids args))] + [(mu/trace ::get-results [] (graphql-results rows ids args)) + count + (summarize-orders rows)])) (defn summarize-graphql [args] - (let [db (dc/db conn) - {ids-to-retrieve :ids matching-count :count} (mu/trace ::get-sales-order-ids [] (raw-graphql-ids db args))] - (summarize-orders ids-to-retrieve))) - + (let [{:keys [rows]} (raw-graphql-ids args)] + (summarize-orders rows))) \ No newline at end of file diff --git a/src/clj/auto_ap/ezcater/core.clj b/src/clj/auto_ap/ezcater/core.clj index 900a214a..5b520a36 100644 --- a/src/clj/auto_ap/ezcater/core.clj +++ b/src/clj/auto_ap/ezcater/core.clj @@ -1,6 +1,7 @@ (ns auto-ap.ezcater.core (:require - [auto-ap.datomic :refer [conn random-tempid]] + [auto-ap.datomic :refer [conn random-tempid]] + [auto-ap.storage.parquet :as parquet] [datomic.api :as dc] [clj-http.client :as client] [venia.core :as v] @@ -135,17 +136,17 @@ 0.15M :else 0.07M)] - (round-carry-cents + (round-carry-cents (* commision% 0.01M (+ (-> order :totals :subTotal :subunits ) (reduce + 0 - (map (comp :subunits :cost) (:feesAndDiscounts (:catererCart order))))))))) + (map (comp :subunits :cost) (:feesAndDiscounts (:catererCart order)))))))) (defn ccp-fee [order] - (round-carry-cents + (round-carry-cents (* 0.000299M (+ (-> order :totals :subTotal :subunits ) @@ -198,7 +199,48 @@ :tax tax :tip tip :returns 0.0 - :vendor :vendor/ccp-ezcater})) + :vendor :vendor/ccp-ezcater}})) + +(defn- flatten-order-to-parquet! [order] + "Flatten a sales-order into entity-type tagged maps and buffer to parquet." + (let [so-ext-id (:sales-order/external-id order) + so-date (some-> (:sales-order/date order) .toString) + client (:sales-order/client order) + client-code (if (map? client) (:client/code client) client)] + (parquet/buffer! "sales-order" + {:entity-type "sales-order" + :external-id so-ext-id + :client-code client-code + :location (:sales-order/location order) + :vendor (:sales-order/vendor order) + :total (:sales-order/total order) + :tax (:sales-order/tax order) + :tip (:sales-order/tip order) + :discount (:sales-order/discount order) + :service-charge (:sales-order/service-charge order) + :date so-date}) + (when-let [charges (:sales-order/charges order)] + (doseq [chg charges] + (parquet/buffer! "charge" + {:entity-type "charge" + :external-id (:charge/external-id chg) + :type-name (:charge/type-name chg) + :total (:charge/total chg) + :tax (:charge/tax chg) + :tip (:charge/tip chg) + :date so-date + :processor (some-> (:charge/processor chg) name) + :sales-order-external-id so-ext-id}))) + (when-let [items (:sales-order/line-items order)] + (doseq [li items] + (parquet/buffer! "line-item" + {:entity-type "line-item" + :item-name (:order-line-item/item-name li) + :category (:order-line-item/category li) + :total (:order-line-item/total li) + :tax (:order-line-item/tax li) + :discount (:order-line-item/discount li) + :sales-order-external-id so-ext-id}))))) (defn get-by-id [integration id] @@ -271,17 +313,22 @@ (alog/warn ::caterer-no-longer-has-location :json json)))) (defn import-order [json] - ;; {"id" "bf3dcf5c-a68f-42d9-9084-049133e03d3d", "parent_type" "Caterer", "parent_id" "91541331-d7ae-4634-9e8b-ccbbcfb2ce70", "entity_type" "Order", "entity_id" "9ab05fee-a9c5-483b-a7f2-14debde4b7a8", "key" "accepted", "occurred_at" "2022-07-21T19:21:07.549Z"} (alog/info ::try-import-order :json json) - @(dc/transact conn (filter identity - [(some-> json - (lookup-order) - (order->sales-order) - (update :sales-order/date coerce/to-date) - (update-in [:sales-order/charges 0 :charge/date] coerce/to-date))]))) - + (when-let [order (some-> json + (lookup-order) + (order->sales-order) + (update :sales-order/date coerce/to-date) + (update-in [:sales-order/charges 0 :charge/date] coerce/to-date))] + (try + (flatten-order-to-parquet! order) + (alog/info ::order-buffered + :external-id (:sales-order/external-id order)) + (catch Exception e + (alog/error ::buffer-failed + :exception e + :order (:sales-order/external-id order)))))) (defn upsert-recent [] (upsert-ezcater) (let [last-sunday (coerce/to-date (time/plus (second (->> (time/today) diff --git a/src/clj/auto_ap/jobs/sales_summaries.clj b/src/clj/auto_ap/jobs/sales_summaries.clj index 60808d33..464d3b59 100644 --- a/src/clj/auto_ap/jobs/sales_summaries.clj +++ b/src/clj/auto_ap/jobs/sales_summaries.clj @@ -3,6 +3,7 @@ [auto-ap.jobs.core :refer [execute]] [auto-ap.logging :as alog] [auto-ap.time :as atime] + [auto-ap.storage.parquet :as pq] [clj-time.coerce :as c] [clj-time.core :as time] [clj-time.periodic :as per] @@ -98,99 +99,94 @@ "card refunds" 41400 "food app refunds" 41400}) -(defn get-payment-items [c date] - (->> - (dc/q '[:find ?processor ?type-name (sum ?total) - :with ?c - :in $ [?clients ?start-date ?end-date] - :where [(iol-ion.query/scan-sales-orders $ ?clients ?start-date ?end-date) [[?e _ ?sort-default] ...]] - [?e :sales-order/charges ?c] - [?c :charge/type-name ?type-name] - (or-join [?c ?processor] - (and [?c :charge/processor ?p] - [?p :db/ident ?processor]) - (and - (not [?c :charge/processor]) - [(ground :ccp-processor/na) ?processor])) - [?c :charge/total ?total]] - (dc/db conn) - [[c] date date]) - (reduce - (fn [acc [processor type-name total]] - (update - acc - (cond (= type-name "CARD") - "Card Payments" - (= type-name "CASH") - "Cash Payments" - (#{"SQUARE_GIFT_CARD" "WALLET" "GIFT_CARD"} type-name) - "Gift Card Payments" - (#{:ccp-processor/toast - #_:ccp-processor/ezcater - #_:ccp-processor/koala - :ccp-processor/doordash - :ccp-processor/grubhub - :ccp-processor/uber-eats} processor) - "Food App Payments" - :else - "Unknown") - (fnil + 0.0) - total)) - {}) - (map (fn [[k v]] - {:db/id (str (java.util.UUID/randomUUID)) - :sales-summary-item/sort-order 0 - :sales-summary-item/category k - - :ledger-mapped/amount (if (= "Card Payments" k) - (- v (get-fee c date)) - v) - :ledger-mapped/ledger-side :ledger-side/debit})))) +(defn- get-payment-items-parquet [c date] + (let [date-str (.toString date)] + (when-let [rows (seq (pq/query-deduped "charge" date-str date-str))] + (let [client-code (if (map? c) (:client/code c) c) + filtered (filter #(= client-code (:client_code %)) rows)] + (reduce + (fn [acc {:keys [processor type-name total]}] + (update acc + (cond + (= type-name "CARD") "Card Payments" + (= type-name "CASH") "Cash Payments" + (#{"SQUARE_GIFT_CARD" "WALLET" "GIFT_CARD"} type-name) "Gift Card Payments" + (#{"doordash" "grubhub" "uber-eats"} processor) "Food App Payments" + :else "Unknown") + (fnil + 0.0) + (or total 0.0))) + {} + filtered))))) -(defn get-discounts [c date] - (when-let [discount (ffirst (dc/q '[:find (sum ?discount) - :with ?e - :in $ [?clients ?start-date ?end-date] - :where [(iol-ion.query/scan-sales-orders $ ?clients ?start-date ?end-date) [[?e _ ?sort-default] ...]] - [?e :sales-order/discount ?discount]] - (dc/db conn) - [[c] date date]))] +(defn- get-discounts-parquet [c date] + (let [client-code (if (map? c) (:client/code c) c) + date-str (.toString date) + discount (auto-ap.storage.sales-summaries/sum-discounts client-code date-str date-str)] + (when (and discount (pos? discount)) + {:db/id (str (java.util.UUID/randomUUID)) + :sales-summary-item/sort-order 1 + :sales-summary-item/category "Discounts" + :ledger-mapped/amount discount + :ledger-mapped/ledger-side :ledger-side/debit}))) + +(defn- get-refund-items-parquet [c date] + (let [client-code (if (map? c) (:client/code c) c) + date-str (.toString date) + refunds (auto-ap.storage.sales-summaries/sum-refunds-by-type client-code date-str date-str)] + (when (seq refunds) + (map (fn [[type-name total]] + {:db/id (str (java.util.UUID/randomUUID)) + :sales-summary-item/sort-order 3 + :sales-summary-item/category (cond + (= type-name "CARD") "Card Refunds" + (= type-name "CASH") "Cash Refunds" + :else "Food App Refunds") + :ledger-mapped/amount total + :ledger-mapped/ledger-side :ledger-side/credit}) + refunds)))) + +(defn- get-fees [c date] + (when-let [fee (get-fee c date)] {:db/id (str (java.util.UUID/randomUUID)) - :sales-summary-item/sort-order 1 - :sales-summary-item/category "Discounts" - :ledger-mapped/amount discount + :sales-summary-item/sort-order 2 + :sales-summary-item/category "Fees" + :ledger-mapped/amount fee :ledger-mapped/ledger-side :ledger-side/debit})) -(defn get-refund-items [c date] - (->> - (dc/q '[:find ?type-name (sum ?t) - :with ?e - :in $ [?clients ?start-date ?end-date] - :where - :where [(iol-ion.query/scan-sales-refunds $ ?clients ?start-date ?end-date) [[?e _ ?sort-default] ...]] - [?e :sales-refund/type ?type-name] - [?e :sales-refund/total ?t]] - (dc/db conn) - [[c] date date]) - (reduce - (fn [acc [type-name total]] - (update - acc - (cond (= type-name "CARD") - "Card Refunds" - (= type-name "CASH") - "Cash Refunds" - :else - "Food App Refunds") - (fnil + 0.0) - total)) - {}) - (map (fn [[k v]] - {:db/id (str (java.util.UUID/randomUUID)) - :sales-summary-item/sort-order 3 - :sales-summary-item/category k - :ledger-mapped/amount v - :ledger-mapped/ledger-side :ledger-side/credit})))) +(defn- get-tax-parquet [c date] + (let [client-code (if (map? c) (:client/code c) c) + date-str (.toString date) + tax (auto-ap.storage.sales-summaries/sum-taxes client-code date-str date-str)] + {:db/id (str (java.util.UUID/randomUUID)) + :sales-summary-item/category "Tax" + :sales-summary-item/sort-order 1 + :ledger-mapped/ledger-side :ledger-side/credit + :ledger-mapped/amount (or tax 0.0)})) + +(defn- get-tip-parquet [c date] + (let [client-code (if (map? c) (:client/code c) c) + date-str (.toString date) + tip (auto-ap.storage.sales-summaries/sum-tips client-code date-str date-str)] + {:ledger-mapped/ledger-side :ledger-side/credit + :sales-summary-item/sort-order 2 + :db/id (str (java.util.UUID/randomUUID)) + :sales-summary-item/category "Tip" + :ledger-mapped/amount (or tip 0.0)})) + +(defn- get-sales-parquet [c date] + (let [client-code (if (map? c) (:client/code c) c) + date-str (.toString date) + sales (auto-ap.storage.sales-summaries/sum-sales-by-category client-code date-str date-str)] + (for [{:keys [category total tax discount]} sales] + {:db/id (str (java.util.UUID/randomUUID)) + :sales-summary-item/category (or category "Unknown") + :sales-summary-item/sort-order 0 + :sales-summary-item/total total + :sales-summary-item/net (- (+ total discount) tax) + :sales-summary-item/tax tax + :sales-summary-item/discount discount + :ledger-mapped/ledger-side :ledger-side/credit + :ledger-mapped/amount (- (+ total discount) tax)}))) @@ -293,14 +289,13 @@ :sales-summary/items (->> - (get-sales c date) - (concat (get-payment-items c date)) - (concat (get-refund-items c date)) - (cons (get-discounts c date)) + (get-sales-parquet c date) + (concat (get-payment-items-parquet c date)) + (concat (get-refund-items-parquet c date)) + (cons (get-discounts-parquet c date)) (cons (get-fees c date)) - (cons (get-tax c date)) - (cons (get-tip c date)) - (cons (get-returns c date)) + (cons (get-tax-parquet c date)) + (cons (get-tip-parquet c date)) (filter identity) (map (fn [z] (assoc z :ledger-mapped/account (some-> z :sales-summary-item/category str/lower-case name->number lookup-account) @@ -311,7 +306,7 @@ (alog/info ::upserting-summaries :category-count (count (:sales-summary/items result))) @(dc/transact conn [[:upsert-entity result]])) - @(dc/transact conn [{:db/id id :sales-summary/dirty false}])))))) + @(dc/transact conn [{:db/id id :sales-summary/dirty false}])))))) (comment ;; TODO: Move to test file or proper location diff --git a/src/clj/auto_ap/routes/ezcater_xls.clj b/src/clj/auto_ap/routes/ezcater_xls.clj index d9af6e75..26e6e7e9 100644 --- a/src/clj/auto_ap/routes/ezcater_xls.clj +++ b/src/clj/auto_ap/routes/ezcater_xls.clj @@ -1,6 +1,6 @@ (ns auto-ap.routes.ezcater-xls (:require - [auto-ap.datomic :refer [audit-transact conn]] + [auto-ap.datomic :refer [conn]] [auto-ap.logging :as alog] [clojure.data.json :as json] [auto-ap.parse.excel :as excel] @@ -12,6 +12,7 @@ [auto-ap.ssr.ui :refer [base-page]] [auto-ap.ssr.utils :refer [html-response]] [auto-ap.time :as atime] + [auto-ap.storage.parquet :as parquet] [bidi.bidi :as bidi] [clj-time.coerce :as coerce] [clojure.java.io :as io] @@ -103,6 +104,47 @@ :else nil))) +(defn- flatten-order-to-parquet! [order] + "Flatten a sales-order into entity-type tagged maps and buffer to parquet." + (let [so-ext-id (:sales-order/external-id order) + so-date (some-> (:sales-order/date order) .toString) + client (:sales-order/client order) + client-code (if (map? client) (:client/code client) client)] + (parquet/buffer! "sales-order" + {:entity-type "sales-order" + :external-id so-ext-id + :client-code client-code + :location (:sales-order/location order) + :vendor (:sales-order/vendor order) + :total (:sales-order/total order) + :tax (:sales-order/tax order) + :tip (:sales-order/tip order) + :discount (:sales-order/discount order) + :service-charge (:sales-order/service-charge order) + :date so-date}) + (when-let [charges (:sales-order/charges order)] + (doseq [chg charges] + (parquet/buffer! "charge" + {:entity-type "charge" + :external-id (:charge/external-id chg) + :type-name (:charge/type-name chg) + :total (:charge/total chg) + :tax (:charge/tax chg) + :tip (:charge/tip chg) + :date so-date + :processor (some-> (:charge/processor chg) name) + :sales-order-external-id so-ext-id}))) + (when-let [items (:sales-order/line-items order)] + (doseq [li items] + (parquet/buffer! "line-item" + {:entity-type "line-item" + :item-name (:order-line-item/item-name li) + :category (:order-line-item/category li) + :total (:order-line-item/total li) + :tax (:order-line-item/tax li) + :discount (:order-line-item/discount li) + :sales-order-external-id so-ext-id}))))) + (defn stream->sales-orders [s] (let [clients (map first (dc/q '[:find (pull ?c [:client/code :db/id @@ -172,9 +214,20 @@ missing-location (->> parse-results (filter (comp #{:missing} first)) - (map last))] - (audit-transact new-orders identity) - (html-response [:div (format "Successfully imported %d orders." (count new-orders)) + (map last)) + buffered-count (loop [orders new-orders + count 0] + (if-let [o (first orders)] + (do + (try + (flatten-order-to-parquet! o) + (catch Exception e + (alog/error ::buffer-failed + :exception e + :order (:sales-order/external-id o)))) + (recur (rest orders) (inc count))) + count))] + (html-response [:div (format "Successfully imported %d orders." buffered-count) (when (seq missing-location) [:div "Missing the following locations" [:ul.ul diff --git a/src/clj/auto_ap/square/core3.clj b/src/clj/auto_ap/square/core3.clj index a0df0672..5a0f181b 100644 --- a/src/clj/auto_ap/square/core3.clj +++ b/src/clj/auto_ap/square/core3.clj @@ -3,6 +3,7 @@ [auto-ap.datomic :refer [conn remove-nils]] [auto-ap.logging :as log :refer [capture-context->lc with-context-as]] [auto-ap.time :as atime] + [auto-ap.storage.parquet :as parquet] [cemerick.url :as url] [clj-http.client :as client] [clj-time.coerce :as coerce] @@ -594,6 +595,57 @@ (s/buffer 5) (s/realize-each) (s/reduce conj [])))))) +(defn- flatten-order-to-parquet! [order] + "Flatten a sales-order into entity-type tagged maps and buffer to parquet. + Returns the sales-order external-id for logging." + (let [so-ext-id (:sales-order/external-id order) + so-date (some-> (:sales-order/date order) .toString) + client (:sales-order/client order) + client-code (when client (if (map? client) + (:client/code client) + client))] + (parquet/buffer! "sales-order" + {:entity-type "sales-order" + :external-id so-ext-id + :client-code (or client-code (:db/id client)) + :location (:sales-order/location order) + :vendor (:sales-order/vendor order) + :total (:sales-order/total order) + :tax (:sales-order/tax order) + :tip (:sales-order/tip order) + :discount (:sales-order/discount order) + :service-charge (:sales-order/service-charge order) + :date so-date}) + (when-let [charges (:sales-order/charges order)] + (doseq [chg charges] + (parquet/buffer! "charge" + {:entity-type "charge" + :external-id (:charge/external-id chg) + :type-name (:charge/type-name chg) + :total (:charge/total chg) + :tax (:charge/tax chg) + :tip (:charge/tip chg) + :date so-date + :processor (some-> (:charge/processor chg) name) + :sales-order-external-id so-ext-id}) + (when-let [returns (:charge/returns chg)] + (doseq [rt returns] + (parquet/buffer! "sales-refund" + {:entity-type "sales-refund" + :type-name (:type-name rt) + :total (:total rt) + :sales-order-external-id so-ext-id}))))) + (when-let [items (:sales-order/line-items order)] + (doseq [li items] + (parquet/buffer! "line-item" + {:entity-type "line-item" + :item-name (:order-line-item/item-name li) + :category (:order-line-item/category li) + :total (:order-line-item/total li) + :tax (:order-line-item/tax li) + :discount (:order-line-item/discount li) + :sales-order-external-id so-ext-id}))))) + (defn upsert ([client] (apply de/zip @@ -608,7 +660,13 @@ (doseq [x (partition-all 100 results)] (log/info ::loading-orders :count (count x)) - @(dc/transact-async conn x)))))))) + (doseq [order x] + (try + (flatten-order-to-parquet! order) + (catch Exception e + (log/error ::buffer-failed + :exception e + :order (:sales-order/external-id order))))))))))) (defn upsert-payouts diff --git a/src/clj/auto_ap/storage/sales_summaries.clj b/src/clj/auto_ap/storage/sales_summaries.clj index aa6bb2f8..12e8068c 100644 --- a/src/clj/auto_ap/storage/sales_summaries.clj +++ b/src/clj/auto_ap/storage/sales_summaries.clj @@ -39,7 +39,7 @@ "]) " "WHERE " (dq "client-code") - " IS NOT NULL " + " = '" client-id "' " "GROUP BY " (dq "processor") ", " (dq "type-name"))] @@ -59,3 +59,126 @@ (catch Exception e (println "[sales-summaries]" (.getMessage e)) {})))) + +(defn sum-discounts [client-id start-date end-date] + (let [files (pq-files "sales-order" start-date end-date)] + (try + (let [sql (str "SELECT SUM(" + (dq "discount") + ")::DOUBLE AS discount_total " + "FROM read_parquet([" + (str/join ", " files) + "]) " + "WHERE " + (dq "client-code") + " = '" client-id "'")] + (or (some-> (first (p/query-rows sql)) :discount_total sum-dbl) 0.0)) + (catch Exception e + (println "[sales-summaries/discounts]" (.getMessage e)) + 0.0)))) + +(defn sum-refunds-by-type [client-id start-date end-date] + (let [files (pq-files "sales-refund" start-date end-date)] + (try + (let [sql (str "SELECT " + (dq "type-name") + " AS type_name, " + "SUM(" + (dq "total") + ")::DOUBLE AS total_amount " + "FROM read_parquet([" + (str/join ", " files) + "]) " + "WHERE " + (dq "sales-order-external-id") + " IN (SELECT " + (dq "external-id") + " FROM read_parquet([" + (str/join ", " (pq-files "sales-order" start-date end-date)) + "]) WHERE " + (dq "client-code") + " = '" client-id "') " + "GROUP BY " (dq "type-name"))] + (let [rows (p/query-rows sql)] + (reduce (fn [acc row] + (let [tname (str/trim (name (:type_name row))) + total (sum-dbl (:total_amount row))] + (assoc acc tname (+ (get acc tname 0.0) total)))) + {} + rows))) + (catch Exception e + (println "[sales-summaries/refunds]" (.getMessage e)) + {})))) + +(defn sum-taxes [client-id start-date end-date] + (let [files (pq-files "sales-order" start-date end-date)] + (try + (let [sql (str "SELECT SUM(" + (dq "tax") + ")::DOUBLE AS tax_total " + "FROM read_parquet([" + (str/join ", " files) + "]) " + "WHERE " + (dq "client-code") + " = '" client-id "'")] + (or (some-> (first (p/query-rows sql)) :tax_total sum-dbl) 0.0)) + (catch Exception e + (println "[sales-summaries/tax]" (.getMessage e)) + 0.0)))) + +(defn sum-tips [client-id start-date end-date] + (let [files (pq-files "sales-order" start-date end-date)] + (try + (let [sql (str "SELECT SUM(" + (dq "tip") + ")::DOUBLE AS tip_total " + "FROM read_parquet([" + (str/join ", " files) + "]) " + "WHERE " + (dq "client-code") + " = '" client-id "'")] + (or (some-> (first (p/query-rows sql)) :tip_total sum-dbl) 0.0)) + (catch Exception e + (println "[sales-summaries/tip]" (.getMessage e)) + 0.0)))) + +(defn sum-sales-by-category [client-id start-date end-date] + (let [files (pq-files "line-item" start-date end-date)] + (try + (let [sql (str "SELECT " + (dq "category") + " AS category, " + "SUM(" + (dq "total") + ")::DOUBLE AS total_amount, " + "SUM(" + (dq "tax") + ")::DOUBLE AS tax_amount, " + "SUM(" + (dq "discount") + ")::DOUBLE AS discount_amount " + "FROM read_parquet([" + (str/join ", " files) + "]) " + "WHERE " + (dq "sales-order-external-id") + " IN (SELECT " + (dq "external-id") + " FROM read_parquet([" + (str/join ", " (pq-files "sales-order" start-date end-date)) + "]) WHERE " + (dq "client-code") + " = '" client-id "') " + "GROUP BY " (dq "category"))] + (let [rows (p/query-rows sql)] + (mapv (fn [row] + {:category (or (:category row) "Unknown") + :total (sum-dbl (:total_amount row)) + :tax (sum-dbl (:tax_amount row)) + :discount (sum-dbl (:discount_amount row))}) + rows))) + (catch Exception e + (println "[sales-summaries/sales]" (.getMessage e)) + []))))