Click or drag to resize

Monitoring Jobs

The API provides events that are called after key state changes occur in the job scheduler. Examples include when a task changes state, when a task completes, and when a task reports new progress in cases where data is progressively returned back to the client during long running jobs.

In this section the following is explained:

Responding to task state changes

Every time a task changes state, the job scheduler notifies the client with an event. A task's result can be processed immediately upon task completion.

Task Completed

The JobTaskCompleted and TaskCompleted events are raised when a task completes. Once a task completes, its results can be processed immediately.

C#
using System;
using AGI.Parallel.Client;
using AGI.Parallel.Infrastructure;

namespace CodeSamples
{
    class Program
    {
        static void Main(string[] args)
        {
            using (IJobScheduler scheduler = new ClusterJobScheduler("localhost"))
            {
                scheduler.Connect();

                // Method 1. Use the TaskCompleted property on the job.
                Job job = scheduler.CreateJob();
                job.AddTask(new OurTask());
                job.TaskCompleted += new EventHandler<TaskCompletedEventArgs>(OnTaskCompleted);
                job.Submit();
                job.WaitUntilDone();
            }


            using (IJobScheduler scheduler = new ClusterJobScheduler("localhost"))
            {
                scheduler.Connect();

                // Method 2. Use the Completed property on the task.
                Job job = scheduler.CreateJob();
                Task task = new OurTask();
                task.Completed += new EventHandler<EventArgs>(OnCompleted);
                job.AddTask(task);
                job.Submit();
                job.WaitUntilDone();
            }
        }

        private static void OnTaskCompleted(object sender, TaskCompletedEventArgs e)
        {
            // You could process the results here.
            Console.WriteLine("Task finished...");
            Console.WriteLine(e.Task.TaskStatus);
            Console.WriteLine(e.Task.Result);
        }

        private static void OnCompleted(object sender, EventArgs e)
        {
            // You could process the results here.
            Console.WriteLine("Task finished...");
            Task task = (Task)sender;
            Console.WriteLine(task.TaskStatus);
            Console.WriteLine(task.Result);
        }
    }

    [Serializable]
    class OurTask : Task
    {
        public override void Execute()
        {
        }
    }
}

Job Completed

JobJobCompleted is raised when all tasks from the job are completed.

C#
using System;
using AGI.Parallel.Client;
using AGI.Parallel.Infrastructure;

namespace CodeSamples
{
    class Program
    {
        static void Main(string[] args)
        {
            using (IJobScheduler scheduler = new ClusterJobScheduler("localhost"))
            {
                scheduler.Connect();

                Job job = scheduler.CreateJob();
                job.AddTask(new OurTask());
                job.JobCompleted += new EventHandler<JobCompletedEventArgs>(OnJobCompletedHandler);
                job.Submit();
                job.WaitUntilDone();
            }
        }

        private static void OnJobCompletedHandler(object sender, JobCompletedEventArgs e)
        {
            Console.WriteLine("Job " + e.Job.Name + " is finished");
        }
    }

    [Serializable]
    class OurTask : Task
    {
        public override void Execute()
        {
        }
    }
}

Task State Changed

JobTaskStateChanged and TaskStatusChanged are raised when a task changes state. For example, when a task changes its state from Assigned to Running.

C#
using System;
using AGI.Parallel.Client;
using AGI.Parallel.Infrastructure;

namespace CodeSamples
{
    class Program
    {
        static void Main(string[] args)
        {
            using (IJobScheduler scheduler = new ClusterJobScheduler("localhost"))
            {
                scheduler.Connect();

                Job job = scheduler.CreateJob();
                job.AddTask(new OurTask());
                job.TaskStateChanged += new EventHandler<TaskStatusChangedEventArgs>(OnTaskStatusChangedEventHandler);
                job.Submit();
                job.WaitUntilDone();
            }
        }

        private static void OnTaskStatusChangedEventHandler(object sender, TaskStatusChangedEventArgs taskState)
        {
            Console.WriteLine("Task Name: " + taskState.Task.Name);
            Console.WriteLine("Status changed from " + taskState.PreviousStatus + " to " + taskState.NewStatus);
        }
    }

    [Serializable]
    class OurTask : Task
    {
        public override void Execute()
        {
        }
    }
}
Reporting the progress of tasks

Task Progress Updated

