# MPI V: Collective operations and other loose ends

We'll start with catch-up from Thursday's class, and you can keep working in that
assignment repo (which also means it'll be due next Tuesday, 4/28). You should find a small pull request in your assignment repo. Merging that will get you an updated `mpi_message.cxx` to be used later in today's class.

## Some ways to avoid deadlocks

As mentioned last class, the most obvious way to avoid deadlocks is to come up with a message flow where messages are sent and received in a way where they won't cross each other, e.g., even sends to odd, then odd sends to even. This, however, can be quite cumbersome and also not very efficient, since it kinda serializes the exchange of messages. In my opinion, a more elegant way to avoid deadlocks is to use non-blocking communication, which we'll talk about in a bit.

However, it's worth knowing that there are other alternatives that can be considered as well, e.g., using `MPI_Sendrecv()`, which is a single call that performs both a send and a receive at the same time, so it won't deadlock -- that is in fact often used for exchanging ghost points in a domain decomposition, since the same two procs need to exchange messages with each other, so they can just call `MPI_Sendrecv()` to do that.

In addition, and to add to the zoo of alternatives to `MPI_Send()`, one can also use `MPI_Bsend()`, which is a "buffered send" -- that means that the message is copied into an MPI internal buffer, so the send call can return immediately, and the message will be sent out in the background. This is a simple way to avoid deadlocks, but it has some disadvantages, e.g., you have to make sure to allocate enough buffer space for all your messages, and it may not be the most efficient way to do things.

## Non-blocking communication

Non-blocking communication is often considered a more advanced feature of MPI
that many people don't want to touch. However, it actually often makes life
easier, in particular when it comes to avoiding deadlocks. Non-blocking
communication can also make your code get better performance, since it allows to
overlap communication and computation. What that means is the following:
Normally (using blocking communication) the code will block until the message
you're sending has made it to the other end (possibly) or until the message you
want to receive has actually arrived (definitely). Because networks aren't that
fast, and because the processor whose message you're waiting for may be busy
doing something else, so it may not even have sent out that message yet, your
processor will just sit there twiddling its thumbs (well, if it had thumbs...).
It'd be nice if you could spend that idle time doing something useful while the
message are in flight, and that's what non-blocking MPI calls enable you to do
(but you'll still have to write the code to do that "something useful").

For sending a message, things are quite straightforward: Instead of your usual

```c
    MPI_Send(buf_send, N, MPI_DOUBLE, dest, tag, MPI_COMM_WORLD);
```

you now use

```c
    MPI_Request req;
    MPI_ISend(buf_send, N, MPI_DOUBLE, dest, tag, MPI_COMM_WORLD, &req);
```

The '`I`' versions are the non-blocking versions of the usual `MPI_Send()` and
`MPI_Recv()` and those will return right away. `MPI_Isend()` does not wait until
the message is received on the other end, in fact, it might not even send out
the message just yet. After `MPI_Isend()` returns and your code continues
executing, the data in your buffer may not have gone out yet (or it may have
gone out, but might need to be retransmitted). Because of that, you are NOT
allowed to change the data in `buf_send` right away after `MPI_Isend()` returns.
Well, but you probably do want to use that buffer again eventually, so you need
to know when it is safe to do so -- when is sending of the message actually
complete? This is where the new `MPI_Request` type argument comes in --
`MPI_Isend()` will return a handle that you can use to check on whether the send
is complete -- or you can wait for completion. There is a whole bunch of
functions to do this, `MPI_Test()`, `MPI_Testsome()`, `MPI_Testany()`,
`MPI_Testall()` and the corresponding `MPI_Wait()`, `MPI_Waitsome()`,
`MPI_Waitany()`, `MPI_Waitall()`. The "Test" versions will just check for
completion, returning immediately (so those are non-blocking, too), while the
"Wait" ones actually wait for completion, so those are blocking calls. We will
only consider one of the bunch in depth, that is `MPI_Waitall()`, though if you
have only a single request to wait for, `MPI_Wait()` is a simpler way of doing
that.

```
MPI_Waitall - Waits for all given communications
to complete.

C Syntax
       #include <mpi.h>
       int MPI_Waitall(int count, MPI_Request *array_of_requests,
            MPI_Status *array_of_statuses)
```

As the name would have you guess, this call takes a bunch of requests (an array
of requests, to be specific), and waits until all of them complete.

Here's an example on how the MPI deadlock found in the beginning of the class
can be fixed using non-blocking communication:

```c
   [...]

   if (rank == 0) {
     MPI_Request req;
     MPI_Isend(buf_send, N, MPI_DOUBLE, 1, 1234, MPI_COMM_WORLD, &req);
     MPI_Recv(buf_recv, N, MPI_DOUBLE, 1, 1234, MPI_COMM_WORLD,
       MPI_STATUS_IGNORE);
     MPI_Waitall(1, &req, MPI_STATUSES_IGNORE);
     // or simpler: MPI_Wait(&req, MPI_STATUS_IGNORE);
   } else { // rank == 1
   [...]
```

Just making the send non-blocking is good enough for fixing the deadlock problem
in this code -- since the `MPI_Isend()` will return immediately, `MPI_Recv()` is
called for sure, and the in-progress send will have a receiver that accepts the
message, so both send and receive will be successful. However, for symmetry I
like to make both calls non-blocking, which also allows you to start with the
receive (which would deadlock for sure with a blocking `MPI_Recv`). The reason
for this is that sending a message will hopefully go quicker if there's already
a matching receiver waiting for it, so let's post the receive first.

It is important to be aware that `MPI_Irecv()` also returns right away, before
the message actually has been received. That means that you cannot use the data
from the receive buffer yet, you have to wait for completion of the receive,
which again happens with the aforementioned test/wait functions. However, if you
have something useful to do in the mean time, you can do that right now, and
check back later whether the message has actually arrived, and then (and only
then) work with the data you got, which will be in the receive buffer now.

#### Your turn

- Change the `mpi_dl_test` example to use non-blocking communication for both
  send and receive.

## Exchanging ghost points using non-blocking MPI calls

You can use your own version of the wave_equation ghost point exchange, or you
can use the one in the assignment repo, where added the ghost point
exchange in the function `fill_ghosts()` in `mpi_domain.h`.

#### Your turn

Your (or my) ghost point filling in the wave_equation code (hopefully) works
fine, though strictly speaking it is likely not 100% correct because it relies
on MPI buffering the short messages to avoid a deadlock.

- You can actually
provoke that deadlock by using `MPI_Ssend()` instead of `MPI_Send()`, which
doesn't buffer messages at all. Try that out to see the deadlock in action.

- Fix the ghost point filling routine to use non-blocking communication, which
  makes it deadlock-safe, and also has the potential to get a bit better
  performance.

<!-- - Perform a scaling analysis of the code, generally. It's a good idea to dive into some amount of detail, ie., time the different parts of the code separately, e.g., the time spent in communication vs the time spent doing the actual computation, vs I/O. Does the communication time go down when you use non-blocking communication, in particular when you try to get as many messages in flight at the same time as possible? The default size of the domain is very small, so you may want to increase that to see how that changes the results. Also, as you do this, the timestep will decrease (automatically, since the code takes into account the CFL condition), so you may want to increase the number of time steps, but not write output as frequently.
 -->
- Optional: Once you have the non-blocking version working, you could also try to do some
  useful work while the messages are in flight, e.g., by calculating the
  derivative at the inner points while waiting for the ghost point values to
  arrive. This is kinda slightly painful coding-wise, and it may not even pay off, but it doesn't mean it's not worth trying.

## Some collective operations

So far, we've really pretty much stuck with out "MPI in 6 functions", though
non-blocking added some variants (`MPI_Isend` / `MPI_Irecv`) and associated
waiting (`MPI_Wait` / `MPI_Waitall`).

The classic send / receive performs what is called "point-to-point" (p2p)
communication, ie., only two procs, sender and receiver are involved. MPI,
however, also provides a bunch of "collective" communications, where every proc
participates in the communication.

### `MPI_Bcast`

You previously wrote a `mpi_message` example that gives a given value from proc
0 to all other procs. At the time, you had written a loop to do so, using
`MPI_Send` and `MPI_Recv`. We will now call this a "broadcast" operation:

```cxx
  int rank, size;
  MPI_Comm_rank(MPI_COMM_WORLD, &rank);
  MPI_Comm_size(MPI_COMM_WORLD, &size);

  int test = 0;

  if (rank == 0) {
    test = 99;
    for (int r = 0; r < size; r++) {
      MPI_Send(&test, 1, MPI_INT, r, 123, MPI_COMM_WORLD);
    }
  } else {
    MPI_Recv(&test, 1, MPI_INT, 0, 123, MPI_COMM_WORLD, MPI_STATUS_IGNORE);
  }

  printf("Hello %d/%d test = %d\n", rank, size, test);
```

Since that is something that commonly happens, and also because the above isn't
necessarily the most efficient implementation, MPI provides a `MPI_Bcast`
function which will do the entire thing in a single call:

```
NAME
       MPI_Bcast -  Broadcasts a message from the process with rank "root" to all other processes of the communicator

SYNOPSIS
       int MPI_Bcast( void *buffer, int count, MPI_Datatype datatype, int root,
                      MPI_Comm comm )

INPUT/OUTPUT PARAMETERS
       buffer - starting address of buffer (choice)

INPUT PARAMETERS
       count  - number of entries in buffer (integer)
       datatype
              - data type of buffer (handle)
       root   - rank of broadcast root (integer)
       comm   - communicator (handle)
```

#### Your turn

- Change `mpi_message.cxx` to using `MPI_Bcast()`.

### `MPI_Reduce`

Previously, you adapted the `test_integration` example to be MPI parallel -
ideally by subdividing the trapezoids amongst MPI processes, having each process
calculate a partial sum of its assigned trapezoids and then adding up all those
partial sums to the final result. The first part, calculating partial sums
doesn't involve any communications at all. You then used, I suppose,
point-to-point communications to calculate the total sum (final result).

When we parallelized the same program using OpenMP, we had to add a
"reduction(+:sum)" statement, because that's what's happening, the partial
results are reduced to one global result. And MPI supports that, too,
thankfully, you don't have to go about sending all those point-to-point messages
as you did.

```c
  MPI_Reduce(sendbuf, recvbuf, count, datatype, op, root, communicator)
```

does what you need. As usual, the man page will give you all the details
(`man MPI_Reduce`).

```
NAME
       MPI_Reduce -  Reduces values on all processes to a single value

SYNOPSIS
       int MPI_Reduce(const void *sendbuf, void *recvbuf, int count, MPI_Datatype datatype,
                      MPI_Op op, int root, MPI_Comm comm)

INPUT PARAMETERS
       sendbuf
              - address of send buffer (choice)
       count  - number of elements in send buffer (integer)
       datatype
              - data type of elements of send buffer (handle)
       op     - reduce operation (handle)
       root   - rank of root process (integer)
       comm   - communicator (handle)

OUTPUT PARAMETERS
       recvbuf
              - address of receive buffer (choice, significant only at root )
```

It takes partial results in `sendbuf` (as before, if you don't have an array but
just a single value, you need to use a `&` in C/C++, in Fortran it just works),
and reduces them to global results in `recvbuf`. The global results will be
available only on the processor rank given as `root`, if you want them to be
available everywhere, look into `MPI_Allreduce()`. The rest of the arguments
you've seen before, except for `op`. In our case, I suppose you want `MPI_SUM`.
There are a bunch of others, e.g., `MPI_PROD`, `MPI_MIN`, `MPI_MAX`, or you can
even make your own if you need them.

#### Your turn

- Change `test_integration.cxx` to use `MPI_Reduce`.

## MPI_Scatter and MPI_Gather

These functions aren't used all that commonly in codes that are designed from
scratch for MPI, but they can be helpful in particular in the process of porting
a serial code to MPI, and also are an option to handle I/O.

```
NAME
       MPI_Scatter -  Sends data from one process to all other processes in a communicator

SYNOPSIS
       int MPI_Scatter(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
       void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)

INPUT PARAMETERS
       sendbuf
              - address of send buffer (choice, significant only at root )
       sendcount
              - number of elements sent to each process (integer, significant only at root )
       sendtype
              - data type of send buffer elements (significant only at root ) (handle)
       recvcount
              - number of elements in receive buffer (integer)
       recvtype
              - data type of receive buffer elements (handle)
       root   - rank of sending process (integer)
       comm   - communicator (handle)


OUTPUT PARAMETERS
       recvbuf
       - address of receive buffer (choice)
```

```
NAME
       MPI_Gather -  Gathers together values from a group of processes

SYNOPSIS
       int MPI_Gather(const void *sendbuf, int sendcount, MPI_Datatype sendtype,
       void *recvbuf, int recvcount, MPI_Datatype recvtype, int root, MPI_Comm comm)

INPUT PARAMETERS
       sendbuf
              - starting address of send buffer (choice)
       sendcount
              - number of elements in send buffer (integer)
       sendtype
              - data type of send buffer elements (handle)
       recvcount
              - number of elements for any single receive (integer, significant only at root)
       recvtype
              - data type of recv buffer elements (significant only at root) (handle)
       root   - rank of receiving process (integer)
       comm   - communicator (handle)


OUTPUT PARAMETERS
       recvbuf
       - address of receive buffer (choice, significant only at root )
```

`MPI_Scatter` takes a large array that exists on a single rank and distributes
it across processes in the communicator. E.g., with 3 processes:

```
         rank #0                                                 rank #1               rank #2
sendbuf: | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 |     N/A                   N/A
    |
    v
recvbuf: | 0 | 1 | 2 | 3 |                                       | 4 | 5 | 6 | 7 |    | 8 | 9 | 10 | 11 |
```

`MPI_Gather` performs the inverse operation, i.e., it gathers pieces of an array
that are distributed across processes onto a single rank into one large array.

```
         rank #0                                                 rank #1               rank #2
sendbuf: | 0 | 1 | 2 | 3 |                                       | 4 | 5 | 6 | 7 |    | 8 | 9 | 10 | 11 |
    |
    v
recvbuf: | 0 | 1 | 2 | 3 | 4 | 5 | 6 | 7 | 8 | 9 | 10 | 11 |     N/A                   N/A
```

### `MPI_Allgather`

Just like `MPI_Allreduce` makes the result of a reduction available on all
ranks, rather than just a single one, `MPI_Allgather` can provide the gathered
array on all ranks.

#### Your turn

- Take `test_derivative.cxx` or your parallelized wave equation, and add an
  alternate output function that will create just one file per output with the
  complete data in there -- not a separate file per proc, as we have used so
  far.

  Note: So far, we didn't really abstract the I/O in the wave equation code and `test_derivative`. It wasn't really too important, since it's basically just two lines, thanks to xtensor's `dump_csv()` doing most of the actual work. However, now that we want to have two different output functions, one for the parallelized version and one for the non-parallelized version, it would be a good idea to abstract the I/O a bit more -- and since it's outputting fields living on the `mpi_domain`, this couldd be a good opportunity to enhance the `mpi_domain` class to include an output function. This is not strictly necessary, but it might make the code cleaner and more maintainable. 

<!-- ## Some of the bits we skipped over so far

In the following, I will mention a couple of the features we've glanced over so
far -- and we'll continue to do so for the most part, but it's important to know
their existence, so that you can learn about it if the need arises later.
`man MPI_whatever_call` / google / your favorite LLM will tell you more of the specifics.

### MPI communicators

MPI communicators are objects that specify a "communication domain", basically a
set of processors that can communicate with each other. We've already
encountered the two most important pieces of information for a communicator: The
number of total processors in a given communicator (`MPI_Comm_size()`), as well
as the local rank of the current processor in the communicator
(`MPI_Comm_rank()`), where processors have rank 0,1,2,...,(size-1).

We've always used `MPI_COMM_WORLD`, which is one of the two predefined
communicators. It represents the world as far as MPI is concerned, that is all
of the processors that were started by (typically) `mpirun`, for example if you
used `mpirun -n 8`, the world consists of 8 processors.

There is also `MPI_COMM_SELF`, which can sometimes be useful. This is a
communicator that only includes a single processor, that is the current one, so
its size is always equal to one and the rank of the current processor in
`MPI_COMM_SELF` is always 0.

If you need other communicators, you can make your own, basically by choosing
subsets of `MPI_COMM_WORLD` or other communicators. This useful for more complex
codes, for example in the global magnetosphere code OpenGGCM, there's one
process for I/O, one that handles the ionosphere, and N that handle the MHD
computation. Those N processes sometimes need to talk just among themselves
(e.g. to figure out the next timestep), and so they create their own MHD
communicator for that.

### MPI_ANY_SOURCE and MPI_ANY_TAG

So far, our MPI calls have always used specific processor ranks as source /
destination, as well as a given tag. For sending messages, there's no way around
specifying the destination, as well as specifying a tag.

However, when it comes to receiving messages, it may be desirable to receive the
next message from whichever process sends it, without knowing the source rank in
advance. In order to do so, specify `MPI_ANY_SOURCE` in `MPI_Recv()`. Similarly,
we may want receive different kind of messages, e.g. "do this work" vs "do that
work", which may be specified by using different tags. Under normal
circumstances, a `MPI_Recv()` will only receive a message from a `MPI_Send()`
that used a matching tag, but specifying `MPI_ANY_TAG` instead does what its
name promises...

### MPI_Status

We've always used `MPI_STATUS_IGNORE` in `MPI_Recv()`, and as one might have
suspected, ignored anything status-related. However, in a number of situations,
as you receive a message, it would be nice to get additional information about
it, other than the actual data. You can get that by passing a pointer to a
variable of type MPI_Status instead of `MPI_STATUS_IGNORE`.

Particular examples are if you used `MPI_ANY_SOURCE` or `MPI_ANY_TAG` in your
`MPI_Recv()` call, because in this case you don't know where your message came
from / what tag it had. However, you can get this information from the status
parameter. You can also get the number of actual data received from it -- it is
possible to receive a short messages, that is messages that contain less data
than provided in the receive buffer.

### MPI datatypes

We've seen that there are predefined data types like `MPI_INT`, `MPI_DOUBLE`
etc. that correspond to basic C (and Fortran) data types. You can make your own,
more complex data types.

### Lots more

MPI (now at version 4.0) has a lot more functionality, including file
input/output (MPI-IO), Cartesian communicators, one-sided communication...

If you want to learn more details about domain decomposition, this is a useful
tutorial: <https://edoras.sdsu.edu/~mthomas/docs/mpi/nasa.tutorial/mpi2.pdf>
 -->
## Homework

<!-- - We'll pick things up again at (briefly) `MPI_Scatter`/`MPI_Gather` and other
  remaining MPI bits next class, so the Class 25/26 homework is about
  `mpi_dl_test`, making `fill_ghosts()` use non-blocking communication,
  `MPI_Bcast` for `mpi_message.cxx` and `MPI_Reduce` for `test_integration.cxx`.
  The class repo mistakenly contains only the non-parallelized
  `test_integration.cxx`, but you can just copy your own from last week, or the
  one from my solutions (https://github.com/unh-hpc-2025/iam851/pull/24).
 -->
- As usual, follow the steps above, make commits and add comments in the
  feedback pull request.