Tutorial

This tutorial gives a step by step narrative for creating an example application. The application built throughout this section is a pi calculator that uses the dartboard algorithm.

Overview

Creating a Task Class

The very first step is defining a task. A task represents the code that will run. The most important aspect of writing a task class is implementing the execute() method. This execute() method is the entry point into the Task.

For this pi calculator, the execute() method of the task contains the logic of the dartboard algorithm. Here is the code:

 1class PiDartboardAlgorithmTask():
 2    def __init__(self, darts_per_task):
 3        self.darts_per_task = darts_per_task
 4
 5    def execute(self):
 6        num_darts_in_circle = 0
 7
 8        for i in range(self.darts_per_task):
 9            x = (random.random() - 0.5) * 2
10            y = (random.random() - 0.5) * 2
11            if (x * x + y * y) <= 1.0:
12                num_darts_in_circle += 1
13
14        self.result = num_darts_in_circle

Connect to the Job Scheduler

Connecting to the Coordinator consists of one line. The hostname is the DNS name or IP address of the Coordinator. If the coordinator is not running on the default port (9090), also specify the port number. The connect() method must be called before any of the operations on the coordinator can be performed.

Once Connect is successfully called, call disconnect() on the client when it is no longer needed. Otherwise, unnecessary coordinator resources will be consumed and the application may not exit properly. Alternatively, declare and instantiate the job scheduler within a with statement to ensure it is properly disconnected.

1with ClusterJobScheduler("localhost") as scheduler:
2    scheduler.connect()

Note

If an exception is thrown at the Connect method and says something like “Cannot connect to Coordinator”, the most likely reason is that the Coordinator is not running on the specified machine and port. Make sure coordinator is running on that machine and confirm a firewall is not blocking the port.

ClusterConnectException

Submit Job

After connecting to the coordinator, specify tasks to submit. First, create a Job using create_job(). Next, instantiate the Task(s) written above. For the pi calculator, add multiple tasks to the job to get more samples. When finished adding tasks, the submit() method will actually submit the job to the coordinator.

Note

The are also many options on the job that you could specify. For this tutorial, you will just use two of the options, name and description.

The pi calculator submit code should look like this:

1job = scheduler.create_job()
2job.name = "PiJob"
3job.description = "Computes digits of Pi using dartboard algorithm"
4
5for i in range(num_tasks):
6    job.add_task(PiDartboardAlgorithmTask(darts_per_task))
7
8job.submit()

Waiting for Task Results

Calling wait_until_done() will block the current thread until all of the job’s tasks are completed. Once the job is completed, the result of each task is sent back to the client. One of the results you can get from the task is from the result property. For the pi calculator, you want to sum up the task’s result. The wait_until_done code will look like this:

1job.wait_until_done()
2
3sum_of_darts = 0
4for i in range(num_tasks):
5    sum_of_darts += job.tasks[i].result
6
7pi_approximation = 4 * sum_of_darts / (darts_per_task * num_tasks)
8
9print("PI is approximately {0:1.10f}".format(pi_approximation))

The Complete Application

Here is the complete application after all the steps are applied:

 1from agiparallel.client import ClusterJobScheduler
 2import random
 3
 4
 5class PiDartboardAlgorithmTask():
 6    def __init__(self, darts_per_task):
 7        self.darts_per_task = darts_per_task
 8
 9    def execute(self):
10        num_darts_in_circle = 0
11
12        for i in range(self.darts_per_task):
13            x = (random.random() - 0.5) * 2
14            y = (random.random() - 0.5) * 2
15            if (x * x + y * y) <= 1.0:
16                num_darts_in_circle += 1
17
18        self.result = num_darts_in_circle
19
20
21def dartboard_example(num_tasks, darts_per_task):
22    with ClusterJobScheduler("localhost") as scheduler:
23        scheduler.connect()
24
25        job = scheduler.create_job()
26        job.name = "PiJob"
27        job.description = "Computes digits of Pi using dartboard algorithm"
28
29        for i in range(num_tasks):
30            job.add_task(PiDartboardAlgorithmTask(darts_per_task))
31
32        job.submit()
33        job.wait_until_done()
34
35        sum_of_darts = 0
36        for i in range(num_tasks):
37            sum_of_darts += job.tasks[i].result
38
39        pi_approximation = 4 * sum_of_darts / (darts_per_task * num_tasks)
40
41        print("PI is approximately {0:1.10f}".format(pi_approximation))
42
43
44if __name__ == "__main__":
45    dartboard_example(8, 1000000)

Next Steps

With an example application completed, explore other concepts at Key Concepts or review the Library Reference.

See Also

Reference