(ns auto-ap.plaid.import (:require [auto-ap.datomic :refer [conn]] [auto-ap.plaid.core :as p] [auto-ap.utils :refer [allow-once]] [auto-ap.yodlee.import :as y] [clj-time.core :as time] [clojure.tools.logging :as log] [datomic.api :as d] [mount.core :as mount] [unilog.context :as lc] [yang.scheduler :as scheduler] [clj-time.coerce :as coerce])) (defn get-plaid-accounts [db] (-> (d/q '[:find ?ba ?c ?external-id ?t :in $ :where [?c :client/bank-accounts ?ba] [?ba :bank-account/plaid-account ?pa] [?pa :plaid-account/external-id ?external-id] [?pi :plaid-item/accounts ?pa] [?pi :plaid-item/access-token ?t]] db ))) (defn plaid->transaction [t] #:transaction {:description-original (:name t) :raw-id (:transaction_id t) :id (digest/sha-256 (:transaction_id t)) :amount (double (:amount t)) :date (coerce/to-date (auto-ap.time/parse (:date t) auto-ap.time/iso-date)) :status "POSTED"}) (defn import-plaid [] (lc/with-context {:source "Import plaid transactions"} (let [import-batch (y/start-import-batch :import-source/plaid "Automated plaid user") end (auto-ap.time/local-now) start (time/plus end (time/days -30))] (try (doseq [[bank-account-id client-id external-id access-token] (get-plaid-accounts (d/db conn)) transaction (:transactions (p/get-transactions access-token external-id start end))] (when (not (:pending transaction)) (y/import-transaction! import-batch (assoc (plaid->transaction transaction) :transaction/bank-account bank-account-id :transaction/client client-id)))) (y/finish! import-batch) (catch Exception e (y/fail! import-batch e)))))) (def import-plaid (allow-once import-plaid)) (mount/defstate import-worker :start (scheduler/every (* 1000 60 60 3) import-plaid) :stop (scheduler/stop import-worker))