(ns auto-ap.graphql.jobs (:require [amazonica.aws.ecs :as ecs] [auto-ap.graphql.utils :refer [assert-admin assert-failure result->page]] [clj-time.coerce :as coerce] [clj-time.core :as time] [clojure.string :as str] [config.core :refer [env]] [clojure.tools.logging :as log] [com.walmartlabs.lacinia.util :refer [attach-resolvers]] [clojure.edn :as edn] [auto-ap.graphql.utils :refer [attach-tracing-resolvers]]) (:import (com.amazonaws.services.ecs.model AssignPublicIp))) (defn get-ecs-tasks [] (->> (concat (:task-arns (ecs/list-tasks :max-results 50)) (:task-arns (ecs/list-tasks :desired-status "STOPPED" :max-results 50))) (ecs/describe-tasks :include [] :tasks) :tasks (map #(assoc % :task-definition (:task-definition (ecs/describe-task-definition :task-definition (:task-definition-arn %))))) (sort-by :created-at) reverse)) (defn is-background-job? [task] (->> task :task-definition :container-definitions (mapcat :environment) (filter (comp #{"INTEGREAT_JOB"} :name)) seq)) (defn task-definition->job-name [task-definition] (->> (:container-definitions task-definition) (mapcat :environment) (filter (comp #{"INTEGREAT_JOB"} :name)) (map :value) first)) (defn job-exited-successfully? [task] (if (= 0 (->> task :containers (filter (comp #{"integreat-app" } :name)) (first) :exit-code)) true false)) (defn ecs-task->job [task] {:status (condp = (:last-status task) "RUNNING" :running "PENDING" :pending "PROVISIONING" :pending "DEPROVISIONING" :running "STOPPED" (if (job-exited-successfully? task) :succeeded :failed)) :name (task-definition->job-name (:task-definition task)) :end-date (some-> (:stopped-at task) coerce/to-date-time (time/to-time-zone (time/time-zone-for-offset 0))) :start-date (some-> (:created-at task) coerce/to-date-time (time/to-time-zone (time/time-zone-for-offset 0)))}) (defn get-jobs-page [context args _] (assert-admin (:id context)) (let [args (assoc (:filters args) :id (:id context)) jobs (->> (get-ecs-tasks) (filter is-background-job?) (map ecs-task->job))] (result->page jobs (count jobs) :data args))) (defn currently-running-jobs [] (->> (get-ecs-tasks) (filter is-background-job?) (map ecs-task->job) (filter (comp #{:pending :running} :status)) (map :name) set)) (defn run-task [task args] (log/info "running job" task) (ecs/run-task (cond-> {:capacity-provider-strategy [{:base 1 :weight 1 :capacity-provider "FARGATE_SPOT"}] :count 1 :cluster "default" :enable-ecs-managed-tags true :task-definition task :network-configuration {:aws-vpc-configuration {:subnets ["subnet-5e675761" "subnet-8519fde2" "subnet-89bab8d4"] :security-groups ["sg-004e5855310c453a3" "sg-02d167406b1082698"] :assign-public-ip AssignPublicIp/ENABLED}}} args (assoc-in [:overrides :container-overrides ] [{:name "integreat-app" :environment [{:name "args" :value (pr-str args)}]}])))) (defn request-job [context value _] (assert-admin (:id context)) (if (not (get (currently-running-jobs) (:which value))) (let [new-job (run-task (-> (:which value) (str/replace #"-" "_") (str/replace #":" "") (str "_" (:dd-env env))) (some-> (:args value) edn/read-string))] {:message (str "task " (str new-job) " started.")}) (assert-failure "This job is already running"))) (def objects {:job {:fields {:name {:type 'String} :start_date {:type :iso_date_time} :end_date {:type :iso_date_time} :status {:type :job_status}}} :jobs_page {:fields {:data {:type '(list :job)} :count {:type 'Int} :total {:type 'Int} :start {:type 'Int} :end {:type 'Int}}}}) (def input-objects {:jobs_filters {:fields {:sort {:type '(list :sort_item)} :start {:type 'Int} :per_page {:type 'Int}}}}) (def queries {:jobs_page {:type :jobs_page :args {:filters {:type :jobs_filters} :sort {:type '(list :sort_item)} :start {:type 'Int} :per_page {:type 'Int}} :resolve :get-jobs-page}}) (def enums {:job_status {:values [{:enum-value :pending} {:enum-value :running} {:enum-value :succeeded} {:enum-value :failed}]} }) (def resolvers {:get-jobs-page get-jobs-page :mutation/request-job request-job}) (def mutations {:request_job {:type :message :args {:which {:type 'String} :args {:type 'String}} :resolve :mutation/request-job}}) (defn attach [schema] (-> (merge-with merge schema {:objects objects :input-objects input-objects :queries queries :enums enums :mutations mutations}) (attach-tracing-resolvers resolvers)))