Cancel Job And All Its Child Tasks

Problem

You want to cancel a job.

Solution

There are two ways to cancel tasks on a job.

Method 1: Set the cancel_on_task_failure property to True to cancel a job’s tasks automatically if one of the tasks fails. Use this option if the result of a job depends on all its child tasks completing successfully.

 1from agiparallel.client import *
 2from agiparallel.infrastructure import *
 3from agiparallel.constants import TaskStatus
 4
 5
 6class WaitTask(object):
 7    def execute(self):
 8        from time import sleep
 9        for i in range(0, 100):
10            sleep(.01)
11
12
13class FailureTask(object):
14    def execute(self):
15        from time import sleep
16        sleep(.1)
17        raise Exception("Failure")
18
19
20if __name__ == "__main__":
21    with ClusterJobScheduler("localhost") as client:
22        client.connect()
23
24        job = client.create_job()
25        job.cancel_on_task_failure = True
26
27        for i in range(0, 100):
28            job.add_task(WaitTask())
29
30        # This task will fail. Watch as the other tasks are canceled.
31        job.add_task(FailureTask())
32
33        job.submit()
34        job.wait_until_done()
35
36        for task in job.tasks:
37            print(f'Task with ID: {task.unique_id} has status {TaskStatus(task.task_status).name}')

Method 2: The other option is to explicitly cancel the job using the cancel() method.

Note

By default, the client will not wait until the task is fully canceled. To wait for the job to complete, use the overload to cancel() and pass True to the listen_for_cancellation_event parameter.

 1from agiparallel.client import *
 2from agiparallel.infrastructure import *
 3
 4
 5class WaitTask(object):
 6    def execute(self):
 7        from time import sleep
 8        for i in range(0, 100):
 9            sleep(.1)
10
11
12if __name__ == "__main__":
13    with ClusterJobScheduler("localhost") as client:
14        client.connect()
15        job = client.create_job()
16
17        for i in range(0, 100):
18            job.add_task(WaitTask())
19
20        job.submit()
21
22        from time import sleep
23        sleep(10)
24
25        job.cancel()
26        job.wait_until_done()
27        

Discussion

Within a task, check the is_cancelling flag. If is_cancelling is True, try to exit out of the task as soon as possible. Each task is given 30 seconds to gracefully finish before the process is terminated. This value can be configured by setting GRACEFUL_CANCELLATION_TIMEOUT.

See Also

Reference