MPI V: Collective operations and other loose ends#

We’ll start with catch-up from Thursday’s class, and you can keep working in that assignment repo (which also means it’ll be due next Tuesday, 4/28). You should find a small pull request in your assignment repo. Merging that will get you an updated mpi_message.cxx to be used later in today’s class.

Some ways to avoid deadlocks#

As mentioned last class, the most obvious way to avoid deadlocks is to come up with a message flow where messages are sent and received in a way where they won’t cross each other, e.g., even sends to odd, then odd sends to even. This, however, can be quite cumbersome and also not very efficient, since it kinda serializes the exchange of messages. In my opinion, a more elegant way to avoid deadlocks is to use non-blocking communication, which we’ll talk about in a bit.

However, it’s worth knowing that there are other alternatives that can be considered as well, e.g., using MPI_Sendrecv(), which is a single call that performs both a send and a receive at the same time, so it won’t deadlock – that is in fact often used for exchanging ghost points in a domain decomposition, since the same two procs need to exchange messages with each other, so they can just call MPI_Sendrecv() to do that.

In addition, and to add to the zoo of alternatives to MPI_Send(), one can also use MPI_Bsend(), which is a “buffered send” – that means that the message is copied into an MPI internal buffer, so the send call can return immediately, and the message will be sent out in the background. This is a simple way to avoid deadlocks, but it has some disadvantages, e.g., you have to make sure to allocate enough buffer space for all your messages, and it may not be the most efficient way to do things.

Non-blocking communication#

Non-blocking communication is often considered a more advanced feature of MPI that many people don’t want to touch. However, it actually often makes life easier, in particular when it comes to avoiding deadlocks. Non-blocking communication can also make your code get better performance, since it allows to overlap communication and computation. What that means is the following: Normally (using blocking communication) the code will block until the message you’re sending has made it to the other end (possibly) or until the message you want to receive has actually arrived (definitely). Because networks aren’t that fast, and because the processor whose message you’re waiting for may be busy doing something else, so it may not even have sent out that message yet, your processor will just sit there twiddling its thumbs (well, if it had thumbs…). It’d be nice if you could spend that idle time doing something useful while the message are in flight, and that’s what non-blocking MPI calls enable you to do (but you’ll still have to write the code to do that “something useful”).

For sending a message, things are quite straightforward: Instead of your usual

    MPI_Send(buf_send, N, MPI_DOUBLE, dest, tag, MPI_COMM_WORLD);

you now use

    MPI_Request req;
    MPI_ISend(buf_send, N, MPI_DOUBLE, dest, tag, MPI_COMM_WORLD, &req);

The ‘I’ versions are the non-blocking versions of the usual MPI_Send() and MPI_Recv() and those will return right away. MPI_Isend() does not wait until the message is received on the other end, in fact, it might not even send out the message just yet. After MPI_Isend() returns and your code continues executing, the data in your buffer may not have gone out yet (or it may have gone out, but might need to be retransmitted). Because of that, you are NOT allowed to change the data in buf_send right away after MPI_Isend() returns. Well, but you probably do want to use that buffer again eventually, so you need to know when it is safe to do so – when is sending of the message actually complete? This is where the new MPI_Request type argument comes in – MPI_Isend() will return a handle that you can use to check on whether the send is complete – or you can wait for completion. There is a whole bunch of functions to do this, MPI_Test(), MPI_Testsome(), MPI_Testany(), MPI_Testall() and the corresponding MPI_Wait(), MPI_Waitsome(), MPI_Waitany(), MPI_Waitall(). The “Test” versions will just check for completion, returning immediately (so those are non-blocking, too), while the “Wait” ones actually wait for completion, so those are blocking calls. We will only consider one of the bunch in depth, that is MPI_Waitall(), though if you have only a single request to wait for, MPI_Wait() is a simpler way of doing that.

MPI_Waitall - Waits for all given communications
to complete.

C Syntax
       #include <mpi.h>
       int MPI_Waitall(int count, MPI_Request *array_of_requests,
            MPI_Status *array_of_statuses)

As the name would have you guess, this call takes a bunch of requests (an array of requests, to be specific), and waits until all of them complete.

