Yodlee exists as datomic entities!
This commit is contained in:
@@ -8,11 +8,22 @@
|
||||
[clojure.core.async :as async]
|
||||
[config.core :refer [env]]
|
||||
[mount.core :as mount]
|
||||
[yang.scheduler :as scheduler]))
|
||||
[yang.scheduler :as scheduler]
|
||||
[clj-time.coerce :as coerce]
|
||||
[auto-ap.time :as time2]
|
||||
[datomic.api :as d]
|
||||
[auto-ap.datomic :refer [conn]]
|
||||
[auto-ap.datomic.clients :as d-clients]))
|
||||
(defn client-code->login [client-code]
|
||||
(if (< (count client-code) 3)
|
||||
(str client-code "_" client-code)
|
||||
client-code))
|
||||
|
||||
(defn auth-header
|
||||
([cob-session] (str "Bearer " cob-session)))
|
||||
|
||||
|
||||
|
||||
(def other-config
|
||||
(if (:yodlee2-proxy-host env)
|
||||
{:proxy-host (:yodlee2-proxy-host env)
|
||||
@@ -40,10 +51,10 @@
|
||||
:token
|
||||
:accessToken))
|
||||
|
||||
(defn login-user []
|
||||
(defn login-user [client-code]
|
||||
(-> (str (:yodlee2-base-url env) "/auth/token")
|
||||
(client/post (merge {:headers (assoc base-headers
|
||||
"loginName" (:yodlee2-integreat-user env)
|
||||
"loginName" client-code
|
||||
"Content-Type" "application/x-www-form-urlencoded")
|
||||
:body (str "clientId=" (:yodlee2-client-id env) " &secret=" (:yodlee2-client-secret env))
|
||||
:as :json}
|
||||
@@ -53,9 +64,8 @@
|
||||
:token
|
||||
:accessToken))
|
||||
|
||||
|
||||
(defn get-accounts []
|
||||
(let [cob-session (login-user)]
|
||||
(defn get-accounts [client-code ]
|
||||
(let [cob-session (login-user client-code)]
|
||||
(-> (str (:yodlee2-base-url env) "/accounts")
|
||||
(client/get (merge {:headers (merge base-headers {"Authorization" (str "Bearer " cob-session)})
|
||||
:as :json}
|
||||
@@ -63,9 +73,9 @@
|
||||
:body
|
||||
:account)))
|
||||
|
||||
(defn get-accounts-for-provider-account [provider-account-id]
|
||||
(defn get-accounts-for-provider-account [client-code provider-account-id]
|
||||
(try
|
||||
(let [cob-session (login-user)]
|
||||
(let [cob-session (login-user client-code)]
|
||||
(-> (str (:yodlee2-base-url env) "/accounts?providerAccountId=" provider-account-id)
|
||||
(client/get (merge {:headers (merge base-headers {"Authorization" (auth-header cob-session)})
|
||||
:as :json}
|
||||
@@ -77,8 +87,8 @@
|
||||
e)
|
||||
[])))
|
||||
|
||||
(defn get-provider-accounts []
|
||||
(let [cob-session (login-user)]
|
||||
(defn get-provider-accounts [client-code ]
|
||||
(let [cob-session (login-user client-code)]
|
||||
(-> (str (:yodlee2-base-url env) "/providerAccounts")
|
||||
(-> (client/get (merge {:headers (merge base-headers {"Authorization" (auth-header cob-session )})
|
||||
:as :json}
|
||||
@@ -88,8 +98,8 @@
|
||||
|
||||
|
||||
|
||||
(defn get-transactions []
|
||||
(let [cob-session (login-user)
|
||||
(defn get-transactions [client-code]
|
||||
(let [cob-session (login-user client-code)
|
||||
batch-size 100
|
||||
get-transaction-batch (fn [skip]
|
||||
(-> (str (:yodlee2-base-url env) "/transactions?top=" batch-size "&skip=" skip)
|
||||
@@ -111,8 +121,8 @@
|
||||
|
||||
|
||||
|
||||
(defn get-provider-account [id]
|
||||
(let [cob-session (login-user)
|
||||
(defn get-provider-account [client-code id]
|
||||
(let [cob-session (login-user client-code)
|
||||
batch-size 100]
|
||||
|
||||
(-> (str (:yodlee2-base-url env) "/providerAccounts/" id)
|
||||
@@ -123,8 +133,8 @@
|
||||
:body
|
||||
:providerAccount)))
|
||||
|
||||
(defn get-provider-account-detail [id]
|
||||
(let [cob-session (login-user)]
|
||||
(defn get-provider-account-detail [client-code id]
|
||||
(let [cob-session (login-user client-code)]
|
||||
|
||||
(-> (str (:yodlee2-base-url env) "/providerAccounts/" id )
|
||||
|
||||
@@ -136,8 +146,8 @@
|
||||
:providerAccount
|
||||
first)))
|
||||
|
||||
(defn update-provider-account [pa]
|
||||
(let [cob-session (login-user)]
|
||||
(defn update-provider-account [client-code pa]
|
||||
(let [cob-session (login-user client-code)]
|
||||
|
||||
(-> (str (:yodlee2-base-url env) "/providerAccounts?providerAccountIds=" pa)
|
||||
|
||||
@@ -151,8 +161,8 @@
|
||||
|
||||
|
||||
|
||||
(defn get-specific-transactions [account]
|
||||
(let [cob-session (login-user)
|
||||
(defn get-specific-transactions [client-code account]
|
||||
(let [cob-session (login-user client-code)
|
||||
batch-size 100
|
||||
get-transaction-batch (fn [skip]
|
||||
(-> (str (:yodlee2-base-url env) "/transactions?top=" batch-size "&skip=" skip "&accountId=" account)
|
||||
@@ -172,23 +182,23 @@
|
||||
transactions)))))
|
||||
|
||||
|
||||
(defn get-access-token []
|
||||
(defn get-access-token [client-code]
|
||||
(try
|
||||
(let [cob-session (login-user)]
|
||||
(let [cob-session (login-user client-code)]
|
||||
cob-session)
|
||||
(catch Exception e
|
||||
(log/error e)
|
||||
(throw e))))
|
||||
|
||||
(defn create-user []
|
||||
(defn create-user [client-code]
|
||||
(let [cob-session (login-cobrand)]
|
||||
(-> (str (:yodlee2-base-url env) "/user/register")
|
||||
(client/post (merge {:headers (merge base-headers {"Authorization" (auth-header cob-session)})
|
||||
:body (json/write-str {"user" {
|
||||
"loginName" "integreat-main"
|
||||
"loginName" client-code
|
||||
"email" "bryce@integreatconsult.com"
|
||||
"name" {"first" "Bryce"
|
||||
"last" "Covert"}
|
||||
"name" {"first" client-code
|
||||
"last" client-code}
|
||||
"address" {"address1" "200 Lincoln Ave"
|
||||
"state" "CA"
|
||||
"city" "Salinas"
|
||||
@@ -203,34 +213,38 @@
|
||||
:body)))
|
||||
|
||||
|
||||
(defn assert-user [client-code]
|
||||
(let [cob-session (login-cobrand client-code)]
|
||||
cob-session))
|
||||
|
||||
(defn get-provider-accounts-with-details []
|
||||
(let [provider-accounts (get-provider-accounts)]
|
||||
|
||||
|
||||
(defn get-provider-accounts-with-details [client-code ]
|
||||
(let [provider-accounts (get-provider-accounts client-code)]
|
||||
(let [concurrent 20
|
||||
output-chan (async/chan)]
|
||||
(async/pipeline-blocking concurrent
|
||||
output-chan
|
||||
(map (fn [provider-account]
|
||||
(lc/with-context {:provider-account-id (:id provider-account)}
|
||||
(get-provider-account-detail (:id provider-account)))))
|
||||
(get-provider-account-detail client-code (:id provider-account)))))
|
||||
(async/to-chan provider-accounts))
|
||||
(async/<!! (async/into [] output-chan)))))
|
||||
|
||||
(defn concurrent-get-accounts-for-providers [provider-account-ids]
|
||||
(let [concurrent 20
|
||||
output-chan (async/chan)]
|
||||
(async/pipeline-blocking concurrent
|
||||
output-chan
|
||||
(map (fn [provider-account-id]
|
||||
(lc/with-context {:provider-account-id provider-account-id}
|
||||
[provider-account-id
|
||||
(get-accounts-for-provider-account provider-account-id)])))
|
||||
(async/to-chan provider-account-ids))
|
||||
(async/<!! (async/into {} output-chan))))
|
||||
(defn get-accounts-for-providers [client-code provider-account-ids]
|
||||
(log/info "looking up " (count provider-account-ids) " provider accounts for client " client-code ".")
|
||||
(into {}
|
||||
(mapv (fn [provider-account-id]
|
||||
(lc/with-context {:provider-account-id provider-account-id}
|
||||
[provider-account-id
|
||||
(get-accounts-for-provider-account client-code provider-account-id)]))
|
||||
provider-account-ids)))
|
||||
|
||||
(defn get-provider-accounts-with-accounts []
|
||||
(let [provider-accounts (by :id (get-provider-accounts-with-details))
|
||||
accounts (concurrent-get-accounts-for-providers (keys provider-accounts))]
|
||||
|
||||
|
||||
(defn get-provider-accounts-with-accounts [client-code]
|
||||
(let [provider-accounts (by :id (get-provider-accounts-with-details client-code))
|
||||
accounts (get-accounts-for-providers client-code (keys provider-accounts))]
|
||||
(->> accounts
|
||||
(reduce
|
||||
(fn [provider-accounts [which accounts]]
|
||||
@@ -238,33 +252,12 @@
|
||||
provider-accounts)
|
||||
vals)))
|
||||
|
||||
(mount/defstate in-memory-cache
|
||||
:start (atom []))
|
||||
|
||||
(defn refresh-in-memory-cache []
|
||||
(lc/with-context {:source "refreshing-in-memory-cache"}
|
||||
(try
|
||||
(log/info "Refreshing Yodlee in memory cache")
|
||||
(reset! in-memory-cache (get-provider-accounts-with-accounts))
|
||||
|
||||
(catch Exception e
|
||||
(log/error e)))))
|
||||
|
||||
(mount/defstate in-memory-cache-worker
|
||||
:start (scheduler/every (* 5 60 1000) refresh-in-memory-cache)
|
||||
:stop (scheduler/stop in-memory-cache-worker))
|
||||
|
||||
|
||||
(defn refresh-provider-account [id]
|
||||
(swap! in-memory-cache
|
||||
(fn [i]
|
||||
(-> (by :id i)
|
||||
(assoc id (assoc (get-provider-account-detail id)
|
||||
:accounts (get-accounts-for-provider-account id)))
|
||||
vals))))
|
||||
|
||||
(defn delete-provider-account [id]
|
||||
(let [cob-session (login-user)]
|
||||
|
||||
(defn delete-provider-account [client-code id]
|
||||
(let [cob-session (login-user client-code)]
|
||||
|
||||
(-> (str (:yodlee2-base-url env) "/providerAccounts/" id )
|
||||
|
||||
@@ -274,11 +267,49 @@
|
||||
:body
|
||||
:providerAccount
|
||||
first))
|
||||
(swap! in-memory-cache
|
||||
(fn [i]
|
||||
(-> (by :id i)
|
||||
(dissoc id)
|
||||
vals))))
|
||||
@(d/transact conn [:db/retractEntity [:yodlee-provider-account/id id]]))
|
||||
|
||||
(defn upsert-accounts-tx [client-code]
|
||||
(let [provider-accounts (get-provider-accounts client-code)
|
||||
accounts (get-accounts-for-providers client-code (map :id provider-accounts))]
|
||||
(map (fn [pa]
|
||||
{:yodlee-provider-account/id (:id pa)
|
||||
:yodlee-provider-account/status (:status pa)
|
||||
:yodlee-provider-account/detailed-status (-> pa :dataset first :additionalStatus)
|
||||
:yodlee-provider-account/client [:client/code client-code]
|
||||
:yodlee-provider-account/last-updated (-> pa :dataset first :lastUpdated coerce/to-date)
|
||||
:yodlee-provider-account/accounts (mapv
|
||||
(fn [a]
|
||||
{:yodlee-account/id (:id a)
|
||||
:yodlee-account/name (str (:providerName a) " (" (:accountName a) ")")
|
||||
:yodlee-account/number (:accountNumber a)
|
||||
:yodlee-account/status (-> a :dataset first :additionalStatus)
|
||||
:yodlee-account/available-balance (or (-> a :currentBalance :amount)
|
||||
0.0)})
|
||||
(get accounts (:id pa)))})
|
||||
provider-accounts)))
|
||||
|
||||
(defn refresh-provider-account [client-code id]
|
||||
@(d/transact conn (upsert-accounts-tx (client-code->login (client-code->login client-code)
|
||||
id))))
|
||||
|
||||
(defn upsert-accounts []
|
||||
(let [concurrent 20
|
||||
output-chan (async/chan)]
|
||||
(async/pipeline-blocking concurrent
|
||||
output-chan
|
||||
(mapcat (fn [client]
|
||||
(log/info "Upserting Yodlee Accounts for " (:client/code client))
|
||||
(lc/with-context {:client-code (:client/code client)}
|
||||
(upsert-accounts-tx (client-code->login (:client/code client))))))
|
||||
(async/to-chan (d-clients/get-all)))
|
||||
(let [result (async/<!! (async/into [] output-chan))]
|
||||
(log/info "Current yodlee state is " result)
|
||||
@(d/transact conn result))))
|
||||
|
||||
(mount/defstate yodlee-sync-worker
|
||||
:start (scheduler/every (* 5 60 1000) upsert-accounts)
|
||||
:stop (scheduler/stop upsert-accounts))
|
||||
|
||||
(defn update-yodlee [id]
|
||||
(update-provider-account id)
|
||||
@@ -286,7 +317,6 @@
|
||||
|
||||
(defn reauthenticate [pa data]
|
||||
(let [cob-session (login-cobrand)]
|
||||
|
||||
(try
|
||||
|
||||
(doto (-> (str (:yodlee2-base-url env) "/providerAccounts?providerAccountIds=" pa)
|
||||
|
||||
Reference in New Issue
Block a user