Click or drag to resize

Monitoring Jobs

The API provides events that are called after key state changes occur in the job scheduler. Examples include when a task changes state, when a task completes, and when a task reports new progress in cases where data is progressively returned back to the client during long running jobs.

In this section the following is explained:

Responding to task state changes

Every time a task changes state, the job scheduler notifies the client with an event. A task's result can be processed immediately upon task completion.

Task Completed

The Job.TaskCompleted and Task.Completed events are raised when a task completes. Once a task completes, its results can be processed immediately. Subscribe to these events by using Job.addTaskCompletedListener and Task.addCompletedListener.

Java
import agi.parallel.client.ClusterJobScheduler;
import agi.parallel.client.IJobScheduler;
import agi.parallel.client.Job;
import agi.parallel.client.TaskCompletedEventArgs;
import agi.parallel.client.TaskCompletedListener;
import agi.parallel.infrastructure.CompletedListener;
import agi.parallel.infrastructure.Task;

public class EventsTaskCompleted {
    public static void main(String[] args) {
        try (IJobScheduler scheduler = new ClusterJobScheduler("localhost")) {
            scheduler.connect();

            // Method 1. Subscribe to the task completed changes by calling addTaskCompletedListener on the job.
            Job job = scheduler.createJob();
            job.addTask(new OurTask());
            job.addTaskCompletedListener(new TaskCompletedListener() {
                @Override
                public void onTaskCompleted(Job job, TaskCompletedEventArgs e) {
                    // You could process the results here.
                    System.out.println("Task finished...");
                    System.out.println(e.getTask().getTaskStatus());
                    System.out.println(e.getTask().getResult());
                }
            });

            job.submit();
            job.waitUntilDone();
        }

        try (IJobScheduler scheduler = new ClusterJobScheduler("localhost")) {
            scheduler.connect();

            // Method 2. Subscribe to the task completed changes by calling addTaskCompletedListener on the task.
            Job job = scheduler.createJob();
            Task task = new OurTask();
            task.addCompletedListener(new CompletedListener() {
                @Override
                public void onCompleted(Task task) {
                    // You could process the results here.
                    System.out.println("Task finished...");
                    System.out.println(task.getTaskStatus());
                    System.out.println(task.getResult());
                }
            });
            job.addTask(task);
            job.submit();
            job.waitUntilDone();
        }
    }

    public static class OurTask extends Task {
        @Override
        public void execute() {
        }
    }
}

Job Completed

Job.JobCompleted is raised when all tasks from the job are completed. Subscribe to these events by using Job.addJobCompletedListener.

Java
import agi.parallel.client.ClusterJobScheduler;
import agi.parallel.client.IJobScheduler;
import agi.parallel.client.Job;
import agi.parallel.client.JobCompletedEventArgs;
import agi.parallel.client.JobCompletedListener;
import agi.parallel.infrastructure.Task;

public class EventsJobCompleted {
    public static void main(String[] args) {
        try (IJobScheduler scheduler = new ClusterJobScheduler("localhost")) {
            scheduler.connect();

            Job job = scheduler.createJob();
            job.setName("AwesomeJob");
            job.addTask(new OurTask());

            job.addJobCompletedListener(new JobCompletedListener() {
                @Override
                public void onJobCompleted(Job job, JobCompletedEventArgs e) {
                    System.out.println("Job " + e.getJob().getName() + " is finished");
                }
            });

            job.submit();
            job.waitUntilDone();
        }
    }

    public static class OurTask extends Task {
        @Override
        public void execute() {
        }
    }
}

Task State Changed

Job.TaskStateChanged and Task.StatusChanged events are raised when a task changes state. For example, when a task changes its state from ASSIGNED to RUNNING. Subscribe to these events by using Job.addTaskStateChangeListener and Task.addStatusChangedListener

Java
import agi.parallel.client.ClusterJobScheduler;
import agi.parallel.client.IJobScheduler;
import agi.parallel.client.Job;
import agi.parallel.client.TaskStatusChangedEventArgs;
import agi.parallel.client.TaskStateChangedListener;
import agi.parallel.infrastructure.Task;

