Send Task File Dependencies

Problem

A task requires file dependencies such as data files or text files for input.

Solution

Although the current version does not provide a way to handle file dependencies, it is still possible to send files and dependencies manually. To do so, serialize the files to be sent with the task. On the host (task) side, persist the files if needed.

 1from agiparallel.client import ClusterJobScheduler
 2from agiparallel.constants import TaskProperties
 3import os
 4import tempfile
 5
 6
 7class TaskThatNeedsFiles:
 8    def __init__(self):
 9        self.result = ""
10
11    def execute(self):
12        taskenv = self.get_property(TaskProperties.ENVIRONMENT)
13        for file in os.listdir(taskenv.data_directory):
14            file = os.path.join(taskenv.data_directory, file)
15            self.result += (file + "\n")
16
17
18class StoreFileDependenciesTaskEnvironment:
19    """ A simple class that just stores all the files in a directory as a byte array in the constructor.
20        When it is run in the host, the class simply writes the files into temp """
21    def __init__(self, data_directory):
22        self.data_payload = {}
23
24        for file in os.listdir(data_directory):
25            full_path = os.path.join(data_directory, file)
26            if os.path.isfile(full_path):       # make sure it's a file and not a directory
27                with open(full_path, 'r') as f:
28                    self.data_payload[full_path] = f.read()
29
30    def setup(self):
31        # Now on the host side, resave the files
32        self.data_directory = tempfile.NamedTemporaryFile().name
33        os.mkdir(self.data_directory)
34
35        for key, value in self.data_payload.items():
36            name = os.path.join(self.data_directory, os.path.basename(key))
37            with open(name, 'w') as f:
38                f.write(value)
39
40    def teardown(self):
41        os.rmdir(self.data_directory)
42
43
44def send_task_file_dependencies_example():
45    with ClusterJobScheduler("localhost") as scheduler:
46        scheduler.connect()
47        job = scheduler.create_job()
48
49        job.task_environment = StoreFileDependenciesTaskEnvironment('C:\\MyData\\')
50        job.add_task(TaskThatNeedsFiles())
51
52        job.submit()
53        job.wait_until_done()
54
55        print("Result: \n", job.tasks[0].result)
56
57
58if __name__ == "__main__":
59    send_task_file_dependencies_example()