Click or drag to resize

Communicate with tasks by sending messages

Problem

You need the client to communicate with the task while the task is running.

Solution

Use messaging capabilities.

On the client side:

Cast an IJobScheduler interface to IMessageEndpoint. Then call postMessage(java.lang.Object, java.util.UUID) to send a message to a task. For the id parameter, pass in the Id of the task.

Java
IMessageEndpoint messageEndpoint = (IMessageEndpoint) scheduler;
messageEndpoint.postMessage("Hello?", task.getId());

On the task side:

Get the reference to the IJobSchedulerContext interface by calling getProperty(java.lang.String) and passing in TaskProperties.JOB_SCHEDULER_CONTEXT. Both IJobScheduler and IJobSchedulerContext implement the IMessageEndpoint interface.

Java
IJobSchedulerContext schedulerContext = (IJobSchedulerContext) this.getProperty(TaskProperties.JOB_SCHEDULER_CONTEXT);

The task can then consume the messages the client sent by calling receiveMessage().

Java
Object message = schedulerContext.receiveMessage();

The task can also send messages back to the client. The task first needs to find out the Id of the submitter. One way to do this is to retrieve the SUBMITTER_ID property to get the id.

Java
UUID submitterId = (UUID) this.getProperty(TaskProperties.SUBMITTER_ID);
schedulerContext.postMessage("Howdy!", submitterId);

Here is a complete example of message passing that shows the client and the task communicating with each other.

Java
package stkparallelcomputingserversdk.howto;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.UUID;

import agi.parallel.client.ClusterJobScheduler;
import agi.parallel.client.IJobScheduler;
import agi.parallel.client.Job;
import agi.parallel.infrastructure.IJobSchedulerContext;
import agi.parallel.infrastructure.IMessageEndpoint;
import agi.parallel.infrastructure.Task;
import agi.parallel.infrastructure.TaskProperties;

public class MessageSending {
    public static void main(String[] args) {
        try (IJobScheduler scheduler = new ClusterJobScheduler("localhost")) {
            scheduler.connect();

            Job job = scheduler.createJob();
            Task task = new EchoTask();
            job.addTask(task);
            job.submit();

            System.out.println("Client\n========");
            IMessageEndpoint messageEndpoint = (IMessageEndpoint) scheduler;
            messageEndpoint.postMessage("Hello?", task.getId());
            SimpleDateFormat dateFormat = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss aaa");
            System.out.println("Sent Hello? at " + dateFormat.format(new Date()));
            Object message = messageEndpoint.receiveMessage();
            System.out.println("Received " + message + " at " + dateFormat.format(new Date()));
            job.waitUntilDone();

            System.out.println("\nTask\n========");
            System.out.println(task.getStandardOutput());
        }

        /*
         * The output of the application should resemble:
         * Client
         * ========
         * Sent Hello? at 7/11/2013 1:24:24 PM
         * Received Howdy! at 7/11/2013 1:24:28 PM
         *
         * Task
         * ========
         * Received Hello? at 7/11/2013 1:24:28 PM
         * Sent Howdy! at 7/11/2013 1:24:28 PM
         */
    }

    private static class EchoTask extends Task {
        @Override
        public void execute() {
            IJobSchedulerContext schedulerContext = (IJobSchedulerContext) this.getProperty(TaskProperties.JOB_SCHEDULER_CONTEXT);

            Object message = schedulerContext.receiveMessage();
            SimpleDateFormat dateFormat = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss aaa");
            System.out.println("Received " + message + " at " + dateFormat.format(new Date()));
            UUID submitterId = (UUID) this.getProperty(TaskProperties.SUBMITTER_ID);
            schedulerContext.postMessage("Howdy!", submitterId);
            System.out.println("Sent Howdy! at " + dateFormat.format(new Date()));
        }
    }
}
Discussion

