Control Agent that is Selected to Run Task¶
Problem¶
You want your task to run only on a specified list of machines.
Solution¶
Use task preconditions to control how a task is assigned. Many options can be controlled, including agent hostname, number of available cores,
and hard disk space.
Use the task_preconditions
method to add task preconditions to a job.
The CommonResources
enumeration contains the most frequently used
resource types.
1from agiparallel.client import ClusterJobScheduler
2from agiparallel.constants import CommonResources, Operator
3from socket import gethostname
4
5
6class Task:
7 def execute(self):
8 self.result = "Hello from " + gethostname()
9
10
11def control_agent_example():
12 with ClusterJobScheduler("localhost") as scheduler:
13 scheduler.connect()
14
15 agents = scheduler.get_coordinator_snapshot().agents
16 agent_machine_name = agents[0].machine_name
17
18 job = scheduler.create_job()
19 # restrict the task to be only run on the first agent machine
20 job.add_task_preconditions(resource=CommonResources.HOSTNAME,
21 operation=Operator.MEMBER_OF,
22 resource_value=[agent_machine_name])
23
24 # restrict the task to be run on machines that have at least 1 empty host slot
25 job.add_task_preconditions(resource=CommonResources.AVAILABLE_CORES,
26 operation=Operator.GREATER_THAN,
27 resource_value=0)
28
29 job.add_task(Task())
30
31 job.submit()
32 job.wait_until_done()
33
34 print("Result:", job.tasks[0].result)
35
36 # output should resemble:
37 # Result: Hello from MyComputerName
38
39
40if __name__ == "__main__":
41 control_agent_example()
Discussion¶
It is a common pattern to execute tasks only on a restricted set of machines. Examples include:
Forcing the task to be executed only on a predefined set of machines.
Reserving system resources on a machine; for example: only use 2 cores of a 4 core machine.
Executing a task only if it has a specific resource, for example: only use machines that have STK installed.
Executing a task only if it has enough memory or hard disk space.
A helpful method to use in combination with task_preconditions
is the
get_coordinator_snapshot()
.
get_coordinator_snapshot will return a list of agents, which can be used to determine currently available resources before tasks are submitted.