Skip to content

Parallel Execution

This example illustrates how data can be exchanged and synchronised between processes. It uses the mpi4py package for interprocess communication. If you're unfamiliar with this package, or with MPI, check out the documentation here.

The basic idea is that we have a population with a single arbitrary state property which can take one of N values, where N is the number of processes, and each process initially holds the part of the population in the corresponding state. As time evolves, indvidual's states change at random, and the processes exchange individuals to keep their own population homegeneous.

Each population is stored in a pandas DataFrame. At the start these is an equal population in each process (and thus in each state).

The states transition randomly with a fixed probability \(p\) at each timestep, and those that change are redistributed amongst the processes.

Finally, one process acquires the entire population and prints a summary of the state counts.

Examples Source Code

The code for all the examples can be obtained by either:

  • pulling the docker image virgesmith/neworder:latest (more here), or
  • installing neworder, downloading the source code archive from the neworder releases page and extracting the examples subdirectory.

Optional dependencies

This example requires optional dependencies, see system requirements and use:

pip install neworder[parallel]

Setup

Firstly, we import the necessary modules and check we are running in parallel mode:

import neworder
from parallel import Parallel # import our model definition

#neworder.verbose()
#neworder.checked(False)

# must be MPI enabled
assert neworder.mpi.SIZE > 1, "This configuration requires MPI with >1 process"
[file: examples/parallel/model.py]

MPI

neworder uses the mpi4py package to provide MPI functionality, which in turn requires an MPI installation on the host (see system requirements). The attributes neworder.mpi.COMM (the MPI communicator), neworder.mpi.RANK and neworder.mpi.SIZE are provided for convenience.

As always, the neworder framework expects an instance of a model class, subclassed from neworder.Model, which in turn requires a timeline, in this case a neworder.LinearTimeline object:

population_size = 100
p = 0.01
timeline = neworder.LinearTimeline(0, 10, 10)
model = Parallel(timeline, p, population_size)
neworder.run(model)
[file: examples/parallel/model.py]

So each process has an initial population of 100 individuals, each of which has a 1% probability of changing to another given state at each of the ten (unit) timesteps.

The Model

Here's the model constructor:

import numpy as np
import pandas as pd  # type: ignore

import neworder

class Parallel(neworder.Model):
  def __init__(self, timeline: neworder.Timeline, p: float, n: int):
    # initialise base model (essential!)
    super().__init__(timeline, neworder.MonteCarlo.nondeterministic_stream)

    # enumerate possible states
    self.s = np.arange(neworder.mpi.SIZE)

    # create transition matrix with all off-diagonal probabilities equal to p
    self.p = np.identity(neworder.mpi.SIZE) * (1 - neworder.mpi.SIZE * p) + p

    # record initial population size
    self.n = n

    # individuals get a unique id and their initial state is the MPI rank
    self.pop = pd.DataFrame({"id": neworder.df.unique_index(n),
                             "state": np.full(n, neworder.mpi.RANK) }).set_index("id")
[file: examples/parallel/parallel.py]

The step method, which is called at every timestep performs the state transitions. Note that neworder.df.transition modifies the dataframe in-place. Then, sends individuals with changed state to the appropriate process and receives appropriate individuals from the other processes:

  def step(self) -> None:
    # generate some movement
    neworder.df.transition(self, self.s, self.p, self.pop, "state")

    # send emigrants to other processes
    for s in range(neworder.mpi.SIZE):
      if s != neworder.mpi.RANK:
        emigrants = self.pop[self.pop.state == s]
        neworder.log("sending %d emigrants to %d" % (len(emigrants), s))
        neworder.mpi.COMM.send(emigrants, dest=s)

    # remove the emigrants
    self.pop = self.pop[self.pop.state == neworder.mpi.RANK]

    # receive immigrants
    for s in range(neworder.mpi.SIZE):
      if s != neworder.mpi.RANK:
        immigrants = neworder.mpi.COMM.recv(source=s)
        if len(immigrants):
          neworder.log("received %d immigrants from %d" % (len(immigrants), s))
          self.pop = pd.concat((self.pop, immigrants))
[file: examples/parallel/parallel.py]

Blocking communication

The above implementation uses blocking communication, which means that all processes send and receive from each other, even if they send an empty dataframe: a given process cannot know in advance if it's not going to receive data from another process, and will deadlock if it tries to receive data from a process that hasn't sent any. MPI does have non-blocking communication protocols, which are more complex to implement. For more info see the mpi4py documentation.

The check method accounts for everyone being present and in the right place (i.e. process):

  def check(self) -> bool:
    # Ensure we haven't lost (or gained) anybody
    totals = neworder.mpi.COMM.gather(len(self.pop), root=0)
    if totals:
      if sum(totals) != self.n * neworder.mpi.SIZE:
        return False
    # And check each process only has individuals that it should have
    out_of_place = neworder.mpi.COMM.gather(len(self.pop[self.pop.state != neworder.mpi.RANK]))
    if out_of_place and any(out_of_place):
      return False
    return True
[file: examples/parallel/parallel.py]

For an explanation of why it's implemented like this, see here. The finalise method aggregates the populations and prints a summary of the populations in each state.

  def finalise(self) -> None:
    # process 0 assembles all the data and prints a summary
    pops = neworder.mpi.COMM.gather(self.pop, root=0)
    if pops:
      pop = pd.concat(pops)
      neworder.log("State counts (total %d):\n%s" % (len(pop), pop["state"].value_counts().to_string()))
[file: examples/parallel/parallel.py]

Execution

As usual, to run the model we just execute the model script, but via MPI, e.g. from the command line, something like

mpiexec -n 8 python examples/parallel/model.py

adjusting the path as necessary, and optionally changing the number of processes.

Output

Results will vary as the random streams are not deterministic in this example, but you should see something like:

...
[py 0/8]  sending 2 emigrants to 7
[py 0/8]  received 2 immigrants from 1
[py 0/8]  received 1 immigrants from 4
[py 0/8]  received 1 immigrants from 5
[py 0/8]  received 1 immigrants from 6
[py 0/8]  received 2 immigrants from 7
[py 0/8]  State counts (total 800):
2    109
4    106
6    105
7     99
1     99
0     99
5     96
3     87