Communicate with tasks by sending messages |
You need the client to communicate with the task while the task is running.
Use messaging capabilities.
On the client side:
Cast an IJobScheduler interface to IMessageEndpoint. Then call postMessage(java.lang.Object, java.util.UUID) to send a message to a task. For the id parameter, pass in the Id of the task.
IMessageEndpoint messageEndpoint = (IMessageEndpoint) scheduler;
messageEndpoint.postMessage("Hello?", task.getId());
On the task side:
Get the reference to the IJobSchedulerContext interface by calling getProperty(java.lang.String) and passing in TaskProperties.JOB_SCHEDULER_CONTEXT. Both IJobScheduler and IJobSchedulerContext implement the IMessageEndpoint interface.
IJobSchedulerContext schedulerContext = (IJobSchedulerContext) this.getProperty(TaskProperties.JOB_SCHEDULER_CONTEXT);
The task can then consume the messages the client sent by calling receiveMessage().
Object message = schedulerContext.receiveMessage();
The task can also send messages back to the client. The task first needs to find out the Id of the submitter. One way to do this is to retrieve the SUBMITTER_ID property to get the id.
UUID submitterId = (UUID) this.getProperty(TaskProperties.SUBMITTER_ID); schedulerContext.postMessage("Howdy!", submitterId);
Here is a complete example of message passing that shows the client and the task communicating with each other.
package stkparallelcomputingserversdk.howto; import java.text.SimpleDateFormat; import java.util.Date; import java.util.UUID; import agi.parallel.client.ClusterJobScheduler; import agi.parallel.client.IJobScheduler; import agi.parallel.client.Job; import agi.parallel.infrastructure.IJobSchedulerContext; import agi.parallel.infrastructure.IMessageEndpoint; import agi.parallel.infrastructure.Task; import agi.parallel.infrastructure.TaskProperties; public class MessageSending { public static void main(String[] args) { try (IJobScheduler scheduler = new ClusterJobScheduler("localhost")) { scheduler.connect(); Job job = scheduler.createJob(); Task task = new EchoTask(); job.addTask(task); job.submit(); System.out.println("Client\n========"); IMessageEndpoint messageEndpoint = (IMessageEndpoint) scheduler; messageEndpoint.postMessage("Hello?", task.getId()); SimpleDateFormat dateFormat = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss aaa"); System.out.println("Sent Hello? at " + dateFormat.format(new Date())); Object message = messageEndpoint.receiveMessage(); System.out.println("Received " + message + " at " + dateFormat.format(new Date())); job.waitUntilDone(); System.out.println("\nTask\n========"); System.out.println(task.getStandardOutput()); } /* * The output of the application should resemble: * Client * ======== * Sent Hello? at 7/11/2013 1:24:24 PM * Received Howdy! at 7/11/2013 1:24:28 PM * * Task * ======== * Received Hello? at 7/11/2013 1:24:28 PM * Sent Howdy! at 7/11/2013 1:24:28 PM */ } private static class EchoTask extends Task { @Override public void execute() { IJobSchedulerContext schedulerContext = (IJobSchedulerContext) this.getProperty(TaskProperties.JOB_SCHEDULER_CONTEXT); Object message = schedulerContext.receiveMessage(); SimpleDateFormat dateFormat = new SimpleDateFormat("MM/dd/yyyy HH:mm:ss aaa"); System.out.println("Received " + message + " at " + dateFormat.format(new Date())); UUID submitterId = (UUID) this.getProperty(TaskProperties.SUBMITTER_ID); schedulerContext.postMessage("Howdy!", submitterId); System.out.println("Sent Howdy! at " + dateFormat.format(new Date())); } } }
Message passing can be a powerful tool for passing data between tasks and clients dynamically (while the task is running). Along with the added flexibility, message passing is much more lightweight compared to submitting jobs. There is a drastic latency difference between the two options because message passing does not require starting a host or sending dependencies. Thus, message passing can be used as a complement to tasks for the low latency portions of an application.
In this section we will discuss further key concepts of message passing:
When a message is sent to an endpoint the message gets inserted into a queue. Messages in the queue are consumed in a first-in first-out manner. By default, the queue is unbounded.
To see queuing in action, send messages to a task before it runs. When that task starts running, all messages sent to its queue are available and the task can process the queue starting from the first message received. If the task never consumes messages from the queue, the messages in the queue are discarded after the task completes.
Who can send messages and how is the target of a message specified? All of the key components have a unique Id property: IJobScheduler.Id, Task.Id, and Job.Id. These id properties are used as the address where messages can be sent; a mailbox in other words. A message sent to a mailbox can only be consumed by the owner of the mailbox. The only exception to this rule is that a message sent to a job is replicated and sent to each of the job's tasks.
Obtaining the address of a mailbox is straightforward. A job client will know the ids of all the tasks it creates because they are available immediately when the tasks are created. In more advanced cases, send the id dynamically as a parameter to the task or even in a message itself. There are no restrictions in who can send a message to whom. Tasks can send messages to other tasks and job clients can send messages to other job clients.
Tip |
---|
There are task properties for common addresses. The TaskProperties.SUBMITTER_ID property is the address of the client that submits the task. The TaskProperties.TASK_ANCESTRY property gives an array of the task's entire ancestry. It is ordered from the most recent to the oldest. Along with task properties, the address of the sender of a message can be retrieved with the sender overload for IMessageEndpoint.receiveMessage() |
Messages can either be primitive types (string, int, byte[], etc.) or user defined objects. The only restriction with a user defined object is that it must not be null and the object must be serializable. This is consistent with other user-defined objects such as Task or TaskEnvironment. If an object is not serializable, a NonSerializableObject object will be received instead of the message.
There are two options for receiving messages. The first option is to receive messages in a blocking manner via IMessageEndpoint.receiveMessage(). This option is useful when it is desirable to block the current thread until a message is received.
The other option is to subscribe to new messages and process them when they come. To do this, subscribe to the IMessageEndpoint.NewMessage event by using IMessageEndpoint.addNewMessageListener(). After subscribing to the new messages, calling IMessageEndpoint.receiveMessage() will result in an exception.
A robust message passing protocol must handle the case when a message does not arrive. This can happen either because of an error in the application logic or an environmental error such as a process running out of memory. The millisecondsTimeout parameter allows unblocking from the IMessageEndpoint.receiveMessage(int) method. If the method times out, it will return null to indicate no message was processed.
An exception will be thrown if a message is sent to an address that does not exist. Set the quietlyFail parameter of postMessage(java.lang.Object, java.util.UUID, boolean) to true to ignore such exceptions.
Because messages are queued and typically involve some sort of protocol they are inherently stateful. Because they are stateful, it is strongly recommended the retry mechanism be turned off. If using messaging in tasks and the tasks depend on the state of the messages that are queued, set Job.MaxTaskInterruptedRetryAttempts to 0. Also, messages typically rely on the client to participate in the message protocol. If the client disconnects, the protocol may need to be aborted. Setting the Job.CancelOnClientDisconnection option will allow the tasks to unblock if the client happens to disconnect.
Tip |
---|
When using message passing, it is recommended to almost always set the Job.MaxTaskInterruptedRetryAttempts option to 0 and the Job.CancelOnClientDisconnection option to false. |
STK Parallel Computing Server 2.9 API for Java