Key Concepts¶
Job, Task, and Task Environment are the three key components exposed for extension.
Overview¶
Job¶
A Job allows groups of tasks to be managed as a unit. A job consists of one or more tasks and possibly a task environment. Operations performed on a job affect all the tasks associated with the job. For example, setting the priority and requiring certain conditions to be met before a task can be executed.
A job must be submitted before it can have its work run on a job scheduler. Once a job is submitted, wait until all the tasks of the job are completed. None of the job options are mandatory.
Job Options¶
There are many options on a job. All of the properties are optional. Below is a table of all the options and their default values.
Property |
Description |
Default Value |
---|---|---|
Name of the job displayed in the monitoring applications. |
Job # |
|
Description of the job displayed in the monitoring applications. |
|
|
Priority of the job’s tasks. Higher priority tasks are selected to be executed before lower priority tasks. |
||
List of conditions that must be met by an agent machine before a task is assigned to it. |
|
|
The scheduling algorithm to use when selecting which agent should execute a task. |
||
Whether to cancel all tasks if the client disconnects. Appropriate for interactive jobs. |
|
|
Whether to cancel all tasks if any task in the job fails. Appropriate if the result of the job depends on all the tasks completing successfully. |
|
|
The number of milliseconds for which the job’s tasks are allowed to run before the task is considered to have timed-out. |
-1 |
|
Number of times the job’s tasks will be retried if the task gets interrupted. |
5 |
Tips and Best Practices¶
There is a performance overhead when submitting jobs. If possible, packing more tasks per job is a way to decrease overhead.
Example¶
Here is an annotated example:
1with ClusterJobScheduler("localhost") as client:
2 client.connect()
3 job = client.create_job()
4
5 # add one or more tasks
6 job.add_task(SimpleTask(1, 1))
7
8 # set the task environment
9 job.task_environment = MyEnvironment()
10
11 # set various job options
12 job.name = "my job name"
13 job.description = "my job description"
14 job.max_interrupted_retry_attempts = 0
15 job.priority = JobPriority.LOW
16 job.add_task_preconditions(CommonResources.AVAILABLE_CORES, Operator.GREATER_THAN, 2)
17
18 #submit task to the job scheduler
19 job.submit()
20
21 #wait for the results
22 job.wait_until_done()
Task¶
A task is a basic building block. The application executes programs contained within tasks. The most important components of a task
are the execute()
method and the result
property.
Every task has an unique_id
and a task number assigned to it. The unique id is created when the task is instantiated and the coordinator assigns the task number.
Tasks can also be given names for easier identification.
Serialization¶
Tasks are serialized by the pickle module when submitted to a job scheduler. When a task is executed, a copy of the pickled object
is deserialized and it’s execute()
method is called. The developer must ensure the
object can be serialized by pickle. For more information on Python Serialization, visit pickle documentation.
Note
When the task is deserialized by pickle, the task’s constructor will not be called.
Task Properties¶
Properties that can be set either before a task is submitted or when it is running:
Property |
Description |
Default Value |
---|---|---|
|
Name of the task displayed in the monitoring applications. |
Task’s type name |
|
Result of the task after task executes. Result objects must be able to be serialized. If the task encounters an error, the task’s result is the error message. |
|
|
Dictionary of properties returned after the task executes. The items in this collection can only be serializable types. |
{ } |
Read-only properties set by the infrastructure:
Property |
Description |
---|---|
|
The output of the process while it was executing the task. |
|
The error stream of the process while it was executing the task. |
|
Random GUID used to identify the task. |
|
The current status of the task. This will be updated when its state changes in the job scheduler. |
|
If the task is canceled, this property gives a human readable reason why. |
|
If the task is canceled, a |
Task States¶
A task can transition through several different states during its lifetime. Task
states are represented by the TaskStatus
enum.
All tasks start at the NOT_SUBMITTED
state. After
the task is submitted, the state changes to the SUBMITTED
state while
the task sits in the coordinator queue. When a task is assigned to an agent for execution,
its status changes to ASSIGNED
and then to RUNNING
once the Agent starts running the task. If no errors occur during execution of the task, the task status changes to
COMPLETED
. If an uncaught exception is encountered, it goes into the FAILED
state.
Below is a state transition diagram and a full table of task statuses.
Possible task statuses:
Property |
Description |
Transition |
---|---|---|
Task is not submitted yet. |
Always the initial state |
|
Task is submitted but not assigned yet. |
Always the state after the task is submitted |
|
Task is assigned but not run yet. |
Transition state. Next state is RUNNING |
|
Task is currently running. |
Expected |
|
Task is in the process of canceling. |
Transition |
|
Task encountered a system exception. Examples of system exceptions include if the agent disconnects or the host process exits unexpectedly. |
End state |
|
Task is canceled. |
End state |
|
Task failed to run because of an uncaught exception in Task Environment setup. |
End state |
|
Task completed successfully. |
End state |
|
Task timed-out because it ran longer than the specified |
End state |
|
Task failed because of an uncaught exception while running the task. |
End state |
Tips and Best Practices¶
How should tasks be split? It has been found that splitting the workload into more units can yield better performance. A common problem in parallel applications is that the workload is split more coarsely than is optimal. In some cases, the performance of the task becomes equal to the “time it takes for the slowest task to complete”. As with any performance tip, measure, measure, measure.
Design tasks so they are decoupled from the application code as much as possible. Try not to store application logic in the Task class. This will prove helpful when refactoring and testing code.
If certain fields on the task instance are not used, ignore or remove the fields to reduce the size of the object being sent.
Example¶
Here is an example of a simple task:
1class SimpleTask(object):
2 def __init__(self, first, second):
3 self.a = first
4 self.b = second
5
6 def execute(self):
7 self.result = self.a + self.b
Task Environment¶
A task environment is used to provide a single set of functions performed for each host once prior to executing any of the tasks associated with the task environment and once before the host is recycled.
Although not strictly required, using a task environment is crucial to optimizing many cases where it takes time to do a common operation. A very common scenario for using the task environment is to perform some sort of expensive operation before a task executes. Examples include loading required data, setting up and running an application, or simply starting the host environment.
Note
When a task environment is not specified, a default task environment, common for all tasks in the job, is used.
Task Environment Lifetime¶
A task environment is set up only once per host process. After a process sets up a task environment,
the process remains idle until a task specifying the task environment is assigned to the process for execution. In other words,
a host runs one task environment and multiple tasks. After each task is executed, the host checks whether any of the task environment’s
recycle_settings
conditions have been
satisfied. If any one of the conditions have been met, the process is recycled. The Agent Tray Application exposes
a user interface for configuring an agent’s host recycle settings. The values set in the UI will be overridden if a submitted job’s
TaskEnvironment
’s
recycle_settings
have been set programmatically.
A task environment can also be recycled if the agent needs to free up a process to run an incoming task that has a different task environment.
Identification¶
To identify whether two task environment references are equal, a few properties of the task environment are checked.
A task environment is uniquely identified via a combination of its
unique_id
and
the additional_id
properties.
Two task environment identifications are equal if and only if both id and additional_id properties are equal.
Here are a few examples to demonstrate. Below is code for a number of task environments:
1class TaskEnvironmentA(TaskEnvironment):
2 def __init__(self):
3 super().__init__()
4 self.unique_id = uuid.UUID("94CFE45C-5EA9-4592-BB64-B83A5E72DB77")
5
6
7class TaskEnvironmentB(TaskEnvironment):
8 def __init__(self):
9 super().__init__()
10 self.unique_id = uuid.UUID("94CFE45C-5EA9-4592-BB64-B83A5E72DB77")
11
12
13class TaskEnvironmentC(TaskEnvironment):
14 def __init__(self):
15 super().__init__()
16 self.unique_id = uuid.UUID("94CFE45C-5EA9-4592-BB64-B83A5E72DB77")
17 self.additional_id = "ham"
18
19
20class TaskEnvironmentD(TaskEnvironment):
21 def __init__(self):
22 super().__init__()
23 self.additional_id = "ham"
24
25
26class TaskEnvironmentE(TaskEnvironment):
27 def __init__(self):
28 super().__init__()
29 self.additional_id = "ham"
30
31
32class TaskEnvironmentF(TaskEnvironment):
33 def __init__(self):
34 super().__init__()
35 self.unique_id = uuid.UUID("B766B6B0-232A-4878-A7CF-82F354E117D4")
Imagine there is a hypothetical method called task_environmment_is_equal, which compares whether two task environments have the same identification reference. The results are below with the explanations of their results in the comments.
1# Result: True
2# Reason: Both unique_id and additional_id properties are the same
3task_environment_is_equal(TaskEnvironmentA(), TaskEnvironmentB())
4
5# Result: False
6# Reason: unique_id properties are not the same
7task_environment_is_equal(TaskEnvironmentA(), TaskEnvironmentF())
8
9# Result: False
10# Reason: unique_id properties are not the same, even though additional_id properties are the same
11task_environment_is_equal(TaskEnvironmentA(), TaskEnvironmentC())
12
13# Result: False
14# Reason: additional_id properties are the same, but the unique_id is by default random
15task_environment_is_equal(TaskEnvironmentD(), TaskEnvironmentE())
Serialization¶
As with Tasks, the pickle serializer must be able to serialize user defined TaskEnvironments.
Properties¶
Property |
Description |
Default value |
---|---|---|
One of the ways to determine if two task environment references are equal. See the Identification section |
Unique guid generated from |
|
|
One of the ways to determine if two task environment references are equal. See the Identification section |
|
Whether to host the task in a 32 bit or 64 bit process. |
|
|
The strategies to use for determining when host processes get recycled (shutdown before another is started). |
If not specified, the default |
Example¶
Here is a simple example:
1class MyEnvironment(object):
2 def setup(self):
3 pass
4
5 def teardown(self):
6 pass