Skip to content

Infection model

An example of individuals moving and interacting on a geospatial network.

Examples Source Code

The code for all the examples can be obtained by either:

  • downloading and extracting the examples archive from the neworder releases page, or
  • pulling the docker image virgesmith/neworder:latest (more here

Optional dependencies

This example requires optional dependencies, use:

pip install neworder[geospatial]

Implementation

The model is built on the graph implementations and algorithms provided by the networkx and osmnx packages, as well as geoapandas, which are encapsulated in the GeospatialGraph class specific to this example (to avoid the main package needing the extra dependencies). Each entity travels at a fixed random speed on a street network, travelling repeatedly to randomly selected destinations following a shortest path.

A number of entities are initially "infected" (shown in red) and will pass on the infection to nearby susceptible entities (white). While infected there is a possibility of dying (black) in which case the individual stops moving and is no longer infectious. If they survive the infection they become immune (green). The simulation runs until there are no more infected agents.

Run like so

python examples/infection/run.py

which uses this model implementation:

examples/infection/infection.py
from enum import IntEnum
from time import sleep
from typing import Any

import geopandas as gpd
import matplotlib.pyplot as plt
import numpy as np
import osmnx as ox
import pandas as pd
from shapely import line_interpolate_point

import neworder as no
from neworder.geospatial import GeospatialGraph


class Status(IntEnum):
    SUSCEPTIBLE = 0
    INFECTED = 1
    IMMUNE = 2
    DEAD = 3

    @property
    def rgba(self) -> tuple[float, float, float, float]:
        match self:
            case Status.SUSCEPTIBLE:
                return (1.0, 1.0, 1.0, 1.0)
            case Status.INFECTED:
                return (1.0, 0.0, 0.0, 1.0)
            case Status.IMMUNE:
                return (0.0, 1.0, 0.0, 1.0)
            case Status.DEAD:
                return (0.0, 0.0, 0.0, 1.0)


class Infection(no.Model):
    def __init__(
        self,
        point: tuple[float, float],
        dist: float,
        n_agents: int,
        n_infected: int,
        speed: float,
        infection_radius: float,
        recovery_time: int,
        mortality: float,
    ) -> None:
        super().__init__(no.LinearTimeline(0.0, 1.0), no.MonteCarlo.deterministic_independent_stream)
        # expose the model's MC engine to numpy
        self.nprand = no.as_np(self.mc)
        # create the spatial domain
        self.domain = GeospatialGraph.from_point(point, dist, network_type="drive", crs="epsg:27700")

        # set the parameters
        self.infection_radius = infection_radius
        self.recovery_time = recovery_time
        self.marginal_mortality = 1.0 - (1.0 - mortality) ** (1.0 / recovery_time)

        # create the agent data, which is stored in a geopandas geodataframe
        start_positions = self.domain.all_nodes.sample(n=n_agents, random_state=self.nprand, replace=True).index.values
        speeds = self.nprand.lognormal(np.log(speed), 0.2, n_agents)
        agents = pd.DataFrame(
            data={
                "node": start_positions,
                "speed": speeds,
                "status": Status.SUSCEPTIBLE,
                "t_infect": no.time.NEVER,
            }
        )
        agents["dest"] = agents["node"].apply(self.__random_next_dest)
        agents["path"] = agents[["node", "dest"]].apply(
            lambda r: self.domain.shortest_path(r["node"], r["dest"], weight="length"),
            axis=1,
        )
        agents["dist"] = agents.path.apply(lambda p: p.length).astype(float)
        agents["offset"] = 0.0
        infected = self.nprand.choice(agents.index, n_infected, replace=False)
        agents.loc[infected, "status"] = Status.INFECTED
        agents.loc[infected, "t_infect"] = self.timeline.index

        self.agents = gpd.GeoDataFrame(
            agents,
            geometry=agents["path"].apply(lambda linestr: line_interpolate_point(linestr, 0)),
        )  # ty:ignore[no-matching-overload]
        self.fig, self.g = self.__init_visualisation()

    def step(self) -> None:
        self.__update_position()
        self.__infect_nearby()
        self.__recover()
        self.__succumb()
        num_infected = (self.agents.status == Status.INFECTED).sum()
        num_immune = (self.agents.status == Status.IMMUNE).sum()
        num_dead = (self.agents.status == Status.DEAD).sum()
        self.__update_visualisation(num_infected, num_immune, num_dead)
        if num_infected == 0:
            sleep(5)
            self.halt()
            self.finalise()

    def finalise(self) -> None:
        no.log(f"total steps: {self.timeline.index}")
        no.log(f"infections: {len(self.agents.t_infect.dropna())}")
        no.log(f"recoveries: {(self.agents.status == Status.IMMUNE).sum()}")
        no.log(f"deaths: {(self.agents.status == Status.DEAD).sum()}")
        no.log(f"unaffected: {(self.agents.status == Status.SUSCEPTIBLE).sum()}")

    def __random_next_dest(self, node: int) -> int:
        # ensure dest is different from origin
        dest = node
        while dest == node:
            dest = self.domain.all_nodes.sample(n=1, random_state=self.nprand).index.values[0]
        return dest

    def __update_position(self) -> None:
        self.agents.offset += self.agents.speed
        # move agent along its route
        self.agents["geometry"] = self.agents[["path", "offset"]].apply(
            lambda r: line_interpolate_point(r["path"], r["offset"]), axis=1
        )
        # check if arrived at destination and set a new destination if necessary
        overshoots = self.agents.offset >= self.agents.dist
        if not overshoots.empty:
            # offset <- offset - dist
            self.agents.loc[overshoots, "offset"] -= self.agents.loc[overshoots, "dist"]
            # node <- dest
            self.agents.loc[overshoots, "node"] = self.agents.loc[overshoots, "dest"]
            # dest <- random
            self.agents.loc[overshoots, "dest"] = self.agents.loc[overshoots, "node"].apply(self.__random_next_dest)
            # path <- (node, dest), dist <- new_dist
            self.agents.loc[overshoots, "path"] = self.agents.loc[overshoots, ["node", "dest"]].apply(
                lambda r: self.domain.shortest_path(r["node"], r["dest"], weight="length"),
                axis=1,
            )
            self.agents.loc[overshoots, "dist"] = (
                self.agents.loc[overshoots, "path"].apply(lambda p: p.length).astype(float)
            )
            # finally update position
            self.agents.loc[overshoots, "geometry"] = self.agents.loc[overshoots, "path"].apply(
                lambda linestr: line_interpolate_point(linestr, 0)
            )

    def __infect_nearby(self) -> None:
        infected = self.agents[self.agents.status == Status.INFECTED].geometry
        susceptible = self.agents[self.agents.status == Status.SUSCEPTIBLE].geometry
        new_infections = []
        # loop over smallest group for efficiency
        if len(infected) < len(susceptible):
            for i in infected:
                new = susceptible.geometry.distance(i) < self.infection_radius
                # new[new].index gives us only the index values corresponding to True
                new_infections.extend(new[new].index)
        else:
            for i, p in susceptible.items():
                new = infected.geometry.distance(p) < self.infection_radius
                if new.any():
                    new_infections.append(i)
        self.agents.loc[new_infections, "status"] = Status.INFECTED
        self.agents.loc[new_infections, "t_infect"] = self.timeline.index

    def __recover(self) -> None:
        t = self.timeline.index
        self.agents.loc[
            (t - self.agents.t_infect >= self.recovery_time) & (self.agents.status == Status.INFECTED),
            "status",
        ] = Status.IMMUNE

    def __succumb(self) -> None:
        infected = self.agents[self.agents.status == Status.INFECTED]
        death = self.mc.hazard(self.marginal_mortality, len(infected)).astype(bool)
        self.agents.loc[infected[death].index.values, "status"] = Status.DEAD
        self.agents.loc[infected[death].index.values, "speed"] = 0.0

    def __init_visualisation(self) -> tuple[Any, Any]:
        plt.ion()
        fig, ax = ox.plot_graph(
            self.domain.graph,  # ty:ignore[invalid-argument-type]
            bgcolor="w",
            node_size=5,
            edge_linewidth=2,
            edge_color="#777777",
            figsize=(12, 9),
        )
        plt.tight_layout()
        # optionally add a basemap:
        # import contextily as ctx
        # ctx.add_basemap(ax, crs=self.domain.crs, url=ctx.providers.OpenTopoMap)
        g = ax.scatter(
            self.agents.geometry.x,
            self.agents.geometry.y,
            color=self.agents.status.apply(lambda c: c.rgba),
            edgecolor="k",
        )
        fig.suptitle("[q to quit]")
        fig.canvas.mpl_connect("key_press_event", lambda event: self.halt() if event.key == "q" else None)  # ty:ignore[unresolved-attribute]
        fig.canvas.flush_events()
        return fig, g

    def __update_visualisation(self, num_infected, num_immune, num_dead) -> None:
        offsets = np.array(list(zip(self.agents.geometry.x, self.agents.geometry.y, strict=False)))
        colours = self.agents.status.apply(lambda c: c.rgba)
        self.g.set_offsets(offsets)
        self.g.set_facecolors(colours)
        self.fig.suptitle(
            f"step {self.timeline.index}: inf={num_infected} imm={num_immune} dead={num_dead} / {len(self.agents)} [q to quit]"
        )

        self.fig.canvas.flush_events()

and the neworder.geospatial.GeospatialGraph spatial domain.

Outputs

The output is an animated map of the agents, as illustrated above. A basemap can easily be added by installing contextily and uncommenting the lines in __init_visualisation.