Source code for cassiopeia.solver.SpectralSolver

"""
This file stores a subclass of GreedySolver, the SpectralSolver. 
This subclass implements an inference procedure that uses Fiedler's algorithm to
minimize for a version of the normalized cut on a similarity graph generated from 
the observed mutations on a group of samples. The goal is to find a partition 
on a graph that minimizes seperation in samples that share mutations, normalizing 
for the sizes of each of the sides of the partition.
"""
from typing import Callable, Dict, List, Optional, Tuple, Union

import networkx as nx
import numpy as np
import pandas as pd
import scipy as sp

from cassiopeia.solver import (
    GreedySolver,
    dissimilarity_functions,
    graph_utilities,
)


[docs]class SpectralSolver(GreedySolver.GreedySolver): """ A spectral-based CassiopeiaSolver. The SpectralSolver implements a top-down algorithm that recursively partitions the sample set based on similarity. At each recursive step, a similarity graph is generated for the sample set, where edges represent the number of shared mutations between nodes. Then a partition is generated by finding the minimum weight cut over the graph, normalized over the sum of edges within each side of the partition. The cut is minimized in order to preserve similarities. The final partition is then improved upon by a greedy hill-climbing procedure that further optimizes the cut. TODO: Experiment to find the best default similarity function Args: similarity_function: A function that calculates a similarity score between two given samples and their observed mutations. The default is "hamming_distance_without_missing" threshold: A minimum similarity threshold to include an edge in the similarity graph prior_function: A function defining a transformation on the priors in forming weights to scale the contribution of each mutation in the similarity graph. One of the following: "negative_log": Transforms each probability by the negative log (default) "inverse": Transforms each probability p by taking 1/p "square_root_inverse": Transforms each probability by the the square root of 1/p Attributes: similarity_function: A function that calculates a similarity score between two given samples and their observed mutations threshold: A minimum similarity threshold prior_transformation: Function to use when transforming priors into weights. """ def __init__( self, similarity_function: Optional[ Callable[ [ int, int, pd.DataFrame, int, Optional[Dict[int, Dict[str, float]]], ], float, ] ] = dissimilarity_functions.hamming_similarity_without_missing, threshold: Optional[int] = 0, prior_transformation: str = "negative_log", ): super().__init__(prior_transformation) self.threshold = threshold self.similarity_function = similarity_function
[docs] def perform_split( self, character_matrix: pd.DataFrame, samples: List[int], weights: Optional[Dict[int, Dict[int, float]]] = None, missing_state_indicator: int = -1, ) -> Tuple[List[str], List[str]]: """Partitions the samples using the spectral algorithm. First, a similarity graph is generated with samples as nodes such that edges between a pair of nodes is some provided function on the number of character/state mutations shared. Then, Fiedler's algorithm is used to generate a partition on this graph that minimizes a modified normalized cut: weight of edges across cut/ min(weight of edges within each side of cut). It does this efficiently by first calculating the 2nd eigenvector of the normalized Laplacian of the similarity matrix. Then, it orders the nodes in a graph by the eigenvector values and finds an index such that partitioning the ordered nodes on that index minimizes the normalized cut ratio. As the optimal partition can be determined using the 2nd eigenvector, this greatly reduces the space of cuts needed to be explored. Args: character_matrix: Character matrix samples: A list of samples to partition weights: Weighting of each (character, state) pair. Typically a transformation of the priors. missing_state_indicator: Character representing missing data. Returns: A tuple of lists, representing the left and right partition groups """ G = graph_utilities.construct_similarity_graph( character_matrix, missing_state_indicator, samples, similarity_function=self.similarity_function, threshold=self.threshold, weights=weights, ) L = nx.normalized_laplacian_matrix(G).todense() diag = sp.linalg.eig(L) second_eigenvector = diag[1][:, 1] nodes_to_eigenvector = {} vertices = list(G.nodes()) for i in range(len(vertices)): nodes_to_eigenvector[vertices[i]] = second_eigenvector[i] vertices.sort(key=lambda v: nodes_to_eigenvector[v]) total_weight = 2 * sum([G[e[0]][e[1]]["weight"] for e in G.edges()]) # If the similarity graph is empty and there are no meaningful splits, # return a polytomy over the remaining samples if total_weight == 0: return samples, [] cut = set() numerator = 0 denominator = 0 prev_numerator = -1 best_score = np.inf best_index = 0 for i in range(len(vertices) - 1): v = vertices[i] cut.add(v) cut_edges = 0 neighbor_weight = 0 for w in G.neighbors(v): neighbor_weight += G[v][w]["weight"] if w in cut: cut_edges += G[v][w]["weight"] denominator += neighbor_weight if i > 0: prev_numerator = numerator numerator += neighbor_weight - 2 * cut_edges # Avoids naively taking the first zero-weight cut. If more samples # can be added without changing the cut weight, those samples do not # share any similarity with the other side of the partition. if numerator != 0 and prev_numerator == 0: best_index = i - 1 break if min(denominator, total_weight - denominator) != 0: if ( numerator / min(denominator, total_weight - denominator) < best_score ): best_score = numerator / min( denominator, total_weight - denominator ) best_index = i else: best_score = 0 best_index = i improved_left_set = graph_utilities.spectral_improve_cut( G, vertices[: best_index + 1] ) improved_right_set = [] for i in samples: if i not in improved_left_set: improved_right_set.append(i) return improved_left_set, improved_right_set