Files
integreat/src/clj/auto_ap/yodlee/core.clj
2021-11-30 18:22:07 -08:00

417 lines
16 KiB
Clojure

(ns auto-ap.yodlee.core
(:require [clj-http.client :as client]
[auto-ap.utils :refer [by]]
[cemerick.url :as u]
[unilog.context :as lc]
[clojure.tools.logging :as log]
[clojure.data.json :as json]
[clojure.core.async :as async]
[config.core :refer [env]]
[mount.core :as mount]
[yang.scheduler :as scheduler]))
(defn auth-header
([cob-session] (str "{cobSession=" cob-session "}"))
([cob-session user-session] (str "{cobSession=" cob-session ",userSession=" user-session "}")))
(def other-config
(if (:yodlee-proxy-host env)
{:proxy-host (:yodlee-proxy-host env)
:proxy-port (:yodlee-proxy-port env)
:retry-handler (fn [ex try-count http-context]
(log/error "yodlee Error." ex)
false)
:socket-timeout 60000
:connection-timeout 60000}
{:retry-handler (fn [ex try-count http-context]
(log/error "yodlee Error." ex)
false)
:socket-timeout 60000
:connection-timeout 60000}))
(defn retry-thrice
([x] (retry-thrice x 0))
([x i]
(try
(x)
(catch Exception e
(if (>= i 3)
(throw e)
(retry-thrice x (inc i)))))))
(def base-headers {"Api-Version" "1.1"
"Cobrand-Name" (:yodlee-cobrand-name env)
"Content-Type" "application/json"})
(defn login-cobrand []
(-> (str (:yodlee-base-url env) "/cobrand/login")
(client/post (merge {:headers base-headers
:body
(json/write-str {:cobrand {:cobrandLogin (:yodlee-cobrand-login env)
:cobrandPassword (:yodlee-cobrand-password env)
:locale "en_US"}})
:as :json}
other-config)
)
:body
:session
:cobSession))
(defn login-user
([cob-session] (login-user cob-session (:yodlee-user-login env) (:yodlee-user-password env)))
([cob-session user password]
(-> (str (:yodlee-base-url env) "/user/login")
(client/post (merge {:headers (merge base-headers {"Authorization" (auth-header cob-session)})
:body
(json/write-str {:user {:loginName user
:password password
:locale "en_US"}})
:as :json}
other-config))
:body
:user
:session
:userSession))
)
(defn get-accounts []
(let [cob-session (login-cobrand)
user-session (login-user cob-session)]
(-> (str (:yodlee-base-url env) "/accounts")
(client/get (merge {:headers (merge base-headers {"Authorization" (auth-header cob-session user-session)})
:as :json}
other-config))
:body
:account)))
(defn get-accounts-for-provider-account [provider-account-id]
(try
(let [cob-session (login-cobrand)
user-session (login-user cob-session)]
(retry-thrice
(fn []
(-> (str (:yodlee-base-url env) "/accounts?providerAccountId=" provider-account-id)
(client/get (merge {:headers (merge base-headers {"Authorization" (auth-header cob-session user-session)})
:as :json}
other-config))
:body
:account))))
(catch Exception e
(log/error (str "Couldn't get accounts for provider account '" provider-account-id "'")
e)
[])))
(defn get-account [i]
(let [cob-session (login-cobrand)
user-session (login-user cob-session)]
(-> (str (:yodlee-base-url env) (str "/accounts/" i))
(client/get (merge {:headers (merge base-headers {"Authorization" (auth-header cob-session user-session)})
:as :json}
other-config))
:body
:account)))
(defn get-provider-accounts []
(let [cob-session (login-cobrand)
user-session (login-user cob-session)]
(-> (str (:yodlee-base-url env) "/providerAccounts")
(-> (client/get (merge {:headers (merge base-headers {"Authorization" (auth-header cob-session user-session)})
:query-params {"include" "credentials,questions,preferences"}
:as :json}
other-config))
:body
:providerAccount))))
(defn get-transactions []
(let [cob-session (login-cobrand)
user-session (login-user cob-session)
batch-size 100
get-transaction-batch (fn [skip]
(-> (str (:yodlee-base-url env) "/transactions?top=" batch-size "&skip=" skip)
(client/get (merge {:headers (merge base-headers {"Authorization" (auth-header cob-session user-session)})
:as :json}
other-config))
:body
:transaction
))]
(loop [transactions []
skip 0]
(let [transaction-batch (get-transaction-batch skip)]
(if (seq transaction-batch)
(recur (concat transactions transaction-batch) (+ batch-size skip))
transactions)))))
(defn get-provider-accounts []
(let [cob-session (login-cobrand)
user-session (login-user cob-session)
batch-size 100]
(-> (str (:yodlee-base-url env) "/providerAccounts")
(client/get (merge {:headers (merge base-headers {"Authorization" (auth-header cob-session user-session)})
:as :json}
other-config))
:body
:providerAccount
)))
(defn get-provider-account [id]
(let [cob-session (login-cobrand)
user-session (login-user cob-session)
batch-size 100]
(-> (str (:yodlee-base-url env) "/providerAccounts/" id)
(client/get (merge {:headers (merge base-headers {"Authorization" (auth-header cob-session user-session)})
:as :json}
other-config))
:body
:providerAccount)))
(defn get-provider-account-detail [id]
(let [cob-session (login-cobrand)
user-session (login-user cob-session)
batch-size 100]
(-> (str (:yodlee-base-url env) "/providerAccounts/" id )
(client/get (merge {:headers (merge base-headers {"Authorization" (auth-header cob-session user-session)})
:query-params {"include" "credentials,preferences"}
:as :json}
other-config))
:body
:providerAccount
first)))
(defn update-provider-account [pa]
(let [cob-session (login-cobrand)
user-session (login-user cob-session)
batch-size 100]
(-> (str (:yodlee-base-url env) "/providerAccounts?providerAccountIds=" pa)
(client/put (merge {:headers (merge base-headers {"Authorization" (auth-header cob-session user-session)})
:body "{\"dataSetName\": [\"BASIC_AGG_DATA\"]}"
:as :json}
other-config)))))
(defn get-specific-transactions [account]
(let [cob-session (login-cobrand)
user-session (login-user cob-session)
batch-size 100
get-transaction-batch (fn [skip]
(-> (str (:yodlee-base-url env) "/transactions?top=" batch-size "&skip=" skip "&accountId=" account)
(client/get (merge {:headers (merge base-headers {"Authorization" (auth-header cob-session user-session)})
:as :json}
other-config))
:body
:transaction
))]
(loop [transactions []
skip 0]
(let [transaction-batch (get-transaction-batch skip)]
(if (seq transaction-batch)
(recur (concat transactions transaction-batch) (+ batch-size skip))
transactions)))))
(defn get-specific-transactions-with-date [account start end]
(let [cob-session (login-cobrand)
user-session (login-user cob-session)
batch-size 100
get-transaction-batch (fn [skip]
(-> (str (:yodlee-base-url env) "/transactions?top=" batch-size "&fromDate=" start "&toDate=" end "&skip=" skip "&accountId=" account)
(client/get (merge {:headers (merge base-headers {"Authorization" (auth-header cob-session user-session)})
:as :json}
other-config))
:body
:transaction
))]
(loop [transactions []
skip 0]
(let [transaction-batch (get-transaction-batch skip)]
(if (seq transaction-batch)
(recur (concat transactions transaction-batch) (+ batch-size skip))
transactions)))))
(defn count-specific-transactions [account]
(let [cob-session (login-cobrand)
user-session (login-user cob-session)]
(-> (str (:yodlee-base-url env) "/transactions/count?accountId=" account)
(client/get (merge {:headers (merge base-headers {"Authorization" (auth-header cob-session user-session)})
:as :json}
other-config))
:body
:transaction
)))
(defn get-access-token []
(try
(let [cob-session (login-cobrand)
user-session (login-user cob-session)
token (->
(str (:yodlee-base-url env) "/user/accessTokens?appIds=" 10003600)
(client/get
(merge {:headers (merge base-headers {"Authorization" (auth-header cob-session user-session)})
:as :json}
other-config))
(doto log/info)
:body
:user
:accessTokens
first
:value
)]
[user-session token])
(catch Exception e
(log/error e)
(throw e))))
(defn create-user []
(let [cob-session (login-cobrand)]
(-> (str (:yodlee-base-url env) "/user/register")
(client/post (merge {:headers (merge base-headers {"Authorization" (auth-header cob-session)})
:body (json/write-str {:user {:loginName "brycesPersoonal2"
:password "kV@mdv3TU11"
:email "yodleepersonal2@brycecovertoperations.com"}})
:as :json}
other-config))
:body)))
(defn get-provider-accounts-with-details []
(let [provider-accounts (get-provider-accounts)]
(let [concurrent 20
output-chan (async/chan)]
(async/pipeline-blocking concurrent
output-chan
(map (fn [provider-account]
(lc/with-context {:provider-account-id (:id provider-account)}
(get-provider-account-detail (:id provider-account)))))
(async/to-chan provider-accounts)
true
(fn [e]
(lc/with-context {:source "Yodlee"}
(log/error "Yodlee pipeline error" e))))
(async/<!! (async/into [] output-chan)))))
(defn concurrent-get-accounts-for-providers [provider-account-ids]
(let [concurrent 20
output-chan (async/chan)]
(async/pipeline-blocking concurrent
output-chan
(map (fn [provider-account-id]
(lc/with-context {:provider-account-id provider-account-id}
[provider-account-id
(get-accounts-for-provider-account provider-account-id)])))
(async/to-chan provider-account-ids)
true
(fn [e]
(lc/with-context {:source "Yodlee"}
(log/error "Yodlee pipeline error" e))))
(async/<!! (async/into {} output-chan))))
(defn get-provider-accounts-with-accounts []
(let [provider-accounts (by :id (get-provider-accounts-with-details))
accounts (concurrent-get-accounts-for-providers (keys provider-accounts))]
(->> accounts
(reduce
(fn [provider-accounts [which accounts]]
(assoc-in provider-accounts [which :accounts] accounts))
provider-accounts)
vals)))
(mount/defstate in-memory-cache
:start (atom []))
(defn refresh-in-memory-cache []
(lc/with-context {:source "refreshing-in-memory-cache"}
(try
(log/info "Refreshing Yodlee in memory cache")
(reset! in-memory-cache (get-provider-accounts-with-accounts))
(catch Exception e
(log/error e)))))
(mount/defstate in-memory-cache-worker
:start (scheduler/every (* 5 60 1000) refresh-in-memory-cache)
:stop (scheduler/stop in-memory-cache-worker))
(defn refresh-provider-account [id]
(swap! in-memory-cache
(fn [i]
(-> (by :id i)
(assoc id (assoc (get-provider-account-detail id)
:accounts (get-accounts-for-provider-account id)))
vals))))
(defn delete-provider-account [id]
(let [cob-session (login-cobrand)
user-session (login-user cob-session)
batch-size 100]
(-> (str (:yodlee-base-url env) "/providerAccounts/" id )
(client/delete (merge {:headers (merge base-headers {"Authorization" (auth-header cob-session user-session)})
:as :json}
other-config))
:body
:providerAccount
first))
(swap! in-memory-cache
(fn [i]
(-> (by :id i)
(dissoc id)
vals))))
(defn update-yodlee [id]
(update-provider-account id)
(refresh-provider-account id)
)
(defn reauthenticate [pa data]
(let [cob-session (login-cobrand)
user-session (login-user cob-session)
batch-size 100]
(try
(doto (-> (str (:yodlee-base-url env) "/providerAccounts?providerAccountIds=" pa)
(client/put (merge {:headers (merge base-headers {"Authorization" (auth-header cob-session user-session)})
:body (json/write-str data)
:as :json}
other-config)))
log/info)
(refresh-provider-account pa)
(catch Exception e
(log/error e)))))
#_(defn get-users []
(let [cob-session (login-cobrand)]
(-> "https://developer.api.yodlee.com/ysl/user"
(client/get {:headers (merge base-headers {"Authorization" (auth-header cob-session)})
:as :json})
:body)))