Key Concepts

Job, Task, and Task Environment are the three key components exposed for extension.

Overview

Job

A Job allows groups of tasks to be managed as a unit. A job consists of one or more tasks and possibly a task environment. Operations performed on a job affect all the tasks associated with the job. For example, setting the priority and requiring certain conditions to be met before a task can be executed.

A job must be submitted before it can have its work run on a job scheduler. Once a job is submitted, wait until all the tasks of the job are completed. None of the job options are mandatory.

Job Options

There are many options on a job. All of the properties are optional. Below is a table of all the options and their default values.

Property

Description

Default Value

name

Name of the job displayed in the monitoring applications.

Job #

description

Description of the job displayed in the monitoring applications.

None

priority

Priority of the job’s tasks. Higher priority tasks are selected to be executed before lower priority tasks.

Normal

task_preconditions

List of conditions that must be met by an agent machine before a task is assigned to it.

None

agent_selection_preference

The scheduling algorithm to use when selecting which agent should execute a task.

Default

cancel_on_client_disconnection

Whether to cancel all tasks if the client disconnects. Appropriate for interactive jobs.

False

cancel_on_task_failure

Whether to cancel all tasks if any task in the job fails. Appropriate if the result of the job depends on all the tasks completing successfully.

False

task_execution_timeout

The number of milliseconds for which the job’s tasks are allowed to run before the task is considered to have timed-out.

-1

max_interrupted_retry_attempts

Number of times the job’s tasks will be retried if the task gets interrupted.

5

Tips and Best Practices

  • There is a performance overhead when submitting jobs. If possible, packing more tasks per job is a way to decrease overhead.

Example

Here is an annotated example:

 1with ClusterJobScheduler("localhost") as client:
 2    client.connect()
 3    job = client.create_job()
 4
 5    # add one or more tasks
 6    job.add_task(SimpleTask(1, 1))
 7
 8    # set the task environment
 9    job.task_environment = MyEnvironment()
10
11    # set various job options
12    job.name = "my job name"
13    job.description = "my job description"
14    job.max_interrupted_retry_attempts = 0
15    job.priority = JobPriority.LOW
16    job.add_task_preconditions(CommonResources.AVAILABLE_CORES, Operator.GREATER_THAN, 2)
17
18    #submit task to the job scheduler
19    job.submit()
20
21    #wait for the results
22    job.wait_until_done()

Task

A task is a basic building block. The application executes programs contained within tasks. The most important components of a task are the execute() method and the result property.

Every task has an unique_id and a task number assigned to it. The unique id is created when the task is instantiated and the coordinator assigns the task number. Tasks can also be given names for easier identification.

Serialization

Tasks are serialized by the pickle module when submitted to a job scheduler. When a task is executed, a copy of the pickled object is deserialized and it’s execute() method is called. The developer must ensure the object can be serialized by pickle. For more information on Python Serialization, visit pickle documentation.

Note

When the task is deserialized by pickle, the task’s constructor will not be called.

Task Properties

Properties that can be set either before a task is submitted or when it is running:

Property

Description

Default Value

name

Name of the task displayed in the monitoring applications.

Task’s type name

result

Result of the task after task executes. Result objects must be able to be serialized. If the task encounters an error, the task’s result is the error message.

None

properties

Dictionary of properties returned after the task executes. The items in this collection can only be serializable types.

{ }

Read-only properties set by the infrastructure:

Property

Description

standard_output

The output of the process while it was executing the task.

standard_error

The error stream of the process while it was executing the task.

unique_id

Random GUID used to identify the task.

task_status

The current status of the task. This will be updated when its state changes in the job scheduler.

task_cancellation_message

If the task is canceled, this property gives a human readable reason why.

task_cancellation_reason

If the task is canceled, a TaskCancellationReason that explains why the task was canceled.

Task States

A task can transition through several different states during its lifetime. Task states are represented by the TaskStatus enum.

All tasks start at the NOT_SUBMITTED state. After the task is submitted, the state changes to the SUBMITTED state while the task sits in the coordinator queue. When a task is assigned to an agent for execution, its status changes to ASSIGNED and then to RUNNING once the Agent starts running the task. If no errors occur during execution of the task, the task status changes to COMPLETED. If an uncaught exception is encountered, it goes into the FAILED state. Below is a state transition diagram and a full table of task statuses.

TaskStatus

Possible task statuses:

Property

Description

Transition

NOT_SUBMITTED

Task is not submitted yet.

Always the initial state

SUBMITTED

Task is submitted but not assigned yet.

Always the state after the task is submitted

ASSIGNED

Task is assigned but not run yet.

Transition state. Next state is RUNNING

RUNNING

Task is currently running.

Expected

CANCELING

Task is in the process of canceling.

Transition

INTERRUPTED

Task encountered a system exception. Examples of system exceptions include if the agent disconnects or the host process exits unexpectedly.

End state

CANCELED

Task is canceled.

End state

ENVIRONMENT_ERROR

Task failed to run because of an uncaught exception in Task Environment setup.

End state

COMPLETED

Task completed successfully.

End state

TIMED_OUT

Task timed-out because it ran longer than the specified task_execution_timeout value.

End state

FAILED

Task failed because of an uncaught exception while running the task.

