Constrain the Amount of Resources Required for a Task to be Scheduled

Problem

You need fine grain control over the amount of available resources necessary for a task to execute.

Solution

On the task, assign requested_task_resources to a dictionary where the keys are the resources to be configured and the values are the amount of the corresponding resource. There are currently three resources that can be configured for consumption: Cores, Memory, and Estimated Memory Budget. These can be set by specifying CORES, MEMORY, ESTIMATED_MEMORY_BUDGET.

task.requested_task_resources = {ConsumableResources.CORES: 2}

You can get the task’s allocated resources by accessing the allocated_task_resources dictionary property of the task_state. The following examples check the allocated resources once the task status reaches the ASSIGNED state.

cores_allocated = task_state.allocated_task_resources[ConsumableResources.CORES]

Below is a code example that demonstrates how to configure a task to consume 2 cores.

 1from agiparallel.client import ClusterJobScheduler
 2from agiparallel.constants import ConsumableResources, TaskStatus
 3
 4
 5class Task:
 6    def execute(self):
 7        pass
 8
 9
10def on_core_task_state_changed(task_status):
11    # print the number of cores allocated
12    if task_status.task_state.status == TaskStatus.ASSIGNED:   # so it only prints once
13        print("From Task -> Cores Allocated: ",
14              task_status.task_state.allocated_task_resources[ConsumableResources.CORES])
15
16
17def control_cores_example():
18    with ClusterJobScheduler("localhost") as scheduler:
19        scheduler.connect()
20        job = scheduler.create_job()
21        task = Task()
22
23        # request 2 cores for the task
24        print("Requesting 2 cores.")
25        task.requested_task_resources = {ConsumableResources.CORES: 2}
26
27        job.task_state_changed = on_core_task_state_changed
28        job.add_task(task)
29
30        job.submit()
31        job.wait_until_done()
32
33
34if __name__ == "__main__":
35    control_cores_example()

Below is another code example that demonstrates how to configure a task to consume a specific amount of the estimated memory budget.

 1from agiparallel.client import ClusterJobScheduler
 2from agiparallel.constants import ConsumableResources, TaskStatus
 3
 4
 5class EmptyTask:
 6    def execute(self):
 7        # Even though the body of the execute() function is empty, the agent's
 8        # available estimated memory budget must be large enough to schedule a
 9        # task with a non-zero value of the ESTIMATED_MEMORY_BUDGET
10        pass
11
12
13def on_memory_task_state_changed(task_status):
14    # print the amount of memory allocated
15    if task_status.task_state.status == TaskStatus.ASSIGNED:  # so it only prints once
16        print("From", task_status.task.name, "-> Memory Allocated: ",
17              task_status.task_state.allocated_task_resources[ConsumableResources.ESTIMATED_MEMORY_BUDGET])
18
19
20def control_memory_example():
21    with ClusterJobScheduler("localhost") as scheduler:
22        scheduler.connect()
23        job = scheduler.create_job()
24
25        job.task_state_changed = on_memory_task_state_changed
26
27        for i in range(1, 4):
28            task = EmptyTask()
29            # configure task to use i*3 GB (3072 MB) of the estimated memory budget
30            print("Requesting", i * 3072, "MB for Task", i)
31            task.name = "Task " + str(i)
32            task.requested_task_resources = {ConsumableResources.ESTIMATED_MEMORY_BUDGET: i * 3072}
33            job.add_task(task)
34
35        job.submit()
36        job.wait_until_done()
37
38        # If the machine has 6GB of memory and the two tasks are
39        # estimated to require 6GB and 3GB of memory, the task estimated
40        # to require 6GB memory won't run until the task estimated to
41        # require 3GB memory completes.
42
43
44if __name__ == "__main__":
45    control_memory_example()

Discussion

When a task executes, what is happening internally is that the task is consuming a resource. The most common of these resources is “Cores”. By default a task consumes one resource of type “Cores” on an agent. On a machine with eight cores, an agent can run eight concurrent tasks at a time until its “Cores” resource is exhausted. The amount of resources a task can consume can be optionally specified. For instance, if each task is configured to consume at least two cores, the eight core machine can only run four of these tasks concurrently. When a task is finished, then it will release the resources that it was allocated back into the pool of available resources.

The resource “Memory” has a limited number of useful applications. This resource will check the current available memory of the current machine at the time a task is attempting to be executed. Imagine a machine with 8GB of memory and 8 cores. If you attempt to submit four tasks each with a minimum memory set at 4GB, even if two tasks are already executing, as long as there is 4GB of available memory, the other tasks can begin execution. In this case, the number of tasks running is not being constrained by the memory resource properly. An example of a practical use for this resource is if you have a cluster of machines and the task’s is_exclusive flag is True for the job so that only one task may run at a time per machine. After finishing executing a task, a check will be made to ensure there is enough free memory on the machine to execute the next task.

The third type of resource is the “Estimated Memory Usage”. This resource will simulate allocation of memory for tasks, without actually checking how much physical memory each task consumes as it executes. The total memory budget is set for the agent with the default value equal to the number of MB of physical memory available on the machine. The estimated memory usage is then set for each task and subtracted from the agent’s total memory budget as the task executes. If a task’s estimated memory usage is less than the agent’s available budget, it will run. If a task’s estimated memory usage is greater than the available budget, it will wait to be executed until there is enough memory available. The default estimated memory usage for a task is zero. Imagine a task believed to consume 2GB of memory. On an agent with a estimated memory budget of 6GB and eight cores, the agent would only execute three of these tasks concurrently to prevent the agent from swapping massive amounts of memory. In this case, configure the task’s estimated memory usage to 2GB.

Note that tasks are not restricted from only using the resources allocated to them. That is, a task with two resources of type “Cores” can spawn as many threads as it wants. The task will not be stopped from over consuming its declared resources. Efficient use of resource consumption requires that tasks are coded to be honest and only use the resources they are allocated.

See Also

Reference