Working With Task Results

Working with the data sent back from a task is an important element of any application.

Overview

Task Status

The task status describes whether the task completed successfully or encountered errors. If a task completed without throwing an uncaught exception, its status will be COMPLETED. For details on what each status code means take a look at the TaskStatus reference.

if (task.task_status == TaskStatus.COMPLETED):
    # Task ran without throwing an uncaught exception.
    # This is the only state where your Result object is guaranteed to be your user defined result.
    print(task.properties)
    print("Result: " + str(task.result))

Result Property

The task result property is the main mechanism for passing data back after a task completes. To use it, set the task.result property at some point during the execution of the task. For example:

class OurTask():
    def execute(self):
        self.result = "Hello from task"

Task results can also be user defined. The only requirement for the result object is that it must be able to be serialized by the pickle serializer. This is an example of a user-defined task.

from socket import gethostname


class SimpleTask:
    def execute(self):
        name = gethostname()
        self.result = MyTaskResult(name)


# the result object must be serializable
class MyTaskResult:
    def __init__(self, machine_name):
        self.time = datetime.now()
        self.machine_name = machine_name

After a task has completed, the client gets a copy of the result object back. To use it, access the result property of the task.

result = job.tasks[0].result
print(type(result))  # should be <class 'MyTaskResult'>
print("Time = ", result.time)
print("Machine Name = ", result.machine_name)

Warning

The result property may be an Exception type if there was an uncaught exception while executing the task. For more information, see the Exceptions and Error Handling section below.

StandardOutput and StandardError

At times, the easiest way to pass data back is through the standard_output and standard_error properties. The only thing the task code needs to do is call print() and/or raise(). For example:

class WritesToStandardOutput:
    def execute(self):
        print("Current time in task is: ", datetime.now())
        print("Agent machine name: ", gethostname())

The standard output and standard error is sent back to the client as a string. Access it like so:

print("Standard output: \n" + job.tasks[0].standard_output)

Note

The results of standard output are also displayed in the monitoring applications such as the Task Monitor. Looking at the standard output from the GUI is a convenient way to debug your application.

ConsoleOutputAppearsInMonitoringApps.png

Note

In the event of an unhandled exception, the Exception will be written as a string to standard error on the task’s behalf.

Task Properties

Task properties are yet another way to return data back from a task to the client. Task properties are stored internally as a hash table and can store user objects such as strings and integers. A task’s properties object is returned back to the client after the task completes. Depending on an application’s needs, task properties may be more convenient to update than the task’s result property.

One of the main advantages of properties is that they are guaranteed to be returned to the client even if the task fails. This is not true with the task result. For instance, this example sets both the result and properties and then simulates an exception:

class PropertiesExampleTask():
    def execute(self):
        self.set_property("Property1", 100)
        self.set_property("Property2", "one hundred")
        self.result = "This won't get back to the scheduler"

        raise Exception("Task failure")

On the client side, the result property is an Exception while the properties object returned regardless.

# after the task failed
print("Task status = ", job.tasks[0].task_status)

# our result property is an exception
print("Exception is " + job.tasks[0].result)

# we can still get the properties
print("Property1: " + job.tasks[0].properties["Property1"])
print("Property2: " + job.tasks[0].properties["Property2"])

Aside from a guaranteed return after an exception, task properties should only be used if they are more convenient to use than the result object.

Warning

Unlike task.result, there are only a small set of supported data types. Use only string, int, float, and byte[].

Exceptions and Error Handling

Tasks can fail or not complete successfully for a number of reasons. The most common reason for a failure is an uncaught exception in a tasks’s code. Fortunately, the task’s exception is handled and information is returned back to the client for error handling.

Task Failure

If a task fails, the result property of the task will be the Exception object thrown in the task. The TaskStatus will be set to FAILED. Also, the standard_error will include the stack trace of the exception. Here is an example that checks for task failure and inspects the exception.

