"""
This file stores a subclass of DistanceSolver, UPGMA. The inference procedure is
a hierarchical clustering algorithm proposed by Sokal and Michener (1958) that
iteratively joins together samples with the minimum dissimilarity.
"""
from typing import Callable, Dict, List, Optional, Tuple, Union
import abc
from collections import defaultdict
import networkx as nx
import numba
import numpy as np
import pandas as pd
from cassiopeia.data import CassiopeiaTree
from cassiopeia.mixins import DistanceSolverError
from cassiopeia.solver import DistanceSolver, dissimilarity_functions
[docs]
class UPGMASolver(DistanceSolver.DistanceSolver):
"""
UPGMA CassiopeiaSolver.
Implements the UPGMA algorithm described as a derived class of
DistanceSolver. This class inherits the generic `solve` method, but
implements its own procedure for finding cherries by minimizing the
dissimilarity between samples. After joining nodes, the dissimilarities
are updated by averaging the distances of elements in the new cluster
with each existing node. Produces a rooted tree that is assumed to be
ultrametric. If fast is set to True, a fast UPGMA implementation of is used.
Args:
dissimilarity_function: A function by which to compute the dissimilarity
map. Optional if a dissimilarity map is already provided.
prior_transformation: Function to use when transforming priors into
weights. Supports the following transformations:
"negative_log": Transforms each probability by the negative
log (default)
"inverse": Transforms each probability p by taking 1/p
"square_root_inverse": Transforms each probability by the
the square root of 1/p
fast: Whether to use a fast implementation of UPGMA.
implementation: Which fast implementation to use. Options are:
"ccphylo_upgma": Uses the fast UPGMA implementation from CCPhylo.
Attributes:
dissimilarity_function: Function used to compute dissimilarity between
samples.
add_root: Whether or not to add an implicit root the tree.
prior_transformation: Function to use when transforming priors into
weights.
"""
def __init__(
self,
dissimilarity_function: Optional[
Callable[
[np.array, np.array, int, Dict[int, Dict[int, float]]], float
]
] = dissimilarity_functions.weighted_hamming_distance,
prior_transformation: str = "negative_log",
fast: bool = False,
implementation: str = "ccphylo_upgma",
):
if fast:
if implementation in ["ccphylo_upgma"]:
self._implementation = implementation
else:
raise DistanceSolverError(
"Invalid fast implementation of UPGMA. Options are: "
"'ccphylo_upgma'"
)
else:
self._implementation = "generic_upgma"
super().__init__(
dissimilarity_function=dissimilarity_function,
add_root=True,
prior_transformation=prior_transformation,
)
self.__cluster_to_cluster_size = defaultdict(int)
[docs]
def root_tree(
self, tree: nx.Graph, root_sample: str, remaining_samples: List[str]
):
"""Roots a tree produced by UPGMA.
Adds the root at the top of the UPGMA reconstructed tree. By the
ultrametric assumption, the root is placed as the parent to the last
two unjoined nodes.
Args:
tree: Networkx object representing the tree topology
root_sample: Ignored in this case, the root is known in this case
remaining_samples: The last two unjoined nodes in the tree
Returns:
A rooted tree.
"""
tree.add_node("root")
tree.add_edges_from(
[("root", remaining_samples[0]), ("root", remaining_samples[1])]
)
rooted_tree = nx.DiGraph()
for e in nx.dfs_edges(tree, source="root"):
rooted_tree.add_edge(e[0], e[1])
return rooted_tree
[docs]
def find_cherry(self, dissimilarity_matrix: np.array) -> Tuple[int, int]:
"""Finds a pair of samples to join into a cherry.
Finds the pair of samples with the minimum dissimilarity by finding the
minimum value in the provided dissimilarity matrix
Args:
dissimilarity_matrix: A sample x sample dissimilarity matrix
Returns:
A tuple of integers representing rows in the dissimilarity matrix
to join.
"""
dissimilarity_matrix = dissimilarity_matrix.astype(float)
np.fill_diagonal(dissimilarity_matrix, np.inf)
return np.unravel_index(
np.argmin(dissimilarity_matrix, axis=None),
dissimilarity_matrix.shape,
)
[docs]
def update_dissimilarity_map(
self,
dissimilarity_map: pd.DataFrame,
cherry: Tuple[str, str],
new_node: str,
) -> pd.DataFrame:
"""Update dissimilarity map after finding a cherry.
Updates the dissimilarity map after joining together two nodes (m1, m2)
at a cherry m. For all other nodes v, the new dissimilarity map d' is:
d'(m, v) = (<m1> * d(m1, v) + <m2> * d(m2, v))/(<m1> + <m2>)
where <m1> is the size of cluster m1, i.e. the number of sample leaves
under node m1.
Args:
dissimilarity_map: A dissimilarity map to update
cherry: A tuple of indices in the dissimilarity map that are joining
new_node: New node name, to be added to the new dissimilarity map
Returns:
A new dissimilarity map, updated with the new node
"""
i_size, j_size = (
max(1, self.__cluster_to_cluster_size[cherry[0]]),
max(1, self.__cluster_to_cluster_size[cherry[1]]),
)
self.__cluster_to_cluster_size[new_node] = i_size + j_size
i, j = (
np.where(dissimilarity_map.index == cherry[0])[0][0],
np.where(dissimilarity_map.index == cherry[1])[0][0],
)
dissimilarity_array = self.__update_dissimilarity_map_numba(
dissimilarity_map.to_numpy(), i, j, i_size, j_size
)
sample_names = list(dissimilarity_map.index) + [new_node]
dissimilarity_map = pd.DataFrame(
dissimilarity_array, index=sample_names, columns=sample_names
)
# drop out cherry from dissimilarity map
dissimilarity_map.drop(
columns=[cherry[0], cherry[1]],
index=[cherry[0], cherry[1]],
inplace=True,
)
return dissimilarity_map
@staticmethod
@numba.jit(nopython=True)
def __update_dissimilarity_map_numba(
dissimilarity_map: np.array,
cherry_i: int,
cherry_j: int,
size_i: int,
size_j: int,
) -> np.array:
"""A private, optimized function for updating dissimilarities.
A faster implementation of updating the dissimilarity map for UPGMA,
invoked by `self.update_dissimilarity_map`.
Args:
dissimilarity_map: A matrix of dissimilarities to update
cherry_i: Index of the first item in the cherry
cherry_j: Index of the second item in the cherry
Returns:
An updated dissimilarity map
"""
# add new row & column for incoming sample
N = dissimilarity_map.shape[1]
new_row = np.array([0.0] * N)
updated_map = np.vstack((dissimilarity_map, np.atleast_2d(new_row)))
new_col = np.array([0.0] * (N + 1))
updated_map = np.hstack((updated_map, np.atleast_2d(new_col).T))
new_node_index = updated_map.shape[0] - 1
for v in range(dissimilarity_map.shape[0]):
if v == cherry_i or v == cherry_j:
continue
updated_map[v, new_node_index] = updated_map[new_node_index, v] = (
size_i * dissimilarity_map[v, cherry_i]
+ size_j * dissimilarity_map[v, cherry_j]
) / (size_i + size_j)
updated_map[new_node_index, new_node_index] = 0
return updated_map
[docs]
def setup_root_finder(self, cassiopeia_tree: CassiopeiaTree) -> None:
"""Defines the implicit rooting strategy for the UPGMASolver.
By default, the UPGMA algorithm returns an rooted tree. Therefore,
the implicit root will be placed and specified at the end of the
solving procedure as the parent of the last two unjoined nodes.
Args:
cassiopeia_tree: Input CassiopeiaTree to `solve`
"""
cassiopeia_tree.root_sample_name = "root"