Click or drag to resize

Send task file dependencies

Problem

A task requires file dependencies such as data files or text files for input.

Solution

Although the current version does not provide a way to handle file dependencies, it is still possible to send files and dependencies manually. To do so, serialize the files as byte arrays to be sent with the task. On the host (task) side, persist the byte arrays as files if needed.

C#
using System;
using System.Collections.Generic;
using System.IO;
using AGI.Parallel.Client;
using AGI.Parallel.Infrastructure;

namespace CodeSamples
{
    class Program
    {
        static void Main(string[] args)
        {
            using (IJobScheduler scheduler = new ClusterJobScheduler("localhost"))
            {
                scheduler.Connect();
                Job job = scheduler.CreateJob();
                job.TaskEnvironment = new StoreFileDependencies("C:\\MyData\\");

                job.AddTask(new TaskThatNeedsFiles());
                job.Submit();
                job.WaitUntilDone();

                Console.WriteLine(job.Tasks[0].StandardOutput);
            }

            /*
             * The output of the application should resemble:
             * C:\Users\jdoe\AppData\Local\Temp\ld13izr3.nym\CovDefTest.TMP
             * C:\Users\jdoe\AppData\Local\Temp\ld13izr3.nym\CovDefTestAcc.CUR
             * C:\Users\jdoe\AppData\Local\Temp\ld13izr3.nym\TestCovDef.cv
             * C:\Users\jdoe\AppData\Local\Temp\ld13izr3.nym\TestCovDef.cv3
             */
        }

        // This is simple class that just stores all the files in a directory as a byte array in the constructor.
        // When it is run in the host, the class simply writes the files into temp.
        [Serializable]
        class StoreFileDependencies : TaskEnvironment
        {
            private Dictionary<string, byte[]> dataPayload;

            public StoreFileDependencies(string dataDirectory)
            {
                dataPayload = new Dictionary<string, byte[]>();

                foreach (string file in Directory.GetFiles(dataDirectory, "*", SearchOption.TopDirectoryOnly))
                {
                    string name = Path.GetFileName(file);
                    dataPayload[name] = File.ReadAllBytes(file);
                }
            }

            public string DataDirectory { get; set; }

            public override void Setup()
            {
                // Now on the host side, resave the files.
                DataDirectory = Path.Combine(Path.GetTempPath(), Path.GetRandomFileName());
                Directory.CreateDirectory(DataDirectory);

                foreach (KeyValuePair<string, byte[]> fileItem in dataPayload)
                {
                    string fileName = Path.Combine(DataDirectory, fileItem.Key);
                    File.WriteAllBytes(fileName, fileItem.Value);
                }
            }

            public override void Teardown()
            {
                Directory.Delete(DataDirectory, true);
            }
        }

        [Serializable]
        class TaskThatNeedsFiles : Task
        {
            public override void Execute()
            {
                StoreFileDependencies environment = this.GetProperty<StoreFileDependencies>(TaskProperties.Environment);
                foreach (string file in Directory.GetFiles(environment.DataDirectory, "*", SearchOption.TopDirectoryOnly))
                {
                    Console.WriteLine(file);
                }
            }
        }
    }
}

STK Parallel Computing Server 2.9 API for .NET