Source code for cassiopeia.simulator.Cas9LineageTracingDataSimulator

"""
A Cas9-based lineage tracing data simulator. This is a sublcass of the
LineageTracingDataSimulator that simulates the data produced from Cas9-based
technologies (e.g, as described in Chan et al, Nature 2019 or McKenna et al,
Science 2016). This simulator implements the method `overlay_data` which takes
in a CassiopeiaTree with edge lengths and overlays states onto cut-sites.
"""
import copy
import math
from typing import Callable, Dict, List, Optional, Tuple, Union

import numpy as np

from cassiopeia.data import CassiopeiaTree
from cassiopeia.simulator.DataSimulator import DataSimulatorError
from cassiopeia.simulator.LineageTracingDataSimulator import (
    LineageTracingDataSimulator,
)


[docs]class Cas9LineageTracingDataSimulator(LineageTracingDataSimulator): """Simulates Cas9-based lineage tracing data. This subclass of `LineageTracingDataSimulator` implements the `overlay_data` function to simulate Cas9-based mutations onto a defined set of "cassette". These cassettes emulate the TargetSites described in Chan et al, Nature 2019 and the GESTALT arrays described in McKenna et al, Science 2020 in which several Cas9 cut-sites are arrayed together. In the Chan et al technology, these cassettes are of length 3; in McKenna et al, these cassettes are of length 10. The class accepts several parameters that govern the Cas9-based tracer. First and foremost is the cutting rate, describing how fast Cas9 is able to cut. We model Cas9 cutting as an exponential process, parameterized by the specified mutation rate - specifically, for a lifetime `t` and parameter `lambda`, the expected probability of Cas9 mutation, per site, is `exp(-lambda * t)`. Second, the class accepts the architecture of the recorder - described by the size of the cassette (by default 3) and the number of cassettes. The resulting lineage will have (size_of_cassette * number_of_cassettes) characters. The architecture is important in the sense that it encodes the correlation between characters. This is critical in two ways: the first is with respect to silencing -- entire cassettes are lost from silencing, either transcriptional or stochastic events. The second is with respect to Cas9 resections, in which more than one Cas9 molecule introduces a cut to the same cassette. In this event, all cut sites intermediate will be observed as missing. In this simulation, we allow this event to occur if Cas9 molecules perform cuts on the same cassette at any point in a cell's lifetime. Importantly, setting the cassette length to 1 will remove any resection events due to Cas9 cutting, and will reduce the amount of transcriptionally silencing observed. This behavior can be manually turned off by setting `collapse_sites_on_cassette=False`, which will keep cuts that occur simultaneously on the same cassette as separate events, instead of causing a resection event. Third, the class accepts a state distribution describing the relative likelihoods of various indels. This is very useful, as it is typical that a handful of mutations are far likelier than the bulk of the possible mutations. Finally, the class accepts two types of silencing rates. The first is the heritable silencing rate which is a rare event in which an entire cassette is transcriptionally silenced and therefore not observed. The second type of silencing is a stochastic dropout rate which simulates the loss of cassettes due to the low sensitivity of the RNA-sequencing assay. The function `overlay_data` will operate on the tree in place and will specifically modify the data stored in the character attributes. Args: number_of_cassettes: Number of cassettes (i.e., arrays of target sites) size_of_cassette: Number of editable target sites per cassette mutation_rate: Exponential parameter for the Cas9 cutting rate. The user can either pass in a float, in which every site will mutate at the same rate, or a list with an entry for each cut site indicating potentially a different mutation rate for every site. state_generating_distribution: Distribution from which to simulate state likelihoods. This is only used if mutation priors are not specified to the simulator. number_of_states: Number of states to simulate mutation_priors: A mapping from state to probability that a user can specify. If this argument is not None, states will not be pulled from the state distribution. heritable_silencing_rate: Silencing rate for the cassettes, per node, simulating heritable missing data events. stochastic_silencing_rate: Rate at which to randomly drop out cassettes, to simulate dropout due to low sensitivity of assays. heritable_missing_data_state: Integer representing data that has gone missing due to a heritable event (i.e. Cas9 resection or heritable silencing). stochastic_missing_data_state: Integer representing data that has gone missing due to the stochastic dropout from single-cell assay sensitivity. random_seed: Numpy random seed to use for deterministic simulations. Note that the numpy random seed gets set during every call to `overlay_data`, thereby producing deterministic simulations every time this function is called. collapse_sites_on_cassette: Whether or not to collapse cuts that occur in the same cassette in a single iteration. This option only takes effect when `size_of_cassette` is greater than 1. Defaults to True. Raises: DataSimulatorError if assumptions about the system are broken. """ def __init__( self, number_of_cassettes: int = 10, size_of_cassette: int = 3, mutation_rate: Union[float, List[float]] = 0.01, state_generating_distribution: Callable[ [], float ] = lambda: np.random.exponential(1e-5), number_of_states: int = 100, state_priors: Optional[Dict[int, float]] = None, heritable_silencing_rate: float = 1e-4, stochastic_silencing_rate: float = 1e-2, heritable_missing_data_state: int = -1, stochastic_missing_data_state: int = -1, random_seed: Optional[int] = None, collapse_sites_on_cassette: bool = True, ): if number_of_cassettes <= 0 or not isinstance(number_of_cassettes, int): raise DataSimulatorError("Specify a positive number of cassettes.") if size_of_cassette <= 0 or not isinstance(size_of_cassette, int): raise DataSimulatorError( "Specify a positive number of cut-sites" " per cassette." ) self.size_of_cassette = size_of_cassette self.number_of_cassettes = number_of_cassettes self.collapse_sites_on_cassette = collapse_sites_on_cassette if isinstance(mutation_rate, float): if mutation_rate < 0: raise DataSimulatorError( "Mutation rate needs to be" " non-negative." ) number_of_characters = size_of_cassette * number_of_cassettes self.mutation_rate_per_character = [ mutation_rate ] * number_of_characters else: if len(mutation_rate) != ( self.number_of_cassettes * self.size_of_cassette ): raise DataSimulatorError( "Length of mutation rate array is not" " the same as the number of characters." ) if np.any(np.array(mutation_rate) < 0): raise DataSimulatorError( "Mutation rate needs to be" " non-negative." ) self.mutation_rate_per_character = mutation_rate[:] if state_priors is None: self.number_of_states = number_of_states self.state_generating_distribution = state_generating_distribution else: Z = np.sum([v for v in state_priors.values()]) if not math.isclose(Z, 1.0): raise DataSimulatorError("Mutation priors do not sum to 1.") self.number_of_states = len(state_priors) self.state_generating_distribution = None self.mutation_priors = state_priors self.heritable_silencing_rate = heritable_silencing_rate self.stochastic_silencing_rate = stochastic_silencing_rate self.heritable_missing_data_state = heritable_missing_data_state self.stochastic_missing_data_state = stochastic_missing_data_state self.random_seed = random_seed
[docs] def overlay_data(self, tree: CassiopeiaTree): """Overlays Cas9-based lineage tracing data onto the CassiopeiaTree. Args: tree: Input CassiopeiaTree """ if self.random_seed is not None: np.random.seed(self.random_seed) # create state priors if they don't exist. # This will set the instance's variable for mutation priors and will # use this for all future simulations. if self.mutation_priors is None: self.mutation_priors = {} probabilites = [ self.state_generating_distribution() for _ in range(self.number_of_states) ] Z = np.sum(probabilites) for i in range(self.number_of_states): self.mutation_priors[i + 1] = probabilites[i] / Z number_of_characters = self.number_of_cassettes * self.size_of_cassette # initialize character states character_matrix = {} for node in tree.nodes: character_matrix[node] = [-1] * number_of_characters for node in tree.depth_first_traverse_nodes(tree.root, postorder=False): if tree.is_root(node): character_matrix[node] = [0] * number_of_characters continue parent = tree.parent(node) life_time = tree.get_time(node) - tree.get_time(parent) character_array = character_matrix[parent] open_sites = [ c for c in range(len(character_array)) if character_array[c] == 0 ] new_cuts = [] for site in open_sites: mutation_rate = self.mutation_rate_per_character[site] mutation_probability = 1 - (np.exp(-life_time * mutation_rate)) if np.random.uniform() < mutation_probability: new_cuts.append(site) # collapse cuts that are on the same cassette cuts_remaining = new_cuts if self.collapse_sites_on_cassette and self.size_of_cassette > 1: character_array, cuts_remaining = self.collapse_sites( character_array, new_cuts ) # introduce new states at cut sites character_array = self.introduce_states( character_array, cuts_remaining ) # silence cassettes silencing_probability = 1 - ( np.exp(-life_time * self.heritable_silencing_rate) ) character_array = self.silence_cassettes( character_array, silencing_probability, self.heritable_missing_data_state, ) character_matrix[node] = character_array # apply stochastic silencing for leaf in tree.leaves: character_matrix[leaf] = self.silence_cassettes( character_matrix[leaf], self.stochastic_silencing_rate, self.stochastic_missing_data_state, ) tree.set_all_character_states(character_matrix)
[docs] def collapse_sites( self, character_array: List[int], cuts: List[int] ) -> Tuple[List[int], List[int]]: """Collapses cassettes. Given a character array and a new set of cuts that Cas9 is inducing, this function will infer which cuts occur within a given cassette and collapse the sites between the two cuts. Args: character_array: Character array in progress cuts: Sites in the character array that are being cut. Returns: The updated character array and sites that are not part of a cassette collapse. """ updated_character_array = character_array.copy() cassettes = self.get_cassettes() cut_to_cassette = np.digitize(cuts, cassettes) cuts_remaining = [] for cassette in np.unique(cut_to_cassette): cut_indices = np.where(cut_to_cassette == cassette)[0] if len(cut_indices) > 1: sites_to_collapse = np.array(cuts)[cut_indices] left, right = ( np.min(sites_to_collapse), np.max(sites_to_collapse), ) for site in range(left, right + 1): updated_character_array[ site ] = self.heritable_missing_data_state else: cuts_remaining.append(np.array(cuts)[cut_indices[0]]) return updated_character_array, cuts_remaining
[docs] def introduce_states( self, character_array: List[int], cuts: List[int] ) -> List[int]: """Adds states to character array. New states are added to the character array at the predefined cut locations. Args: character_array: Character array cuts: Loci being cut Returns: An updated character array. """ updated_character_array = character_array.copy() states = list(self.mutation_priors.keys()) probabilities = list(self.mutation_priors.values()) for i in cuts: state = np.random.choice(states, 1, p=probabilities)[0] updated_character_array[i] = state return updated_character_array
[docs] def silence_cassettes( self, character_array: List[int], silencing_rate: float, missing_state: int = -1, ) -> List[int]: """Silences cassettes. Using the specified silencing rate, this function will randomly select cassettes to silence. Args: character_array: Character array silencing_rate: Silencing rate. missing_state: State to use for encoding missing data. Returns: An updated character array. """ updated_character_array = character_array.copy() cassettes = self.get_cassettes() cut_site_by_cassette = np.digitize( range(len(character_array)), cassettes ) for cassette in range(1, self.number_of_cassettes + 1): if np.random.uniform() < silencing_rate: indices = np.where(cut_site_by_cassette == cassette) left, right = np.min(indices), np.max(indices) for site in range(left, right + 1): updated_character_array[site] = missing_state return updated_character_array
[docs] def get_cassettes(self) -> List[int]: """Obtain indices of individual cassettes. A helper function that returns the indices that correpspond to the independent cassettes in the experiment. Returns: An array of indices corresponding to the start positions of the cassettes. """ cassettes = [ (self.size_of_cassette * j) for j in range(0, self.number_of_cassettes) ] return cassettes