(ns auto-ap.import.plaid (:require [auto-ap.datomic :refer [conn random-tempid]] [auto-ap.import.common :refer [wrap-integration]] [auto-ap.import.transactions :as t] [auto-ap.logging :as alog] [auto-ap.plaid.core :as p] [auto-ap.solr] [auto-ap.time :as atime] [auto-ap.utils :refer [allow-once by]] [clj-time.coerce :as coerce] [clj-time.core :as time] [datomic.api :as dc] [digest :as di] [manifold.deferred :as de] [manifold.executor :as ex] [clojure.string :as str])) (defn get-plaid-accounts [db] (-> (dc/q '[:find ?ba ?c ?external-id ?t :in $ :where [?c :client/bank-accounts ?ba] [?ba :bank-account/plaid-account ?pa] [?pa :plaid-account/external-id ?external-id] [?pi :plaid-item/accounts ?pa] [?pi :plaid-item/access-token ?t]] db ))) (defn plaid->transaction [t plaid-merchant->vendor-id] (alog/info ::trying-transaction :transaction t) (cond-> #:transaction {:description-original (:name t) :raw-id (:transaction_id t) :db/id (random-tempid) :id #_{:clj-kondo/ignore [:unresolved-var]} (di/sha-256 (:transaction_id t)) :amount (if (= "credit" (:type (:account t))) (- (double (:amount t))) (- (double (:amount t)))) :date (coerce/to-date (atime/parse (or (:authorized_date t) (:date t)) atime/iso-date)) :status "POSTED"} (:merchant_name t) (assoc :transaction/plaid-merchant {:plaid-merchant/name (:merchant_name t) :db/id (random-tempid)}) (not (str/blank? (:check_number t))) (assoc :transaction/check-number (Integer/parseInt (:check_number t))) (plaid-merchant->vendor-id (:merchant_name t)) (assoc :transaction/default-vendor (plaid-merchant->vendor-id (:merchant_name t))))) (defn build-plaid-merchant->vendor-id [] (into {} (dc/q '[:find ?pmn ?v :in $ :where [?v :vendor/plaid-merchant ?pm] [?pm :plaid-merchant/name ?pmn]] (dc/db conn )))) (def single-thread (ex/fixed-thread-executor 1)) (defn rebuild-search-index [] (de/future-with single-thread (auto-ap.solr/index-documents-raw auto-ap.solr/impl "plaid_merchants" (for [[result] (dc/qseq {:query '[:find (pull ?v [:plaid-merchant/name :db/id]) :in $ :where [?v :plaid-merchant/name]] :args [(dc/db conn)]})] {"id" (:db/id result) "name" (:plaid-merchant/name result)})))) (defn import-plaid-int [] (let [import-batch (t/start-import-batch :import-source/plaid "Automated plaid user") end (atime/local-now) start (time/plus end (time/days -30)) plaid-merchant->vendor-id (build-plaid-merchant->vendor-id)] (try (doseq [[bank-account-id client-id external-id access-token] (get-plaid-accounts (dc/db conn)) :let [transaction-result (wrap-integration #(p/get-transactions access-token external-id start end) bank-account-id) accounts-by-id (by :account_id (:accounts transaction-result))] transaction (:transactions transaction-result)] (when (not (:pending transaction)) (t/import-transaction! import-batch (assoc (plaid->transaction (assoc transaction :account (accounts-by-id (:account_id transaction))) plaid-merchant->vendor-id) :transaction/bank-account bank-account-id :transaction/client client-id)))) (try (rebuild-search-index) (catch Exception e (alog/error ::cant-index-plaid :error e))) (t/finish! import-batch) (catch Exception e (t/fail! import-batch e))))) (def import-plaid (allow-once import-plaid-int))