Makes integreat run on datomic cloud

This commit is contained in:
2022-08-23 12:13:12 -07:00
parent 58b9dcf272
commit d02fba2b44
58 changed files with 2163 additions and 1257 deletions

View File

@@ -1,28 +1,29 @@
(ns auto-ap.import.common
(:require
[auto-ap.datomic :refer [conn]]
[auto-ap.datomic :refer [conn pull-ref random-tempid]]
[clojure.tools.logging :as log]
[datomic.api :as d]))
[datomic.client.api :as dc]))
(defn bank-account->integration-id [bank-account]
(or (->> bank-account
(d/pull (d/db conn) [:bank-account/integration-status])
:bank-account/integration-status
:db/id)
#db/id[:db.part/user]))
(or (pull-ref (dc/db conn) :bank-account/integration-status bank-account)
(random-tempid)))
(defn wrap-integration [f bank-account]
(try
(let [result (f)]
@(d/transact conn [{:db/id bank-account :bank-account/integration-status {:db/id (bank-account->integration-id bank-account)
:integration-status/state :integration-state/success
:integration-status/last-attempt (java.util.Date.)
:integration-status/last-updated (java.util.Date.)}}])
(dc/transact conn {:tx-data [{:db/id bank-account
:bank-account/integration-status
{:db/id (bank-account->integration-id bank-account)
:integration-status/state :integration-state/success
:integration-status/last-attempt (java.util.Date.)
:integration-status/last-updated (java.util.Date.)}}]})
result)
(catch Exception e
@(d/transact conn [{:db/id bank-account :bank-account/integration-status {:db/id (bank-account->integration-id bank-account)
:integration-status/state :integration-state/failed
:integration-status/last-attempt (java.util.Date.)
:integration-status/message (.getMessage e)}}])
(dc/transact conn {:tx-data [{:db/id bank-account
:bank-account/integration-status
{:db/id (bank-account->integration-id bank-account)
:integration-status/state :integration-state/failed
:integration-status/last-attempt (java.util.Date.)
:integration-status/message (.getMessage e)}}]})
(log/warn e)
nil)))

View File

@@ -10,10 +10,12 @@
[clojure.string :as str]
[clojure.tools.logging :as log]
[com.unbounce.dogstatsd.core :as statsd]
[datomic.api :as d]))
[datomic.client.api :as dc]
[mount.core :as mount]
[yang.scheduler :as scheduler]))
(defn get-intuit-bank-accounts [db]
(d/q '[:find ?external-id ?ba ?c
(dc/q '[:find ?external-id ?ba ?c
:in $
:where
[?c :client/bank-accounts ?ba]
@@ -49,7 +51,7 @@
:priority :low}
nil)
(let [import-batch (t/start-import-batch :import-source/intuit "Automated intuit user")
db (d/db conn)
db (dc/db conn)
end (auto-ap.time/local-now)
start (time/plus end (time/days -30))]
(try
@@ -76,8 +78,8 @@
(defn upsert-accounts []
(let [token (i/get-fresh-access-token)
bank-accounts (i/get-bank-accounts token)]
@(d/transact conn (mapv
(fn [ba]
{:intuit-bank-account/external-id (:name ba)
:intuit-bank-account/name (:name ba)})
bank-accounts))))
(dc/transact conn {:tx-data (mapv
(fn [ba]
{:intuit-bank-account/external-id (:name ba)
:intuit-bank-account/name (:name ba)})
bank-accounts)})))

View File

@@ -5,7 +5,7 @@
[auto-ap.import.transactions :as t]
[clj-time.coerce :as coerce]
[clojure.data.csv :as csv]
[datomic.api :as d]
[datomic.client.api :as dc]
[unilog.context :as lc]))
@@ -35,17 +35,17 @@
(defn import-batch [transactions user]
(lc/with-context {:source "Manual import transactions"}
(let [bank-account-code->client (into {}
(d/q '[:find ?bac ?c
(dc/q '[:find ?bac ?c
:in $
:where
[?c :client/bank-accounts ?ba]
[?ba :bank-account/code ?bac]]
(d/db conn)))
(dc/db conn)))
bank-account-code->bank-account (into {}
(d/q '[:find ?bac ?ba
(dc/q '[:find ?bac ?ba
:in $
:where [?ba :bank-account/code ?bac]]
(d/db conn)))
(dc/db conn)))
import-batch (t/start-import-batch :import-source/manual user)
transactions (->> transactions
(map (fn [t]

View File

@@ -8,12 +8,12 @@
[auto-ap.utils :refer [allow-once by]]
[clj-time.coerce :as coerce]
[clj-time.core :as time]
[datomic.api :as d]
[datomic.client.api :as dc]
[digest :as di]
[unilog.context :as lc]))
(defn get-plaid-accounts [db]
(-> (d/q '[:find ?ba ?c ?external-id ?t
(-> (dc/q '[:find ?ba ?c ?external-id ?t
:in $
:where
[?c :client/bank-accounts ?ba]
@@ -42,7 +42,7 @@
end (atime/local-now)
start (time/plus end (time/days -30))]
(try
(doseq [[bank-account-id client-id external-id access-token] (get-plaid-accounts (d/db conn))
(doseq [[bank-account-id client-id external-id access-token] (get-plaid-accounts (dc/db conn))
:let [transaction-result (wrap-integration #(p/get-transactions access-token external-id start end)
bank-account-id)
accounts-by-id (by :account_id (:accounts transaction-result))]

View File

@@ -1,6 +1,6 @@
(ns auto-ap.import.transactions
(:require
[auto-ap.datomic :refer [conn remove-nils uri]]
[auto-ap.datomic :refer [audit-transact conn random-tempid remove-nils]]
[auto-ap.datomic.accounts :as a]
[auto-ap.datomic.checks :as d-checks]
[auto-ap.datomic.transaction-rules :as tr]
@@ -13,7 +13,7 @@
[clj-time.core :as t]
[clojure.core.cache :as cache]
[clojure.tools.logging :as log]
[datomic.api :as d]
[datomic.client.api :as dc]
[digest :as di]))
(defn rough-match [client-id bank-account-id amount]
@@ -56,19 +56,19 @@
(defn match-transaction-to-unfulfilled-autopayments [amount client-id]
(log/info "trying to find uncleared autopay invoices")
(let [candidate-invoices-vendor-groups (->> (d/query {:query {:find ['?vendor-id '?e '?total '?sd]
:in ['$ '?client-id]
:where ['[?e :invoice/client ?client-id]
'[?e :invoice/scheduled-payment ?sd]
'[?e :invoice/status :invoice-status/paid]
'(not [_ :invoice-payment/invoice ?e])
'[?e :invoice/vendor ?vendor-id]
'[?e :invoice/total ?total]]}
:args [(d/db conn) client-id]})
(let [candidate-invoices-vendor-groups (->> (dc/q {:query {:find ['?vendor-id '?e '?total '?sd]
:in ['$ '?client-id]
:where ['[?e :invoice/client ?client-id]
'[?e :invoice/scheduled-payment ?sd]
'[?e :invoice/status :invoice-status/paid]
'(not [_ :invoice-payment/invoice ?e])
'[?e :invoice/vendor ?vendor-id]
'[?e :invoice/total ?total]]}
:args [(dc/db conn) client-id]})
(sort-by last) ;; sort by scheduled payment date
(group-by first) ;; group by vendors
vals)
considerations (for [candidate-invoices candidate-invoices-vendor-groups
considerations (for [candidate-invoices candidate-invoices-vendor-groups
invoice-count (range 1 32)
consideration (partition invoice-count 1 candidate-invoices)
:when (dollars= (reduce (fn [acc [_ _ amount]]
@@ -81,19 +81,19 @@
(defn match-transaction-to-unpaid-invoices [amount client-id]
(log/info "trying to find unpaid invoices for " client-id amount)
(let [candidate-invoices-vendor-groups (->> (d/query {:query {:find ['?vendor-id '?e '?outstanding-balance '?d]
:in ['$ '?client-id]
:where ['[?e :invoice/client ?client-id]
'[?e :invoice/status :invoice-status/unpaid]
'(not [_ :invoice-payment/invoice ?e])
'[?e :invoice/vendor ?vendor-id]
'[?e :invoice/outstanding-balance ?outstanding-balance]
'[?e :invoice/date ?d]]}
:args [(d/db conn) client-id]})
(let [candidate-invoices-vendor-groups (->> (dc/q {:query {:find ['?vendor-id '?e '?outstanding-balance '?d]
:in ['$ '?client-id]
:where ['[?e :invoice/client ?client-id]
'[?e :invoice/status :invoice-status/unpaid]
'(not [_ :invoice-payment/invoice ?e])
'[?e :invoice/vendor ?vendor-id]
'[?e :invoice/outstanding-balance ?outstanding-balance]
'[?e :invoice/date ?d]]}
:args [(dc/db conn) client-id]})
(sort-by last) ;; sort by scheduled payment date
(group-by first) ;; group by vendors
vals)
considerations (for [candidate-invoices candidate-invoices-vendor-groups
considerations (for [candidate-invoices candidate-invoices-vendor-groups
invoice-count (range 1 32)
consideration (partition invoice-count 1 candidate-invoices)
:when (dollars= (reduce (fn [acc [_ _ amount]]
@@ -111,7 +111,7 @@
(defn add-new-payment [[transaction :as tx] [[vendor] :as invoice-payments] bank-account-id client-id]
(log/info "Adding a new payment for transaction " (:transaction/id transaction) " and invoices " invoice-payments)
(let [payment-id (d/tempid :db.part/user)]
(let [payment-id (random-tempid)]
(-> tx
(conj {:payment/bank-account bank-account-id
@@ -142,7 +142,6 @@
:amount (Math/abs (:transaction/amount transaction))}]]))))
(defn extract-check-number [{:transaction/keys [description-original]}]
(if-let [[_ _ check-number] (re-find #"(?i)check(card|[^0-9]+([0-9]*))" description-original)]
(try
(Integer/parseInt check-number)
@@ -152,8 +151,8 @@
(defn find-expected-deposit [client-id amount date]
(when date
(-> (d/q
'[:find [(pull ?ed [:db/id {:expected-deposit/vendor [:db/id]}]) ...]
(-> (dc/q
'[:find (pull ?ed [:db/id {:expected-deposit/vendor [:db/id]}])
:in $ ?c ?a ?d-start
:where
[?ed :expected-deposit/client ?c]
@@ -163,8 +162,8 @@
[?ed :expected-deposit/total ?a2]
[(auto-ap.utils/dollars= ?a2 ?a)]
]
(d/db conn) client-id amount (coerce/to-date (t/plus date (t/days -10))))
first)))
(dc/db conn) client-id amount (coerce/to-date (t/plus date (t/days -10))))
ffirst)))
(defn categorize-transaction [transaction bank-account existing]
@@ -242,7 +241,7 @@
))))
(defn maybe-code [{:transaction/keys [client amount] :as transaction} apply-rules valid-locations]
(when-not (seq (match-transaction-to-unpaid-invoices amount client))
(when (seq (match-transaction-to-unpaid-invoices amount client))
(apply-rules transaction valid-locations)))
(defn transaction->txs [transaction bank-account apply-rules]
@@ -267,13 +266,13 @@
(defn get-existing [bank-account]
(log/info "looking up bank account data for" bank-account)
(into {}
(d/query {:query {:find ['?tid '?as2]
(dc/q {:query {:find ['?tid '?as2]
:in ['$ '?ba]
:where ['[?e :transaction/bank-account ?ba]
'[?e :transaction/id ?tid]
'[?e :transaction/approval-status ?as]
'[?as :db/ident ?as2]]}
:args [(d/db (d/connect uri)) bank-account]})))
:args [(dc/db conn) bank-account]})))
(defprotocol ImportBatch
(import-transaction! [this transaction])
@@ -288,17 +287,17 @@
:import-batch/not-ready 0
:import-batch/extant 0})
extant-cache (atom (cache/ttl-cache-factory {} :ttl 60000 ))
import-id (get (:tempids @(d/transact conn [{:db/id "import-batch"
:import-batch/date (coerce/to-date (t/now))
:import-batch/source source
:import-batch/status :import-status/started
:import-batch/user-name user}])) "import-batch")
import-id (get (:tempids (dc/transact conn {:tx-data [{:db/id "import-batch"
:import-batch/date (coerce/to-date (t/now))
:import-batch/source source
:import-batch/status :import-status/started
:import-batch/user-name user}]})) "import-batch")
rule-applying-function (rm/rule-applying-fn (tr/get-all))]
(log/info "Importing transactions from " source)
(reify ImportBatch
(import-transaction! [_ transaction]
(let [bank-account (d/pull (d/db conn)
(let [bank-account (dc/pull (dc/db conn)
[:bank-account/code
:db/id
:bank-account/locations
@@ -327,14 +326,14 @@
(fail! [_ error]
(log/errorf "Couldn't complete import %d with error." import-id)
(log/error error)
@(d/transact conn [(merge {:db/id import-id
:import-batch/status :import-status/completed
:import-batch/error-message (str error)}
@stats)]))
(dc/transact conn {:tx-data [(merge {:db/id import-id
:import-batch/status :import-status/completed
:import-batch/error-message (str error)}
@stats)]}))
(finish! [_]
(log/infof "Finishing import batch %d for %s with stats %s " import-id (name source) (pr-str @stats))
@(d/transact conn [(merge {:db/id import-id
(dc/transact conn [(merge {:db/id import-id
:import-batch/status :import-status/completed}
@stats)])))))

View File

@@ -9,9 +9,10 @@
[clj-time.coerce :as coerce]
[clojure.string :as str]
[com.unbounce.dogstatsd.core :as statsd]
[datomic.api :as d]
[digest :as di]
[unilog.context :as lc]))
[datomic.client.api :as dc]
[mount.core :as mount]
[unilog.context :as lc]
[yang.scheduler :as scheduler]))
#_{:clj-kondo/ignore [:unresolved-var]}
(defn yodlee->transaction [transaction use-date-instead-of-post-date?]
@@ -51,7 +52,7 @@
nil)
(let [import-batch (t/start-import-batch :import-source/yodlee2 "Automated yodlee2 user")]
(try
(let [account-lookup (d/q '[:find ?ya ?ba ?cd ?ud
(let [account-lookup (dc/q '[:find ?ya ?ba ?cd ?ud
:in $
:where
[?ba :bank-account/yodlee-account ?y]
@@ -60,7 +61,7 @@
[?c :client/code ?cd]
[?y :yodlee-account/id ?ya]
]
(d/db conn))]
(dc/db conn))]
(doseq [[yodlee-account bank-account client-code use-date-instead-of-post-date?] account-lookup
transaction (wrap-integration #(client2/get-specific-transactions client-code yodlee-account)
bank-account)]