enormous refactor but simplified much!

This commit is contained in:
Bryce Covert
2021-12-22 18:14:49 -08:00
parent a7c9d376bc
commit 7489426ccb
25 changed files with 1188 additions and 1258 deletions

View File

@@ -58,6 +58,7 @@
:body))
(defn get-transactions [access-token account-id start end]
(log/infof "looking up transactions from %s to %s for %s" start end account-id)
(-> (client/post (str base-url "/transactions/get")
{:as :json
:headers {"Content-Type" "application/json"}

View File

@@ -9,10 +9,11 @@
[datomic.api :as d]
[mount.core :as mount]
[unilog.context :as lc]
[yang.scheduler :as scheduler]))
[yang.scheduler :as scheduler]
[clj-time.coerce :as coerce]))
(defn get-plaid-accounts [db]
(-> (d/q '[:find ?ba ?external-id ?t
(-> (d/q '[:find ?ba ?c ?external-id ?t
:in $
:where
[?c :client/bank-accounts ?ba]
@@ -22,47 +23,36 @@
[?pi :plaid-item/access-token ?t]]
db )))
(defn import-plaid
[]
(lc/with-context {:source "Importing plaid transactions"}
(let [db (d/db conn)
import-id (y/start-import :import-source/plaid "Automated Plaid User")]
(defn plaid->transaction [t]
#:transaction {:description-original (:name t)
:raw-id (:transaction_id t)
:id (digest/sha-256 (:transaction_id t))
:amount (double (:amount t))
:date (coerce/to-date (auto-ap.time/parse (:date t) auto-ap.time/iso-date))
:status "POSTED"})
(defn import-plaid []
(lc/with-context {:source "Import plaid transactions"}
(let [import-batch (y/start-import-batch :import-source/plaid "Automated plaid user")
end (auto-ap.time/local-now)
start (time/plus end (time/days -30))]
(try
(let [result (->>
(for [[bank-account external-id access-token] (get-plaid-accounts db)
:let [end (auto-ap.time/local-now)
start (time/plus end (time/days -30))
_ (log/infof "importing from %s to %s for %s" start end external-id)
transactions (p/get-transactions access-token external-id start end)
transactions (->> transactions
:transactions
(filter (fn [t]
(not (:pending t))))
(mapv (fn [t]
{:description {:original (:name t)
:simple (:name t)}
:id (:transaction_id t)
:amount {:amount (:amount t)}
:date (:date t)
:status "POSTED"}))
)]]
(y/import-for-bank-account transactions bank-account import-id))
y/aggregate-results)]
(log/info "Plaid transactions imported" result)
(y/finish-import (assoc result :db/id import-id)))
(doseq [[bank-account-id client-id external-id access-token] (get-plaid-accounts (d/db conn))
transaction (:transactions (p/get-transactions access-token external-id start end))]
(when (not (:pending transaction))
(y/import-transaction! import-batch (assoc (plaid->transaction transaction)
:transaction/bank-account bank-account-id
:transaction/client client-id))))
(y/finish! import-batch)
(catch Exception e
(log/error e)
(y/finish-import {:db/id import-id}))))))
(y/fail! import-batch e))))))
(defn do-import []
(import-plaid))
(def do-import (allow-once do-import))
(def import-plaid (allow-once import-plaid))
(mount/defstate import-worker
:start (scheduler/every (* 1000 60 60 3) do-import)
:start (scheduler/every (* 1000 60 60 3) import-plaid)
:stop (scheduler/stop import-worker))