Message passing can be a powerful tool for passing data between tasks and clients dynamically (while the task is running). Along with the added flexibility, message passing is much more lightweight compared to submitting jobs. There is a drastic latency difference between the two options because message passing does not require starting a host or sending dependencies. Thus, message passing can be used as a complement to tasks for the low latency portions of an application.

In this section we will discuss further key concepts of message passing:

  • Message queuing
  • Addressing
  • Message types
  • Receiving messages asynchronously
  • Common errors and pitfalls

Message queuing

When a message is sent to an endpoint the message gets inserted into a queue. Messages in the queue are consumed in a first-in first-out manner. By default, the queue is unbounded.

To see queuing in action, send messages to a task before it runs. When that task starts running, all messages sent to its queue are available and the task can process the queue starting from the first message received. If the task never consumes messages from the queue, the messages in the queue are discarded after the task completes.

Addressing

Who can send messages and how is the target of a message specified? All of the key components have a unique Id property: IJobScheduler.Id, Task.Id, and Job.Id. These id properties are used as the address where messages can be sent; a mailbox in other words. A message sent to a mailbox can only be consumed by the owner of the mailbox. The only exception to this rule is that a message sent to a job is replicated and sent to each of the job's tasks.

Obtaining the address of a mailbox is straightforward. A job client will know the ids of all the tasks it creates because they are available immediately when the tasks are created. In more advanced cases, send the id dynamically as a parameter to the task or even in a message itself. There are no restrictions in who can send a message to whom. Tasks can send messages to other tasks and job clients can send messages to other job clients.

Tip Tip

There are task properties for common addresses. The TaskProperties.SUBMITTER_ID property is the address of the client that submits the task. The TaskProperties.TASK_ANCESTRY property gives an array of the task's entire ancestry. It is ordered from the most recent to the oldest.

Along with task properties, the address of the sender of a message can be retrieved with the sender overload for IMessageEndpoint.receiveMessage()

Message types

Messages can either be primitive types (string, int, byte[], etc.) or user defined objects. The only restriction with a user defined object is that it must not be null and the object must be serializable. This is consistent with other user-defined objects such as Task or TaskEnvironment. If an object is not serializable, a NonSerializableObject object will be received instead of the message.

Receiving messages asynchronously

There are two options for receiving messages. The first option is to receive messages in a blocking manner via IMessageEndpoint.receiveMessage(). This option is useful when it is desirable to block the current thread until a message is received.

The other option is to subscribe to new messages and process them when they come. To do this, subscribe to the IMessageEndpoint.NewMessage event by using IMessageEndpoint.addNewMessageListener(). After subscribing to the new messages, calling IMessageEndpoint.receiveMessage() will result in an exception.

Common errors and pitfalls

A robust message passing protocol must handle the case when a message does not arrive. This can happen either because of an error in the application logic or an environmental error such as a process running out of memory. The millisecondsTimeout parameter allows unblocking from the IMessageEndpoint.receiveMessage(int) method. If the method times out, it will return null to indicate no message was processed.

An exception will be thrown if a message is sent to an address that does not exist. Set the quietlyFail parameter of postMessage(java.lang.Object, java.util.UUID, boolean) to true to ignore such exceptions.

Because messages are queued and typically involve some sort of protocol they are inherently stateful. Because they are stateful, it is strongly recommended the retry mechanism be turned off. If using messaging in tasks and the tasks depend on the state of the messages that are queued, set Job.MaxTaskInterruptedRetryAttempts to 0. Also, messages typically rely on the client to participate in the message protocol. If the client disconnects, the protocol may need to be aborted. Setting the Job.CancelOnClientDisconnection option will allow the tasks to unblock if the client happens to disconnect.

Tip Tip
When using message passing, it is recommended to almost always set the Job.MaxTaskInterruptedRetryAttempts option to 0 and the Job.CancelOnClientDisconnection option to false.
See Also

STK Parallel Computing Server 2.9 API for Java