Monitoring Jobs

The API provides events that are called after key state changes occur in the job scheduler. Examples include when a task changes state, when a task completes, and when a task reports new progress in cases where data is progressively returned back to the client during long running jobs.

Overview

Responding to Task State Changes

Every time a task changes state, the job scheduler notifies the client with an event. A task’s result can be processed immediately upon task completion.

Task Completed

The job.task_completed and task.on_completed events are raised when a task completes. Once a task completes, its results can be processed immediately.

 1from agiparallel.client import ClusterJobScheduler
 2from agiparallel.constants import TaskStatus
 3
 4
 5task_statuses = {TaskStatus.FAILED: "Failed",
 6                 TaskStatus.TIMED_OUT: "Timed out",
 7                 TaskStatus.COMPLETED: "Completed",
 8                 TaskStatus.ENVIRONMENT_ERROR: "Environment Error",
 9                 TaskStatus.CANCELED: "Cancelled",
10                 TaskStatus.INTERRUPTED: "Interrupted",
11                 TaskStatus.CANCELING: "Cancelling",
12                 TaskStatus.RUNNING: "Running",
13                 TaskStatus.ASSIGNED: "Assigned",
14                 TaskStatus.SUBMITTED: "Submitted",
15                 TaskStatus.NOT_SUBMITTED: "Not Submitted"}
16
17
18class Method1Task:
19    def __init__(self):
20        self.name = "Task 1"
21
22    def execute(self):
23        pass
24
25
26class Method2Task:
27    def __init__(self):
28        self.name = "Task 2"
29
30    def execute(self):
31        pass
32
33    def on_completed(self):
34        print("\tTaskEvent:", self.name, "completed.")
35        print("\tTask Result:", self.result)
36
37
38def on_task_completed(task):
39    print("\tJobEvent:", task.name, "completed.")
40    print("\tTask status: ", task_statuses[task.task_status])
41    print("\tTask Result: ", task.result)
42
43
44def task_completed_example():
45    with ClusterJobScheduler("localhost") as scheduler:
46        scheduler.connect()
47
48        print("- Method 1: use the task_completed property on the job")
49        job1 = scheduler.create_job()
50        job1.add_task(Method1Task())
51        job1.task_completed = on_task_completed  # set event handler
52        job1.submit()
53        job1.wait_until_done()
54
55        print("\n- Method 2: use the on_completed method on the task")
56        job2 = scheduler.create_job()
57        job2.add_task(Method2Task())
58        job2.submit()
59        job2.wait_until_done()
60
61
62if __name__ == "__main__":
63    task_completed_example()

Job Completed

job.job_completed is raised when all tasks from the job are completed.

 1from agiparallel.client import ClusterJobScheduler
 2
 3
 4class OurTask(object):
 5    def execute(self):
 6        pass
 7
 8
 9def on_job_completed(job):
10    print('Job "{0}" is finished'.format(job.name))
11
12
13def job_completed_example():
14    with ClusterJobScheduler("localhost") as scheduler:
15        scheduler.connect()
16        job = scheduler.create_job()
17
18        job.name = "our job"
19        job.add_task(OurTask())
20
21        job.job_completed = on_job_completed    # set event handler
22
23        job.submit()
24        job.wait_until_done()
25
26
27if __name__ == "__main__":
28    job_completed_example()

Task State Changed

job.task_state_changed is raised when a task changes state. For example, when a task changes its state from ASSIGNED to RUNNING.

 1from agiparallel.client import ClusterJobScheduler
 2from agiparallel.constants import TaskStatus
 3
 4task_statuses = {TaskStatus.FAILED: "Failed",
 5                 TaskStatus.TIMED_OUT: "Timed out",
 6                 TaskStatus.COMPLETED: "Completed",
 7                 TaskStatus.ENVIRONMENT_ERROR: "Environment Error",
 8                 TaskStatus.CANCELED: "Cancelled",
 9                 TaskStatus.INTERRUPTED: "Interrupted",
10                 TaskStatus.CANCELING: "Cancelling",
11                 TaskStatus.RUNNING: "Running",
12                 TaskStatus.ASSIGNED: "Assigned",
13                 TaskStatus.SUBMITTED: "Submitted",
14                 TaskStatus.NOT_SUBMITTED: "Not Submitted"}
15
16
17class OurTask():
18    def __init__(self):
19        self.name = "our task"
20
21    def execute(self):
22        self.result = "hello from task"
23
24
25def on_task_state_changed(task_state):
26    print("Task name: ", task_state.task.name)
27    print("Task result: ", task_state.task.result)
28    print("Status changed from {0} to {1}\n".format(
29        task_statuses[task_state.previous_status],
30        task_statuses[task_state.new_status]))
31
32
33def task_state_changed_example():
34    with ClusterJobScheduler("localhost") as scheduler:
35        scheduler.connect()
36        job = scheduler.create_job()
37
38        job.add_task(OurTask())
39        job.task_state_changed = on_task_state_changed  # add event handler
40
41        job.submit()
42        job.wait_until_done()
43
44
45if __name__ == "__main__":
46    task_state_changed_example()

Reporting the Progress of Tasks

Task Progress Updated

A task can specify a user defined progress value by calling set_progress() on the task. The reported progress values are not used in any way. For example, the progress parameter can range from 20 to 2500. The job’s task_progress_updated is called at 200 millisecond intervals if the progress of the task is updated. If set_progress() is called multiple times during a single 200 millisecond interval, only a single progress event will be sent.

The reported progress values are also dynamically shown in the monitoring applications. This can be very valuable when monitoring applications.

Below is a common pattern to print the complete progress of the job given the progress of each task:

Note

Notice in this code snippet update_progress() is used to call the update function. This overload to wait_until_done() can be used to unblock from wait_until_done() to do something quickly. For example, checking a canceling flag is also very common.

 1from agiparallel.client import ClusterJobScheduler
 2from agiparallel.infrastructure.TaskState import TaskState
 3import random
 4from time import sleep
 5
 6task_id_progress = {}
 7
 8
 9class ReportProgressExampleTask:
10    def __init__(self):
11        self.name = "progress example task"
12        self.result = None
13        self.state = TaskState()
14
15    def execute(self):
16        # simulate sporadic progress
17        for i in range(95):
18            sleep(random.random() * 0.2)
19            self.set_progress(i)
20
21
22def task_completed(task):
23    task_id_progress[task.unique_id] = 100
24
25
26def job_task_progress_update(task_id, task_progress_info):
27    task_id_progress[task_id] = task_progress_info.progress
28
29
30def task_progress_example():
31    with ClusterJobScheduler("localhost") as scheduler:
32        scheduler.connect()
33        job = scheduler.create_job()
34
35        total_tasks = 12
36        for i in range(total_tasks):
37            job.add_task(ReportProgressExampleTask())
38
39        job.task_completed = task_completed
40        job.task_progress_updated = job_task_progress_update
41
42        job.submit()
43
44        def update_progress():
45            max_progress = total_tasks * 100
46            total_progress = 0
47            for progress in task_id_progress.values():
48                total_progress += progress
49
50            percent = total_progress * 100 / max_progress
51
52            if total_progress > 0:
53                print("Total job progress: {0:2.2f}%".format(percent))
54
55        job.wait_until_done(seconds_timeout=1000,
56                            heartbeat_callback=update_progress,
57                            seconds_heartbeat=.1)
58        update_progress()   # one last call so it will show the 100%
59
60
61if __name__ == "__main__":
62    task_progress_example()

See Also

Reference