Click or drag to resize

Control how events are dispatched

Problem

You want to control how events are dispatched.

Solution

Events can be configured to be dispatched in two different ways. By default, events are run in the simplest manner possible: in the thread where WaitUntilDone is called. But if an application needs to asynchronously process events, WaitUntilDone has limitations. This happens frequently when working with any UI application or with the .NET task parallel library.

There is another option to control how events are dispatched. To use this option, specify JobRaiseEventsUsingSynchronizationContext. If RaiseEventsUsingSynchronizationContext is set to true, events are raised by posting them to the SynchronizationContext that calls JobSubmit, no matter if WaitUntilDone is called or not. If the current synchronization context is null when Submit is called, a default synchronization context is created. Note that not all SynchronizationContext implementations guarantee the order or synchronization of the events (see Monitoring Jobs for more details).

C#
using System;
using System.Threading;
using AGI.Parallel.Client;
using AGI.Parallel.Infrastructure;

namespace CodeSamples
{
    class Program
    {
        static void Main(string[] args)
        {
            ManualResetEvent allTasksDone = new ManualResetEvent(false);

            IJobScheduler scheduler = new ClusterJobScheduler("localhost");
            scheduler.Connect();
            Job job = scheduler.CreateJob();

            // To see this, let's subscribe to a bunch of events.
            job.JobCompleted += new EventHandler<JobCompletedEventArgs>(
                delegate(object sender, JobCompletedEventArgs e)
                {
                    Console.WriteLine("JobEvent: JobCompleted, " + GetThreadInfo());

                    // Make sure you eventually dispose the scheduler.
                    scheduler.Dispose();
                    allTasksDone.Set();
                });
            job.TaskStateChanged += new EventHandler<TaskStatusChangedEventArgs>(
                delegate(object sender, TaskStatusChangedEventArgs e)
                {
                    Console.WriteLine("JobEvent: TaskStateChanged, Status=" + e.NewStatus + ", " + GetThreadInfo());
                });
            job.TaskCompleted += new EventHandler<TaskCompletedEventArgs>(
                delegate(object sender, TaskCompletedEventArgs e)
                {
                    Console.WriteLine("JobEvent: TaskCompleted, " + GetThreadInfo());
                });
            job.TaskProgressUpdated += new EventHandler<TaskProgressEventArgs>(
                delegate(object sender, TaskProgressEventArgs e)
                {
                    Console.WriteLine("JobEvent: TaskProgressUpdated, Progress=" + e.Description.AdditionalInformation + ", " + GetThreadInfo());
                });

            Task task = new SimpleTask();
            task.Completed += new EventHandler<EventArgs>(
                delegate(object sender, EventArgs e)
                {
                    Console.WriteLine("TaskEvent: Completed, " + GetThreadInfo());
                });
            task.StatusChanged += new EventHandler<StatusChangedEventArgs>(
                delegate(object sender, StatusChangedEventArgs e)
                {
                    Console.WriteLine("TaskEvent: StatusChanged, Status=" + e.NewStatus + ", " + GetThreadInfo());
                });
            task.ProgressUpdated += new EventHandler<ProgressEventArgs>(
                delegate(object sender, ProgressEventArgs e)
                {
                    Console.WriteLine("TaskEvent: ProgressUpdated, Progress=" + e.Description.AdditionalInformation + ", " + GetThreadInfo());
                });

            job.AddTask(task);

#if !USE_DEFAULT_OPTION
            // Set RaiseEventsUsingSynchronizationContext to true. If the job's RaiseEventsUsingSynchronizationContext
            // is set to true, events will be posted to System.Threading.SynchronizationContext.Current.
            // Note, if SynchronizationContext.Current is not set, the default SynchronizationContext is used (which uses a threadpool).
            job.RaiseEventsUsingSynchronizationContext = true;
            job.Submit();
#else
            // The default behavior is to execute events in the same thread as WaitUntilDone.
            job.RaiseEventsUsingSynchronizationContext = false;
            job.Submit();
            job.WaitUntilDone();
#endif

            // We are waiting, for demonstration purposes. Normally, you would just return to your application here.
            allTasksDone.WaitOne();

            /*
             * The output of the application should resemble:
             * TaskEvent: StatusChanged, Status=Submitted, IsThreadPoolThread=True, ManagedThreadId=4
             * JobEvent: TaskStateChanged, Status=Submitted, IsThreadPoolThread=True, ManagedThreadId=4
             * TaskEvent: StatusChanged, Status=Assigned, IsThreadPoolThread=True, ManagedThreadId=4
             * JobEvent: TaskStateChanged, Status=Assigned, IsThreadPoolThread=True, ManagedThreadId=4
             * TaskEvent: StatusChanged, Status=Running, IsThreadPoolThread=True, ManagedThreadId=4
             * JobEvent: TaskStateChanged, Status=Running, IsThreadPoolThread=True, ManagedThreadId=4
             * TaskEvent: ProgressUpdated, Progress=[Progress 1], IsThreadPoolThread=True, ManagedThreadId=4
             * JobEvent: TaskProgressUpdated, Progress=[Progress 1], IsThreadPoolThread=True, ManagedThreadId=4
             * TaskEvent: ProgressUpdated, Progress=[Progress 2], IsThreadPoolThread=True, ManagedThreadId=4
             * JobEvent: TaskProgressUpdated, Progress=[Progress 2], IsThreadPoolThread=True, ManagedThreadId=4
             * TaskEvent: StatusChanged, Status=Completed, IsThreadPoolThread=True, ManagedThreadId=4
             * JobEvent: TaskStateChanged, Status=Completed, IsThreadPoolThread=True, ManagedThreadId=4
             * TaskEvent: Completed, IsThreadPoolThread=True, ManagedThreadId=5
             * JobEvent: JobCompleted, IsThreadPoolThread=True, ManagedThreadId=6
             * JobEvent: TaskCompleted, IsThreadPoolThread=True, ManagedThreadId=5
             */
        }

        [Serializable]
        class SimpleTask : Task
        {
            public override void Execute()
            {
                // Simulates sporadic progress
                this.SetProgress(10, "[Progress 1]");
                Thread.Sleep(250);
                this.SetProgress(50, "[Progress 2]");
                Thread.Sleep(250);
            }
        }

        private static string GetThreadInfo()
        {
            return string.Format("IsThreadPoolThread={0}, ManagedThreadId={1}", Thread.CurrentThread.IsThreadPoolThread, Thread.CurrentThread.ManagedThreadId);
        }
    }
}
See Also

STK Parallel Computing Server 2.9 API for .NET