if job.tasks[0].task_status == TaskStatus.FAILED or job.tasks[0].task_status == TaskStatus.ENVIRONMENT_ERROR:
    print("Exception message is: ", job.tasks[0].result)
    print("StandardError also has the stack trace: ", job.tasks[0].standard_error)

Note

As with most objects, the exception object must be serializable. If an exception object is not serializable, a generic exception object will be created that includes the error message of the original exception.

Task Environment Failure

Similar to a task failure, when a task environment fails the task’s result is the uncaught exception that was thrown. The TaskStatus is set to ENVIRONMENT_ERROR. A task can only run after a task environment is sucessfully setup, so if a task environment fails to setup, the task does not run.

Task Cancellation

If a task gets canceled, information on why it was canceled is available. Check the task’s task_cancellation_message and the task_cancellation_reason. Usually, some useful information will be provided in the task_cancellation_message; this normally includes an indication of the user who canceled the job.

if job.tasks[0].task_status == TaskStatus.CANCELED:
    print(job.tasks[0].task_cancellation_message)

Putting it all together

Below is a code snippet that synthesizes the different ways data can be sent back to the client from a task, as described in this section. The following is a common pattern showing how each of the task statuses is handled after a task completes. Take note that during development, when code is not as well tested and errors are more likely to occur, the cases where a task could fail should always be handled.

 1from agiparallel.client import *
 2from agiparallel.infrastructure import *
 3from agiparallel.constants import TaskStatus, TaskCancellationReason
 4
 5
 6class SimpleTask(object):
 7    def execute(self):
 8        raise Exception
 9
10
11if __name__ == "__main__":
12    with ClusterJobScheduler("localhost") as client:
13        client.connect()
14        job = client.create_job()
15        # add one or more tasks
16        job.add_task(SimpleTask())
17        # set the task environment
18        job.submit()
19        job.wait_until_done()
20        for task in job.tasks:
21            # After the task completes...
22            if (task.task_status == TaskStatus.COMPLETED):
23                # Task ran without throwing an uncaught exception.
24                # This is the only state where your Result object is guaranteed to be your user defined result.
25                print(task.properties)
26                print("Result: " + str(task.result))
27
28            elif (task.task_status == TaskStatus.FAILED):
29                # Task.Execute threw an uncaught exception.
30                # The result object is the exception that was not caught.
31                print("Task {0} threw an exception during Task.Execute".format(str(task.name) if hasattr(task, "name") else ""))
32                print("Exception: " + str(task.result))
33                print("StandardError with callstack: " + str(task.standard_error))
34
35            elif (task.task_status == TaskStatus.ENVIRONMENT_ERROR):
36                # TaskEnvironment.Setup threw an uncaught exception.
37                # The result object is the exception that was not caught.
38                print("TaskEnvironment {0} threw an exception during TaskEnvironment.Setup".format(job.task_environment.name))
39                print("Exception: " + str(task.result))
40                print("StandardError with callstack: " + str(task.standard_error))
41
42            elif (task.task_status == TaskStatus.TIMED_OUT):
43                # Task execution exceeded the value specified in Job.TaskExecutionTimeout
44                # This should only be handled if you specify a value for Job.TaskExecutionTimeout
45                print("Task execution exceeded the specified execution timeout")
46                print("TaskExecutionTimeout was " + str(job.task_execution_timeout))
47
48            elif (task.task_status == TaskStatus.CANCELED):
49                # Task was canceled with Job.Cancel or by the user (for instance, the UI).
50                # This should usually be handled if it is possible that users can cancel your task.
51                print("Task was canceled")
52                if (task.task_cancellation_reason == TaskCancellationReason.DIRECT):
53                    print("User who canceled the task was: " + str(task.task_cancellation_message))
54            else:
55                # Any other state is not a final state, that is the task is still transitioning.
56                continue

See Also

Reference

Other Resources