Source code for cassiopeia.solver.SpectralSolver
"""
This file stores a subclass of GreedySolver, the SpectralSolver.
This subclass implements an inference procedure that uses Fiedler's algorithm to
minimize for a version of the normalized cut on a similarity graph generated from
the observed mutations on a group of samples. The goal is to find a partition
on a graph that minimizes seperation in samples that share mutations, normalizing
for the sizes of each of the sides of the partition.
"""
from typing import Callable, Dict, List, Optional, Tuple, Union
import networkx as nx
import numpy as np
import pandas as pd
import scipy as sp
from cassiopeia.solver import (
GreedySolver,
dissimilarity_functions,
graph_utilities,
)
[docs]class SpectralSolver(GreedySolver.GreedySolver):
"""
A spectral-based CassiopeiaSolver.
The SpectralSolver implements a top-down algorithm that recursively
partitions the sample set based on similarity. At each recursive step,
a similarity graph is generated for the sample set, where edges
represent the number of shared mutations between nodes. Then a partition
is generated by finding the minimum weight cut over the graph,
normalized over the sum of edges within each side of the partition. The
cut is minimized in order to preserve similarities. The final partition
is then improved upon by a greedy hill-climbing procedure that further
optimizes the cut.
TODO: Experiment to find the best default similarity function
Args:
similarity_function: A function that calculates a similarity score
between two given samples and their observed mutations. The default
is "hamming_distance_without_missing"
threshold: A minimum similarity threshold to include an edge in the
similarity graph
prior_function: A function defining a transformation on the priors
in forming weights to scale the contribution of each mutation in
the similarity graph. One of the following:
"negative_log": Transforms each probability by the negative
log (default)
"inverse": Transforms each probability p by taking 1/p
"square_root_inverse": Transforms each probability by the
the square root of 1/p
Attributes:
similarity_function: A function that calculates a similarity score
between two given samples and their observed mutations
threshold: A minimum similarity threshold
prior_transformation: Function to use when transforming priors into
weights.
"""
def __init__(
self,
similarity_function: Optional[
Callable[
[
int,
int,
pd.DataFrame,
int,
Optional[Dict[int, Dict[str, float]]],
],
float,
]
] = dissimilarity_functions.hamming_similarity_without_missing,
threshold: Optional[int] = 0,
prior_transformation: str = "negative_log",
):
super().__init__(prior_transformation)
self.threshold = threshold
self.similarity_function = similarity_function
[docs] def perform_split(
self,
character_matrix: pd.DataFrame,
samples: List[int],
weights: Optional[Dict[int, Dict[int, float]]] = None,
missing_state_indicator: int = -1,
) -> Tuple[List[str], List[str]]:
"""Partitions the samples using the spectral algorithm.
First, a similarity graph is generated with samples as nodes such that
edges between a pair of nodes is some provided function on the number
of character/state mutations shared. Then, Fiedler's algorithm is used
to generate a partition on this graph that minimizes a modified
normalized cut: weight of edges across cut/ min(weight of edges within
each side of cut). It does this efficiently by first calculating the
2nd eigenvector of the normalized Laplacian of the similarity matrix.
Then, it orders the nodes in a graph by the eigenvector values and finds
an index such that partitioning the ordered nodes on that index
minimizes the normalized cut ratio. As the optimal partition can be
determined using the 2nd eigenvector, this greatly reduces the space of
cuts needed to be explored.
Args:
character_matrix: Character matrix
samples: A list of samples to partition
weights: Weighting of each (character, state) pair. Typically a
transformation of the priors.
missing_state_indicator: Character representing missing data.
Returns:
A tuple of lists, representing the left and right partition groups
"""
G = graph_utilities.construct_similarity_graph(
character_matrix,
missing_state_indicator,
samples,
similarity_function=self.similarity_function,
threshold=self.threshold,
weights=weights,
)
L = nx.normalized_laplacian_matrix(G).todense()
diag = sp.linalg.eig(L)
second_eigenvector = diag[1][:, 1]
nodes_to_eigenvector = {}
vertices = list(G.nodes())
for i in range(len(vertices)):
nodes_to_eigenvector[vertices[i]] = second_eigenvector[i]
vertices.sort(key=lambda v: nodes_to_eigenvector[v])
total_weight = 2 * sum([G[e[0]][e[1]]["weight"] for e in G.edges()])
# If the similarity graph is empty and there are no meaningful splits,
# return a polytomy over the remaining samples
if total_weight == 0:
return samples, []
cut = set()
numerator = 0
denominator = 0
prev_numerator = -1
best_score = np.inf
best_index = 0
for i in range(len(vertices) - 1):
v = vertices[i]
cut.add(v)
cut_edges = 0
neighbor_weight = 0
for w in G.neighbors(v):
neighbor_weight += G[v][w]["weight"]
if w in cut:
cut_edges += G[v][w]["weight"]
denominator += neighbor_weight
if i > 0:
prev_numerator = numerator
numerator += neighbor_weight - 2 * cut_edges
# Avoids naively taking the first zero-weight cut. If more samples
# can be added without changing the cut weight, those samples do not
# share any similarity with the other side of the partition.
if numerator != 0 and prev_numerator == 0:
best_index = i - 1
break
if min(denominator, total_weight - denominator) != 0:
if (
numerator / min(denominator, total_weight - denominator)
< best_score
):
best_score = numerator / min(
denominator, total_weight - denominator
)
best_index = i
else:
best_score = 0
best_index = i
improved_left_set = graph_utilities.spectral_improve_cut(
G, vertices[: best_index + 1]
)
improved_right_set = []
for i in samples:
if i not in improved_left_set:
improved_right_set.append(i)
return improved_left_set, improved_right_set