diff --git a/src/clj/auto_ap/background/requests.clj b/src/clj/auto_ap/background/requests.clj index 8101be93..13d5ffdd 100644 --- a/src/clj/auto_ap/background/requests.clj +++ b/src/clj/auto_ap/background/requests.clj @@ -7,36 +7,25 @@ [auto-ap.intuit.import :as i] [unilog.context :as lc])) -(def stopped? (atom false)) - (defn process-1 [] - (let [[{:keys [message-id receipt-handle body]}] (:messages (sqs/receive-message {:queue-url (:requests-queue-url env) - :count 1}))] - (when message-id - (log/infof "processing message %s with body %s" message-id body ) - (if (= ":intuit" body) - (try - (i/upsert-transactions) - (catch Exception e - (log/error e)))) - (sqs/delete-message {:queue-url (:requests-queue-url env) - :receipt-handle receipt-handle} )))) + (lc/with-context {:source "Request poller"} + (log/info "Checking SQS...") + (let [[{:keys [message-id receipt-handle body]}] (:messages (sqs/receive-message {:queue-url (:requests-queue-url env) + :count 1}))] + (when message-id + (log/infof "processing message %s with body %s" message-id body ) + (if (= ":intuit" body) + (try + (i/upsert-transactions) + (catch Exception e + (log/error e)))) + (sqs/delete-message {:queue-url (:requests-queue-url env) + :receipt-handle receipt-handle} ))))) (defn fake-message [] (sqs/send-message {:queue-url (:requests-queue-url env) :message-body ":intuit"} )) -(defn listen-sqs [] - (lc/with-context {:source "Request poller"} - (loop [] - (process-1) - (log/info "Checking SQS....") - (Thread/sleep 5000) - (when-not @stopped? - (log/info "stopping") - (recur))))) - (mount/defstate request-listener - :start (future (listen-sqs)) - :stop (reset! stopped? true)) - + :start (scheduler/every (* 1000 5) process-1) + :stop (scheduler/stop request-listener))