(ns auto-ap.jobs.import-uploaded-invoices (:require [amazonica.aws.s3 :as s3] [amazonica.aws.simpleemail :as ses] [amazonica.aws.sqs :as sqs] [auto-ap.jobs.core :refer [execute]] [auto-ap.parse :as parse] [auto-ap.routes.invoices :as invoices] [clojure-mail.message :as message] [clojure.data.json :as json] [clojure.java.io :as io] [clojure.string :as str] [config.core :refer [env]] [auto-ap.logging :as alog] [com.brunobonacci.mulog :as mu]) (:import (java.util Properties UUID) (javax.mail Session) (javax.mail.internet MimeMessage))) (defn send-email-about-failed-message [mail-bucket mail-key message] (let [target-key (str "failed-emails/" mail-key ".eml") target-url (str "https://" (:data-bucket env) "/" target-key)] (alog/info ::sending-failure-email :who (:import-failure-destination-emails env)) (s3/copy-object mail-bucket mail-key (:data-bucket env) target-key) (ses/send-email {:destination {:to-addresses (:import-failure-destination-emails env)} :source (:invoice-email env) :message {:subject "An email invoice import failed" :body {:html (str "
You can download the original email here.

" message "

") :text (str "
You can download the original email here: " target-url)}}}))) (defn process-sqs [] (alog/info ::fetching-sqs) (doseq [message (:messages (sqs/receive-message {:queue-url (:invoice-import-queue-url env) :wait-time-seconds 5 :max-number-of-messages 10 #_#_:attribute-names ["All"]}))] (let [message-body (json/read-str (:body message) :key-fn keyword)] (doseq [r (:Records message-body)] (alog/info ::processing-record :record r) (let [props (Session/getDefaultInstance (Properties.)) message-stream (-> (s3/get-object {:key (-> r :s3 :object :key) :bucket-name (-> r :s3 :bucket :name)}) :input-stream) mail (message/read-message (MimeMessage. props message-stream))] (alog/info ::reading-mail :body (->> mail :body (filter :content-type) (map :body))) (mu/with-context {:email-record {:key (-> r :s3 :object :key) :bucket-name (-> r :s3 :bucket :name)}} (doseq [pdf-stream (->> (-> mail :body) (filter :content-type) (filter (complement #(re-find #"html" (:content-type % ""))))) :let [filename (str "/tmp/" (UUID/randomUUID) ".pdf")]] (try (let [_ (io/copy (:body pdf-stream) (io/file filename)) extension (last (str/split (.getName (io/file filename)) #"\.")) s3-location (str "invoice-files/" (str (UUID/randomUUID)) "." extension)] (mu/with-context {:s3-location s3-location} (s3/put-object :bucket-name (:data-bucket env) :key s3-location :input-stream (io/input-stream filename) :metadata {:content-type "application/pdf" :content-length (.length (io/file filename))}) (let [imports (->> (parse/parse-file filename filename :allow-glimpse? false) (map #(assoc % :source-url (str "https://" (:data-bucket env) "/" s3-location) :import-status :import-status/imported)))] (alog/info ::found-imports :imports imports) (invoices/import-uploaded-invoice {:user/role "admin" :user/name "Email Invoice Importer"} imports)))) (catch Exception e (alog/warn ::cant-import :error e) (send-email-about-failed-message (-> r :s3 :bucket :name) (-> r :s3 :object :key) (str e))) (finally (io/delete-file filename)))))))) (sqs/delete-message (assoc message :queue-url (:invoice-import-queue-url env))))) (defn -main [& _] (execute "import-uploaded-invoices" process-sqs)) (comment (with-open [i (io/output-stream "/tmp/bryce.pdf")] (clojure.java.io/copy (-> (s3/get-object :bucket-name (:data-bucket env) :key "invoice-files/f0e73dcb-e5e5-4c81-b82b-319b7caedacf.pdf" ) :input-stream) i)) (parse/parse-file "/tmp/bryce.pdf" "/tmp/bryce.pdf") (-> (clojure.java.shell/sh "pdftotext" "-layout" "/tmp/bryce.pdf" "-") :out ) 1 (user/init-repl) )