Intermediate MPI
From csi702
Contents
|
1 Parallel Programming in a Nutshell
load balancing vs communication This is the eternal problem in parallel computing. The basic approaches to this problem include:
- data partitioning - moving different parts of the data set across several nodes
- task partitioning - give separate tasks to different nodes
1.1 Very Basic MPI
1.1.1 Fortran
All Fortran and Fortran90 MPI codes have four calls in common.
include"mpif.h" callMPI_INIT(ierr) callMPI_COMM_RANK(MPI_COMM_WORLD,my_id,ierr) callMPI_COMM_SIZE(MPI_COMM_WORLD,nproc,ierr) dosomethinguseful callMPI_FINALIZE(ierr)
- The include statement defines the variable MPI COMM WORLD and the MPI prototypes.
- MPI INIT and MPI FINALIZE start and stop the MPI communication library.
- ierr is an integer variable which is assigned a value of zero when the call is completed successfully.
1.1.2 C
#include"mpi.h" MPI_Init(&argc,&argv); MPI_Comm_size(MPI_COMM_WORLD,&numprocs); MPI_Comm_rank(MPI_COMM_WORLD,&myid); dousefulstuff MPI_Finalize(); functionsreturnavalueofzeroifsuccessful.
1.1.3 C++
#include"mpi.h" MPI::Init(argc,argv); size=MPI::COMM_WORLD.Get_size(); rank=MPI::COMM_WORLD.Get_rank(); dousefulstuff MPI::Finalize();
1.2 Nodal Operations
Basic nodal operations include knowing what node the program is running on and knowing the number(and addresses) of other nodes.
int MPI_Comm_rank( MPI_Comm comm, int*my_node ) int MPI_Comm_size( MPI_Comm comm, int*num_nodes )
A full description of the MPI_Comm_rank syntax is available at [1].
A full description of the MPI_Comm_size syntax is available at [2].
1.3 Rank sand Sizes in MPI
- Sizes are given by the number of processor in the MPI COMM
- ranks range from 0 to size-1 in all languages, including Fortran
- sizes and ranks are defined only for each processor group
- The variable MPI COMM WORLD is a global defining the full set of processors. Sub groups of processors can also be used if they are properly defined.
1.4 Global Operations
Global operations include synchronization, broadcasts,and reductions.
- Synchronization is necessary to cause all processors to stop when they reach a particular point within a program.
- Broadcasts transmit data from a single node to the rest of the computational nodes.
- Reductions include sums, sorts, maximums and minimums within a distributed array.
1.4.1 Synchronizing Nodes
All nodes in the provided comm group must execute MPI_Barrier before any can continue.
int MPI_Barrier(MPI_Comm comm, int ierr);
A full description of the MPI_Barrier syntax is available at [3].
1.4.2 Global Broadcasts
Global broadcasts are often used to communicate input data to the rest of the nodes.
int MPI_Bcast( <type> buf, int count, MPI_Datatype type, int root, MPI_Comm comm, int ierr);
A full description of the MPI_Bcast syntax is available at [4].
1.5 MPI Broadcasts
MPI_Bcast(&n, 1, MPI_INT, 0, MPI_COMM_WORLD); MPI::COMM_WORLD.Bcast(&n, 1, MPI::INT, 0); call MPI_BCAST(n,1,MPI_INTEGER,0,MPI_COMM_WORLD,ierr)
- "n" means n value or array to be broadcast
- "1" means the number of elements to be broadcast
- MPI_INT means MPI type of elements to be broadcast
- "0" means the origin node of broadcast
- MPI_COMM_WORLD means COMM group of broadcast
- "ierr" means error on call
1.6 Parallel Reductions
Reductions include a set of parallel operations on a data set which is spread across multiple nodes. Summation is perhaps the easiest and most obvious.
int MPI_Reduce( void *sendbuf, void *recbuf, int cnt, MPI_Datatype datatype, MPI_Op op, int root, MPI_Comm comm);
A full description of the MPI_Reduce syntax is available at [5].
1.6.1 Reduction Operations
MPI_Reduce can only perform a fixed set of reduction operations. The available reduction operations are summarized below:[1]
| MPI function | Math Meaning |
| MPI_MAX | maximum, max |
| MPI_MIN | minimum, min |
| MPI_MAXLOC | maximum and location of maximum |
| MPI_MINLOC | minimum and location of minimum |
| MPI_SUM | sum |
| MPI_PROD | product |
| MPI_LAND | logical and |
| MPI_LOR | logical or |
| MPI_LXOR | logical exclusive or |
| MPI_BAND | bitwise and |
| MPI_BOR | bitwise or |
| MPI_BXOR | bitwise exclusive or |
1.7 MPI Reductions
MPI::COMM_WORLD.Reduce(&mypi, &pi, 1, MPI::DOUBLE, MPI::SUM, 0); call MPI_REDUCE(mypi,pi,1,MPI_DOUBLE_PRECISION, & MPI_SUM,0, & MPI_COMM_WORLD,ierr) MPI_Reduce(&mypi, &pi, 1, MPI_DOUBLE, MPI_SUM, 0, MPI_COMM_WORLD);
- local variable(s) used as input for reduction
- variable reduction is completed on
- number of elements to be reduced
- MPI data type
- target process where reduction is completed
- MPI comm type
- error
1.7.1 Numerical Integration
Integrating
We can approximate this integral using Simpson’s algorithms
- input the number of partitions to be used
- divide the domain into n partitions
- evaluate the function at each partition
- multiply the function evalution times the width of the
- function to find a differential area
- add the differential areas together
- output the result
1.7.2 Parallel Integration
In parallel, the problem is nearly the same.
- on processor zero, input the number of partitions
- broadcast the user input information to all processors
- determine the number of processors - m
- divide the domain into n/m partitions on each processor
- evaluate the function at each partition
- multiply the function evalution times the width of the
- function to find a differential area
- add the differential areas together across all the processors
- on processor zero, output the result
1.8 Point-to-Point Communication
There are three general types of point to point communication which may be available in parallel machines.
- synchronous or blocking - the nodes (receiving and perhaps sending) halt until the communication is complete
- asynchronous or non-blocking - the nodes send and forget the message, check it when ever desired
- interrupt driven communication - not available in MPI
1.8.1 Blocking Sends and Receives
Blocking the execution of a routine until it has received a new data set can be used to synchronize the computational results between nodes. In many cases, blocking communications are desirable. There is a “ring toss” program which illustrates this communication fairly well.
Node 1 Node 1 Node 1 Node 1 Node 2 Node 2 Node 2 Node 2 Node 3 Node 3 Node 3 Node 3 Node 4 Node 4 Node 4 Node 4
The communication is passed from one processor to the final processor via blocking sends and receives.
The syntax for blocking sends is
int MPI_Send( void *buf, int count, MPI_Datatype datatype, int dest, int type, MPI_Comm comm); int MPI_Recv( void *buf, int count, MPI_Datatype datatype, int source, int type, MPI_Comm comm, MPI_Status status);
A full description of the MPI_Send syntax is available at [6].
A full description of the MPI_Recv syntax is available at [7].
1.8.1.1 MPI Datatypes
To perform sends and receives in a platform independent way, MPI uses its own set of well defined data types for transmitting data between processes. The allowed data types are:[1]
1.8.1.1.1 MPI Fortran Datatypes
| F90 MPI DataType | F90 Meaning |
| MPI_INTEGER | INTEGER |
| MPI_REAL | REAL |
| MPI_DOUBLE_PRECISION | REAL*8 |
| MPI_COMPLEX | COMPLEX |
| MPI_LOGICAL | LOGICAL |
| MPI_CHARACTER | CHARACTER(1) |
| MPI_BYTE | |
| MPI_PACKED |
1.8.1.1.2 MPI C Datatypes
| C MPI DataType | C Meaning |
| MPI_INT | (signed) int |
| MPI_FLOAT | float |
| MPI_DOUBLE | double |
| MPI_LONG_DOUBLE | long double |
| MPI_SIGNED_CHAR | signed char |
| MPI_UNSIGNED_CHAR | unsigned char |
| MPI_SHORT | signed short int |
| MPI_LONG | signed long int |
| MPI_UNSIGNED | unsigned int |
| MPI_UNSIGNED_SHORT | unsigned short int |
| MPI_UNSIGNED_LONG | unsigned long int |
| MPI_BYTE | |
| MPI_PACKED |
1.8.2 MPI Sends-Fortran
integer :: stats(MPI_STATUS_SIZE) if (my_id == source) then call MPI_SEND(msg, lngth, MPI_INTEGER, dest, tag, MPI_COMM_WORLD, ierr) endif if (my_id == dest) then call MPI_RECV( msg, length, MPI_INTEGER, source, tag, MPI_COMM_WORLD, stats, ierr) endif call MPI_SEND(msg, lngth, MPI_INTEGER, dest, tag, MPI_COMM_WORLD, ierr)
- msg - message array
- lngth - length or array
- MPI type
- dest/source - destination or message source
- tag - message tag - must same on send and recv
- MPI comm
- stats - information about the receive
- error flag
1.8.3 Elliptical PDE Example
Consider the Elliptical PDE Poisson Problem.
Solve the following equation:
On the domain:
x = [0,1]
y = [0,1]
Subject to the Dirchlet Boundary Condtions On the domain:
u(x,0) = ubottom
u(x,1) = utop
u(0,y) = uleft
u(1,y,) = uright
In two dimensions, this simplifies to:
Using a finite difference approximation we can write:
where Ui,j = u(xi,yj) and h is the grid spacing xi + 1 − xi or yi + 1 − yi. We will assume we have a grid of size
.
From this we arrive at the iteration to update entries in the array each time step:
The boundary conditions, shown graphically below, represent areas of the grid which do not change and therefore can be stored statically on all processors which need them.
The following is a simple code to perform the Jacobi Iteration described above:
! initialize the array u(:,:) = 0 uold = u ! set up the boundary conditions u(:,1) = left_bc u(:,nsize) = right_bc u(1,:) = bottom_bc u(nsize,:) = top_bc do k = 1, max_iterations call sweep_grid(u, uold, f, nsize) uold = u enddo
subroutine sweep_grid(u, uold, f, nsize) integer, intent(in) :: nsize double precision, dimension(nsize, nsize), & intent(out) :: u, uold, f integer :: i, j do i = 2, nsize -1 do j = 2, nsize - 1 u(i,j) = 0.25d0 * (uold(i-1,j) + & uold(i+1,j) + uold(i,j-1) + uold(i,j+1)) & - 0.25d0 * f(i,j) * h^2 enddo enddo return end subroutine sweep_grid
The following diagram demonstrates a possible strategy for dividing the cells in the grid among multiple processors. The following sections describe the MPI commands required to update the cells on the boundaries between processors.
1.8.3.1 MPI Domain Decomposition
When communications are occurring between nodes arranged in an orderly pattern (like a Cartesian coordinate grid), MPI provides tools for automatically decomposing the problem to make communications simpler.
1.8.3.1.1 Create new Comm Groups
The following command creates a new comm group from the nodes in the MPI_COMM_WORLD group. The nodes are numbered as if they were arranged on a 1d Cartesian grid with no wrapping and renumbering of the nodes is allowed. The new comm group handle is stored in comm1d.
int MPI_Cart_create ( MPI_COMM_WORLD, 1, numprocs, 0, 1, comm1d )
A full description of the MPI_Cart_create syntax is available at [8].
1.8.3.1.2 Shift within Cartesian Comm Group
Once a Cartesian comm group has been created. The source and destination ids for sends and receives can be determined using the MPI_Cart_shift command. The following command calculates the source and destination addresses for a shift of 1 unit along the 0th Cartesian dimension of the comm1d comm group.
int MPI_Cart_shift ( comm1d, 0, 1, &source, &dest )
A full description of the MPI_Cart_shift syntax is available at [9].
1.8.3.2 Non-Blocking Sends and Receives
- Non-blocking sends and receives are very much like email. You send and forget the message. The receiving processor checks for “mail” when it feels like it. Neither process halts for message checking if no message is present.
- Probe command are used to check for message receipt.
- The syntax for non-blocking sends and receives is usually IDENTICAL to blocking sends and receives, except, the command name is changed slightly and we add a request id.
- DO NOT change or access the array before the element has completed the send/recv.
The command for non-blocking send in MPI is MPI_Isend:
int MPI_Isend( void *buf, int count, MPI_Datatype datatype, int dest, int type, MPI_Comm comm, MPI_Request *request);
A full description of the MPI_Isend syntax is available at [10].
The command for non-blocking receive in MPI is MPI_Irecv:
int MPI_Irecv( void *buf, int count, MPI_Datatype datatype, int source, int type, MPI_Comm comm, MPI_Status *status);
A full description of the MPI_Irecv syntax is available at [11].
To check whether a message has arrived yet, the MPI_Iprobe command can be used:
int MPI_Iprobe(int source, int type, MPI_Comm comm, MPI_Status status, boolean flag, int status[]);
A full description of the MPI_Irecv syntax is available at [12].
Once a series of non-blocking sends and receives have been initiated, they can be waited on using MPI_Waitall. MPI_Waitall is a blocking command which will wait until all the sends and receives specified in the request array have completed.
int MPI_Waitall(int count, MPI_Request array_of_requests[], MPI_Status array_of_statuses[] );
A full description of the MPI_Waitall syntax is available at [13].
1.8.4 MPI Exchanges
1.8.4.1 MPI Exchange Approaches
1.8.4.1.1 A simple (and poor) approach
call MPI_SEND( a(1,e), nx, MPI_DOUBLE_PRECISION, * nbrtop, 0, comm1d, ierr ) call MPI_RECV( a(1,s-1), nx, MPI_DOUBLE_PRECISION, * nbrbottom, 0, comm1d, status, ierr ) call MPI_SEND( a(1,s), nx, MPI_DOUBLE_PRECISION, nbrbottom, 1, comm1d, ierr ) call MPI_RECV( a(1,e+1), nx, MPI_DOUBLE_PRECISION, nbrtop, 1, comm1d, status, ierr ) return
Why will not this work?
(Note: s = start of computing cells, e = end of computing cells)
- Send to the cpu above and wait until it is received
Simple SEND/RECV:
How do we avoid deadlocks in our communication when we use blocking SEND and RECV commands?
call MPI_CART_COORDS( comm1d, rank, 1, coord, ierr ) ! !Even Nodes: ! if (mod( coord, 2 ) .eq. 0) then call MPI_SEND( a(1,e), nx, MPI_DOUBLE_PRECISION, & nbrtop, 0, comm1d, ierr ) call MPI_RECV( a(1,s-1), nx, MPI_DOUBLE_PRECISION, & nbrbottom, 0, comm1d, status, ierr ) call MPI_SEND( a(1,s), nx, MPI_DOUBLE_PRECISION, & nbrbottom, 1, comm1d, ierr ) call MPI_RECV( a(1,e+1), nx, MPI_DOUBLE_PRECISION, & nbrtop, 1, comm1d, status, ierr ) ! !Odd Nodes: ! else call MPI_RECV( a(1,s-1), nx, MPI_DOUBLE_PRECISION, & nbrbottom, 0, comm1d, status, ierr ) call MPI_SEND( a(1,e), nx, MPI_DOUBLE_PRECISION, & nbrtop, 0, comm1d, ierr ) call MPI_RECV( a(1,e+1), nx, MPI_DOUBLE_PRECISION, & nbrtop, 1, comm1d, status, ierr ) call MPI_SEND( a(1,s), nx, MPI_DOUBLE_PRECISION, & nbrbottom, 1, comm1d, ierr ) endif
Even/Odd Pattern:
- If my rank is even
- SEND to the node above
- RECV from the node above
- SEND to the node below
- RECV from the node below
- If my rank is even
- RECV from the node below
- SEND to the node below
- RECV from the node above
- SEND to the node below
1.8.4.1.2 Paired Exchanges
call MPI_SENDRECV( & a(1,e), nx, MPI_DOUBLE_PRECISION, nbrtop, 0, & a(1,s-1), nx, MPI_DOUBLE_PRECISION, nbrbottom, 0, & comm1d, status, ierr ) call MPI_SENDRECV( & a(1,s), nx, MPI_DOUBLE_PRECISION, nbrbottom, 1, & a(1,e+1), nx, MPI_DOUBLE_PRECISION, nbrtop, 1, & comm1d, status, ierr ) call MPI_SENDRECV( & a(1,e), nx, MPI_DOUBLE_PRECISION, nbrtop, 0, & a(1,s-1), nx, MPI_DOUBLE_PRECISION, nbrbottom, 0, & comm1d, status, ierr )
We are sending nx elements of a(1,e) to the node nbrtop while receiving nx elements starting at location a(1,s-1) from the node nbrbottom on the comm-group comm1d. This transfers all the bottom boundaries to the top in one line.
SENDRECV Communication Pattern:
- Transfer all the boundary node values from left to right
- Transfer all the boundary node values from right to left
This works very well for this 1D case.
1.8.4.1.3 Non-blocking ISEND/IRECV
call MPI_IRECV ( a(1,s-1), nx, MPI_DOUBLE_PRECISION, nbrbottom, 0, comm1d, req(1), ierr ) call MPI_IRECV ( a(1,e+1), nx, MPI_DOUBLE_PRECISION, nbrtop, 1, comm1d, req(2), ierr ) call MPI_ISEND ( a(1,e), nx, MPI_DOUBLE_PRECISION, nbrtop, 0, comm1d, req(3), ierr ) call MPI_ISEND ( a(1,s), nx, MPI_DOUBLE_PRECISION, nbrbottom, 1, comm1d, req(4), ierr ) call MPI_WAITALL ( 4, req, status_array, ierr )
- this IS a blocking command
call MPI_WAITALL ( 4, req, status_array, ierr )
- waits on four requests from ISEND or IRECV
- the req array is initialized with the ISEND or IRECV with one element for each command
- the status$\_$array is the final status of the ISEND and
IRECV, and can be used to determine the message size
Non-blocking ISEND/IRECV Pattern:
- Set up all the IRECV
- Assign a request id for each IRECV
- Set up all the ISEND's
- Assign a request id for each ISEND
- Set up a wait until the ISEND and IRECV are completed
- use the request information to monitor the requests
- return the status variables on the ISEND's and IRECV's
1.8.4.2 Exchanges: A Better Approach
Re-examining the Problem:
- Although the set up is simpler than with synchronized SEND/RECV pairs, the efficiency isn't much higher in the overall code.
- How do we make this problem work with minimal wait states?
For each iteration, we need to
- set Uold = Unew
- START exchange the boundary informations by setting up ISEND and IRECV between the edge of the grids using ghost cells
- loop over the ``truely interior cells to update $U_{new}$
- Wait until the ISEND and IRECV are completed
- Finish the Computation near the boundaries
Communication happens during Computation with small or no wait states
call MPI_IRECV ( a(1,s-1), nx, MPI_DOUBLE_PRECISION, nbrbottom, 0, comm1d, req(1), ierr ) call MPI_IRECV ( a(1,e+1), nx, MPI_DOUBLE_PRECISION, nbrtop, 1, comm1d, req(2), ierr ) call MPI_ISEND ( a(1,e), nx, MPI_DOUBLE_PRECISION, nbrtop, 0, comm1d, req(3), ierr ) call MPI_ISEND ( a(1,s), nx, MPI_DOUBLE_PRECISION, nbrbottom, 1, comm1d, req(4), ierr ) do interior update call MPI_WAITALL ( 4, req, status_array, ierr ) do boundary updates
1.8.5 Timing MPI Codes
MPI has its own version of a time command.
double precision MPI_Wtime()
Returns a double precision time in seconds.
It is possible to time sections of a code using the following idea
double precision :: t0, t1 call MPI_Barrier(comm_group) t0 = MPI_Wtime() !do something interesting here t1 = MPI_Wtime() print*, 'total time = ', t1-t0
1.8.6 Parallel Sorts
1. Sample the data. Here is a sample sort:
- distribute and equal number of elements to each processor
- on each processor, sort the local array
- do a global exchange to find the xmin and xmax
- on each processor, put the data into a set of evenly spaced bins - the number of bins should be larger than the number of processors
- use a global summation to find the number of elements in each bin
- broadcast the results of the summation to all processors
2. Shuffle the Data
- using the summation data, the processor id, and the number of processors, determine what data should be shipped to which processor
- on each processor, create a set of coordinated sends and receives to ship the data to the correct processor
- re-sort the data locally
3. Transfer the data
At times, we want to transfer data instead of copying it. Data transfers is equivalent of a move command in file systems instead of a copy command. Transfers are done:
- When we repartition our data
- When elements the geometric boundary associated with a computational node (particles, for example)
- To recover memory
- To keep local arrays associated only with local data
Processor Sending - P1:
Transfers are mostly a set of bookkeeping exercises that need to be done. Assume we are shipping some part of an array on processor P1 to processor P2. On P1, we need to:
- determine the data that needs to be sent from P1
- copy the data into a temporary array on P1
- sending the data from P1
- deleting the data from the original P1 array
- clean up the P1 array
Processor Receiving - P2:
On P2, we need to
- prepare a temporary array to receive data from P1
- set up a receive to accept the data
- integrate the data from the temporary array into P2 's array
- make sure there is enough space to receive the new data
- reindex the new data, if needed
- copy the new data into P2 's array
- delete the temporary array

