diff --git a/src/clj/auto_ap/background/requests.clj b/src/clj/auto_ap/background/requests.clj index 5094be0f..866d80d5 100644 --- a/src/clj/auto_ap/background/requests.clj +++ b/src/clj/auto_ap/background/requests.clj @@ -6,7 +6,8 @@ [yang.scheduler :as scheduler] [clojure.tools.logging :as log] [auto-ap.intuit.import :as i] - [unilog.context :as lc])) + [unilog.context :as lc] + [auto-ap.yodlee.import :as y])) (def queue-url (:requests-queue-url env)) @@ -18,9 +19,22 @@ :count 1}))] (when message-id (log/infof "processing message %s with body %s" message-id body ) - (if (= ":intuit" body) + (cond + (= ":intuit" body) (try (i/upsert-transactions) + (catch Exception e + (log/error e))) + + (= ":yodlee" body) + (try + (y/do-import) + (catch Exception e + (log/error e))) + + (= ":yodlee2" body) + (try + (y/do-import2) (catch Exception e (log/error e)))) (sqs/delete-message {:queue-url queue-url diff --git a/src/clj/auto_ap/graphql.clj b/src/clj/auto_ap/graphql.clj index b25f605a..86b722a8 100644 --- a/src/clj/auto_ap/graphql.clj +++ b/src/clj/auto_ap/graphql.clj @@ -1000,9 +1000,9 @@ {:enum-value :requires_feedback} {:enum-value :excluded}]}} :mutations - {:request_intuit_import {:type 'String - :args {} - :resolve :mutation/request-intuit-import} + {:request_import {:type 'String + :args {:which {:type 'String}} + :resolve :mutation/request-import} :reject_invoices {:type '(list :id) :args {:invoices {:type '(list :id)}} @@ -1413,7 +1413,7 @@ :mutation/void-payment gq-checks/void-check :mutation/edit-expense-accounts gq-invoices/edit-expense-accounts :mutation/import-ledger gq-ledger/import-ledger - :mutation/request-intuit-import gq-requests/request-intuit-import + :mutation/request-import gq-requests/request-import :get-vendor gq-vendors/get-graphql}) schema/compile)) diff --git a/src/clj/auto_ap/graphql/requests.clj b/src/clj/auto_ap/graphql/requests.clj index 3126acec..4a8db809 100644 --- a/src/clj/auto_ap/graphql/requests.clj +++ b/src/clj/auto_ap/graphql/requests.clj @@ -4,8 +4,8 @@ [auto-ap.graphql.utils :refer [assert-admin]] [config.core :refer [env]])) -(defn request-intuit-import [context value args] +(defn request-import [context value args] (assert-admin (:id context)) (:message-id (sqs/send-message {:queue-url (:requests-queue-url env) - :message-body ":intuit"} ))) + :message-body (:which value)} ))) diff --git a/src/clj/auto_ap/intuit/import.clj b/src/clj/auto_ap/intuit/import.clj index 2a1b51de..cd191215 100644 --- a/src/clj/auto_ap/intuit/import.clj +++ b/src/clj/auto_ap/intuit/import.clj @@ -2,7 +2,7 @@ (:require [amazonica.aws.s3 :as s3] [auto-ap.datomic :refer [conn remove-nils]] [auto-ap.intuit.core :as i] - [auto-ap.utils :refer [by]] + [auto-ap.utils :refer [by allow-once]] [auto-ap.yodlee.import :as y] [clj-time.coerce :as coerce] [clj-time.core :as time] @@ -26,51 +26,29 @@ db #_client-whitelist))) -(defn allow-once [f] - (let [in-progress? (atom false)] - (fn [] - (println in-progress?) - (when (= false @in-progress?) - (try - (reset! in-progress? true) - (f) - (finally - (reset! in-progress? false))))))) - (defn upsert-transactions [] - (let [db (d/db conn) - import-id (y/start-import :import-source/intuit "Automated Intuit User") - stats (atom {:import-batch/imported 0 - :import-batch/extant 0 - :import-batch/suppressed 0})] - - (doseq [[bank-account external-id] (get-intuit-bank-accounts db) - :let [bank-account (d/entity db bank-account) - end (auto-ap.time/local-now) - start (time/plus end (time/days -30))]] - (log/infof "importing from %s to %s for %s" start end external-id) - (let [transactions (->> (i/get-transactions (auto-ap.time/unparse start auto-ap.time/iso-date) - (auto-ap.time/unparse end auto-ap.time/iso-date) - external-id) - (mapv (fn [r] - {:client-id (:db/id (:client/_bank-accounts bank-account)) - :bank-account-id (:db/id bank-account) - :description-original (:Memo/Description r) - :amount (Double/parseDouble (:Amount r)) - :date (auto-ap.time/parse (:Date r) auto-ap.time/iso-date) - :status "posted"})))] - (log/infof "%d transactions found" (count transactions)) - (let [result (y/grouped-import transactions import-id)] - (swap! stats - (fn [s] - (merge-with + s {:import-batch/imported (count (:import result)) - :import-batch/extant (count (:extant result)) - :import-batch/suppressed (count (:suppressed result))})))))) - (y/finish-import (assoc @stats :db/id import-id)))) + (lc/with-context {:source "Importing intuit transactions"} + (let [db (d/db conn)] + (->> + (for [[bank-account external-id] (get-intuit-bank-accounts db) + :let [bank-account (d/entity db bank-account) + end (auto-ap.time/local-now) + start (time/plus end (time/days -30)) + _ (log/infof "importing from %s to %s for %s" start end external-id)] + transaction (i/get-transactions (auto-ap.time/unparse start auto-ap.time/iso-date) + (auto-ap.time/unparse end auto-ap.time/iso-date) + external-id)] + {:client-id (:db/id (:client/_bank-accounts bank-account)) + :bank-account-id (:db/id bank-account) + :description-original (:Memo/Description transaction) + :amount (Double/parseDouble (:Amount transaction)) + :date (auto-ap.time/parse (:Date transaction) auto-ap.time/iso-date) + :status "posted"}) + (y/grouped-import :import-source/intuit "Automated Intuit User")) + (log/info "Intuit transactions imported")))) (def upsert-transactions (allow-once upsert-transactions)) - (defn dry-run-upsert-transactions [] (let [db (d/db conn)] (clojure.data.csv/write-csv diff --git a/src/clj/auto_ap/yodlee/import.clj b/src/clj/auto_ap/yodlee/import.clj index ffc282ab..4d475f4d 100644 --- a/src/clj/auto_ap/yodlee/import.clj +++ b/src/clj/auto_ap/yodlee/import.clj @@ -1,24 +1,25 @@ (ns auto-ap.yodlee.import - (:require [auto-ap.datomic :refer [audit-transact conn remove-nils uri]] - [auto-ap.datomic.accounts :as a] - [auto-ap.datomic.checks :as d-checks] - [auto-ap.datomic.clients :as d-clients] - [auto-ap.datomic.transaction-rules :as tr] - [auto-ap.datomic.transactions :as d-transactions] - [auto-ap.rule-matching :as rm] - [auto-ap.time :as time] - [auto-ap.utils :refer [by dollars=]] - [auto-ap.yodlee.core :as client] - [auto-ap.yodlee.core2 :as client2] - [clj-time.coerce :as coerce] - [clj-time.core :as t] - [clojure.string :as str] - [clojure.tools.logging :as log] - [datomic.api :as d] - [digest :refer [sha-256]] - [mount.core :as mount] - [unilog.context :as lc] - [yang.scheduler :as scheduler])) + (:require + [auto-ap.datomic :refer [audit-transact conn remove-nils uri]] + [auto-ap.datomic.accounts :as a] + [auto-ap.datomic.checks :as d-checks] + [auto-ap.datomic.clients :as d-clients] + [auto-ap.datomic.transaction-rules :as tr] + [auto-ap.datomic.transactions :as d-transactions] + [auto-ap.rule-matching :as rm] + [auto-ap.time :as time] + [auto-ap.utils :refer [allow-once by dollars=]] + [auto-ap.yodlee.core :as client] + [auto-ap.yodlee.core2 :as client2] + [clj-time.coerce :as coerce] + [clj-time.core :as t] + [clojure.string :as str] + [clojure.tools.logging :as log] + [datomic.api :as d] + [digest :refer [sha-256]] + [mount.core :as mount] + [unilog.context :as lc] + [yang.scheduler :as scheduler])) (defn rough-match [client-id bank-account-id amount] (if (and client-id bank-account-id amount) @@ -57,6 +58,19 @@ :else (rough-match client-id bank-account-id amount))) +(defn aggregate-results [results] + (reduce + (fn [acc result] + (merge-with + acc + {:import-batch/imported (count (:import result)) + :import-batch/extant (count (:extant result)) + :import-batch/suppressed (count (:suppressed result))})) + + {:import-batch/imported 0 + :import-batch/extant 0 + :import-batch/suppressed 0} + results)) + (defn match-transaction-to-unfulfilled-autopayments [amount client-id] @@ -172,36 +186,34 @@ first first))) -(defn transactions->txs [transactions import-id transaction->bank-account apply-rules existing] - (let [grouped-transactions (group-by - (fn [{post-date :postDate - account-id :accountId +(defn transactions->txs [transactions bank-account import-id apply-rules existing] + (let [client (:client/_bank-accounts bank-account) + client-id (:db/id client) + bank-account-id (:db/id bank-account) + valid-locations (or (:bank-account/locations bank-account) (:client/locations client)) + grouped-transactions (group-by + (fn [{id :id date :date - id :id status :status :as transaction}] - (let [bank-account (transaction->bank-account transaction) - bank-account-id (:db/id bank-account) - client (:client/_bank-accounts bank-account) - client-id (:db/id client) - valid-locations (or (:bank-account/locations bank-account) (:client/locations client)) - - date (time/parse date "YYYY-MM-dd") + (let [date (time/parse date "YYYY-MM-dd") id (sha-256 (str id))] - (cond (= :transaction-approval-status/suppressed (existing id)) :suppressed (existing id) :extant - (not client-id) + (not client) + :error + + (not bank-account) :error (not= "POSTED" status) :not-posted (and (:bank-account/start-date bank-account) - (not (t/after? date (:bank-account/start-date bank-account)))) + (not (t/after? date (-> bank-account :bank-account/start-date coerce/to-date-time)))) :not-ready :else @@ -230,20 +242,16 @@ (- amount) amount) check-number (extract-check-number transaction) - bank-account (transaction->bank-account transaction) - bank-account-id (:db/id bank-account) - client (:client/_bank-accounts bank-account) - client-id (:db/id client) - valid-locations (or (:bank-account/locations bank-account) (:client/locations client)) date (time/parse date "YYYY-MM-dd")]] - (let [existing-check (transaction->existing-payment transaction check-number client-id bank-account-id amount id) + (let [ + existing-check (transaction->existing-payment transaction check-number client-id bank-account-id amount id) autopay-invoices-matches (when-not existing-check - (match-transaction-to-unfulfilled-autopayments amount client-id )) + (match-transaction-to-unfulfilled-autopayments amount client-id)) unpaid-invoices-matches (when-not existing-check (match-transaction-to-unpaid-invoices amount client-id )) expected-deposit (when (and (> amount 0.0) (not existing-check)) - (find-expected-deposit client-id amount date))] + (find-expected-deposit (:db/id client) amount date))] (cond-> [#:transaction {:post-date (coerce/to-date (time/parse post-date "YYYY-MM-dd")) @@ -296,14 +304,15 @@ -(defn get-existing [] +(defn get-existing [bank-account] (into {} (d/query {:query {:find ['?tid '?as2] - :in ['$] - :where ['[?e :transaction/id ?tid] + :in ['$ '?ba] + :where ['[?e :transaction/bank-account ?ba] + '[?e :transaction/id ?tid] '[?e :transaction/approval-status ?as] '[?as :db/ident ?as2]]} - :args [(d/db (d/connect uri))]}))) + :args [(d/db (d/connect uri)) bank-account]}))) (defn get-all-bank-accounts [] (->> (d-clients/get-all) @@ -325,36 +334,55 @@ (defn finish-import [i] @(d/transact conn [(assoc i :import-batch/status :import-status/completed)])) +(defn import-for-bank-account [transactions bank-account-id import-id] + (let [all-rules (tr/get-all) + bank-account (d/pull (d/db conn) + [:bank-account/code + :db/id + :bank-account/locations + :bank-account/start-date + {:client/_bank-accounts [:client/code :client/locations :db/id]} ] + bank-account-id) + _ (log/infof "Importing %d transactions for bank account %s for client %s" (count transactions) (-> bank-account :bank-account/code) (-> bank-account :client/_bank-accounts :client/code)) + transactions (transactions->txs transactions bank-account import-id (rm/rule-applying-fn all-rules) (get-existing bank-account-id))] + (doseq [tx (:import transactions)] + (audit-transact tx {:user/name "Yodlee import" + :user/role ":admin"})) + (log/infof "Completed import for %s" bank-account) + transactions)) -(defn grouped-import [manual-transactions import-id] - (lc/with-context {:source "grouped import"} - (let [transformed-transactions (->> manual-transactions - (filter #(= "posted" (:status %))) - (group-by #(select-keys % [:date :description-original :amount :client-id :bank-account-id])) - (vals) - (mapcat (fn [transaction-group] - (map - (fn [index {:keys [date description-original high-level-category amount bank-account-id client-id] :as transaction}] - {:id (str date "-" bank-account-id "-" description-original "-" amount "-" index "-" client-id) - :bank-account-id bank-account-id - :date (time/unparse date "YYYY-MM-dd") - :amount {:amount amount} - :description {:original description-original - :simple high-level-category} - :status "POSTED"}) - (range) - transaction-group)))) - all-rules (tr/get-all) - all-bank-accounts (by :db/id (get-all-bank-accounts)) - transaction->bank-account (comp all-bank-accounts :bank-account-id) - transactions (transactions->txs transformed-transactions import-id transaction->bank-account (rm/rule-applying-fn all-rules) (get-existing))] - (log/info "Importing " (count (:import transformed-transactions)) " grouped transactions") - - (doseq [tx (:import transactions)] - (audit-transact tx {:user/name "Yodlee import" - :user/role ":admin"})) - (log/info "Imported grouped transactions") - transactions))) +(defn grouped-import [source user manual-transactions] + (let [import-id (start-import source user)] + (lc/with-context {:import-id import-id} + (try + (let [transactions-by-bank-account (->> manual-transactions + (filter #(= "posted" (:status %))) + (group-by #(select-keys % [:date :description-original :amount :client-id :bank-account-id])) + (vals) + (mapcat (fn [transaction-group] + (map + (fn [index {:keys [date description-original high-level-category amount bank-account-id client-id] :as transaction}] + {:id (str date "-" bank-account-id "-" description-original "-" amount "-" index "-" client-id) + :bank-account-id bank-account-id + :date (time/unparse date "YYYY-MM-dd") + :amount {:amount amount} + :description {:original description-original + :simple high-level-category} + :status "POSTED"}) + (range) + transaction-group))) + (group-by :bank-account-id)) + result (->> transactions-by-bank-account + (filter (fn [[bank-account]] + bank-account)) + (map (fn [[bank-account transactions]] + (import-for-bank-account transactions bank-account import-id))) + aggregate-results)] + (finish-import (assoc result :db/id import-id))) + (catch Exception e + (log/error e) + (finish-import {:db/id import-id}) + ))))) (defn grouped-new [manual-transactions] (let [transformed-transactions (->> manual-transactions @@ -379,48 +407,73 @@ transactions (transactions->txs transformed-transactions nil transaction->bank-account (rm/rule-applying-fn all-rules) (get-existing))] transactions)) + + (defn do-import ([] (do-import (client/get-transactions))) ([transactions] (lc/with-context {:source "Import yodlee transactions"} - (do - (log/info "importing from yodlee") - (let [import-id (start-import :import-source/yodlee "Automated Yodlee User") - all-bank-accounts (get-all-bank-accounts) - transaction->bank-account (comp (by :bank-account/yodlee-account-id all-bank-accounts) :accountId) - all-rules (tr/get-all) - transactions (transactions->txs transactions import-id transaction->bank-account (rm/rule-applying-fn all-rules) (get-existing))] - (doseq [tx (:import transactions)] - (audit-transact tx {:user/name "Yodlee import" - :user/role ":admin"})) - (finish-import {:db/id import-id - :import-batch/imported (count (:import transactions)) - :import-batch/extant (count (:extant transactions)) - :import-batch/suppressed (count (:suppresed transactions))})))))) + (let [import-id (start-import :import-source/yodlee "Automated Yodlee User")] + (lc/with-context {:import-id import-id} + (try + (log/info "importing from yodlee") + (let [account-lookup (->> (d/q '[:find ?ya ?ba + :in $ + :where [?ba :bank-account/yodlee-account-id ?ya]] + (d/db conn)) + (into {})) + transactions-by-bank-account (group-by + #(account-lookup (:accountId %)) + transactions) + result (->> transactions-by-bank-account + (filter (fn [[bank-account]] + bank-account)) + (map (fn [[bank-account transactions]] + (import-for-bank-account transactions bank-account import-id))) + aggregate-results)] + + + (finish-import (assoc result :db/id import-id))) + (catch Exception e + (log/error e) + (finish-import {:db/id import-id})))))))) +(def do-import (allow-once do-import)) + (defn do-import2 ([] + (log/info "starting import") (do-import2 (client2/get-transactions "NGGL"))) ([transactions] - (lc/with-context {:source "Import yodlee transactions"} + (lc/with-context {:source "Import yodlee2 transactions"} + (let [import-id (start-import :import-source/yodlee "Automated Yodlee User")] + (lc/with-context {:import-id import-id} + (try + (let [account-lookup (->> (d/q '[:find ?ya ?ba + :in $ + :where + [?ba :bank-account/yodlee-account ?y] + [?y :yodlee-account/id ?ya]] + (d/db conn)) + (into {})) + transactions-by-bank-account (group-by + #(account-lookup (:accountId %)) + transactions) + result (->> transactions-by-bank-account + (filter (fn [[bank-account]] + bank-account)) + (map (fn [[bank-account transactions]] + (import-for-bank-account transactions bank-account import-id))) + aggregate-results)] + - (do - (log/info "importing from yodlee2") - (let [import-id (start-import :import-source/yodlee2 "Automated Yodlee User") - all-bank-accounts (get-all-bank-accounts) - transaction->bank-account (comp (by (comp :yodlee-account/id :bank-account/yodlee-account) all-bank-accounts) :accountId) - all-rules (tr/get-all) - transactions (transactions->txs transactions import-id transaction->bank-account (rm/rule-applying-fn all-rules) (get-existing))] + (finish-import (assoc result :db/id import-id))) + (catch Exception e + (log/error e) + (finish-import {:db/id import-id})))))))) - (doseq [tx (:import transactions)] - - (audit-transact tx {:user/name "Yodlee import2" - :user/role ":admin"})) - (finish-import {:db/id import-id - :import-batch/imported (count (:import transactions)) - :import-batch/extant (count (:extant transactions)) - :import-batch/suppressed (count (:suppresed transactions))})))))) +(def do-import2 (allow-once do-import2)) diff --git a/src/cljc/auto_ap/utils.cljc b/src/cljc/auto_ap/utils.cljc index b5fa4a92..2cfd0d96 100644 --- a/src/cljc/auto_ap/utils.cljc +++ b/src/cljc/auto_ap/utils.cljc @@ -69,3 +69,13 @@ (last vs)))) (def default-pagination-size 20) + +(defn allow-once [f] + (let [in-progress? (atom false)] + (fn [] + (when (= false @in-progress?) + (try + (reset! in-progress? true) + (f) + (finally + (reset! in-progress? false))))))) diff --git a/src/cljs/auto_ap/views/pages/admin/import_batches.cljs b/src/cljs/auto_ap/views/pages/admin/import_batches.cljs index 6181043f..2eafaa45 100644 --- a/src/cljs/auto_ap/views/pages/admin/import_batches.cljs +++ b/src/cljs/auto_ap/views/pages/admin/import_batches.cljs @@ -40,26 +40,60 @@ (set/rename-keys (:result result) {:import-batches :data})])}})) -(re-frame/reg-event-db - ::success-intuit - (fn [db [_ n]] - (assoc db ::msg (str "Your job " (:request-intuit-import n) " has been scheduled." )))) - (re-frame/reg-sub ::msg (fn [db] (::msg db))) +(re-frame/reg-event-db + ::success-intuit + (fn [db [_ n]] + (assoc db ::msg (str "Your job " (:request-import n) " has been scheduled." )))) + + + (re-frame/reg-event-fx ::request-intuit [with-user ] (fn [{:keys [db user] :as cofx} [_ params]] {:graphql {:token user :owns-state {:single ::intuit} - :query "mutation RequestIntuitImport{request_intuit_import}" + :query "mutation RequestIntuitImport{request_import(which: \":intuit\")}" :on-success [::success-intuit] }})) + +(re-frame/reg-event-db + ::success-yodlee + (fn [db [_ n]] + (assoc db ::msg (str "Your job " (:request-import n) " has been scheduled." )))) + +(re-frame/reg-event-fx + ::request-yodlee + [with-user ] + (fn [{:keys [db user] :as cofx} [_ params]] + {:graphql {:token user + :owns-state {:single ::yodlee} + :query "mutation RequestIntuitImport{request_import(which: \":yodlee\")}" + :on-success [::success-yodlee] + }})) + +(re-frame/reg-event-db + ::success-yodlee2 + (fn [db [_ n]] + (assoc db ::msg (str "Your job " (:request-import n) " has been scheduled." )))) + +(re-frame/reg-event-fx + ::request-yodlee2 + [with-user ] + (fn [{:keys [db user] :as cofx} [_ params]] + {:graphql {:token user + :owns-state {:single ::yodlee2} + :query "mutation RequestIntuitImport{request_import(which: \":yodlee2\")}" + :on-success [::success-yodlee2] + }})) + + (re-frame/reg-event-fx ::mounted (fn [{:keys [db]}] @@ -80,7 +114,9 @@ (with-meta (fn [] (let [user @(re-frame/subscribe [::subs/user]) - request-import-status @(re-frame/subscribe [::status/single ::intuit]) + intuit-request-import-status @(re-frame/subscribe [::status/single ::intuit]) + yodlee-request-import-status @(re-frame/subscribe [::status/single ::yodlee]) + yodlee2-request-import-status @(re-frame/subscribe [::status/single ::yodlee2]) message @(re-frame/subscribe [::msg])] [:div [:h1.title "Import Batches"] @@ -90,11 +126,24 @@ [:div [:div.is-pulled-right [:div.buttons + [:button.button.is-primary-two.is-medium {:aria-haspopup true + :type "button" + :on-click (dispatch-event [::request-yodlee]) + :disabled (status/disabled-for yodlee-request-import-status) + :class (status/class-for yodlee-request-import-status)} + + "Start Yodlee Import"] + [:button.button.is-primary-two.is-medium {:aria-haspopup true + :type "button" + :on-click (dispatch-event [::request-yodlee2]) + :disabled (status/disabled-for yodlee2-request-import-status) + :class (status/class-for yodlee2-request-import-status)} + "Start Yodlee2 Import"] [:button.button.is-primary-two.is-medium {:aria-haspopup true :type "button" :on-click (dispatch-event [::request-intuit]) - :disabled (status/disabled-for request-import-status) - :class (status/class-for request-import-status)} + :disabled (status/disabled-for intuit-request-import-status) + :class (status/class-for intuit-request-import-status)} "Start Intuit Import"]]] [table/table {:id :import-batches :data-page ::page}]])]))