Parallel Execution¶
This example illustrates how data can be exchanged and synchronised between processes. It uses the mpi4py
package for interprocess communication. If you're unfamiliar with this package, or with MPI, check out the documentation here.
The basic idea is that we have a population with a single arbitrary state property which can take one of N
values, where N
is the number of processes, and each process initially holds the part of the population in the corresponding state. As time evolves, indvidual's states change at random, and the processes exchange individuals to keep their own population homegeneous.
Each population is stored in a pandas DataFrame
. At the start these is an equal population in each process (and thus in each state).
The states transition randomly with a fixed probability \(p\) at each timestep, and those that change are redistributed amongst the processes.
Finally, one process acquires the entire population and prints a summary of the state counts.
Examples Source Code
The code for all the examples can be obtained by either:
Optional dependencies
This example requires optional dependencies, see system requirements and use:
pip install neworder[parallel]
Setup¶
Firstly, we import the necessary modules and check we are running in parallel mode:
import neworder
from parallel import Parallel # import our model definition
#neworder.verbose()
#neworder.checked(False)
# must be MPI enabled
assert neworder.mpi.SIZE > 1, "This configuration requires MPI with >1 process"
MPI
neworder uses the mpi4py
package to provide MPI functionality, which in turn requires an MPI installation on the host (see system requirements). The attributes neworder.mpi.COMM
(the MPI communicator), neworder.mpi.RANK
and neworder.mpi.SIZE
are provided for convenience.
As always, the neworder framework expects an instance of a model class, subclassed from neworder.Model
, which in turn requires a timeline, in this case a neworder.LinearTimeline
object:
population_size = 100
p = 0.01
timeline = neworder.LinearTimeline(0, 10, 10)
model = Parallel(timeline, p, population_size)
neworder.run(model)
So each process has an initial population of 100 individuals, each of which has a 1% probability of changing to another given state at each of the ten (unit) timesteps.
The Model¶
Here's the model constructor:
import numpy as np
import pandas as pd # type: ignore
import neworder
class Parallel(neworder.Model):
def __init__(self, timeline: neworder.Timeline, p: float, n: int):
# initialise base model (essential!)
super().__init__(timeline, neworder.MonteCarlo.nondeterministic_stream)
# enumerate possible states
self.s = np.arange(neworder.mpi.SIZE)
# create transition matrix with all off-diagonal probabilities equal to p
self.p = np.identity(neworder.mpi.SIZE) * (1 - neworder.mpi.SIZE * p) + p
# record initial population size
self.n = n
# individuals get a unique id and their initial state is the MPI rank
self.pop = pd.DataFrame({"id": neworder.df.unique_index(n),
"state": np.full(n, neworder.mpi.RANK) }).set_index("id")
The step
method, which is called at every timestep performs the state transitions. Note that neworder.df.transition
modifies the dataframe in-place. Then, sends individuals with changed state to the appropriate process and receives appropriate individuals from the other processes:
def step(self) -> None:
# generate some movement
neworder.df.transition(self, self.s, self.p, self.pop, "state")
# send emigrants to other processes
for s in range(neworder.mpi.SIZE):
if s != neworder.mpi.RANK:
emigrants = self.pop[self.pop.state == s]
neworder.log("sending %d emigrants to %d" % (len(emigrants), s))
neworder.mpi.COMM.send(emigrants, dest=s)
# remove the emigrants
self.pop = self.pop[self.pop.state == neworder.mpi.RANK]
# receive immigrants
for s in range(neworder.mpi.SIZE):
if s != neworder.mpi.RANK:
immigrants = neworder.mpi.COMM.recv(source=s)
if len(immigrants):
neworder.log("received %d immigrants from %d" % (len(immigrants), s))
self.pop = pd.concat((self.pop, immigrants))
Blocking communication
The above implementation uses blocking communication, which means that all processes send and receive from each other, even if they send an empty dataframe: a given process cannot know in advance if it's not going to receive data from another process, and will deadlock if it tries to receive data from a process that hasn't sent any. MPI does have non-blocking communication protocols, which are more complex to implement. For more info see the mpi4py documentation.
The check
method accounts for everyone being present and in the right place (i.e. process):
def check(self) -> bool:
# Ensure we haven't lost (or gained) anybody
totals = neworder.mpi.COMM.gather(len(self.pop), root=0)
if totals:
if sum(totals) != self.n * neworder.mpi.SIZE:
return False
# And check each process only has individuals that it should have
out_of_place = neworder.mpi.COMM.gather(len(self.pop[self.pop.state != neworder.mpi.RANK]))
if out_of_place and any(out_of_place):
return False
return True
For an explanation of why it's implemented like this, see here. The finalise
method aggregates the populations and prints a summary of the populations in each state.
def finalise(self) -> None:
# process 0 assembles all the data and prints a summary
pops = neworder.mpi.COMM.gather(self.pop, root=0)
if pops:
pop = pd.concat(pops)
neworder.log("State counts (total %d):\n%s" % (len(pop), pop["state"].value_counts().to_string()))
Execution¶
As usual, to run the model we just execute the model script, but via MPI, e.g. from the command line, something like
mpiexec -n 8 python examples/parallel/model.py
adjusting the path as necessary, and optionally changing the number of processes.
Output¶
Results will vary as the random streams are not deterministic in this example, but you should see something like:
...
[py 0/8] sending 2 emigrants to 7
[py 0/8] received 2 immigrants from 1
[py 0/8] received 1 immigrants from 4
[py 0/8] received 1 immigrants from 5
[py 0/8] received 1 immigrants from 6
[py 0/8] received 2 immigrants from 7
[py 0/8] State counts (total 800):
2 109
4 106
6 105
7 99
1 99
0 99
5 96
3 87