addressing failures.

This commit is contained in:
2022-09-27 09:58:31 -07:00
parent d430fb2f54
commit 5b17340eda
5 changed files with 208 additions and 90 deletions

View File

@@ -1,40 +0,0 @@
(ns auto-ap.background.requests
(:require
[amazonica.aws.sqs :as sqs]
[auto-ap.import.intuit :as i]
[auto-ap.import.plaid :as p]
[auto-ap.import.yodlee2 :as y2]
[auto-ap.utils :refer [heartbeat]]
[clojure.tools.logging :as log]
[config.core :refer [env]]
[mount.core :as mount]
[yang.scheduler :as scheduler]))
(def queue-url (:requests-queue-url env))
(defn process-1 []
(let [[{:keys [message-id receipt-handle body]}] (:messages (sqs/receive-message {:queue-url queue-url
:wait-time-seconds 1
:count 1}))]
(when message-id
(sqs/delete-message {:queue-url queue-url
:receipt-handle receipt-handle} )
(log/infof "processing message %s with body %s" message-id body )
(cond
(= ":intuit" body)
(i/import-intuit)
(= ":yodlee2" body)
(y2/import-yodlee2)
(= ":plaid" body)
(p/import-plaid)))))
#_{:clj-kondo/ignore [:clojure-lsp/unused-public-var]}
(defn fake-message []
(sqs/send-message {:queue-url (:requests-queue-url env)
:message-body ":intuit"} ))
(mount/defstate request-listener
:start (scheduler/every (* 1000 30) (heartbeat process-1 "request-poller"))
:stop (scheduler/stop request-listener))

View File

@@ -1,11 +1,39 @@
(ns auto-ap.graphql.requests
(:require
[amazonica.aws.sqs :as sqs]
[auto-ap.graphql.utils :refer [assert-admin]]
[config.core :refer [env]]))
[amazonica.aws.ecs :as ecs]
[auto-ap.graphql.utils :refer [assert-admin assert-failure]]
[clojure.string :as str]
[config.core :refer [env]])
(:import
(com.amazonaws.services.ecs.model AssignPublicIp)))
(defn currently-running-jobs []
(->> (ecs/list-tasks :cluster "default" )
:task-arns
(ecs/describe-tasks :include [] :tasks)
:tasks
#_(filter #(= "RUNNING" (:last-status %)))
(map (comp (comp last butlast) #(str/split % #"[/:]") :task-definition-arn)
#_(mapcat (comp :container-overrides :overrides)))
(set))
)
(defn run-task [task]
(ecs/run-task :capacity-provider-strategy [{:base 1 :weight 1 :capacity-provider "FARGATE_SPOT"}]
:count 1
:cluster "default"
:enable-ecs-managed-tags true
:task-definition task
:network-configuration {:aws-vpc-configuration {:subnets ["subnet-5e675761" "subnet-8519fde2" "subnet-89bab8d4"]
:security-groups ["sg-004e5855310c453a3" "sg-02d167406b1082698"]
:assign-public-ip AssignPublicIp/ENABLED}}))
(defn request-import [context value _]
(assert-admin (:id context))
(:message-id (sqs/send-message {:queue-url (:requests-queue-url env)
:message-body (:which value)} )))
(let [job (str (str/replace (name (:which value)) #"-" "_") "_" (:dd-env env))]
(if (not (get (currently-running-jobs) job))
(run-task job)
(assert-failure "This job is already running"))))

View File

@@ -1,7 +1,6 @@
(ns auto-ap.server
(:gen-class)
(:require
[auto-ap.background.requests :as requests]
[auto-ap.datomic.migrate :as migrate]
[auto-ap.handler :refer [app]]
[auto-ap.jobs.close-auto-invoices :as job-close-auto-invoices]
@@ -124,7 +123,6 @@
(not (env :run-web? )) (into [#'jetty
#'jetty-stats])
(not (env :run-background?)) (into [#'ledger/process-txes-worker
#'requests/request-listener
#'migrate/migrate-start]))]
(log/info "starting without " without)