diff --git a/project.clj b/project.clj index b1e8dfd1..0d60b875 100644 --- a/project.clj +++ b/project.clj @@ -92,7 +92,8 @@ ;; needed for java 11 - [javax.xml.bind/jaxb-api "2.4.0-b180830.0359"]] + [javax.xml.bind/jaxb-api "2.4.0-b180830.0359"] + [io.forward/clojure-mail "1.0.8"]] :managed-dependencies [;; explicit dependencies to get to latest versions for above [com.fasterxml.jackson.core/jackson-core "2.12.0"] [com.fasterxml.jackson.core/jackson-databind "2.12.0"] diff --git a/src/clj/auto_ap/background/mail.clj b/src/clj/auto_ap/background/mail.clj new file mode 100644 index 00000000..ce228e02 --- /dev/null +++ b/src/clj/auto_ap/background/mail.clj @@ -0,0 +1,68 @@ +(ns auto-ap.background.mail + (:require [amazonica.aws.s3 :as s3] + [amazonica.aws.sqs :as sqs] + [auto-ap.parse :as parse] + [auto-ap.routes.invoices :as invoices] + [clojure-mail.message :as message] + [clojure.data.json :as json] + [clojure.java.io :as io] + [clojure.string :as str] + [config.core :refer [env]] + [clojure.tools.logging :as log] + [unilog.context :as lc] + [mount.core :as mount] + [yang.scheduler :as scheduler]) + (:import (java.util Properties UUID) + (javax.mail Session) + (javax.mail.internet MimeMessage))) + + +(defn process-sqs [] + (lc/with-context {:source "import-uploaded-invoices"} + (try + (log/info "Fetching messages from sqs...") + (doseq [message (:messages (sqs/receive-message {:queue-url (:invoice-import-queue-url env) + :wait-time-seconds 5 + :max-number-of-messages 10 + #_#_:attribute-names ["All"]}))] + (let [message-body (json/read-str (:body message) + :key-fn keyword)] + (doseq [r (:Records message-body)] + (log/info "Processing record " r) + (let [props (Session/getDefaultInstance (Properties.)) + message-stream (-> (s3/get-object {:key (-> r :s3 :object :key) + :bucket-name (-> r :s3 :bucket :name)}) + :input-stream) + mail (message/read-message (MimeMessage. props message-stream))] + (log/info "reading mail" (->> mail :body (filter :content-type) (map :body) )) + (doseq [pdf-stream (->> (-> mail :body) + (filter :content-type) + #_(filter #(re-find #"application/pdf" (:content-type %)) )) + :let [filename (str "/tmp/" (UUID/randomUUID) ".pdf")]] + (try + (let [ + _ (io/copy (:body pdf-stream) (io/file filename)) + extension (last (str/split (.getName (io/file filename)) #"\.")) + s3-location (str "invoice-files/" (str (UUID/randomUUID)) "." extension) + _ (s3/put-object :bucket-name (:data-bucket env) + :key s3-location + :input-stream (io/input-stream filename) + :metadata {:content-type "application/pdf"}) + imports (->> (parse/parse-file filename filename) + (map #(assoc % + :source-url (str "http://" (:data-bucket env) + ".s3-website-us-east-1.amazonaws.com/" + s3-location))))] + (log/info "Found imports" imports) + (invoices/import-uploaded-invoice {:user/role "admin"} imports )) + (catch Exception e + (log/warn e)) + (finally + (io/delete-file filename))))))) + (sqs/delete-message (assoc message :queue-url (:invoice-import-queue-url env) ))) + (catch Exception e + (log/error e))))) + +(mount/defstate import-invoices + :start (scheduler/every (* 60 5000) process-sqs) + :stop (scheduler/stop import-invoices)) diff --git a/src/clj/auto_ap/server.clj b/src/clj/auto_ap/server.clj index f8ab4d18..a9e7b14f 100644 --- a/src/clj/auto_ap/server.clj +++ b/src/clj/auto_ap/server.clj @@ -14,6 +14,7 @@ [auto-ap.ledger :as ledger] [auto-ap.square.core :as square] [auto-ap.background.metrics :as metrics] + [auto-ap.background.mail :as mail] [clojure.tools.logging :as log] [config.core :refer [env]] [mount.core :as mount] @@ -90,6 +91,7 @@ (not (env :run-background?)) (into [#'square/square-loader #'vendor/refresh-vendor-usages-worker #'ledger/touch-broken-ledger-worker + #'mail/import-invoices #'ledger/process-txes-worker #'ledger/ledger-reconciliation-worker #'requests/request-listener