Click or drag to resize

Communicate with tasks by sending messages

Problem

You need the client to communicate with the task while the task is running.

Solution

Use messaging capabilities.

On the client side:

Cast an IJobScheduler interface to IMessageEndpoint. Then call PostMessage to send a message to a task. For the id parameter, pass in the Id of the task.

C#
IMessageEndpoint messageEndpoint = scheduler as IMessageEndpoint;
messageEndpoint.PostMessage("Hello?", task.Id);

On the task side:

Get the reference to the IJobSchedulerContext interface by calling GetProperty and passing in TaskPropertiesJobSchedulerContext. Both IJobScheduler and IJobSchedulerContext implement the IMessageEndpoint interface.

C#
IJobSchedulerContext schedulerContext = this.GetProperty<IJobSchedulerContext>(TaskProperties.JobSchedulerContext);

The task can then consume the messages the client sent by calling ReceiveMessage.

C#
object message = schedulerContext.ReceiveMessage();

The task can also send messages back to the client. The task first needs to find out the Id of the submitter. One way to do this is to retrieve the SubmitterId property to get the id.

C#
Guid submitterId = this.GetProperty<Guid>(TaskProperties.SubmitterId);
schedulerContext.PostMessage("Howdy!", submitterId);

Here is a complete example of message passing that shows the client and the task communicating with each other.

C#
using System;
using AGI.Parallel.Client;
using AGI.Parallel.Infrastructure;
using AGI.Parallel.Infrastructure.Logging;

namespace CodeSamples
{
    class Program
    {
        static void Main(string[] args)
        {
            using (IJobScheduler scheduler = new ClusterJobScheduler("localhost"))
            {
                scheduler.Connect();
                Job job = scheduler.CreateJob();
                Task task = new EchoTask();
                job.AddTask(task);
                job.Submit();

                Console.WriteLine("Client\n========");
                IMessageEndpoint messageEndpoint = scheduler as IMessageEndpoint;
                messageEndpoint.PostMessage("Hello?", task.Id);
                Console.WriteLine("Sent Hello? at " + DateTime.Now);

                object message = messageEndpoint.ReceiveMessage();
                Console.WriteLine("Received " + message + " at " + DateTime.Now);

                job.WaitUntilDone();

                Console.WriteLine("\nTask\n========");
                Console.WriteLine(task.StandardOutput);
            }

            /*
             * The output of the application should resemble:
             * Client
             * ========
             * Sent Hello? at 7/11/2013 1:24:24 PM
             * Received Howdy! at 7/11/2013 1:24:28 PM
             *
             * Task
             * ========
             * Received Hello? at 7/11/2013 1:24:28 PM
             * Sent Howdy! at 7/11/2013 1:24:28 PM
             */
        }

        [Serializable]
        class EchoTask : Task
        {
            public override void Execute()
            {
                IJobSchedulerContext schedulerContext = this.GetProperty<IJobSchedulerContext>(TaskProperties.JobSchedulerContext);

                object message = schedulerContext.ReceiveMessage();
                Console.WriteLine("Received " + message + " at " + DateTime.Now);
                Guid submitterId = this.GetProperty<Guid>(TaskProperties.SubmitterId);
                schedulerContext.PostMessage("Howdy!", submitterId);
                Console.WriteLine("Sent Howdy! at " + DateTime.Now);
            }
        }
    }
}
Discussion

Message passing can be a powerful tool for passing data between tasks and clients dynamically (while the task is running). Along with the added flexibility, message passing is much more lightweight compared to submitting jobs. There is a drastic latency difference between the two options because message passing does not require starting a host or sending dependencies. Thus, message passing can be used as a complement to tasks for the low latency portions of an application.

In this section we will discuss further key concepts of message passing:

  • Message queuing
  • Addressing
  • Message types
  • Receiving messages asynchronously
  • Common errors and pitfalls

Message queuing

When a message is sent to an endpoint the message gets inserted into a queue. Messages in the queue are consumed in a first-in first-out manner. By default, the queue is unbounded.

To see queuing in action, send messages to a task before it runs. When that task starts running, all messages sent to its queue are available and the task can process the queue starting from the first message received. If the task never consumes messages from the queue, the messages in the queue are discarded after the task completes.

Addressing

Who can send messages and how is the target of a message specified? All of the key components have a unique Id property: IJobSchedulerId, TaskId, and JobId. These id properties are used as the address where messages can be sent; a mailbox in other words. A message sent to a mailbox can only be consumed by the owner of the mailbox. The only exception to this rule is that a message sent to a job is replicated and sent to each of the job's tasks.

Obtaining the address of a mailbox is straightforward. A job client will know the ids of all the tasks it creates because they are available immediately when the tasks are created. In more advanced cases, send the id dynamically as a parameter to the task or even in a message itself. There are no restrictions in who can send a message to whom. Tasks can send messages to other tasks and job clients can send messages to other job clients.

Tip Tip

There are task properties for common addresses. The TaskPropertiesSubmitterId property is the address of the client that submits the task. The TaskPropertiesTaskAncestry property gives an array of the task's entire ancestry. It is ordered from the most recent to the oldest.

Along with task properties, the address of the sender of a message can be retrieved with the sender overload for IMessageEndpointReceiveMessage

Message types

Messages can either be primitive types (string, int, byte[], etc.) or user defined objects. The only restriction with a user defined object is that it must not be null and the object must be serializable. This is consistent with other user-defined objects such as Task or TaskEnvironment. If an object is not serializable, a NonSerializableObject object will be received instead of the message.

Receiving messages asynchronously

There are two options for receiving messages. The first option is to receive messages in a blocking manner via IMessageEndpointReceiveMessage. This option is useful when it is desirable to block the current thread until a message is received.

The other option is to subscribe to new messages and process them when they come. To do this, subscribe to the IMessageEndpointNewMessage event. After subscribing to new messages, calling IMessageEndpointReceiveMessage will result in an exception.

Common errors and pitfalls

A robust message passing protocol must handle the case when a message does not arrive. This can happen either because of an error in the application logic or an environmental error such as a process running out of memory. The millisecondsTimeout parameter allows unblocking from the IMessageEndpointReceiveMessage method. If the method times out, it will return null to indicate no message was processed.

An exception will be thrown if a message is sent to an address that does not exist. Set the quietlyFail parameter of PostMessage to true to ignore such exceptions.

Because messages are queued and typically involve some sort of protocol they are inherently stateful. Because they are stateful, it is strongly recommended the retry mechanism be turned off. If using messaging in tasks and the tasks depend on the state of the messages that are queued, set JobMaxTaskInterruptedRetryAttempts to 0. Also, messages typically rely on the client to participate in the message protocol. If the client disconnects the protocol may need to be aborted. Setting the JobCancelOnClientDisconnection option will allow the tasks to unblock if the client happens to disconnect.

See Also

STK Parallel Computing Server 2.9 API for .NET