public class EventsTaskStateChanged {
    public static void main(String[] args) {
        try (IJobScheduler scheduler = new ClusterJobScheduler("localhost")) {
            scheduler.connect();

            Job job = scheduler.createJob();
            job.addTask(new OurTask());
            job.addTaskStateChangeListener(new TaskStateChangedListener() {
                @Override
                public void onTaskStateChanged(Job job, TaskStatusChangedEventArgs e) {
                    System.out.println("Task Name: " + e.getTask().getName());
                    System.out.println("Status changed from " + e.getPreviousStatus() + " to " + e.getNewStatus());
                }
            });

            job.submit();
            job.waitUntilDone();
        }
    }

    public static class OurTask extends Task {
        @Override
        public void execute() {
        }
    }
}
Reporting the progress of tasks

Task Progress Updated

A task can specify a user defined progress value by calling setProgress(int, java.lang.Object). The reported progress values are not used in any way. For example, the progress parameter can range from 20 to 2500. Job.TaskProgressUpdated is called at 200 millisecond intervals if the progress of the task is updated. If SetProgress is called multiple times during a single 200 millisecond interval, only a single progress event will be sent. Subscribe to the task progress event by using addTaskProgressListener

The reported progress values are also dynamically shown in the monitoring applications. This can be very valuable when monitoring applications.

Below is a common pattern to print the complete progress of the job given the progress of each task:

Important note Important

Notice in this code snippet waitUntilDone(int, java.lang.Runnable, int) is used to call the update function. This overload to WaitUntilDone can be used to unblock from WaitUntilDone to do something quickly. For example, checking a canceling flag is also very common.

Java
import java.util.HashMap;
import java.util.Map;
import java.util.Random;
import java.util.UUID;

import agi.parallel.client.ClusterJobScheduler;
import agi.parallel.client.IJobScheduler;
import agi.parallel.client.Job;
import agi.parallel.client.TaskCompletedEventArgs;
import agi.parallel.client.TaskCompletedListener;
import agi.parallel.client.TaskProgressEventArgs;
import agi.parallel.client.TaskProgressUpdatedListener;
import agi.parallel.infrastructure.Task;

public class EventsReportingProgress {
    private static final Map<UUID, Integer> taskid2progress = new HashMap<>();

    public static void main(String[] args) {
        try (IJobScheduler scheduler = new ClusterJobScheduler("localhost")) {
            scheduler.connect();

            final int totalTasks = 12;
            Job job = scheduler.createJob();
            for (int i = 0; i < totalTasks; ++i) {
                job.addTask(new ReportProgressExampleTask());
            }

            job.addTaskCompletedListener(new TaskCompletedListener() {
                @Override
                public void onTaskCompleted(Job job, TaskCompletedEventArgs e) {
                    synchronized (taskid2progress) {
                        taskid2progress.put(e.getTask().getId(), 100);
                    }
                }
            });

            job.addTaskProgressListener(new TaskProgressUpdatedListener() {
                @Override
                public void onTaskProgressUpdated(Job job, TaskProgressEventArgs e) {
                    synchronized (taskid2progress) {
                        taskid2progress.put(e.getTask().getId(), e.getDescription().getProgress());
                    }
                }
            });

            job.submit();
            job.waitUntilDone(
                -1,
                new Runnable() {
                    @Override
                    public void run() {
                        int maxProgress = totalTasks * 100;
                        int totalProgress = 0;
                        synchronized (taskid2progress) {
                            for (Integer taskProgress : taskid2progress.values()) {
                                totalProgress += taskProgress;
                            }
                        }

                        if (totalProgress > 0) {
                            System.out.println(String.format("Total Job Progress: %.4f", (totalProgress * 100.0) / maxProgress));
                        }
                    }
                },
                100);
        }
    }

    public static class ReportProgressExampleTask extends Task {
        @Override
        public void execute() {
            // Simulate sporadic progress
            Random random = new Random();
            for (int i = 0; i < 95; i++) {
                try {
                    Thread.sleep(random.nextInt(200));
                } catch (InterruptedException e) {
                }

                // Set progress by calling setProgress on the task
                this.setProgress(i);
            }
        }
    }
}
See Also

STK Parallel Computing Server 2.9 API for Java