Jul 112011
 

Additional Libraries Used:

5
6
7
// OpenMPI
#include <mpi.h>
#include <Magick++.h>

Task Parallelism would have to be one of the easiest ways to get results quickly via parallelism. I’ve talked previously about using pThreads and OpenMP for this (check out other blog entries should you feel inclined) but the beauty of MPI is that should you have a cluster at your disposal, it is easy to max it out with work. I sometimes prefer this approach over using the batch system on the cluster and submitting thousands/millions of jobs for a couple of main reasons:

  • I don’t have to worry about writing out files (on a shared file system, getting an open file handle can take between 100-300 milliseconds depending on how busy the system is. Multiply that 100ms by 1,000,000 jobs = 28 hours to just open the files!)
  • Once my job is running, I get dedicated resources and can plan around a delivery time for solution
  • Less work with collecting the results back together at the end (minor issue, really)

I firstly define a struct that contains all the necessary detail regarding a particular job. For this example, I’m going for the stock standard Mandelbrot (given I have blog entries for GPU, OpenMP and pThread). You will also see that I’ve really over subscribed the parameters. I’ve only done this for demonstration purposes.

21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// Create a list of jobs that need processing
// Used for the big parallel jobs
typedef struct 
{
    unsigned int result;
    int row;
    int column;
    double x;
    double y;
    double size;
    int width;
    int height;
} Job;
 
// How many jobs?
int numberOfJobs;
// The array of jobs to be allocated and initialised later.
Job *jobs;

Now I have a function/method that will define all the jobs that need to be run.

81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
/*
 *  This is where I create all the work to be done
 *
 */
void CreateJobList()
{
    // Each job will create one pixel of mandelbrot
    numberOfJobs = WIDTH*HEIGHT;
 
    // Create the job list
    jobs = new Job[numberOfJobs];
    int jobCounter = 0;
    // Define the parameters for each job
    for (int j = 0; j<HEIGHT; j++)
    {
        for (int i = 0; i<WIDTH; i++)
        {
            jobs[jobCounter].row = j;
            jobs[jobCounter].column = i;
            jobs[jobCounter].x = 0.0;
            jobs[jobCounter].y = -0.5;
            jobs[jobCounter].size = 2.0;
            jobs[jobCounter].width = WIDTH;
            jobs[jobCounter].height = HEIGHT;
            jobCounter++;
        }
    }
}

The Master farms out the work to the worker processors. Each processor has a full copy of the work (for a crude, straight forward implementation) that needs to be done so the only communication that needs to occur is the master telling a worker which index it should work on. The worker processes the job and sends back the result.

The Master work scheduling looks like:

// Send initial work to all workers
for (w = 1; w<numberOfProcessors; w++)
    MPI_Send(params, ...,  w, ...);
 
for (sim=0; sim < NUM_SIMS; sim++)
{
    // worker is done
    MPI_Recv(..., &status); 
    // which worker?
    w = status.MPI_SOURCE;
    // more work
    MPI_Send(params, ..., w, ...);
}

Putting it all together, the actual code looks like

154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
/*
 *  The Main program kicker
 *
 *  MPI Task Scheduling
 */
