Files
integreat/src/clj/auto_ap/yodlee/import.clj
2021-12-20 20:20:40 -08:00

486 lines
25 KiB
Clojure

(ns auto-ap.yodlee.import
(:require
[auto-ap.datomic :refer [audit-transact conn remove-nils uri]]
[auto-ap.datomic.accounts :as a]
[auto-ap.datomic.checks :as d-checks]
[auto-ap.datomic.clients :as d-clients]
[auto-ap.datomic.transaction-rules :as tr]
[auto-ap.datomic.transactions :as d-transactions]
[auto-ap.rule-matching :as rm]
[auto-ap.time :as time]
[auto-ap.utils :refer [allow-once by dollars=]]
[auto-ap.yodlee.core :as client]
[auto-ap.yodlee.core2 :as client2]
[clj-time.coerce :as coerce]
[clj-time.core :as t]
[clojure.string :as str]
[clojure.tools.logging :as log]
[datomic.api :as d]
[digest :refer [sha-256]]
[mount.core :as mount]
[unilog.context :as lc]
[yang.scheduler :as scheduler]))
(defn rough-match [client-id bank-account-id amount]
(if (and client-id bank-account-id amount)
(let [[matching-checks] (d-checks/get-graphql {:client-id client-id
:bank-account-id bank-account-id
:amount (- amount)
:status :payment-status/pending})]
(if (= 1 (count matching-checks))
(first matching-checks)
nil))
nil))
(defn transaction->existing-payment [_ check-number client-id bank-account-id amount id]
(log/info "Searching for a matching check for "
{:client-id client-id
:check-number check-number
:bank-account-id bank-account-id
:amount amount})
(cond (not (and client-id bank-account-id))
nil
(:transaction/payment (d-transactions/get-by-id [:transaction/id (sha-256 (str id))]))
nil
check-number
(or (-> (d-checks/get-graphql {:client-id client-id
:bank-account-id bank-account-id
:check-number check-number
:amount (- amount)
:status :payment-status/pending})
first
first)
(rough-match client-id bank-account-id amount))
:else
(rough-match client-id bank-account-id amount)))
(defn aggregate-results [results]
(reduce
(fn [acc result]
(merge-with + acc
{:import-batch/imported (count (:import result))
:import-batch/extant (count (:extant result))
:import-batch/suppressed (count (:suppressed result))}))
{:import-batch/imported 0
:import-batch/extant 0
:import-batch/suppressed 0}
results))
(defn match-transaction-to-unfulfilled-autopayments [amount client-id]
(log/info "trying to find uncleared autopay invoices")
(let [candidate-invoices-vendor-groups (->> (d/query {:query {:find ['?vendor-id '?e '?total '?sd]
:in ['$ '?client-id]
:where ['[?e :invoice/client ?client-id]
'[?e :invoice/scheduled-payment ?sd]
'[?e :invoice/status :invoice-status/paid]
'(not [_ :invoice-payment/invoice ?e])
'[?e :invoice/vendor ?vendor-id]
'[?e :invoice/total ?total]]}
:args [(d/db conn) client-id]})
(sort-by last) ;; sort by scheduled payment date
(group-by first) ;; group by vendors
vals)
considerations (for [candidate-invoices candidate-invoices-vendor-groups
invoice-count (range 1 32)
consideration (partition invoice-count 1 candidate-invoices)
:when (dollars= (reduce (fn [acc [_ _ amount]]
(+ acc amount)) 0.0 consideration)
(- amount))]
consideration)]
(log/info "Found " (count considerations) "considerations for transaction of" amount)
considerations
))
(defn match-transaction-to-unpaid-invoices [amount client-id]
(log/info "trying to find unpaid invoices for " client-id amount)
(let [candidate-invoices-vendor-groups (->> (d/query {:query {:find ['?vendor-id '?e '?outstanding-balance '?d]
:in ['$ '?client-id]
:where ['[?e :invoice/client ?client-id]
'[?e :invoice/status :invoice-status/unpaid]
'(not [_ :invoice-payment/invoice ?e])
'[?e :invoice/vendor ?vendor-id]
'[?e :invoice/outstanding-balance ?outstanding-balance]
'[?e :invoice/date ?d]]}
:args [(d/db conn) client-id]})
(sort-by last) ;; sort by scheduled payment date
(group-by first) ;; group by vendors
vals)
considerations (for [candidate-invoices candidate-invoices-vendor-groups
invoice-count (range 1 32)
consideration (partition invoice-count 1 candidate-invoices)
:when (dollars= (reduce (fn [acc [_ _ amount]]
(+ acc amount)) 0.0 consideration)
(- amount))]
consideration)]
(log/info "Found " (count considerations) "unpaid invoice considerations for transaction of" amount)
considerations))
(defn match-transaction-to-single-unfulfilled-autopayments [amount client-id]
(let [considerations (match-transaction-to-unfulfilled-autopayments amount client-id)]
(if (= 1 (count considerations))
(first considerations)
[])))
(defn add-new-payment [[transaction :as tx] [[vendor] :as invoice-payments] bank-account-id client-id]
(log/info "Adding a new payment for transaction " (:transaction/id transaction) " and invoices " invoice-payments)
(let [payment-id (d/tempid :db.part/user)]
(-> tx
(conj {:payment/bank-account bank-account-id
:payment/client client-id
:payment/amount (- (:transaction/amount transaction))
:payment/vendor vendor
:payment/date (:transaction/date transaction)
:payment/type :payment-type/debit
:payment/status :payment-status/cleared
:db/id payment-id})
(into (mapcat (fn [[vendor invoice-id invoice-amount]]
[{:invoice-payment/invoice invoice-id
:invoice-payment/payment payment-id
:invoice-payment/amount invoice-amount}
{:db/id invoice-id
:invoice/outstanding-balance 0.0
:invoice/status :invoice-status/paid}])
invoice-payments))
(update 0 assoc
:transaction/payment payment-id
:transaction/approval-status :transaction-approval-status/approved
:transaction/vendor vendor
:transaction/location "A")
(conj [:reset (:db/id transaction) :transaction/accounts
[#:transaction-account
{:account (:db/id (a/get-account-by-numeric-code-and-sets 21000 ["default"]))
:location "A"
:amount (Math/abs (:transaction/amount transaction))}]]))))
(defn extract-check-number [{{description-original :original} :description}]
(if-let [[_ _ check-number] (re-find #"(?i)check(card|[^0-9]+([0-9]*))" description-original)]
(try
(Integer/parseInt check-number)
(catch NumberFormatException e
nil))
nil))
(defn find-expected-deposit [client-id amount date]
(when date
(-> (d/q
'[:find ?ed
:in $ ?c ?a ?d-start
:where
[?ed :expected-deposit/client ?c]
(not [?ed :expected-deposit/status :expected-deposit-status/cleared])
[?ed :expected-deposit/date ?d]
[(>= ?d ?d-start)]
[?ed :expected-deposit/total ?a2]
[(auto-ap.utils/dollars= ?a2 ?a)]
]
(d/db conn) client-id amount (coerce/to-date (t/plus date (t/days -10))))
first
first)))
(defn transactions->txs [transactions bank-account import-id apply-rules existing]
(let [client (:client/_bank-accounts bank-account)
client-id (:db/id client)
bank-account-id (:db/id bank-account)
valid-locations (or (:bank-account/locations bank-account) (:client/locations client))
grouped-transactions (group-by
(fn [{id :id
date :date
status :status :as transaction}]
(let [date (time/parse date "YYYY-MM-dd")
id (sha-256 (str id))]
(cond (= :transaction-approval-status/suppressed (existing id))
:suppressed
(existing id)
:extant
(not client)
:error
(not bank-account)
:error
(not= "POSTED" status)
:not-posted
(and (:bank-account/start-date bank-account)
(not (t/after? date (-> bank-account :bank-account/start-date coerce/to-date-time))))
:not-ready
:else
:import)))
transactions)]
(update grouped-transactions :import
(fn [transactions]
(into []
(for [transaction transactions
:let [{post-date :postDate
account-id :accountId
date :date
id :id
{amount :amount} :amount
{description-original :original
description-simple :simple} :description
{merchant-id :id
merchant-name :name} :merchant
base-type :baseType
type :type
status :status}
transaction
amount (if (= "DEBIT" base-type)
(- amount)
amount)
check-number (extract-check-number transaction)
date (time/parse date "YYYY-MM-dd")]]
(let [
existing-check (transaction->existing-payment transaction check-number client-id bank-account-id amount id)
autopay-invoices-matches (when-not existing-check
(match-transaction-to-unfulfilled-autopayments amount client-id))
unpaid-invoices-matches (when-not existing-check
(match-transaction-to-unpaid-invoices amount client-id ))
expected-deposit (when (and (> amount 0.0)
(not existing-check))
(find-expected-deposit (:db/id client) amount date))]
(cond->
[#:transaction
{:post-date (coerce/to-date (time/parse post-date "YYYY-MM-dd"))
:id (sha-256 (str id))
:raw-id (str id)
:account-id account-id
:date (coerce/to-date date)
:amount (double amount)
:import-batch/_entry import-id
:description-original (some-> description-original (str/replace #"\s+" " "))
:description-simple (some-> description-simple (str/replace #"\s+" " "))
:approval-status :transaction-approval-status/unapproved
:type type
:status status
:client client-id
:check-number check-number
:bank-account bank-account-id}]
existing-check (update 0 #(assoc % :transaction/approval-status :transaction-approval-status/approved
:transaction/payment {:db/id (:db/id existing-check)
:payment/status :payment-status/cleared}
:transaction/vendor (:db/id (:payment/vendor existing-check))
:transaction/location "A"
:transaction/accounts [#:transaction-account
{:account (:db/id (a/get-account-by-numeric-code-and-sets 21000 ["default"]))
:location "A"
:amount (Math/abs (double amount))}]))
;; temporarily removed to automatically match autopaid invoices
#_(and (not existing-check)
(seq autopay-invoices-matches)) #_(add-new-payment autopay-invoices-matches bank-account-id client-id)
expected-deposit (update 0 #(assoc % :transaction/expected-deposit {:db/id expected-deposit
:expected-deposit/status :expected-deposit-status/cleared}))
(and (not (seq autopay-invoices-matches))
(not (seq unpaid-invoices-matches))
(not expected-deposit)) (update 0 #(apply-rules % valid-locations))
true (update 0 remove-nils)))))))))
(defn batch-transact [transactions]
(reduce (fn [acc batch]
(into acc (->> batch
(d/transact (d/connect uri) )
(deref)
:tempids
vals)))
[]
(partition-all 100 transactions)))
(defn get-existing [bank-account]
(into {}
(d/query {:query {:find ['?tid '?as2]
:in ['$ '?ba]
:where ['[?e :transaction/bank-account ?ba]
'[?e :transaction/id ?tid]
'[?e :transaction/approval-status ?as]
'[?as :db/ident ?as2]]}
:args [(d/db (d/connect uri)) bank-account]})))
(defn get-all-bank-accounts []
(->> (d-clients/get-all)
(mapcat (fn [client]
(->> client
:client/bank-accounts
#_(filter :bank-account/yodlee-account-id)
(filter :db/id)
(map (fn [{:keys [:db/id :bank-account/yodlee-account-id] :as bank-account}]
(assoc bank-account
:client/_bank-accounts client))))))))
(defn start-import [source user]
(get (:tempids @(d/transact conn [{:db/id "import-batch"
:import-batch/date (coerce/to-date (t/now))
:import-batch/source source
:import-batch/status :import-status/started
:import-batch/user-name user}])) "import-batch"))
(defn finish-import [i]
@(d/transact conn [(assoc i :import-batch/status :import-status/completed)]))
(defn import-for-bank-account [transactions bank-account-id import-id]
(let [all-rules (tr/get-all)
bank-account (d/pull (d/db conn)
[:bank-account/code
:db/id
:bank-account/locations
:bank-account/start-date
{:client/_bank-accounts [:client/code :client/locations :db/id]} ]
bank-account-id)
_ (log/infof "Importing %d transactions for bank account %s for client %s" (count transactions) (-> bank-account :bank-account/code) (-> bank-account :client/_bank-accounts :client/code))
transactions (transactions->txs transactions bank-account import-id (rm/rule-applying-fn all-rules) (get-existing bank-account-id))]
(doseq [tx (:import transactions)]
(audit-transact tx {:user/name "Yodlee import"
:user/role ":admin"}))
(log/infof "Completed import for %s" bank-account)
transactions))
(defn grouped-import [source user manual-transactions]
(let [import-id (start-import source user)]
(lc/with-context {:import-id import-id}
(try
(let [transactions-by-bank-account (->> manual-transactions
(filter #(= "posted" (:status %)))
(group-by #(select-keys % [:date :description-original :amount :client-id :bank-account-id]))
(vals)
(mapcat (fn [transaction-group]
(map
(fn [index {:keys [date description-original high-level-category amount bank-account-id client-id] :as transaction}]
{:id (str date "-" bank-account-id "-" description-original "-" amount "-" index "-" client-id)
:bank-account-id bank-account-id
:date (time/unparse date "YYYY-MM-dd")
:amount {:amount amount}
:description {:original description-original
:simple high-level-category}
:status "POSTED"})
(range)
transaction-group)))
(group-by :bank-account-id))
result (->> transactions-by-bank-account
(filter (fn [[bank-account]]
bank-account))
(map (fn [[bank-account transactions]]
(import-for-bank-account transactions bank-account import-id)))
aggregate-results)]
(finish-import (assoc result :db/id import-id)))
(catch Exception e
(log/error e)
(finish-import {:db/id import-id})
)))))
(defn grouped-new [manual-transactions]
(let [transformed-transactions (->> manual-transactions
(filter #(= "posted" (:status %)))
(group-by #(select-keys % [:date :description-original :amount :client-id :bank-account-id]))
(vals)
(mapcat (fn [transaction-group]
(map
(fn [index {:keys [date description-original high-level-category amount bank-account-id client-id] :as transaction}]
{:id (str date "-" bank-account-id "-" description-original "-" amount "-" index "-" client-id)
:bank-account-id bank-account-id
:date (time/unparse date "YYYY-MM-dd")
:amount {:amount amount}
:description {:original description-original
:simple high-level-category}
:status "POSTED"})
(range)
transaction-group))))
all-rules (tr/get-all)
all-bank-accounts (by :db/id (get-all-bank-accounts))
transaction->bank-account (comp all-bank-accounts :bank-account-id)
transactions (transactions->txs transformed-transactions nil transaction->bank-account (rm/rule-applying-fn all-rules) (get-existing))]
transactions))
(defn import-yodlee [transactions]
(lc/with-context {:source "Import yodlee transactions"}
(let [import-id (start-import :import-source/yodlee "Automated Yodlee User")]
(lc/with-context {:import-id import-id}
(try
(log/info "importing from yodlee")
(let [account-lookup (->> (d/q '[:find ?ya ?ba
:in $
:where [?ba :bank-account/yodlee-account-id ?ya]]
(d/db conn))
(into {}))
transactions-by-bank-account (group-by
#(account-lookup (:accountId %))
transactions)
result (->> transactions-by-bank-account
(filter (fn [[bank-account]]
bank-account))
(map (fn [[bank-account transactions]]
(import-for-bank-account transactions bank-account import-id)))
aggregate-results)]
(finish-import (assoc result :db/id import-id)))
(catch Exception e
(log/error e)
(finish-import {:db/id import-id})))))))
(defn do-import []
(import-yodlee (client/get-transactions)))
(def do-import (allow-once do-import))
(defn import-yodlee2
[transactions]
(lc/with-context {:source "Import yodlee2 transactions"}
(let [import-id (start-import :import-source/yodlee2 "Automated Yodlee User")]
(lc/with-context {:import-id import-id}
(try
(let [account-lookup (->> (d/q '[:find ?ya ?ba
:in $
:where
[?ba :bank-account/yodlee-account ?y]
[?y :yodlee-account/id ?ya]]
(d/db conn))
(into {}))
transactions-by-bank-account (group-by
#(account-lookup (:accountId %))
transactions)
result (->> transactions-by-bank-account
(filter (fn [[bank-account]]
bank-account))
(map (fn [[bank-account transactions]]
(import-for-bank-account transactions bank-account import-id)))
aggregate-results)]
(finish-import (assoc result :db/id import-id)))
(catch Exception e
(log/error e)
(finish-import {:db/id import-id})))))))
(defn do-import2 []
(log/info "starting import")
(import-yodlee2 (client2/get-transactions "NGGL")))
(def do-import2 (allow-once do-import2))
(mount/defstate import-transaction-worker
:start (scheduler/every (* 1000 60 60 4) do-import)
:stop (scheduler/stop import-transaction-worker))