Makes a whole separate page for recent jobs.

This commit is contained in:
2022-09-30 16:30:11 -07:00
parent d58e963c92
commit ae1500be9a
12 changed files with 444 additions and 150 deletions

View File

@@ -10,10 +10,10 @@
[auto-ap.graphql.import-batch :as gq-import-batches]
[auto-ap.graphql.intuit-bank-accounts :as gq-intuit-bank-accounts]
[auto-ap.graphql.invoices :as gq-invoices]
[auto-ap.graphql.jobs :as gq-jobs]
[auto-ap.graphql.ledger :as gq-ledger]
[auto-ap.graphql.plaid :as gq-plaid]
[auto-ap.graphql.reports :as gq-reports]
[auto-ap.graphql.requests :as gq-requests]
[auto-ap.graphql.sales-orders :as gq-sales-orders]
[auto-ap.graphql.transaction-rules :as gq-transaction-rules]
[auto-ap.graphql.transactions :as gq-transactions]
@@ -534,11 +534,7 @@
{:enum-value :equity}
{:enum-value :revenue}]}}
:mutations
{:request_import
{:type 'String
:args {:which {:type 'String}
:args {:type 'String}}
:resolve :mutation/request-import}
{
:delete_transaction_rule
{:type :id
@@ -784,7 +780,6 @@
:mutation/upsert-vendor gq-vendors/upsert-vendor
:mutation/upsert-account gq-accounts/upsert-account
:mutation/merge-vendors gq-vendors/merge-vendors
:mutation/request-import gq-requests/request-import
:get-vendor gq-vendors/get-graphql
:search-vendor gq-vendors/search
:search-account gq-accounts/search})
@@ -799,6 +794,7 @@
gq-invoices/attach
gq-clients/attach
gq-sales-orders/attach
gq-jobs/attach
schema/compile))

View File

@@ -0,0 +1,153 @@
(ns auto-ap.graphql.jobs
(:require
[amazonica.aws.ecs :as ecs]
[auto-ap.graphql.utils
:refer [assert-admin assert-failure result->page]]
[clj-time.coerce :as coerce]
[clj-time.core :as time]
[clojure.string :as str]
[config.core :refer [env]]
[clojure.tools.logging :as log]
[com.walmartlabs.lacinia.util :refer [attach-resolvers]])
(:import
(com.amazonaws.services.ecs.model AssignPublicIp)))
(defn get-ecs-tasks []
(->>
(concat (:task-arns (ecs/list-tasks :max-results 50)) (:task-arns (ecs/list-tasks :desired-status "STOPPED" :max-results 50)))
(ecs/describe-tasks :include [] :tasks)
:tasks
(map #(assoc % :task-definition (:task-definition (ecs/describe-task-definition :task-definition (:task-definition-arn %)))))
(sort-by :created-at)
reverse))
(defn is-background-job? [task]
(->> task
:task-definition
:container-definitions
(mapcat :environment)
(filter (comp #{"INTEGREAT_JOB"} :name))
seq))
(defn task-definition->job-name [task-definition]
(->> (:container-definitions task-definition)
(mapcat :environment)
(filter (comp #{"INTEGREAT_JOB"} :name))
(map :value)
first))
(defn job-exited-successfully? [task]
(if (= 0 (->> task
:containers
(filter (comp #{"integreat-app" } :name))
(first)
:exit-code))
true
false))
(defn ecs-task->job [task]
{:status (condp = (:last-status task)
"RUNNING" :running
"PENDING" :pending
"PROVISIONING" :pending
"DEPROVISIONING" :running
"STOPPED" (if (job-exited-successfully? task)
:succeeded
:failed))
:name (task-definition->job-name (:task-definition task))
:end-date (some-> (:stopped-at task) coerce/to-date-time (time/to-time-zone (time/time-zone-for-offset 0)))
:start-date (some-> (:created-at task) coerce/to-date-time (time/to-time-zone (time/time-zone-for-offset 0)))})
(defn get-jobs-page [context args _]
(assert-admin (:id context))
(let [args (assoc (:filters args) :id (:id context))
jobs (->> (get-ecs-tasks)
(filter is-background-job?)
(map ecs-task->job))]
(result->page
jobs
(count jobs)
:data
args)))
(defn currently-running-jobs []
(->> (get-ecs-tasks)
(filter is-background-job?)
(map ecs-task->job)
(filter (comp #{:pending :running} :status))
(map :name)
set))
(defn run-task [task]
(log/info "running job" task)
(ecs/run-task :capacity-provider-strategy [{:base 1 :weight 1 :capacity-provider "FARGATE_SPOT"}]
:count 1
:cluster "default"
:enable-ecs-managed-tags true
:task-definition task
:network-configuration {:aws-vpc-configuration {:subnets ["subnet-5e675761" "subnet-8519fde2" "subnet-89bab8d4"]
:security-groups ["sg-004e5855310c453a3" "sg-02d167406b1082698"]
:assign-public-ip AssignPublicIp/ENABLED}}))
(defn request-job [context value _]
(assert-admin (:id context))
(if (not (get (currently-running-jobs) (:which value)))
(let [new-job (run-task (str (str/replace (:which value) #"-" "_") "_" (:dd-env env)))]
{:message (str "task " (str new-job) " started.")})
(assert-failure "This job is already running")))
(def objects
{:job {:fields {:name {:type 'String}
:start_date {:type :iso_date_time}
:end_date {:type :iso_date_time}
:status {:type :job_status}}}
:jobs_page {:fields {:data {:type '(list :job)}
:count {:type 'Int}
:total {:type 'Int}
:start {:type 'Int}
:end {:type 'Int}}}})
(def input-objects
{:jobs_filters {:fields {:sort {:type '(list :sort_item)}
:start {:type 'Int}
:per_page {:type 'Int}}}})
(def queries
{:jobs_page {:type :jobs_page
:args {:filters {:type :jobs_filters}
:sort {:type '(list :sort_item)}
:start {:type 'Int}
:per_page {:type 'Int}}
:resolve :get-jobs-page}})
(def enums
{:job_status {:values [{:enum-value :pending}
{:enum-value :running}
{:enum-value :succeeded}
{:enum-value :failed}]} })
(def resolvers
{:get-jobs-page get-jobs-page
:mutation/request-job request-job})
(def mutations
{:request_job
{:type :message
:args {:which {:type 'String}
:args {:type 'String}}
:resolve :mutation/request-job}})
(defn attach [schema]
(->
(merge-with merge schema
{:objects objects
:input-objects input-objects
:queries queries
:enums enums
:mutations mutations})
(attach-resolvers resolvers)))

View File

@@ -1,41 +0,0 @@
(ns auto-ap.graphql.requests
(:require
[amazonica.aws.ecs :as ecs]
[auto-ap.graphql.utils :refer [assert-admin assert-failure]]
[clojure.string :as str]
[config.core :refer [env]]
[clojure.tools.logging :as log])
(:import
(com.amazonaws.services.ecs.model AssignPublicIp)))
(defn currently-running-jobs []
(->> (ecs/list-tasks :cluster "default" )
:task-arns
(ecs/describe-tasks :include [] :tasks)
:tasks
#_(filter #(= "RUNNING" (:last-status %)))
(map (comp (comp last butlast) #(str/split % #"[/:]") :task-definition-arn)
#_(mapcat (comp :container-overrides :overrides)))
(set))
)
(defn run-task [task]
(log/info "running job" task)
(ecs/run-task :capacity-provider-strategy [{:base 1 :weight 1 :capacity-provider "FARGATE_SPOT"}]
:count 1
:cluster "default"
:enable-ecs-managed-tags true
:task-definition task
:network-configuration {:aws-vpc-configuration {:subnets ["subnet-5e675761" "subnet-8519fde2" "subnet-89bab8d4"]
:security-groups ["sg-004e5855310c453a3" "sg-02d167406b1082698"]
:assign-public-ip AssignPublicIp/ENABLED}}))
(defn request-import [context value _]
(assert-admin (:id context))
(let [job (str (str/replace (str/replace (name (:which value)) #":" "") #"-" "_") "_" (:dd-env env))]
(if (not (get (currently-running-jobs) job))
(run-task job)
(assert-failure "This job is already running"))))