Send Task File Dependencies¶
Problem¶
A task requires file dependencies such as data files or text files for input.
Solution¶
Although the current version does not provide a way to handle file dependencies, it is still possible to send files and dependencies manually. To do so, serialize the files to be sent with the task. On the host (task) side, persist the files if needed.
1from agiparallel.client import ClusterJobScheduler
2from agiparallel.constants import TaskProperties
3import os
4import tempfile
5
6
7class TaskThatNeedsFiles:
8 def __init__(self):
9 self.result = ""
10
11 def execute(self):
12 taskenv = self.get_property(TaskProperties.ENVIRONMENT)
13 for file in os.listdir(taskenv.data_directory):
14 file = os.path.join(taskenv.data_directory, file)
15 self.result += (file + "\n")
16
17
18class StoreFileDependenciesTaskEnvironment:
19 """ A simple class that just stores all the files in a directory as a byte array in the constructor.
20 When it is run in the host, the class simply writes the files into temp """
21 def __init__(self, data_directory):
22 self.data_payload = {}
23
24 for file in os.listdir(data_directory):
25 full_path = os.path.join(data_directory, file)
26 if os.path.isfile(full_path): # make sure it's a file and not a directory
27 with open(full_path, 'r') as f:
28 self.data_payload[full_path] = f.read()
29
30 def setup(self):
31 # Now on the host side, resave the files
32 self.data_directory = tempfile.NamedTemporaryFile().name
33 os.mkdir(self.data_directory)
34
35 for key, value in self.data_payload.items():
36 name = os.path.join(self.data_directory, os.path.basename(key))
37 with open(name, 'w') as f:
38 f.write(value)
39
40 def teardown(self):
41 os.rmdir(self.data_directory)
42
43
44def send_task_file_dependencies_example():
45 with ClusterJobScheduler("localhost") as scheduler:
46 scheduler.connect()
47 job = scheduler.create_job()
48
49 job.task_environment = StoreFileDependenciesTaskEnvironment('C:\\MyData\\')
50 job.add_task(TaskThatNeedsFiles())
51
52 job.submit()
53 job.wait_until_done()
54
55 print("Result: \n", job.tasks[0].result)
56
57
58if __name__ == "__main__":
59 send_task_file_dependencies_example()