Constrain the Amount of Resources Required for a Task to be Scheduled¶
Problem¶
You need fine grain control over the amount of available resources necessary for a task to execute.
Solution¶
On the task, assign requested_task_resources
to a dictionary where the keys are the resources
to be configured and the values are the amount of the corresponding resource. There are currently three resources that can be
configured for consumption: Cores, Memory, and Estimated Memory Budget. These can be set by specifying
CORES
,
MEMORY
,
ESTIMATED_MEMORY_BUDGET
.
task.requested_task_resources = {ConsumableResources.CORES: 2}
You can get the task’s allocated resources by accessing the allocated_task_resources
dictionary property of the task_state. The following examples check the allocated resources once the task
status reaches the ASSIGNED
state.
cores_allocated = task_state.allocated_task_resources[ConsumableResources.CORES]
Below is a code example that demonstrates how to configure a task to consume 2 cores.
1from agiparallel.client import ClusterJobScheduler
2from agiparallel.constants import ConsumableResources, TaskStatus
3
4
5class Task:
6 def execute(self):
7 pass
8
9
10def on_core_task_state_changed(task_status):
11 # print the number of cores allocated
12 if task_status.task_state.status == TaskStatus.ASSIGNED: # so it only prints once
13 print("From Task -> Cores Allocated: ",
14 task_status.task_state.allocated_task_resources[ConsumableResources.CORES])
15
16
17def control_cores_example():
18 with ClusterJobScheduler("localhost") as scheduler:
19 scheduler.connect()
20 job = scheduler.create_job()
21 task = Task()
22
23 # request 2 cores for the task
24 print("Requesting 2 cores.")
25 task.requested_task_resources = {ConsumableResources.CORES: 2}
26
27 job.task_state_changed = on_core_task_state_changed
28 job.add_task(task)
29
30 job.submit()
31 job.wait_until_done()
32
33
34if __name__ == "__main__":
35 control_cores_example()
Below is another code example that demonstrates how to configure a task to consume a specific amount of the estimated memory budget.
1from agiparallel.client import ClusterJobScheduler
2from agiparallel.constants import ConsumableResources, TaskStatus
3
4
5class EmptyTask:
6 def execute(self):
7 # Even though the body of the execute() function is empty, the agent's
8 # available estimated memory budget must be large enough to schedule a
9 # task with a non-zero value of the ESTIMATED_MEMORY_BUDGET
10 pass
11
12
13def on_memory_task_state_changed(task_status):
14 # print the amount of memory allocated
15 if task_status.task_state.status == TaskStatus.ASSIGNED: # so it only prints once
16 print("From", task_status.task.name, "-> Memory Allocated: ",
17 task_status.task_state.allocated_task_resources[ConsumableResources.ESTIMATED_MEMORY_BUDGET])
18
19
20def control_memory_example():
21 with ClusterJobScheduler("localhost") as scheduler:
22 scheduler.connect()
23 job = scheduler.create_job()
24
25 job.task_state_changed = on_memory_task_state_changed
26
27 for i in range(1, 4):
28 task = EmptyTask()
29 # configure task to use i*3 GB (3072 MB) of the estimated memory budget
30 print("Requesting", i * 3072, "MB for Task", i)
31 task.name = "Task " + str(i)
32 task.requested_task_resources = {ConsumableResources.ESTIMATED_MEMORY_BUDGET: i * 3072}
33 job.add_task(task)
34
35 job.submit()
36 job.wait_until_done()
37
38 # If the machine has 6GB of memory and the two tasks are
39 # estimated to require 6GB and 3GB of memory, the task estimated
40 # to require 6GB memory won't run until the task estimated to
41 # require 3GB memory completes.
42
43
44if __name__ == "__main__":
45 control_memory_example()
Discussion¶
When a task executes, what is happening internally is that the task is consuming a resource. The most common of these resources is “Cores”. By default a task consumes one resource of type “Cores” on an agent. On a machine with eight cores, an agent can run eight concurrent tasks at a time until its “Cores” resource is exhausted. The amount of resources a task can consume can be optionally specified. For instance, if each task is configured to consume at least two cores, the eight core machine can only run four of these tasks concurrently. When a task is finished, then it will release the resources that it was allocated back into the pool of available resources.
The resource “Memory” has a limited number of useful applications. This resource will check the current available memory of the current machine at
the time a task is attempting to be executed. Imagine a machine with 8GB of memory and 8 cores. If you attempt to submit four tasks each with a minimum memory set at 4GB, even
if two tasks are already executing, as long as there is 4GB of available memory, the other tasks can begin execution. In this case, the number of tasks running is not
being constrained by the memory resource properly. An example of a practical use for this resource is if you have a cluster of machines and the task’s is_exclusive
flag is True
for the job so that only one task may run at a time per machine. After finishing executing a task, a check will be made to ensure there is enough free
memory on the machine to execute the next task.
The third type of resource is the “Estimated Memory Usage”. This resource will simulate allocation of memory for tasks, without actually checking how much physical memory each task consumes as it executes. The total memory budget is set for the agent with the default value equal to the number of MB of physical memory available on the machine. The estimated memory usage is then set for each task and subtracted from the agent’s total memory budget as the task executes. If a task’s estimated memory usage is less than the agent’s available budget, it will run. If a task’s estimated memory usage is greater than the available budget, it will wait to be executed until there is enough memory available. The default estimated memory usage for a task is zero. Imagine a task believed to consume 2GB of memory. On an agent with a estimated memory budget of 6GB and eight cores, the agent would only execute three of these tasks concurrently to prevent the agent from swapping massive amounts of memory. In this case, configure the task’s estimated memory usage to 2GB.
Note that tasks are not restricted from only using the resources allocated to them. That is, a task with two resources of type “Cores” can spawn as many threads as it wants. The task will not be stopped from over consuming its declared resources. Efficient use of resource consumption requires that tasks are coded to be honest and only use the resources they are allocated.