Working With Task Results¶
Working with the data sent back from a task is an important element of any application.
Overview¶
Task Status¶
The task status describes whether the task completed
successfully or encountered errors. If a task completed without throwing an uncaught exception, its status
will be COMPLETED
. For details on what each
status code means take a look at the TaskStatus
reference.
if (task.task_status == TaskStatus.COMPLETED):
# Task ran without throwing an uncaught exception.
# This is the only state where your Result object is guaranteed to be your user defined result.
print(task.properties)
print("Result: " + str(task.result))
Result Property¶
The task result
property is the main mechanism
for passing data back after a task completes. To use it, set the
task.result
property at some point during the execution of the task. For example:
class OurTask():
def execute(self):
self.result = "Hello from task"
Task results can also be user defined. The only requirement for the result object is that it must be able to be serialized by the pickle serializer. This is an example of a user-defined task.
from socket import gethostname
class SimpleTask:
def execute(self):
name = gethostname()
self.result = MyTaskResult(name)
# the result object must be serializable
class MyTaskResult:
def __init__(self, machine_name):
self.time = datetime.now()
self.machine_name = machine_name
After a task has completed, the client gets a copy of the result object back. To use it, access the
result
property of the task.
result = job.tasks[0].result
print(type(result)) # should be <class 'MyTaskResult'>
print("Time = ", result.time)
print("Machine Name = ", result.machine_name)
Warning
The result property may be an Exception type if there was an uncaught exception while executing the task. For more information, see the Exceptions and Error Handling section below.
StandardOutput and StandardError¶
At times, the easiest way to pass data back is through the
standard_output
and
standard_error
properties. The only thing the task code needs to do is
call print()
and/or
raise()
. For example:
class WritesToStandardOutput:
def execute(self):
print("Current time in task is: ", datetime.now())
print("Agent machine name: ", gethostname())
The standard output and standard error is sent back to the client as a string. Access it like so:
print("Standard output: \n" + job.tasks[0].standard_output)
Note
The results of standard output are also displayed in the monitoring applications such as the Task Monitor. Looking at the standard output from the GUI is a convenient way to debug your application.
Note
In the event of an unhandled exception, the Exception will be written as a string to standard error on the task’s behalf.
Task Properties¶
Task properties
are yet another way to return
data back from a task to the client. Task properties are stored internally as a hash table and can store user objects such as strings and integers.
A task’s properties object is returned back to the client after the task completes. Depending on an application’s needs, task properties may be
more convenient to update than the task’s result
property.
One of the main advantages of properties
is that they are guaranteed to be returned to the client even if the task fails. This is not true with the task result.
For instance, this example sets both the result and properties and then simulates an exception:
class PropertiesExampleTask():
def execute(self):
self.set_property("Property1", 100)
self.set_property("Property2", "one hundred")
self.result = "This won't get back to the scheduler"
raise Exception("Task failure")
On the client side, the result property is an Exception while the properties object returned regardless.
# after the task failed
print("Task status = ", job.tasks[0].task_status)
# our result property is an exception
print("Exception is " + job.tasks[0].result)
# we can still get the properties
print("Property1: " + job.tasks[0].properties["Property1"])
print("Property2: " + job.tasks[0].properties["Property2"])
Aside from a guaranteed return after an exception, task properties should only be used if they are more convenient to use than the result object.
Warning
Unlike task.result, there are only a small set of supported data types. Use only string, int, float, and byte[].
Exceptions and Error Handling¶
Tasks can fail or not complete successfully for a number of reasons. The most common reason for a failure is an uncaught exception in a tasks’s code. Fortunately, the task’s exception is handled and information is returned back to the client for error handling.
Task Failure¶
If a task fails, the result
property of the task will be the Exception object thrown in the task. The
TaskStatus
will be set to FAILED
. Also, the
standard_error
will include the stack trace of the exception. Here is an example that
checks for task failure and inspects the exception.
if job.tasks[0].task_status == TaskStatus.FAILED or job.tasks[0].task_status == TaskStatus.ENVIRONMENT_ERROR:
print("Exception message is: ", job.tasks[0].result)
print("StandardError also has the stack trace: ", job.tasks[0].standard_error)
Note
As with most objects, the exception object must be serializable. If an exception object is not serializable, a generic exception object will be created that includes the error message of the original exception.
Task Environment Failure¶
Similar to a task failure, when a task environment fails the task’s result is the uncaught exception that was thrown. The
TaskStatus
is set to ENVIRONMENT_ERROR
. A task can only run
after a task environment is sucessfully setup, so if a task environment fails to setup, the task does not run.
Task Cancellation¶
If a task gets canceled, information on why it was canceled is available. Check the task’s task_cancellation_message
and the
task_cancellation_reason
. Usually, some useful information will be provided
in the task_cancellation_message; this normally includes an indication of the user who canceled the job.
if job.tasks[0].task_status == TaskStatus.CANCELED:
print(job.tasks[0].task_cancellation_message)
Putting it all together¶
Below is a code snippet that synthesizes the different ways data can be sent back to the client from a task, as described in this section. The following is a common pattern showing how each of the task statuses is handled after a task completes. Take note that during development, when code is not as well tested and errors are more likely to occur, the cases where a task could fail should always be handled.
1from agiparallel.client import *
2from agiparallel.infrastructure import *
3from agiparallel.constants import TaskStatus, TaskCancellationReason
4
5
6class SimpleTask(object):
7 def execute(self):
8 raise Exception
9
10
11if __name__ == "__main__":
12 with ClusterJobScheduler("localhost") as client:
13 client.connect()
14 job = client.create_job()
15 # add one or more tasks
16 job.add_task(SimpleTask())
17 # set the task environment
18 job.submit()
19 job.wait_until_done()
20 for task in job.tasks:
21 # After the task completes...
22 if (task.task_status == TaskStatus.COMPLETED):
23 # Task ran without throwing an uncaught exception.
24 # This is the only state where your Result object is guaranteed to be your user defined result.
25 print(task.properties)
26 print("Result: " + str(task.result))
27
28 elif (task.task_status == TaskStatus.FAILED):
29 # Task.Execute threw an uncaught exception.
30 # The result object is the exception that was not caught.
31 print("Task {0} threw an exception during Task.Execute".format(str(task.name) if hasattr(task, "name") else ""))
32 print("Exception: " + str(task.result))
33 print("StandardError with callstack: " + str(task.standard_error))
34
35 elif (task.task_status == TaskStatus.ENVIRONMENT_ERROR):
36 # TaskEnvironment.Setup threw an uncaught exception.
37 # The result object is the exception that was not caught.
38 print("TaskEnvironment {0} threw an exception during TaskEnvironment.Setup".format(job.task_environment.name))
39 print("Exception: " + str(task.result))
40 print("StandardError with callstack: " + str(task.standard_error))
41
42 elif (task.task_status == TaskStatus.TIMED_OUT):
43 # Task execution exceeded the value specified in Job.TaskExecutionTimeout
44 # This should only be handled if you specify a value for Job.TaskExecutionTimeout
45 print("Task execution exceeded the specified execution timeout")
46 print("TaskExecutionTimeout was " + str(job.task_execution_timeout))
47
48 elif (task.task_status == TaskStatus.CANCELED):
49 # Task was canceled with Job.Cancel or by the user (for instance, the UI).
50 # This should usually be handled if it is possible that users can cancel your task.
51 print("Task was canceled")
52 if (task.task_cancellation_reason == TaskCancellationReason.DIRECT):
53 print("User who canceled the task was: " + str(task.task_cancellation_message))
54 else:
55 # Any other state is not a final state, that is the task is still transitioning.
56 continue