Parallel Execution¶
Multithreading
With the release of free-threaded python interpreters from 3.13 onwards, neworder v1.5.0 now includes support for multithreaded models. However, in most cases MPI is still the preferred methodology, especially when data is being exchanged by processes. For an example of a model using multithreading and a brief discussion, see here
This example illustrates how data can be exchanged and synchronised between processes. It uses the mpi4py package for interprocess communication. If you're unfamiliar with this package, or with MPI, check out the documentation here.
The basic idea is that we have a population with a single arbitrary state property which can take one of N values, where N is the number of processes, and each process initially holds the part of the population in the corresponding state. As time evolves, indvidual's states change at random, and the processes exchange individuals to keep their own population homegeneous.
Each population is stored in a pandas DataFrame. At the start these is an equal population in each process (and thus in each state).
The states transition randomly with a fixed probability \(p\) at each timestep, and those that change are redistributed amongst the processes.
Finally, one process acquires the entire population and prints a summary of the state counts.
Examples Source Code
The code for all the examples can be obtained by either:
Optional dependencies
This example requires optional dependencies, see system requirements and use:
pip install neworder[parallel]
Setup¶
Firstly, we import the necessary modules and check we are running in parallel mode:
from parallel_mpi import ParallelMPI # import our model definition # ty:ignore[unresolved-import]
import neworder
# neworder.verbose()
# neworder.checked(False)
# must be MPI enabled
assert neworder.mpi.SIZE > 1, "This configuration requires MPI with >1 process"
MPI
neworder uses the mpi4py package to provide MPI functionality, which in turn requires an MPI installation on the host (see system requirements). The attributes neworder.mpi.COMM (the MPI communicator), neworder.mpi.RANK and neworder.mpi.SIZE are provided for convenience.
As always, the neworder framework expects an instance of a model class, subclassed from neworder.Model, which in turn requires a timeline, in this case a neworder.LinearTimeline object:
population_size = 100
p = 0.01
timeline = neworder.LinearTimeline(0, 10, 10)
model = ParallelMPI(timeline, p, population_size)
neworder.run(model)
So each process has an initial population of 100 individuals, each of which has a 1% probability of changing to another given state at each of the ten (unit) timesteps.
The Model¶
Here's the model constructor:
import numpy as np
import pandas as pd
import neworder
class ParallelMPI(neworder.Model):
def __init__(self, timeline: neworder.Timeline, p: float, n: int):
"""This model uses MPI to run parallel process that synchronise timesteps and exchange data with each other"""
# initialise base model (essential!)
super().__init__(timeline, neworder.MonteCarlo.nondeterministic_stream)
# enumerate possible states
self.s = np.arange(neworder.mpi.SIZE)
# create transition matrix with all off-diagonal probabilities equal to p
self.p = np.identity(neworder.mpi.SIZE) * (1 - neworder.mpi.SIZE * p) + p
# record initial population size
self.n = n
# individuals get a unique id and their initial state is the MPI rank
self.pop = pd.DataFrame({"id": neworder.df.unique_index(n), "state": np.full(n, neworder.mpi.RANK)}).set_index(
"id"
)
The step method, which is called at every timestep performs the state transitions. Note that neworder.df.transition modifies the dataframe in-place. Then, sends individuals with changed state to the appropriate process and receives appropriate individuals from the other processes:
def step(self) -> None:
# generate some movement
neworder.df.transition(self, self.s, self.p, self.pop, "state")
# send emigrants to other processes
for s in range(neworder.mpi.SIZE):
if s != neworder.mpi.RANK:
emigrants = self.pop[self.pop.state == s]
neworder.log(f"sending {len(emigrants)} emigrants to {s}")
neworder.mpi.COMM.send(emigrants, dest=s)
# remove the emigrants
self.pop = self.pop[self.pop.state == neworder.mpi.RANK]
# receive immigrants
for s in range(neworder.mpi.SIZE):
if s != neworder.mpi.RANK:
immigrants = neworder.mpi.COMM.recv(source=s)
if len(immigrants):
neworder.log(f"received {len(immigrants)} immigrants from {s}")
self.pop = pd.concat((self.pop, immigrants))
Blocking communication
The above implementation uses blocking communication, which means that all processes send and receive from each other, even if they send an empty dataframe: a given process cannot know in advance if it's not going to receive data from another process, and will deadlock if it tries to receive data from a process that hasn't sent any. MPI does have non-blocking communication protocols, which are more complex to implement. For more info see the mpi4py documentation.
The check method accounts for everyone being present and in the right place (i.e. process):
def check(self) -> bool:
# Ensure we haven't lost (or gained) anybody
totals = neworder.mpi.COMM.gather(len(self.pop), root=0)
if totals and sum(totals) != self.n * neworder.mpi.SIZE:
return False
# And check each process only has individuals that it should have
out_of_place = neworder.mpi.COMM.gather(len(self.pop[self.pop.state != neworder.mpi.RANK]))
return not (out_of_place and any(out_of_place))
For an explanation of why it's implemented like this, see here. The finalise method aggregates the populations and prints a summary of the populations in each state.
def finalise(self) -> None:
# process 0 assembles all the data and prints a summary
pops = neworder.mpi.COMM.gather(self.pop, root=0)
if pops:
pop = pd.concat(pops)
neworder.log(f"State counts (total {len(pop)}):\n{pop['state'].value_counts().to_string()}")
Execution¶
As usual, to run the model we just execute the model script, but via MPI, e.g. from the command line, something like
adjusting the path as necessary, and optionally changing the number of processes.
Output¶
Results will vary as the random streams are not deterministic in this example, but you should see something like:
...
[py 0/8] sending 2 emigrants to 7
[py 0/8] received 2 immigrants from 1
[py 0/8] received 1 immigrants from 4
[py 0/8] received 1 immigrants from 5
[py 0/8] received 1 immigrants from 6
[py 0/8] received 2 immigrants from 7
[py 0/8] State counts (total 800):
2 109
4 106
6 105
7 99
1 99
0 99
5 96
3 87
Multithreaded Execution¶
Dividing model execution amongst threads rather than processes can be more tricky than splitting by processes as it is easier to trigger data races or non-deterministic behaviour. But since the python interpreter can now operate without a Global Interpreter Lock (GIL), significantly improving multihreaded performance, it is worth providing support and some guidance.
Warning
Dependencies containing extension modules must declare they can safely run without the GIL. If they don't the python runtime will automatically re-enable the GIL (and emit a warning). This can be seen when e.g. using pandas versions prior to 3.0.0.
neworder from 1.5 declares it can run safely without the GIL, and contains two new functions: freethreaded(), which indicates whether neworder has been compiled against a free-threaded interpreter, and thread_id() (which returns the same value as python's threading.get_native_id()). The sys._is_gil_enabled() function can be used to check for certain at runtime.
Guidance for multithreaded model implementation
- Seeding RNGs with
thread_id()is discouraged under any circumstance. Values are nondeterministic, and... ThreadPoolExecutorwill potentially reuse threads, so you may see the same thread_id in supposedly different threads, making their random streams identical.neworder.MonteCarlo.deterministic_independent_streamuses (only) the process rank to seed the RNG. For a single process with multiple threads, this will not result in independent streams.- Note that results will not necessarily be deterministic even with deterministic seeds as the order of thread execution is potentially variable.
- Writing to shared data requires the use of
threading.Lock. This should be used as a context manager for exception safety. - To synchronise threads (e.g. require all threads complete a timestep before starting the next one),
threading.Barriercan be used. - It is probably better to use MPI if significant inter-thread communication is necessary.
Example¶
The examples source code now includes a simple model (examples/parallel/parallel_mt.py) that simulates a CPU-intensive process (O(n) complexity on a dataframe), using a divide-and-conquer approach.
import time
from threading import Barrier, Lock
import pandas as pd
import neworder
class ParallelThreaded(neworder.Model):
"""
A simple model demonstrating multithreaded rather than multiprocess execution, sharing a dataset and synchronising
at each timestep. An expensive computation is simulated that is O(n), showing how partitioning the dataset can
achieve significant performance gains.
"""
def __init__(self, pop: pd.DataFrame, indices: list[int], lock: Lock, barrier: Barrier) -> None:
"""
pop: the overall population
indices: the chunk of the population that this thread deals with
"""
super().__init__(neworder.LinearTimeline(0, 10, 10), lambda: indices[0]) # deterministic, independent
self.lock = lock
self.barrier = barrier
self.pop = pop # ref not copy
self.indices = indices
def step(self) -> None:
# simulate some complex CPU-bound task that scales linearly
time.sleep(len(self.indices) / len(self.pop))
value = self.mc.ustream(1)
# update the dataset
with self.lock: # throwing in a lock is bad
self.pop.loc[self.indices] += value
# (optional) synchronise threads at the end of each timestep
self.barrier.wait()
Thus, the more threads used, the faster the overall execution (up to some limit). The model takes a lock - to isolate writing to shared data, and a barrier to enable thread synchronisation at each timestep. The model can be run using the following script which use Python's concurrent.futures module
"""
Run script for multithreaded parallel example
"""
import sys
import time
from concurrent.futures import ThreadPoolExecutor
from threading import Barrier, Lock
import pandas as pd
from parallel_mt import ParallelThreaded # ty:ignore[unresolved-import]
import neworder
# neworder.verbose()
def sync() -> None:
"""Optionally add code here to run before the barrier is crossed"""
pass
def run_thread(index: int, lock: Lock, barrier: Barrier) -> None:
indices = list(range(index * N_PEOPLE // N_THREADS, (index + 1) * N_PEOPLE // N_THREADS))
m = ParallelThreaded(pop, indices, lock, barrier)
neworder.run(m)
N_THREADS = 8
N_PEOPLE = 100
pop = pd.DataFrame(index=range(N_PEOPLE), data={"state": 0.0})
# threading primitives
lock = Lock()
barrier = Barrier(N_THREADS, action=sync)
def _is_ft() -> bool:
if sys.version_info.minor > 12:
return not sys._is_gil_enabled()
return False
neworder.log(
f"neworder FT: {neworder.freethreaded()}, python FT:{_is_ft()}",
sys.version,
)
t = time.perf_counter()
with ThreadPoolExecutor() as executor:
futures = [executor.submit(run_thread, i, lock, barrier) for i in range(N_THREADS)]
results = [f.result() for f in futures]
neworder.log(f"Execution time: {time.perf_counter() - t:.3f}s, (total CPU time ~ 10s)")
# This should output N_THREADS unique values, with (approximately) even counts
print(pop.value_counts())