Here’s an example on how the MPI deadlock found in the beginning of the class can be fixed using non-blocking communication:

   [...]

   if (rank == 0) {
     MPI_Request req;
     MPI_Isend(buf_send, N, MPI_DOUBLE, 1, 1234, MPI_COMM_WORLD, &req);
     MPI_Recv(buf_recv, N, MPI_DOUBLE, 1, 1234, MPI_COMM_WORLD,
       MPI_STATUS_IGNORE);
     MPI_Waitall(1, &req, MPI_STATUSES_IGNORE);
     // or simpler: MPI_Wait(&req, MPI_STATUS_IGNORE);
   } else { // rank == 1
   [...]

Just making the send non-blocking is good enough for fixing the deadlock problem in this code – since the MPI_Isend() will return immediately, MPI_Recv() is called for sure, and the in-progress send will have a receiver that accepts the message, so both send and receive will be successful. However, for symmetry I like to make both calls non-blocking, which also allows you to start with the receive (which would deadlock for sure with a blocking MPI_Recv). The reason for this is that sending a message will hopefully go quicker if there’s already a matching receiver waiting for it, so let’s post the receive first.

It is important to be aware that MPI_Irecv() also returns right away, before the message actually has been received. That means that you cannot use the data from the receive buffer yet, you have to wait for completion of the receive, which again happens with the aforementioned test/wait functions. However, if you have something useful to do in the mean time, you can do that right now, and check back later whether the message has actually arrived, and then (and only then) work with the data you got, which will be in the receive buffer now.

Your turn#

  • Change the mpi_dl_test example to use non-blocking communication for both send and receive.

Exchanging ghost points using non-blocking MPI calls#

You can use your own version of the wave_equation ghost point exchange, or you can use the one in the assignment repo, where added the ghost point exchange in the function fill_ghosts() in mpi_domain.h.

Your turn#

Your (or my) ghost point filling in the wave_equation code (hopefully) works fine, though strictly speaking it is likely not 100% correct because it relies on MPI buffering the short messages to avoid a deadlock.

  • You can actually provoke that deadlock by using MPI_Ssend() instead of MPI_Send(), which doesn’t buffer messages at all. Try that out to see the deadlock in action.

  • Fix the ghost point filling routine to use non-blocking communication, which makes it deadlock-safe, and also has the potential to get a bit better performance.

  • Optional: Once you have the non-blocking version working, you could also try to do some useful work while the messages are in flight, e.g., by calculating the derivative at the inner points while waiting for the ghost point values to arrive. This is kinda slightly painful coding-wise, and it may not even pay off, but it doesn’t mean it’s not worth trying.

Some collective operations#

So far, we’ve really pretty much stuck with out “MPI in 6 functions”, though non-blocking added some variants (MPI_Isend / MPI_Irecv) and associated waiting (MPI_Wait / MPI_Waitall).

The classic send / receive performs what is called “point-to-point” (p2p) communication, ie., only two procs, sender and receiver are involved. MPI, however, also provides a bunch of “collective” communications, where every proc participates in the communication.

MPI_Bcast#

You previously wrote a mpi_message example that gives a given value from proc 0 to all other procs. At the time, you had written a loop to do so, using MPI_Send and MPI_Recv. We will now call this a “broadcast” operation:

  int rank, size;
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  MPI_Comm_size(MPI_COMM_WORLD, &size);

  int test = 0;

  if (rank == 0) {
    test = 99;
    for (int r = 0; r < size; r++) {
      MPI_Send(&test, 1, MPI_INT, r, 123, MPI_COMM_WORLD);
    }
  } else {
    MPI_Recv(&test, 1, MPI_INT, 0, 123, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
  }

  printf("Hello %d/%d test = %d\n", rank, size, test);

Since that is something that commonly happens, and also because the above isn’t necessarily the most efficient implementation, MPI provides a MPI_Bcast function which will do the entire thing in a single call:

NAME
       MPI_Bcast -  Broadcasts a message from the process with rank "root" to all other processes of the communicator

SYNOPSIS
       int MPI_Bcast( void *buffer, int count, MPI_Datatype datatype, int root,
                      MPI_Comm comm )

INPUT/OUTPUT PARAMETERS
       buffer - starting address of buffer (choice)

INPUT PARAMETERS
       count  - number of entries in buffer (integer)
       datatype
              - data type of buffer (handle)
       root   - rank of broadcast root (integer)
       comm   - communicator (handle)

Your turn#

  • Change mpi_message.cxx to using MPI_Bcast().

MPI_Reduce#

Previously, you adapted the test_integration example to be MPI parallel - ideally by subdividing the trapezoids amongst MPI processes, having each process calculate a partial sum of its assigned trapezoids and then adding up all those partial sums to the final result. The first part, calculating partial sums doesn’t involve any communications at all. You then used, I suppose, point-to-point communications to calculate the total sum (final result).

When we parallelized the same program using OpenMP, we had to add a “reduction(+:sum)” statement, because that’s what’s happening, the partial results are reduced to one global result. And MPI supports that, too, thankfully, you don’t have to go about sending all those point-to-point messages as you did.

  MPI_Reduce(sendbuf, recvbuf, count, datatype, op, root, communicator)

does what you need. As usual, the man page will give you all the details (man MPI_Reduce).

NAME
       MPI_Reduce -  Reduces values on all processes to a single value

SYNOPSIS
       int MPI_Reduce(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype,
                      MPI_Op op, int root, MPI_Comm comm)

INPUT PARAMETERS
       sendbuf
              - address of send buffer (choice)
       count  - number of elements in send buffer (integer)
       datatype
              - data type of elements of send buffer (handle)
       op     - reduce operation (handle)
       root   - rank of root process (integer)
       comm   - communicator (handle)

OUTPUT PARAMETERS
       recvbuf
              - address of receive buffer (choice, significant only at root )

It takes partial results in sendbuf (as before, if you don’t have an array but just a single value, you need to use a & in C/C++, in Fortran it just works), and reduces them to global results in recvbuf. The global results will be available only on the processor rank given as root, if you want them to be available everywhere, look into MPI_Allreduce(). The rest of the arguments you’ve seen before, except for op. In our case, I suppose you want MPI_SUM. There are a bunch of others, e.g., MPI_PROD, MPI_MIN, MPI_MAX, or you can even make your own if you need them.

Your turn#

  • Change test_integration.cxx to use MPI_Reduce.

MPI_Scatter and MPI_Gather#

These functions aren’t used all that commonly in codes that are designed from scratch for MPI, but they can be helpful in particular in the process of porting a serial code to MPI, and also are an option to handle I/O.

NAME
       MPI_Scatter -  Sends data from one process to all other processes in a communicator

SYNOPSIS
       int MPI_Scatter(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
       void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)

INPUT PARAMETERS
       sendbuf
              - address of send buffer (choice, significant only at root )
       sendcount
              - number of elements sent to each process (integer, significant only at root )
       sendtype
              - data type of send buffer elements (significant only at root ) (handle)
       recvcount
              - number of elements in receive buffer (integer)
       recvtype
              - data type of receive buffer elements (handle)
       root   - rank of sending process (integer)
       comm   - communicator (handle)


OUTPUT PARAMETERS
       recvbuf
       - address of receive buffer (choice)
NAME
       MPI_Gather -  Gathers together values from a group of processes

SYNOPSIS
       int MPI_Gather(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
       void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)

INPUT PARAMETERS
       sendbuf
              - starting address of send buffer (choice)
       sendcount
              - number of elements in send buffer (integer)
       sendtype
              - data type of send buffer elements (handle)
       recvcount
              - number of elements for any single receive (integer, significant only at root)
       recvtype
              - data type of recv buffer elements (significant only at root) (handle)
       root   - rank of receiving process (integer)
       comm   - communicator (handle)


OUTPUT PARAMETERS
       recvbuf
       - address of receive buffer (choice, significant only at root )

MPI_Scatter takes a large array that exists on a single rank and distributes it across processes in the communicator. E.g., with 3 processes:

         rank #0                                                 rank #1               rank #2
sendbuf: | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 |     N/A                   N/A
    |
    v
recvbuf: | 0 | 1 | 2 | 3 |                                       | 4 | 5 | 6 | 7 |    | 8 | 9 | 10 | 11 |

MPI_Gather performs the inverse operation, i.e., it gathers pieces of an array that are distributed across processes onto a single rank into one large array.

         rank #0                                                 rank #1               rank #2
sendbuf: | 0 | 1 | 2 | 3 |                                       | 4 | 5 | 6 | 7 |    | 8 | 9 | 10 | 11 |
    |
    v
recvbuf: | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 |     N/A                   N/A

MPI_Allgather#

Just like MPI_Allreduce makes the result of a reduction available on all ranks, rather than just a single one, MPI_Allgather can provide the gathered array on all ranks.

Your turn#

  • Take test_derivative.cxx or your parallelized wave equation, and add an alternate output function that will create just one file per output with the complete data in there – not a separate file per proc, as we have used so far.

    Note: So far, we didn’t really abstract the I/O in the wave equation code and test_derivative. It wasn’t really too important, since it’s basically just two lines, thanks to xtensor’s dump_csv() doing most of the actual work. However, now that we want to have two different output functions, one for the parallelized version and one for the non-parallelized version, it would be a good idea to abstract the I/O a bit more – and since it’s outputting fields living on the mpi_domain, this couldd be a good opportunity to enhance the mpi_domain class to include an output function. This is not strictly necessary, but it might make the code cleaner and more maintainable.

Homework#

  • As usual, follow the steps above, make commits and add comments in the feedback pull request.