Files
integreat/src/clj/auto_ap/graphql/jobs.clj
2022-12-05 09:31:16 -08:00

161 lines
5.7 KiB
Clojure

(ns auto-ap.graphql.jobs
(:require
[amazonica.aws.ecs :as ecs]
[auto-ap.graphql.utils
:refer [assert-admin assert-failure result->page]]
[clj-time.coerce :as coerce]
[clj-time.core :as time]
[clojure.string :as str]
[config.core :refer [env]]
[clojure.tools.logging :as log]
[com.walmartlabs.lacinia.util :refer [attach-resolvers]]
[clojure.edn :as edn])
(:import
(com.amazonaws.services.ecs.model AssignPublicIp)))
(defn get-ecs-tasks []
(->>
(concat (:task-arns (ecs/list-tasks :max-results 50)) (:task-arns (ecs/list-tasks :desired-status "STOPPED" :max-results 50)))
(ecs/describe-tasks :include [] :tasks)
:tasks
(map #(assoc % :task-definition (:task-definition (ecs/describe-task-definition :task-definition (:task-definition-arn %)))))
(sort-by :created-at)
reverse))
(defn is-background-job? [task]
(->> task
:task-definition
:container-definitions
(mapcat :environment)
(filter (comp #{"INTEGREAT_JOB"} :name))
seq))
(defn task-definition->job-name [task-definition]
(->> (:container-definitions task-definition)
(mapcat :environment)
(filter (comp #{"INTEGREAT_JOB"} :name))
(map :value)
first))
(defn job-exited-successfully? [task]
(if (= 0 (->> task
:containers
(filter (comp #{"integreat-app" } :name))
(first)
:exit-code))
true
false))
(defn ecs-task->job [task]
{:status (condp = (:last-status task)
"RUNNING" :running
"PENDING" :pending
"PROVISIONING" :pending
"DEPROVISIONING" :running
"STOPPED" (if (job-exited-successfully? task)
:succeeded
:failed))
:name (task-definition->job-name (:task-definition task))
:end-date (some-> (:stopped-at task) coerce/to-date-time (time/to-time-zone (time/time-zone-for-offset 0)))
:start-date (some-> (:created-at task) coerce/to-date-time (time/to-time-zone (time/time-zone-for-offset 0)))})
(defn get-jobs-page [context args _]
(assert-admin (:id context))
(let [args (assoc (:filters args) :id (:id context))
jobs (->> (get-ecs-tasks)
(filter is-background-job?)
(map ecs-task->job))]
(result->page
jobs
(count jobs)
:data
args)))
(defn currently-running-jobs []
(->> (get-ecs-tasks)
(filter is-background-job?)
(map ecs-task->job)
(filter (comp #{:pending :running} :status))
(map :name)
set))
(defn run-task [task args]
(log/info "running job" task)
(ecs/run-task (cond-> {:capacity-provider-strategy [{:base 1 :weight 1 :capacity-provider "FARGATE_SPOT"}]
:count 1
:cluster "default"
:enable-ecs-managed-tags true
:task-definition task
:network-configuration {:aws-vpc-configuration {:subnets ["subnet-5e675761" "subnet-8519fde2" "subnet-89bab8d4"]
:security-groups ["sg-004e5855310c453a3" "sg-02d167406b1082698"]
:assign-public-ip AssignPublicIp/ENABLED}}}
args (assoc-in [:overrides :container-overrides ] [{:name "integreat-app" :environment [{:name "args" :value (pr-str args)}]}]))))
(defn request-job [context value _]
(assert-admin (:id context))
(if (not (get (currently-running-jobs) (:which value)))
(let [new-job (run-task
(-> (:which value)
(str/replace #"-" "_")
(str/replace #":" "")
(str "_" (:dd-env env)))
(some-> (:args value) edn/read-string))]
{:message (str "task " (str new-job) " started.")})
(assert-failure "This job is already running")))
(def objects
{:job {:fields {:name {:type 'String}
:start_date {:type :iso_date_time}
:end_date {:type :iso_date_time}
:status {:type :job_status}}}
:jobs_page {:fields {:data {:type '(list :job)}
:count {:type 'Int}
:total {:type 'Int}
:start {:type 'Int}
:end {:type 'Int}}}})
(def input-objects
{:jobs_filters {:fields {:sort {:type '(list :sort_item)}
:start {:type 'Int}
:per_page {:type 'Int}}}})
(def queries
{:jobs_page {:type :jobs_page
:args {:filters {:type :jobs_filters}
:sort {:type '(list :sort_item)}
:start {:type 'Int}
:per_page {:type 'Int}}
:resolve :get-jobs-page}})
(def enums
{:job_status {:values [{:enum-value :pending}
{:enum-value :running}
{:enum-value :succeeded}
{:enum-value :failed}]} })
(def resolvers
{:get-jobs-page get-jobs-page
:mutation/request-job request-job})
(def mutations
{:request_job
{:type :message
:args {:which {:type 'String}
:args {:type 'String}}
:resolve :mutation/request-job}})
(defn attach [schema]
(->
(merge-with merge schema
{:objects objects
:input-objects input-objects
:queries queries
:enums enums
:mutations mutations})
(attach-resolvers resolvers)))