Control Task Concurrency On Agent¶
Problem¶
You have to integrate code that is already parallelized and you need to control how many instances of a task the agent runs.
Solution¶
Set the is_exclusive
property on a task to specify only one instance of a task should run concurrently.
wait_until_done()
1from agiparallel.client import ClusterJobScheduler
2from agiparallel.constants import TaskProperties
3from time import sleep
4from datetime import datetime
5from socket import gethostname
6
7
8class ExclusiveTask:
9 def execute(self):
10 sleep(5)
11 self.result = gethostname()
12
13
14def task_concurrency_example():
15 with ClusterJobScheduler("localhost") as scheduler:
16 scheduler.connect()
17 job = scheduler.create_job()
18
19 for i in range(3):
20 task = ExclusiveTask()
21 task.name = "Task #" + str(i + 1)
22 task.is_exclusive = True
23 job.add_task(task)
24
25 job.submit()
26 job.wait_until_done()
27
28 for task in job.tasks:
29 start = task.properties[TaskProperties.HOST_START_TIME]
30 end = task.properties[TaskProperties.HOST_END_TIME]
31 print(task.name + " ran on", task.result, "from", start, "to", end)
32
33 # Example output:
34 # Task #1 ran on MyComputerName from 2019-10-08 09:50:31.782120 to 2019-10-08 09:50:36.782179
35 # Task #2 ran on MyComputerName from 2019-10-08 09:50:36.810140 to 2019-10-08 09:50:41.811087
36 # Task #3 ran on MyComputerName from 2019-10-08 09:50:41.835080 to 2019-10-08 09:50:46.835749
37
38
39if __name__ == "__main__":
40 task_concurrency_example()
Discussion¶
In cases where code has already been parallelized and will automatically use all the cores on a single machine,
it would be bad for performance if the tasks were run in parallel. In these situations, execute only one instance of the task on each machine at a time.
That is, if 8 cores are available on two machines, run only one host at a time on each of the two machines. To do this,
set the is_exclusive
property to True
.
Once the “exclusive” task completes, the agent goes back to the normal state and its cores are available to the cluster again.