End state

Tips and Best Practices

  • How should tasks be split? It has been found that splitting the workload into more units can yield better performance. A common problem in parallel applications is that the workload is split more coarsely than is optimal. In some cases, the performance of the task becomes equal to the “time it takes for the slowest task to complete”. As with any performance tip, measure, measure, measure.

  • Design tasks so they are decoupled from the application code as much as possible. Try not to store application logic in the Task class. This will prove helpful when refactoring and testing code.

  • If certain fields on the task instance are not used, ignore or remove the fields to reduce the size of the object being sent.

Example

Here is an example of a simple task:

1class SimpleTask(object):
2    def __init__(self, first, second):
3        self.a = first
4        self.b = second
5
6    def execute(self):
7        self.result = self.a + self.b

Task Environment

A task environment is used to provide a single set of functions performed for each host once prior to executing any of the tasks associated with the task environment and once before the host is recycled.

Although not strictly required, using a task environment is crucial to optimizing many cases where it takes time to do a common operation. A very common scenario for using the task environment is to perform some sort of expensive operation before a task executes. Examples include loading required data, setting up and running an application, or simply starting the host environment.

Note

When a task environment is not specified, a default task environment, common for all tasks in the job, is used.

Task Environment Lifetime

A task environment is set up only once per host process. After a process sets up a task environment, the process remains idle until a task specifying the task environment is assigned to the process for execution. In other words, a host runs one task environment and multiple tasks. After each task is executed, the host checks whether any of the task environment’s recycle_settings conditions have been satisfied. If any one of the conditions have been met, the process is recycled. The Agent Tray Application exposes a user interface for configuring an agent’s host recycle settings. The values set in the UI will be overridden if a submitted job’s TaskEnvironment’s recycle_settings have been set programmatically. A task environment can also be recycled if the agent needs to free up a process to run an incoming task that has a different task environment.

Identification

To identify whether two task environment references are equal, a few properties of the task environment are checked. A task environment is uniquely identified via a combination of its unique_id and the additional_id properties. Two task environment identifications are equal if and only if both id and additional_id properties are equal. Here are a few examples to demonstrate. Below is code for a number of task environments:

 1class TaskEnvironmentA(TaskEnvironment):
 2    def __init__(self):
 3        super().__init__()
 4        self.unique_id = uuid.UUID("94CFE45C-5EA9-4592-BB64-B83A5E72DB77")
 5
 6
 7class TaskEnvironmentB(TaskEnvironment):
 8    def __init__(self):
 9        super().__init__()
10        self.unique_id = uuid.UUID("94CFE45C-5EA9-4592-BB64-B83A5E72DB77")
11
12
13class TaskEnvironmentC(TaskEnvironment):
14    def __init__(self):
15        super().__init__()
16        self.unique_id = uuid.UUID("94CFE45C-5EA9-4592-BB64-B83A5E72DB77")
17        self.additional_id = "ham"
18
19
20class TaskEnvironmentD(TaskEnvironment):
21    def __init__(self):
22        super().__init__()
23        self.additional_id = "ham"
24
25
26class TaskEnvironmentE(TaskEnvironment):
27    def __init__(self):
28        super().__init__()
29        self.additional_id = "ham"
30
31
32class TaskEnvironmentF(TaskEnvironment):
33    def __init__(self):
34        super().__init__()
35        self.unique_id = uuid.UUID("B766B6B0-232A-4878-A7CF-82F354E117D4")

Imagine there is a hypothetical method called task_environmment_is_equal, which compares whether two task environments have the same identification reference. The results are below with the explanations of their results in the comments.

 1# Result: True
 2# Reason: Both unique_id and additional_id properties are the same
 3task_environment_is_equal(TaskEnvironmentA(), TaskEnvironmentB())
 4
 5# Result: False
 6# Reason: unique_id properties are not the same
 7task_environment_is_equal(TaskEnvironmentA(), TaskEnvironmentF())
 8
 9# Result: False
10# Reason: unique_id properties are not the same, even though additional_id properties are the same
11task_environment_is_equal(TaskEnvironmentA(), TaskEnvironmentC())
12
13# Result: False
14# Reason: additional_id properties are the same, but the unique_id is by default random
15task_environment_is_equal(TaskEnvironmentD(), TaskEnvironmentE())

Serialization

As with Tasks, the pickle serializer must be able to serialize user defined TaskEnvironments.

Properties

Property

Description

Default value

unique_id

One of the ways to determine if two task environment references are equal. See the Identification section

Unique guid generated from uuid.uuid4()

additional_id

One of the ways to determine if two task environment references are equal. See the Identification section

None

host_architecture

Whether to host the task in a 32 bit or 64 bit process. Any indicates that the task can execute either on a 32 bit or a 64 bit processes.

x64 if the client application is 64 bit. Otherwise, x32 if the client application is 32 bit.

host_recycle_settings

The strategies to use for determining when host processes get recycled (shutdown before another is started).

If not specified, the default HostRecycleSettings constructor sets an infinite idle timeout and unspecified fixed number of tasks.

Example

Here is a simple example:

1class MyEnvironment(object):
2    def setup(self):
3        pass
4
5    def teardown(self):
6        pass

See Also

Reference

Other Resources