(ns auto-ap.background.mail (:require [amazonica.aws.s3 :as s3] [amazonica.aws.simpleemail :as ses] [amazonica.aws.sqs :as sqs] [auto-ap.parse :as parse] [auto-ap.routes.invoices :as invoices] [auto-ap.utils :refer [heartbeat]] [clojure-mail.message :as message] [clojure.data.json :as json] [clojure.java.io :as io] [clojure.string :as str] [clojure.tools.logging :as log] [config.core :refer [env]] [mount.core :as mount] [unilog.context :as lc] [yang.scheduler :as scheduler]) (:import (java.util Properties UUID) (javax.mail Session) (javax.mail.internet MimeMessage))) (defn send-email-about-failed-message [mail-bucket mail-key] (let [target-key (str "failed-emails/" mail-key ".eml") target-url (str "http://" (:data-bucket env) ".s3-website-us-east-1.amazonaws.com/" target-key)] (log/info "sending email to " (:import-failure-destination-emails env)) (s3/copy-object mail-bucket mail-key (:data-bucket env) target-key) (ses/send-email {:destination {:to-addresses (:import-failure-destination-emails env)} :source (:invoice-email env) :message {:subject "An email invoice import failed" :body {:html (str "
You can download the original email here.
") :text (str "
You can download the original email here: " target-url)}}}))) (defn process-sqs [] (log/info "Fetching messages from sqs...") (doseq [message (:messages (sqs/receive-message {:queue-url (:invoice-import-queue-url env) :wait-time-seconds 5 :max-number-of-messages 10 #_#_:attribute-names ["All"]}))] (let [message-body (json/read-str (:body message) :key-fn keyword)] (doseq [r (:Records message-body)] (log/info "Processing record " r) (let [props (Session/getDefaultInstance (Properties.)) message-stream (-> (s3/get-object {:key (-> r :s3 :object :key) :bucket-name (-> r :s3 :bucket :name)}) :input-stream) mail (message/read-message (MimeMessage. props message-stream))] (log/info "reading mail" (->> mail :body (filter :content-type) (map :body) )) (doseq [pdf-stream (->> (-> mail :body) (filter :content-type) #_(filter #(re-find #"application/pdf" (:content-type %)) )) :let [filename (str "/tmp/" (UUID/randomUUID) ".pdf")]] (try (let [ _ (io/copy (:body pdf-stream) (io/file filename)) extension (last (str/split (.getName (io/file filename)) #"\.")) s3-location (str "invoice-files/" (str (UUID/randomUUID)) "." extension) _ (s3/put-object :bucket-name (:data-bucket env) :key s3-location :input-stream (io/input-stream filename) :metadata {:content-type "application/pdf"}) imports (->> (parse/parse-file filename filename) (map #(assoc % :source-url (str "http://" (:data-bucket env) ".s3-website-us-east-1.amazonaws.com/" s3-location) :import-status :import-status/approved)))] (log/info "Found imports" imports) (invoices/import-uploaded-invoice {:user/role "admin"} imports )) (catch Exception e (log/warn e) (send-email-about-failed-message (-> r :s3 :bucket :name) (-> r :s3 :object :key))) (finally (io/delete-file filename))))))) (sqs/delete-message (assoc message :queue-url (:invoice-import-queue-url env) )))) (mount/defstate import-invoices :start (scheduler/every (* 60 5000) (heartbeat process-sqs "import-uploaded-invoices")) :stop (scheduler/stop import-invoices))