fixing a number of bugs

This commit is contained in:
Bryce Covert
2018-04-08 14:46:23 -07:00
parent c5344a27eb
commit 9dabb633a7
15 changed files with 462 additions and 81 deletions

View File

@@ -23,29 +23,32 @@
(def queue-url "https://sqs.us-east-1.amazonaws.com/679918342773/integreat-mail-prod")
(defn process-sqs []
(println "Fetching messages from sqs...")
(let [companies (companies/get-all)]
(doseq [message (:messages (sqs/receive-message {:queue-url queue-url
:wait-time-seconds 5
:max-number-of-messages 10
#_#_:attribute-names ["All"]}))]
(let [message-body (json/read-str (:body message)
:key-fn keyword)]
(doseq [r (:Records message-body)]
(println "Processing record " r)
(let [props (Session/getDefaultInstance (Properties.))
message-stream (-> (s3/get-object {:key (-> r :s3 :object :key)
:bucket-name (-> r :s3 :bucket :name)})
:input-stream)
mail (message/read-message (MimeMessage. props message-stream))]
(doseq [pdf-stream (->> (-> mail :body)
(filter :content-type)
(filter #(re-find #"application/pdf" (:content-type %)) ))]
(let [filename (str "/tmp/" (UUID/randomUUID) ".pdf")]
(io/copy (:body pdf-stream) (io/file filename))
(invoices/import (parse/parse-file filename filename) companies)
(io/delete-file filename))))))
(sqs/delete-message (assoc message :queue-url queue-url )))))
(try
(println "Fetching messages from sqs...")
(let [companies (companies/get-all)]
(doseq [message (:messages (sqs/receive-message {:queue-url queue-url
:wait-time-seconds 5
:max-number-of-messages 10
#_#_:attribute-names ["All"]}))]
(let [message-body (json/read-str (:body message)
:key-fn keyword)]
(doseq [r (:Records message-body)]
(println "Processing record " r)
(let [props (Session/getDefaultInstance (Properties.))
message-stream (-> (s3/get-object {:key (-> r :s3 :object :key)
:bucket-name (-> r :s3 :bucket :name)})
:input-stream)
mail (message/read-message (MimeMessage. props message-stream))]
(doseq [pdf-stream (->> (-> mail :body)
(filter :content-type)
(filter #(re-find #"application/pdf" (:content-type %)) ))]
(let [filename (str "/tmp/" (UUID/randomUUID) ".pdf")]
(io/copy (:body pdf-stream) (io/file filename))
(invoices/import (parse/parse-file filename filename) companies)
(io/delete-file filename))))))
(sqs/delete-message (assoc message :queue-url queue-url ))))
(catch Exception e
(println e))))
(defn always-process-sqs []
(while (not (Thread/interrupted))