Source code for cassiopeia.simulator.ecDNABirthDeathSimulator

"""
This file stores a general phylogenetic tree simulator using forward birth-death
process, including differing fitness on lineages on the tree. Allows for a
variety of division and fitness regimes to be specified by the user.
"""

from random import random
from typing import Any, Callable, Dict, Generator, List, Optional, Tuple, Union

import networkx as nx
import numpy as np
import pandas as pd

from queue import PriorityQueue

from cassiopeia.data.CassiopeiaTree import CassiopeiaTree
from cassiopeia.mixins import ecDNABirthDeathSimulatorError, TreeSimulatorError
from cassiopeia.simulator.BirthDeathFitnessSimulator import (
    BirthDeathFitnessSimulator,
)


[docs] class ecDNABirthDeathSimulator(BirthDeathFitnessSimulator): """Simulator class for a forward birth-death process with fitness in a population with ecDNA. "Implements a flexible phylogenetic tree simulator using a forward birth- death process. In this process starting from an initial root lineage, births represent the branching of a new lineage and death represents the cessation of an existing lineage. The process is represented as a tree, with internal nodes representing division events, branch lengths representing the lifetimes of individuals, and leaves representing samples observed at the end of the experiment. Allows any distribution on birth and death waiting times to be specified, including constant, exponential, weibull, etc. If no death waiting time distribution is provided, the process reduces to a Yule birth process. Also robustly simulates differing fitness on lineages within a simulated tree. Fitness in this context represents potential mutations that may be acquired on a lineage that change the rate at which new members are born. Each lineage maintains its own birth scale parameter, altered from an initial specified experiment-wide birth scale parameter by accrued mutations. Different fitness regimes can be specified based on user provided distributions on how often fitness mutations occur and their respective strengths. There are two stopping conditions for the simulation. The first is "number of extant nodes", which specifies the simulation to run until the first moment a number of extant nodes exist. The second is "experiment time", which specifies the time at which lineages are sampled. At least one of these two stopping criteria must be provided. Both can be provided in which case the simulation is run until one of the stopping conditions is reached." Example use snippet: # note that numpy uses a different parameterization of the # exponential distribution with the scale parameter, which is 1/rate birth_waiting_distribution = lambda scale: np.random.exponential(scale) death_waiting_distribution = np.random.exponential(1.5) initial_birth_scale = 0.5 mutation_distribution = lambda: 1 if np.random.uniform() > 0.5 else 0 fitness_distribution = lambda: np.random.uniform(-1,1) fitness_base = 2 bd_sim = BirthDeathFitnessSimulator( birth_waiting_distribution, initial_birth_scale, death_waiting_distribution=death_waiting_distribution, mutation_distribution=mutation_distribution, fitness_distribution=fitness_distribution, fitness_base=fitness_base, num_extant=8 ) tree = bd_sim.simulate_tree() Args: birth_waiting_distribution: A function that samples waiting times from the birth distribution. Determines how often births occur. Must take a scale parameter as the input initial_birth_scale: The initial scale parameter that is used at the start of the experiment death_waiting_distribution: A function that samples waiting times from the death distribution. Determines how often deaths occur. Default is no-death. mutation_distribution: A function that samples the number of mutations that occur at a division event. If None, then no mutations are sampled fitness_distribution: One of the two elements in determining the multiplicative coefficient of a fitness mutation. A function that samples the exponential that the fitness base is raised by. Determines the distribution of fitness mutation strengths. Must not be None if mutation_distribution provided fitness_base: One of the two elements in determining the multiplicative strength of a fitness mutation. The base that is raised by the value given by the fitness distribution. Determines the base strength of fitness mutations. By default is e, Euler's Constant num_extant: Specifies the number of extant lineages existing at the same time as a stopping condition for the experiment experiment_time: Specifies the total time that the experiment runs as a stopping condition for the experiment collapse_unifurcations: Specifies whether to collapse unifurcations in the tree resulting from pruning dead lineages prune_dead_lineages: Whether or not to prune dead (unobserved) lineages. Can be more efficient to not prune, and instead compute statistics on living lineages downstream. random_seed: A seed for reproducibility initial_copy_number: Initial copy number for parental lineage. cosegregation_coefficient: A coefficient describing how likely it is for one species to be co-inherited with one specific species (currently modeled as the first in the array). TODO: how do we make this generalizable to multiple species each with different pairwise covariances? splitting_function: As implemented, the function that describes segregation of each species at cell division. TODO: fix this implementation to allow for non-independent segregation. fitness_array: Fitnesses with respect to copy number of each species in a cell. This should be a matrix in R^e (where e is the number of ecDNA species being modelled). fitness_function: A function that produces a fitness value as a function of copy number and the selection coefficient encoded by the fitness array. capture_efficiency: Probability of observing an ecDNA species. Used as the the probability of a binomial process. Raises: TreeSimulatorError if invalid stopping conditions are provided or if a fitness distribution is not provided when a mutation distribution isn't """ # update for ecDNA-specific initial conditions / simulation parameters. def __init__( self, birth_waiting_distribution: Callable[[float], float], initial_birth_scale: float, death_waiting_distribution: Optional[ Callable[[], float] ] = lambda: np.inf, mutation_distribution: Optional[Callable[[], int]] = None, fitness_distribution: Optional[Callable[[], float]] = None, fitness_base: float = np.e, num_extant: Optional[int] = None, experiment_time: Optional[float] = None, collapse_unifurcations: bool = True, prune_dead_lineages: bool = True, random_seed: int = None, initial_copy_number: np.array = np.array([1]), cosegregation_coefficient: float = 0.0, splitting_function: Callable[[int], int] = lambda c, x: c + np.random.binomial(x, p=0.5), fitness_array: np.array = np.array([0, 1]), fitness_function: Optional[Callable[[int, int, float], float]] = None, capture_efficiency: float = 1.0, initial_tree: Optional[CassiopeiaTree] = None, ): if num_extant is None and experiment_time is None: raise TreeSimulatorError( "Please specify at least one stopping condition" ) if mutation_distribution is not None and fitness_distribution is None: raise TreeSimulatorError( "Please specify a fitness strength distribution" ) if num_extant is not None and num_extant <= 0: raise TreeSimulatorError( "Please specify number of extant lineages greater than 0" ) if num_extant is not None and type(num_extant) is not int: raise TreeSimulatorError( "Please specify an integer number of extant tips" ) if experiment_time is not None and experiment_time <= 0: raise TreeSimulatorError( "Please specify an experiment time greater than 0" ) self.birth_waiting_distribution = birth_waiting_distribution self.initial_birth_scale = initial_birth_scale self.death_waiting_distribution = death_waiting_distribution self.mutation_distribution = mutation_distribution self.fitness_distribution = fitness_distribution self.fitness_base = fitness_base self.num_extant = num_extant self.experiment_time = experiment_time self.collapse_unifurcations = collapse_unifurcations self.prune_dead_lineages = prune_dead_lineages self.random_seed = random_seed self.initial_copy_number = initial_copy_number self.cosegregation_coefficient = cosegregation_coefficient self.splitting_function = splitting_function self.fitness_array = fitness_array self.fitness_function = fitness_function self.capture_efficiency = capture_efficiency self.initial_tree = initial_tree # update to store cn_array in node. (tree.nodes[root]["cn_array"])
[docs] def initialize_tree(self, names) -> nx.DiGraph: """initializes a tree (nx.DiGraph() object with one node)""" if self.initial_tree: tree = self.initial_tree.get_tree_topology() for node in self.initial_tree.nodes: tree.nodes[node]["birth_scale"] = self.update_fitness( self.initial_tree.get_attribute(node, "ecdna_array") ) tree.nodes[node]["time"] = self.initial_tree.get_attribute( node, "time" ) tree.nodes[node]["ecdna_array"] = ( self.initial_tree.get_attribute(node, "ecdna_array") ) return tree tree = nx.DiGraph() root = next(names) tree.add_node(root) tree.nodes[root]["birth_scale"] = self.initial_birth_scale tree.nodes[root]["time"] = 0 tree.nodes[root]["ecdna_array"] = self.initial_copy_number return tree
# update to compute fitness using lineage cn_array
[docs] def update_fitness(self, ecdna_array: np.array) -> float: """Updates a lineage birth scale, representing its (Malthusian) fitness. Fitness is computed as a function of copy number, using the fitness_array (which defines fitness for CN=0 or CN >0 for each species, with epistasis). Args: ecdna_array: The birth_scale to be updated Returns: The updated birth_scale Raises: TreeSimulatorError if a negative number of mutations is sampled """ if self.fitness_function is None: return self.initial_birth_scale * ( 1.0 + self.fitness_array[tuple((ecdna_array > 0).astype(int))] ) else: return self.initial_birth_scale * ( 1.0 + self.fitness_function( ecdna_array[0], ecdna_array[1], self.fitness_array[tuple((ecdna_array > 0).astype(int))], ) )
[docs] def sample_lineage_event( self, lineage: Dict[str, Union[int, float]], current_lineages: PriorityQueue, tree: nx.DiGraph, names: Generator, observed_nodes: List[str], ) -> None: """A helper function that samples an event for a lineage. Takes a lineage and determines the next event in that lineage's future. Simulates the lifespan of a new descendant. Birth and death waiting times are sampled, representing how long the descendant lived. If a death event occurs first, then the lineage with the new descendant is added to the queue of currently alive, but its status is marked as inactive and will be removed at the time the lineage dies. If a birth event occurs first, then the lineage with the new descendant is added to the queue, but with its status marked as active, and further events will be sampled at the time the lineage divides. Additionally, its fitness will be updated by altering its birth rate. The descendant node is added to the tree object, with the edge weight between the current node and the descendant representing the lifespan of the descendant. In the case the descendant would live past the end of the experiment (both birth and death times exceed past the end of the experiment), then the lifespan is cut off at the experiment time and a final observed sample is added to the tree. In this case the lineage is marked as inactive as well. Args: unique_id: The unique ID number to be used to name a new node added to the tree lineage: The current extant lineage to extend. Contains the ID of the internal node to attach the descendant to, the current birth scale parameter of the lineage, the current total lived time of the lineage, and the status of whether the lineage is still dividing current_lineages: The queue containing currently alive lineages tree: The tree object being constructed by the simulator representing the birth death process names: A generator providing unique names for tree nodes observed_nodes: A list of nodes that are observed at the end of the experiment Raises: TreeSimulatorError if a negative waiting time is sampled or a non-active lineage is passed in """ if not lineage["active"]: raise TreeSimulatorError( "Cannot sample event for non-active lineage" ) unique_id = next(names) birth_waiting_time = self.birth_waiting_distribution( lineage["birth_scale"] ) death_waiting_time = self.death_waiting_distribution() if birth_waiting_time <= 0 or death_waiting_time <= 0: raise TreeSimulatorError("0 or negative waiting time detected") # TO DO: this is a really hacky fix b/c it bypasses the length checks of # whether the first birth_waiting_time exceeds self.experiment_time. # Also, it just assumes the first event is a birth. we could also # WOLOG that the first birth_waiting_time of the experiment is 0 # (but that requires shifting times elsewhere in order to permit correct # model comparison to non-ecDNA simulators). if lineage["total_time"] == 0: # Update birth rate updated_birth_scale = self.update_fitness( tree.nodes[lineage["id"]]["ecdna_array"] ) # Annotate parameters for a given node in the tree tree.add_node(unique_id) tree.nodes[unique_id]["birth_scale"] = updated_birth_scale tree.add_edge(lineage["id"], unique_id) tree.nodes[unique_id]["time"] = ( birth_waiting_time + lineage["total_time"] ) tree.nodes[unique_id]["ecdna_array"] = tree.nodes[lineage["id"]][ "ecdna_array" ] # child_ecdna_array current_lineages.put( ( birth_waiting_time + lineage["total_time"], unique_id, { "id": unique_id, "birth_scale": updated_birth_scale, "total_time": birth_waiting_time + lineage["total_time"], "active": True, }, ) ) return # If birth and death would happen after the total experiment time, # just cut off the living branch length at the experiment time if ( self.experiment_time and lineage["total_time"] + birth_waiting_time >= self.experiment_time and lineage["total_time"] + death_waiting_time >= self.experiment_time ): tree.add_node(unique_id) tree.nodes[unique_id]["birth_scale"] = lineage["birth_scale"] tree.add_edge(lineage["id"], unique_id) tree.nodes[unique_id]["time"] = self.experiment_time tree.nodes[unique_id]["ecdna_array"] = tree.nodes[lineage["id"]][ "ecdna_array" ] current_lineages.put( ( self.experiment_time, unique_id, { "id": unique_id, "birth_scale": lineage["birth_scale"], "total_time": self.experiment_time, "active": False, }, ) ) # Indicate this node is observed at the end of experiment observed_nodes.append(unique_id) else: if birth_waiting_time < death_waiting_time: # Update birth rate updated_birth_scale = self.update_fitness( tree.nodes[lineage["id"]]["ecdna_array"] ) child_ecdna_array = self.get_ecdna_array(lineage["id"], tree) # Annotate parameters for a given node in the tree tree.add_node(unique_id) tree.nodes[unique_id]["birth_scale"] = updated_birth_scale tree.add_edge(lineage["id"], unique_id) tree.nodes[unique_id]["time"] = ( birth_waiting_time + lineage["total_time"] ) tree.nodes[unique_id]["ecdna_array"] = child_ecdna_array # Add the newly generated cell to the list of living lineages current_lineages.put( ( birth_waiting_time + lineage["total_time"], unique_id, { "id": unique_id, "birth_scale": updated_birth_scale, "total_time": birth_waiting_time + lineage["total_time"], "active": True, }, ) ) else: tree.add_node(unique_id) tree.nodes[unique_id]["birth_scale"] = lineage["birth_scale"] tree.add_edge(lineage["id"], unique_id) tree.nodes[unique_id]["time"] = ( death_waiting_time + lineage["total_time"] ) tree.nodes[unique_id]["ecdna_array"] = tree.nodes[ lineage["id"] ]["ecdna_array"] current_lineages.put( ( death_waiting_time + lineage["total_time"], unique_id, { "id": unique_id, "birth_scale": lineage["birth_scale"], "total_time": death_waiting_time + lineage["total_time"], "active": False, }, ) )
[docs] def get_ecdna_array(self, parent_id: str, tree: nx.DiGraph) -> np.array: """Generates an ecDNA array for a child given its parent and sisters. Args: parent_id: ID of parent in the generated tree. tree: The in-progress tree. Returns: Numpy array corresponding to the ecDNA copy numbers for the child. """ parental_ecdna_array = 2 * tree.nodes[parent_id]["ecdna_array"] has_child = tree.out_degree(parent_id) > 0 new_ecdna_array = parental_ecdna_array.copy() if has_child: child_id = [n for n in tree.successors(parent_id)][0] child_ecdna_array = tree.nodes[child_id]["ecdna_array"] new_ecdna_array = parental_ecdna_array - child_ecdna_array else: new_ecdna_array = np.array([0] * len(parental_ecdna_array)) new_ecdna_array[0] = self.splitting_function( 0, parental_ecdna_array[0] ) for species in range(1, len(parental_ecdna_array)): cosegregating_compartment = int( self.cosegregation_coefficient * (new_ecdna_array[0] / max(1, parental_ecdna_array[0])) * parental_ecdna_array[species] ) sister_cell_cosegregating = int( self.cosegregation_coefficient * ( (parental_ecdna_array[0] - new_ecdna_array[0]) / max(1, parental_ecdna_array[0]) ) * parental_ecdna_array[species] ) random_compartment = ( parental_ecdna_array[species] - cosegregating_compartment - sister_cell_cosegregating ) inherited_fraction = self.splitting_function( cosegregating_compartment, random_compartment, ) new_ecdna_array[species] = inherited_fraction # check that new ecdnay array entries do not exceed parental entries if np.any(new_ecdna_array > parental_ecdna_array): raise ecDNABirthDeathSimulatorError( "Child ecDNA entries exceed parental entries." ) return new_ecdna_array
[docs] def populate_tree_from_simulation( self, tree: nx.DiGraph, observed_nodes: List[str] ) -> CassiopeiaTree: """Populates tree with appropriate meta data. Args: tree: The tree simulated with ecDNA and fitness values populated as attributes. observed_nodes: The observed leaves of the tree. Returns: A CassiopeiaTree with relevant node attributes filled in. """ cassiopeia_tree = CassiopeiaTree(tree=tree) # transfer over all meta data time_dictionary = {} for node in tree.nodes: time_dictionary[node] = tree.nodes[node]["time"] cassiopeia_tree.set_attribute( node, "ecdna_array", tree.nodes[node]["ecdna_array"] ) cassiopeia_tree.set_attribute( node, "fitness", tree.nodes[node]["birth_scale"] ) cassiopeia_tree.set_times(time_dictionary) # Prune dead lineages and collapse resulting unifurcations to_remove = set(cassiopeia_tree.leaves) - set(observed_nodes) if self.prune_dead_lineages and len(to_remove) > 0: print(f"Removing {len(to_remove)} sublineages.") cassiopeia_tree.remove_leaves_and_prune_lineages(list(to_remove)) if self.collapse_unifurcations and len(cassiopeia_tree.nodes) > 1: cassiopeia_tree.collapse_unifurcations(source="1") leaf_ecdna_arrays = [ tree.nodes[node]["ecdna_array"] for node in cassiopeia_tree.leaves ] cell_metadata = pd.DataFrame( leaf_ecdna_arrays, columns=[ f"ecDNA_{i}" for i in range(len(self.initial_copy_number)) ], index=cassiopeia_tree.leaves, ) # apply noise model for i in range(len(self.initial_copy_number)): cell_metadata[f"Observed_ecDNA_{i}"] = cell_metadata.apply( lambda x: np.random.binomial( x[f"ecDNA_{i}"], self.capture_efficiency ), axis=1, ) cassiopeia_tree.cell_meta = cell_metadata.astype(int).copy() cassiopeia_tree.cell_meta["Observed"] = [ "False" if n in to_remove else "True" for n in cassiopeia_tree.leaves ] # If only implicit root remains after pruning dead lineages, error if len(cassiopeia_tree.nodes) == 1: raise ecDNABirthDeathSimulatorError( "All lineages died before stopping condition" ) return cassiopeia_tree