A task can specify a user defined progress value by calling SetProgress. The reported progress values are not used in any way. For example, the progress parameter can range from 20 to 2500. TaskProgressUpdated is called at 200 millisecond intervals if the progress of the task is updated. If SetProgress is called multiple times during a single 200 millisecond interval, only a single progress event will be sent.

The reported progress values are also dynamically shown in the monitoring applications. This can be very valuable when monitoring applications.

Below is a common pattern to print the complete progress of the job given the progress of each task:

Important note Important

Notice in this code snippet WaitUntilDone(Int32, Action, Int32) is used to call the update function. This overload to WaitUntilDone can be used to unblock from WaitUntilDone to do something quickly. For example, checking a canceling flag is also very common.

C#
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using AGI.Parallel.Client;
using AGI.Parallel.Infrastructure;

namespace CodeSamples
{
    class Program
    {
        private static Dictionary<Guid, int> taskid2progress = new Dictionary<Guid, int>();

        static void Main(string[] args)
        {
            using (IJobScheduler scheduler = new ClusterJobScheduler("localhost"))
            {
                scheduler.Connect();

                int totalTasks = 12;
                Job job = scheduler.CreateJob();
                for (int i = 0; i < totalTasks; ++i)
                {
                    job.AddTask(new ReportProgressExampleTask());
                }

                job.TaskCompleted += new EventHandler<TaskCompletedEventArgs>(Job_TaskCompleted);
                job.TaskProgressUpdated += new EventHandler<TaskProgressEventArgs>(Job_TaskProgressUpdated);

                job.Submit();
                job.WaitUntilDone(
                  Timeout.Infinite,
                  () =>
                  {
                      int maxProgress = totalTasks * 100;
                      int totalProgress = 0;
                      lock (taskid2progress)
                      {
                          foreach (int taskProgress in taskid2progress.Values)
                          {
                              totalProgress += taskProgress;
                          }
                      }

                      if (totalProgress > 0)
                      {
                          Console.WriteLine("Total Job Progress: {0:F4}%", (totalProgress * 100) / (double)maxProgress);
                      }
                  },
                  100);
            }
        }

        private static void Job_TaskProgressUpdated(object sender, TaskProgressEventArgs e)
        {
            lock (taskid2progress)
            {
                taskid2progress[e.Task.Id] = e.Description.Progress;
            }
        }

        private static void Job_TaskCompleted(object sender, TaskCompletedEventArgs e)
        {
            lock (taskid2progress)
            {
                taskid2progress[e.Task.Id] = 100;
            }
        }

        [Serializable]
        public class ReportProgressExampleTask : Task
        {
            public override void Execute()
            {
                // Simulate sporadic progress
                Random random = new Random(Environment.TickCount);
                for (int i = 0; i < 95; i++)
                {
                    Thread.Sleep(random.Next(200));

                    // Set progress by calling SetProgress on the task
                    this.SetProgress(i);
                }
            }
        }
    }
}
Events and Threading

Events require a thread on which to execute. There are two options available.

The first option is to invoke the events on the thread on which JobWaitUntilDone is called. This is the option used in all the examples in the help system. With this approach, it is guaranteed that the state changes are ordered. That is, a state change of Submitted will always occur before the state transition to Assigned.

The second option is to invoke the events using the SynchronizationContext of the thread that submits the job. This is useful to not block the current thread. In order to raise events using the SynchronizationContext, set the JobRaiseEventsUsingSynchronizationContext property to true before submitting the job. The current synchronization context at the time JobSubmit is called will be captured. If the current synchronization context is null at that point, a default synchronization context will be created. Events are then posted to the synchronization context. Note that not all SynchronizationContext implementations guarantee the order or synchronization of the events. The following table summarizes the guarantees provided by a few SynchronizationContext implementations. For more information on .NET synchronization contexts, see the following MSDN article.

Specific Thread

(All Events Executed On Same Thread)

Exclusive

(Events Execute One at a Time)

Ordered

(Events Execute in Order)

Windows Forms

Yes

Yes

Yes

WPF

Yes

Yes

Yes

Default

No

No

No

ASP.Net

No

Yes

No

Important note Important

If JobRaiseEventsUsingSynchronizationContext is not set, WaitUntilDone will need to be called to receive events. If the current thread cannot be blocked, call WaitUntilDone in a separate thread.

C#
job.Submit();

ThreadPool.QueueUserWorkItem((notUsed) =>
{
    job.WaitUntilDone();
});
See Also

STK Parallel Computing Server 2.9 API for .NET