Monitoring Jobs¶
The API provides events that are called after key state changes occur in the job scheduler. Examples include when a task changes state, when a task completes, and when a task reports new progress in cases where data is progressively returned back to the client during long running jobs.
Overview¶
Responding to Task State Changes¶
Every time a task changes state, the job scheduler notifies the client with an event. A task’s result can be processed immediately upon task completion.
Task Completed¶
The job.task_completed
and
task.on_completed
events are raised when a task completes.
Once a task completes, its results can be processed immediately.
1from agiparallel.client import ClusterJobScheduler
2from agiparallel.constants import TaskStatus
3
4
5task_statuses = {TaskStatus.FAILED: "Failed",
6 TaskStatus.TIMED_OUT: "Timed out",
7 TaskStatus.COMPLETED: "Completed",
8 TaskStatus.ENVIRONMENT_ERROR: "Environment Error",
9 TaskStatus.CANCELED: "Cancelled",
10 TaskStatus.INTERRUPTED: "Interrupted",
11 TaskStatus.CANCELING: "Cancelling",
12 TaskStatus.RUNNING: "Running",
13 TaskStatus.ASSIGNED: "Assigned",
14 TaskStatus.SUBMITTED: "Submitted",
15 TaskStatus.NOT_SUBMITTED: "Not Submitted"}
16
17
18class Method1Task:
19 def __init__(self):
20 self.name = "Task 1"
21
22 def execute(self):
23 pass
24
25
26class Method2Task:
27 def __init__(self):
28 self.name = "Task 2"
29
30 def execute(self):
31 pass
32
33 def on_completed(self):
34 print("\tTaskEvent:", self.name, "completed.")
35 print("\tTask Result:", self.result)
36
37
38def on_task_completed(task):
39 print("\tJobEvent:", task.name, "completed.")
40 print("\tTask status: ", task_statuses[task.task_status])
41 print("\tTask Result: ", task.result)
42
43
44def task_completed_example():
45 with ClusterJobScheduler("localhost") as scheduler:
46 scheduler.connect()
47
48 print("- Method 1: use the task_completed property on the job")
49 job1 = scheduler.create_job()
50 job1.add_task(Method1Task())
51 job1.task_completed = on_task_completed # set event handler
52 job1.submit()
53 job1.wait_until_done()
54
55 print("\n- Method 2: use the on_completed method on the task")
56 job2 = scheduler.create_job()
57 job2.add_task(Method2Task())
58 job2.submit()
59 job2.wait_until_done()
60
61
62if __name__ == "__main__":
63 task_completed_example()
Job Completed¶
job.job_completed
is raised when all tasks from the job are completed.
1from agiparallel.client import ClusterJobScheduler
2
3
4class OurTask(object):
5 def execute(self):
6 pass
7
8
9def on_job_completed(job):
10 print('Job "{0}" is finished'.format(job.name))
11
12
13def job_completed_example():
14 with ClusterJobScheduler("localhost") as scheduler:
15 scheduler.connect()
16 job = scheduler.create_job()
17
18 job.name = "our job"
19 job.add_task(OurTask())
20
21 job.job_completed = on_job_completed # set event handler
22
23 job.submit()
24 job.wait_until_done()
25
26
27if __name__ == "__main__":
28 job_completed_example()
Task State Changed¶
job.task_state_changed
is raised when a task
changes state. For example, when a task changes its state from ASSIGNED
to
RUNNING
.
1from agiparallel.client import ClusterJobScheduler
2from agiparallel.constants import TaskStatus
3
4task_statuses = {TaskStatus.FAILED: "Failed",
5 TaskStatus.TIMED_OUT: "Timed out",
6 TaskStatus.COMPLETED: "Completed",
7 TaskStatus.ENVIRONMENT_ERROR: "Environment Error",
8 TaskStatus.CANCELED: "Cancelled",
9 TaskStatus.INTERRUPTED: "Interrupted",
10 TaskStatus.CANCELING: "Cancelling",
11 TaskStatus.RUNNING: "Running",
12 TaskStatus.ASSIGNED: "Assigned",
13 TaskStatus.SUBMITTED: "Submitted",
14 TaskStatus.NOT_SUBMITTED: "Not Submitted"}
15
16
17class OurTask():
18 def __init__(self):
19 self.name = "our task"
20
21 def execute(self):
22 self.result = "hello from task"
23
24
25def on_task_state_changed(task_state):
26 print("Task name: ", task_state.task.name)
27 print("Task result: ", task_state.task.result)
28 print("Status changed from {0} to {1}\n".format(
29 task_statuses[task_state.previous_status],
30 task_statuses[task_state.new_status]))
31
32
33def task_state_changed_example():
34 with ClusterJobScheduler("localhost") as scheduler:
35 scheduler.connect()
36 job = scheduler.create_job()
37
38 job.add_task(OurTask())
39 job.task_state_changed = on_task_state_changed # add event handler
40
41 job.submit()
42 job.wait_until_done()
43
44
45if __name__ == "__main__":
46 task_state_changed_example()
Reporting the Progress of Tasks¶
Task Progress Updated¶
A task can specify a user defined progress value by calling set_progress()
on the task.
The reported progress values are not used in any way. For example, the progress parameter can range from 20 to 2500.
The job’s task_progress_updated
is called at 200 millisecond intervals
if the progress of the task is updated. If set_progress()
is called multiple times during a single 200 millisecond interval, only a single progress event will be sent.
The reported progress values are also dynamically shown in the monitoring applications. This can be very valuable when monitoring applications.
Below is a common pattern to print the complete progress of the job given the progress of each task:
Note
Notice in this code snippet update_progress()
is used to call the update function. This overload to
wait_until_done()
can be used to unblock from
wait_until_done()
to do something quickly.
For example, checking a canceling flag is also very common.
1from agiparallel.client import ClusterJobScheduler
2from agiparallel.infrastructure.TaskState import TaskState
3import random
4from time import sleep
5
6task_id_progress = {}
7
8
9class ReportProgressExampleTask:
10 def __init__(self):
11 self.name = "progress example task"
12 self.result = None
13 self.state = TaskState()
14
15 def execute(self):
16 # simulate sporadic progress
17 for i in range(95):
18 sleep(random.random() * 0.2)
19 self.set_progress(i)
20
21
22def task_completed(task):
23 task_id_progress[task.unique_id] = 100
24
25
26def job_task_progress_update(task_id, task_progress_info):
27 task_id_progress[task_id] = task_progress_info.progress
28
29
30def task_progress_example():
31 with ClusterJobScheduler("localhost") as scheduler:
32 scheduler.connect()
33 job = scheduler.create_job()
34
35 total_tasks = 12
36 for i in range(total_tasks):
37 job.add_task(ReportProgressExampleTask())
38
39 job.task_completed = task_completed
40 job.task_progress_updated = job_task_progress_update
41
42 job.submit()
43
44 def update_progress():
45 max_progress = total_tasks * 100
46 total_progress = 0
47 for progress in task_id_progress.values():
48 total_progress += progress
49
50 percent = total_progress * 100 / max_progress
51
52 if total_progress > 0:
53 print("Total job progress: {0:2.2f}%".format(percent))
54
55 job.wait_until_done(seconds_timeout=1000,
56 heartbeat_callback=update_progress,
57 seconds_heartbeat=.1)
58 update_progress() # one last call so it will show the 100%
59
60
61if __name__ == "__main__":
62 task_progress_example()