int main(int argc, char **argv)
{
 
    // Initialise the MPI stuff
    MPI_Init(&argc, &argv);
    // How many processors are available?
    MPI_Comm_size(MPI_COMM_WORLD, &numberOfProcessors);
    // Since every processor is running this, which one am I?
    MPI_Comm_rank(MPI_COMM_WORLD, &myRank);
 
    // Will want to know what is going on later
    MPI_Status status;
 
    // I want every processor to have the full list of jobs.
    // I realise that this is memory overhead and is not entirely
    // necessary, but it keeps the processing simple and makes it simple
    // for this demonstration.
    CreateJobList();
 
    if (myRank == 0) 
    {
        /*
         *  MASTER SECTION
         */
 
        // Maintain a list of which processor is working on what job
        int joblist[numberOfProcessors];
 
        // worker index
        int w;
 
        // What is the next jobID that needs to be done?
        int workIndex = -1;
 
        // How much work has been successfully completed?
        int workCompleted = 0;
 
        // Since the result for job is a unsigned 4 byte integer
        unsigned int result;
 
        // Send initial work to all workers
        for (w = 1; w<numberOfProcessors; w++)
        {
            // What jobID will this worker get
            workIndex++;
            // I need to know which jobID this worker was given
            joblist[w] = workIndex;
            // Send the jobid for this processor to do
            MPI_Send(&workIndex, 1, MPI_INT, w, WORK, MPI_COMM_WORLD);  
        } 
 
        while ( workCompleted < numberOfJobs )
        {
            // Receive the result from the worker
            MPI_Recv(&result, 1, MPI_UNSIGNED, MPI_ANY_SOURCE, DONE, MPI_COMM_WORLD, &status);
            // Which processor just sent that work?
            w = status.MPI_SOURCE;
            // Store the result
            jobs[joblist[w]].result = result;
 
            //fprintf(stdout, "%d: %dn", workCompleted, result);
 
            if (workIndex < numberOfJobs)
            {
                // Get the next jobID
                workIndex++;
                // Pair the jobID and worker
                joblist[w] = workIndex;
                 // Send the jobid for this processor to do
                MPI_Send(&workIndex, 1, MPI_INT, w, WORK, MPI_COMM_WORLD);
 
            }
            // One less job to process
            workCompleted++;
        }
 
        // Tell all workers to shutdown
        for ( w = 1; w < numberOfProcessors; w++)
        {
            MPI_Send(&w, 1, MPI_INT, w, EXIT, MPI_COMM_WORLD);
        }
    }
    else
    {
        /*
         *  WORKER SECTION
         */
        int myWorkIndex;
 
        // Receive the work from the master
        MPI_Recv(&myWorkIndex, 1, MPI_INT, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);
 
        // While master has not sent the EXIT notification
        while (status.MPI_TAG != EXIT)
        {
            /* 
             * Perform the work.  "RunJob" is defined very simply (as I would submit mass 
             * Matlab jobs to a batch system ... one variable in, one variable back. 
             * You get the idea.  It also keeps the MPI calls simple - although they are
             * not that hard to begin with.
             */
            unsigned int value = RunJob(myWorkIndex);
 
            // Send the result back to the master
            MPI_Send(&value, 1, MPI_UNSIGNED, 0, DONE, MPI_COMM_WORLD);
 
            // Wait for further notification from the master
            MPI_Recv(&myWorkIndex, 1, MPI_INT, 0, MPI_ANY_TAG, MPI_COMM_WORLD, &status);        
        }
    }
 
    // Wait for all processors to catch up.
    MPI_Barrier(MPI_COMM_WORLD);
 
    // Let the Master take all the glory and write out the results
    if (myRank == 0)
    {
        CollectResults();
    }   
 
    // Finish up cleanly
    MPI_Finalize();
 
    return 0;
}

Note that I used these #defines as tags for communicating to the Worker threads.

10
11
12
13
// MPI Work Tags
#define WORK 1
#define DONE 2
#define EXIT 3

Now, Mandelbrot is probably not a very good example. There is simply not enough work to be done for each job as they are defined here. If I were going to be running this job, as a real compute, deliver results, program, I’d set each job to be a row or column of pixels to be processed rather than a single pixel. If you use a version of mpi that is numa aware and you are only using a single motherboard, this may not be a big issue, but on a cluster environment, the network overhead will hurt your efficiency. Now, I’m not sending a lot of data but the transaction rate is high. Load balancing becomes the next major issue. I won’t talk about it here, you can see my OpenMP example where I talk about this in a little more detail. However the theory is good and will work well when you have a single job that takes a few seconds or more.

You will encounter a “bug” if you define the number of jobs above to be fewer than the number of processors … duh. Given that I just used this code to run 1.6 million jobs on 512 processors, I’m not too concerned about it.

It should also be noted that this is no substitute for profiling your code. After setting off this job, I really popped the hood in profiling with VTune. Now, I had already profiled the crap out of it but this particular program was still going to take longer than forty (40) days with 512 X5650 processors. Seven hours of my time, dedicated to finding improvements is worth doing. VTune’s “General Exploration” revealed a problem with one line in my code that was causing a very high “Retire Stalls” ratio (that is secret code and cannot be revealed unfortunately … I’ll try and create a similar issue so I can talk you through finding and fixing that type of problem). Once I fixed that, by examining the values in the arrays causing the issue, I got an additional 300x speedup on a single cpu. As you can see, it is worth doing. I killed the existing job on the cluster after 17 hours and reran the job. A couple of hours later I had my solution … better than waiting, and clogging up a valuable resource, for ~ forty (40) days. It also looks good as a graph in case higher management need to consider if I am worth having my contract extended 🙂

Download Source and Makefile

A picture for completeness. Note that I used the ImageMagick libraries for this.

  One Response to “MPI Task Scheduler”

  1. Thanks mark

 Leave a Reply

You may use these HTML tags and attributes: <a href="" title=""> <abbr title=""> <acronym title=""> <b> <blockquote cite=""> <cite> <code> <del datetime=""> <em> <i> <q cite=""> <s> <strike> <strong>

(required)

(required)

Human Conf Test * Time limit is exhausted. Please reload CAPTCHA.