Monitoring Jobs |
The API provides events that are called after key state changes occur in the job scheduler. Examples include when a task changes state, when a task completes, and when a task reports new progress in cases where data is progressively returned back to the client during long running jobs.
In this section the following is explained:
Every time a task changes state, the job scheduler notifies the client with an event. A task's result can be processed immediately upon task completion.
The Job.TaskCompleted and Task.Completed events are raised when a task completes. Once a task completes, its results can be processed immediately. Subscribe to these events by using Job.addTaskCompletedListener and Task.addCompletedListener.
import agi.parallel.client.ClusterJobScheduler; import agi.parallel.client.IJobScheduler; import agi.parallel.client.Job; import agi.parallel.client.TaskCompletedEventArgs; import agi.parallel.client.TaskCompletedListener; import agi.parallel.infrastructure.CompletedListener; import agi.parallel.infrastructure.Task; public class EventsTaskCompleted { public static void main(String[] args) { try (IJobScheduler scheduler = new ClusterJobScheduler("localhost")) { scheduler.connect(); // Method 1. Subscribe to the task completed changes by calling addTaskCompletedListener on the job. Job job = scheduler.createJob(); job.addTask(new OurTask()); job.addTaskCompletedListener(new TaskCompletedListener() { @Override public void onTaskCompleted(Job job, TaskCompletedEventArgs e) { // You could process the results here. System.out.println("Task finished..."); System.out.println(e.getTask().getTaskStatus()); System.out.println(e.getTask().getResult()); } }); job.submit(); job.waitUntilDone(); } try (IJobScheduler scheduler = new ClusterJobScheduler("localhost")) { scheduler.connect(); // Method 2. Subscribe to the task completed changes by calling addTaskCompletedListener on the task. Job job = scheduler.createJob(); Task task = new OurTask(); task.addCompletedListener(new CompletedListener() { @Override public void onCompleted(Task task) { // You could process the results here. System.out.println("Task finished..."); System.out.println(task.getTaskStatus()); System.out.println(task.getResult()); } }); job.addTask(task); job.submit(); job.waitUntilDone(); } } public static class OurTask extends Task { @Override public void execute() { } } }
Job.JobCompleted is raised when all tasks from the job are completed. Subscribe to these events by using Job.addJobCompletedListener.
import agi.parallel.client.ClusterJobScheduler; import agi.parallel.client.IJobScheduler; import agi.parallel.client.Job; import agi.parallel.client.JobCompletedEventArgs; import agi.parallel.client.JobCompletedListener; import agi.parallel.infrastructure.Task; public class EventsJobCompleted { public static void main(String[] args) { try (IJobScheduler scheduler = new ClusterJobScheduler("localhost")) { scheduler.connect(); Job job = scheduler.createJob(); job.setName("AwesomeJob"); job.addTask(new OurTask()); job.addJobCompletedListener(new JobCompletedListener() { @Override public void onJobCompleted(Job job, JobCompletedEventArgs e) { System.out.println("Job " + e.getJob().getName() + " is finished"); } }); job.submit(); job.waitUntilDone(); } } public static class OurTask extends Task { @Override public void execute() { } } }
Job.TaskStateChanged and Task.StatusChanged events are raised when a task changes state. For example, when a task changes its state from ASSIGNED to RUNNING. Subscribe to these events by using Job.addTaskStateChangeListener and Task.addStatusChangedListener
import agi.parallel.client.ClusterJobScheduler; import agi.parallel.client.IJobScheduler; import agi.parallel.client.Job; import agi.parallel.client.TaskStatusChangedEventArgs; import agi.parallel.client.TaskStateChangedListener; import agi.parallel.infrastructure.Task; public class EventsTaskStateChanged { public static void main(String[] args) { try (IJobScheduler scheduler = new ClusterJobScheduler("localhost")) { scheduler.connect(); Job job = scheduler.createJob(); job.addTask(new OurTask()); job.addTaskStateChangeListener(new TaskStateChangedListener() { @Override public void onTaskStateChanged(Job job, TaskStatusChangedEventArgs e) { System.out.println("Task Name: " + e.getTask().getName()); System.out.println("Status changed from " + e.getPreviousStatus() + " to " + e.getNewStatus()); } }); job.submit(); job.waitUntilDone(); } } public static class OurTask extends Task { @Override public void execute() { } } }
A task can specify a user defined progress value by calling setProgress(int, java.lang.Object). The reported progress values are not used in any way. For example, the progress parameter can range from 20 to 2500. Job.TaskProgressUpdated is called at 200 millisecond intervals if the progress of the task is updated. If SetProgress is called multiple times during a single 200 millisecond interval, only a single progress event will be sent. Subscribe to the task progress event by using addTaskProgressListener
The reported progress values are also dynamically shown in the monitoring applications. This can be very valuable when monitoring applications.
Below is a common pattern to print the complete progress of the job given the progress of each task:
Important |
---|
Notice in this code snippet waitUntilDone(int, java.lang.Runnable, int) is used to call the update function. This overload to WaitUntilDone can be used to unblock from WaitUntilDone to do something quickly. For example, checking a canceling flag is also very common. |
import java.util.HashMap; import java.util.Map; import java.util.Random; import java.util.UUID; import agi.parallel.client.ClusterJobScheduler; import agi.parallel.client.IJobScheduler; import agi.parallel.client.Job; import agi.parallel.client.TaskCompletedEventArgs; import agi.parallel.client.TaskCompletedListener; import agi.parallel.client.TaskProgressEventArgs; import agi.parallel.client.TaskProgressUpdatedListener; import agi.parallel.infrastructure.Task; public class EventsReportingProgress { private static final Map<UUID, Integer> taskid2progress = new HashMap<>(); public static void main(String[] args) { try (IJobScheduler scheduler = new ClusterJobScheduler("localhost")) { scheduler.connect(); final int totalTasks = 12; Job job = scheduler.createJob(); for (int i = 0; i < totalTasks; ++i) { job.addTask(new ReportProgressExampleTask()); } job.addTaskCompletedListener(new TaskCompletedListener() { @Override public void onTaskCompleted(Job job, TaskCompletedEventArgs e) { synchronized (taskid2progress) { taskid2progress.put(e.getTask().getId(), 100); } } }); job.addTaskProgressListener(new TaskProgressUpdatedListener() { @Override public void onTaskProgressUpdated(Job job, TaskProgressEventArgs e) { synchronized (taskid2progress) { taskid2progress.put(e.getTask().getId(), e.getDescription().getProgress()); } } }); job.submit(); job.waitUntilDone( -1, new Runnable() { @Override public void run() { int maxProgress = totalTasks * 100; int totalProgress = 0; synchronized (taskid2progress) { for (Integer taskProgress : taskid2progress.values()) { totalProgress += taskProgress; } } if (totalProgress > 0) { System.out.println(String.format("Total Job Progress: %.4f", (totalProgress * 100.0) / maxProgress)); } } }, 100); } } public static class ReportProgressExampleTask extends Task { @Override public void execute() { // Simulate sporadic progress Random random = new Random(); for (int i = 0; i < 95; i++) { try { Thread.sleep(random.nextInt(200)); } catch (InterruptedException e) { } // Set progress by calling setProgress on the task this.setProgress(i); } } } }
STK Parallel Computing Server 2.9 API for Java