Alternative requset listener
This commit is contained in:
@@ -7,36 +7,25 @@
|
|||||||
[auto-ap.intuit.import :as i]
|
[auto-ap.intuit.import :as i]
|
||||||
[unilog.context :as lc]))
|
[unilog.context :as lc]))
|
||||||
|
|
||||||
(def stopped? (atom false))
|
|
||||||
|
|
||||||
(defn process-1 []
|
(defn process-1 []
|
||||||
(let [[{:keys [message-id receipt-handle body]}] (:messages (sqs/receive-message {:queue-url (:requests-queue-url env)
|
(lc/with-context {:source "Request poller"}
|
||||||
:count 1}))]
|
(log/info "Checking SQS...")
|
||||||
(when message-id
|
(let [[{:keys [message-id receipt-handle body]}] (:messages (sqs/receive-message {:queue-url (:requests-queue-url env)
|
||||||
(log/infof "processing message %s with body %s" message-id body )
|
:count 1}))]
|
||||||
(if (= ":intuit" body)
|
(when message-id
|
||||||
(try
|
(log/infof "processing message %s with body %s" message-id body )
|
||||||
(i/upsert-transactions)
|
(if (= ":intuit" body)
|
||||||
(catch Exception e
|
(try
|
||||||
(log/error e))))
|
(i/upsert-transactions)
|
||||||
(sqs/delete-message {:queue-url (:requests-queue-url env)
|
(catch Exception e
|
||||||
:receipt-handle receipt-handle} ))))
|
(log/error e))))
|
||||||
|
(sqs/delete-message {:queue-url (:requests-queue-url env)
|
||||||
|
:receipt-handle receipt-handle} )))))
|
||||||
|
|
||||||
(defn fake-message []
|
(defn fake-message []
|
||||||
(sqs/send-message {:queue-url (:requests-queue-url env)
|
(sqs/send-message {:queue-url (:requests-queue-url env)
|
||||||
:message-body ":intuit"} ))
|
:message-body ":intuit"} ))
|
||||||
|
|
||||||
(defn listen-sqs []
|
|
||||||
(lc/with-context {:source "Request poller"}
|
|
||||||
(loop []
|
|
||||||
(process-1)
|
|
||||||
(log/info "Checking SQS....")
|
|
||||||
(Thread/sleep 5000)
|
|
||||||
(when-not @stopped?
|
|
||||||
(log/info "stopping")
|
|
||||||
(recur)))))
|
|
||||||
|
|
||||||
(mount/defstate request-listener
|
(mount/defstate request-listener
|
||||||
:start (future (listen-sqs))
|
:start (scheduler/every (* 1000 5) process-1)
|
||||||
:stop (reset! stopped? true))
|
:stop (scheduler/stop request-listener))
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user