"""
A library that contains dissimilarity functions for the purpose of comparing
phylogenetic samples.
"""
import itertools
from typing import Callable, Dict, Iterable, List, Optional, Set, Tuple, Union
import numba
import numpy as np
[docs]
def weighted_hamming_distance(
s1: List[int],
s2: List[int],
missing_state_indicator=-1,
weights: Optional[Dict[int, Dict[int, float]]] = None,
) -> float:
"""Computes the weighted hamming distance between samples.
Evaluates the dissimilarity of two phylogenetic samples on the basis of
their shared indel states and the probability of these indel states
occurring. Specifically, for a given character, if two states are identical
we decrement the dissimilarity by the probability of these two occurring
independently; if the two states disagree, we increment the dissimilarity by
the probability of these states occurring. We normalize the dissimilarity by
the number of non-missing characters shared by the two samples.
If weights are not given, then we increment dissimilarity by +2 if the
states are different, +1 if one state is uncut and the other is an indel,
and +0 if the two states are identical.
Args:
s1: Character states of the first sample
s2: Character states of the second sample
missing_state_indicator: The character representing missing values
weights: A dictionary storing the state weights for each character,
derived from the state priors. This should be a nested
dictionary where each key corresponds to character that then indexes
another dictionary storing the weight of each observed state.
(Character -> State -> Weight)
Returns:
A dissimilarity score.
"""
d = 0
num_present = 0
for i in range(len(s1)):
if s1[i] == missing_state_indicator or s2[i] == missing_state_indicator:
continue
num_present += 1
if s1[i] != s2[i]:
if s1[i] == 0 or s2[i] == 0:
if weights:
if s1[i] != 0:
d += weights[i][s1[i]]
else:
d += weights[i][s2[i]]
else:
d += 1
else:
if weights:
d += weights[i][s1[i]] + weights[i][s2[i]]
else:
d += 2
if num_present == 0:
return 0
return d / num_present
[docs]
def hamming_similarity_without_missing(
s1: List[int],
s2: List[int],
missing_state_indicator: int,
weights: Optional[Dict[int, Dict[int, float]]] = None,
) -> float:
"""A function to return the number of (non-missing) character/state
mutations shared by two samples.
Args:
s1: Character states of the first sample
s2: Character states of the second sample
missing_state_indicator: The character representing missing values
weights: A set of optional weights to weight the similarity of a
mutation
Returns:
The number of shared mutations between two samples, weighted or
unweighted
"""
# TODO Optimize this using masks
similarity = 0
for i in range(len(s1)):
if (
s1[i] == missing_state_indicator
or s2[i] == missing_state_indicator
or s1[i] == 0
or s2[i] == 0
):
continue
if s1[i] == s2[i]:
if weights:
similarity += weights[i][s1[i]]
else:
similarity += 1
return similarity
[docs]
def hamming_similarity_normalized_over_missing(
s1: List[int],
s2: List[int],
missing_state_indicator: int,
weights: Optional[Dict[int, Dict[int, float]]] = None,
) -> float:
"""
A function to return the number of (non-missing) character/state mutations
shared by two samples, normalized over the amount of missing data.
Args:
s1: Character states of the first sample
s2: Character states of the second sample
missing_state_indicator: The character representing missing values
weights: A set of optional weights to weight the similarity of a
mutation
Returns:
The number of shared mutations between two samples normalized over the
number of missing data events, weighted or unweighted
"""
# TODO Optimize this using masks
similarity = 0
num_present = 0
for i in range(len(s1)):
if s1[i] == missing_state_indicator or s2[i] == missing_state_indicator:
continue
num_present += 1
if s1[i] == 0 or s2[i] == 0:
continue
if s1[i] == s2[i]:
if weights:
similarity += weights[i][s1[i]]
else:
similarity += 1
if num_present == 0:
return 0
return similarity / num_present
[docs]
@numba.jit(nopython=True)
def hamming_distance(
s1: np.array(int),
s2: np.array(int),
ignore_missing_state: bool = False,
missing_state_indicator: int = -1,
) -> int:
"""Computes the vanilla hamming distance between two samples.
Counts the number of positions that two samples disagree at. A user can
optionally specify to ignore missing data.
Args:
s1: The first sample
s2: The second sample
ignore_missing_state: Ignore comparisons where one is the missing state
indicator
missing_state_indicator: Indicator for missing data.
Returns:
The number of positions two nodes disagree at.
"""
dist = 0
for i in range(len(s1)):
if s1[i] != s2[i]:
if (
s1[i] == missing_state_indicator
or s2[i] == missing_state_indicator
) and ignore_missing_state:
dist += 0
else:
dist += 1
return dist
[docs]
def weighted_hamming_similarity(
s1: List[int],
s2: List[int],
missing_state_indicator: int,
weights: Optional[Dict[int, Dict[int, float]]] = None,
) -> float:
"""A function to return the weighted number of (non-missing) character/state
mutations shared by two samples.
Args:
s1: Character states of the first sample
s2: Character states of the second sample
missing_state_indicator: The character representing missing values
weights: A set of optional weights to weight the similarity of a
mutation
Returns:
The weighted number of shared mutations between two samples
"""
d = 0
num_present = 0
for i in range(len(s1)):
if s1[i] == missing_state_indicator or s2[i] == missing_state_indicator:
continue
num_present += 1
if s1[i] == s2[i]:
if s1[i] != 0:
if weights:
d += 2 * weights[i][s1[i]]
else:
d += 2
else:
if not weights:
d += 1
if num_present == 0:
return 0
return d / num_present
def exponential_negative_hamming_distance(
s1: List[int],
s2: List[int],
missing_state_indicator=-1,
weights: Optional[Dict[int, Dict[int, float]]] = None,
) -> float:
"""
Gives a similarity function from the inverse of weighted hamming distance.
This simply returns exp(-d(i,j)) where d is the weighted_hamming_distance
function as above where no weights are passed in. In other words, we
increment d by +2 if the states are different, +1 if one state is uncut and
the other is an indel, and +0 if the two states are identical. Then, we take
this total d and return exp(-d). Note that since d is a metric,
'exponential_negative_hamming_distance' is a multiplicative similarity
score, i.e. s(i, j) = s(i, k) * s(k, j) for k on the path between i and j.
Args:
s1: Character states of the first sample
s2: Character states of the second sample
missing_state_indicator: The character representing missing values
weights: A dictionary storing the state weights for each character,
derived from the state priors. This should be a nested
dictionary where each key corresponds to character that then indexes
another dictionary storing the weight of each observed state.
(Character -> State -> Weight)
Returns:
A similarity score.
"""
d = 0
num_present = 0
for i in range(len(s1)):
if s1[i] == missing_state_indicator or s2[i] == missing_state_indicator:
continue
num_present += 1
if s1[i] != s2[i]:
if s1[i] == 0 or s2[i] == 0:
if weights:
if s1[i] != 0:
d += weights[i][s1[i]]
else:
d += weights[i][s2[i]]
else:
d += 1
else:
if weights:
d += weights[i][s1[i]] + weights[i][s2[i]]
else:
d += 2
if num_present == 0:
return 0
weighted_hamm_dist = d / num_present
return np.exp(-weighted_hamm_dist)
[docs]
def cluster_dissimilarity(
dissimilarity_function: Callable[
[List[int], List[int], int, Dict[int, Dict[int, float]]], float
],
s1: Union[List[int], List[Tuple[int, ...]]],
s2: Union[List[int], List[Tuple[int, ...]]],
missing_state_indicator: int,
weights: Optional[Dict[int, Dict[int, float]]] = None,
linkage_function: Callable[[Union[np.array, List[float]]], float] = np.mean,
normalize: bool = True,
) -> float:
"""Compute the dissimilarity between (possibly) ambiguous character strings.
An ambiguous character string is a character string in
which each character contains an tuple of possible states, and such a
character string is represented as a list of tuples of integers.
A naive implementation is to first disambiguate each of the two
ambiguous character strings by generating all possible strings, then
computing the dissimilarity between all pairwise combinations, and finally
applying the linkage function on the calculated dissimilarities. However,
doing so has complexity O(\prod_{i=1}^N |a_i| x |b_i|) where N is the number
of target sites, |a_i| is the number of ambiguous characters at target site
i of string a, and |b_i| is the number of amiguous characters at target site
i of string b. As an example, if we have N=10 and all a_i=b_i=2, then we
have to construct 1,038,576 * 2 strings and compute over 4 trillion
dissimilarities.
By assuming each target site is independent, simply calculating the sum of
the linkages of each target site separately is equivalent to the naive
implementation (can be proven by induction). This procedure is implemented
in this function. One caveat is that we usually normalize the distance by
the number of shared non-missing positions. We approximate this by dividing
the absolute distance by the sum of the probability of each site not being
a missing site for both strings.
The idea of linkage is analogous to that in hierarchical clustering, where
``np.min`` can be used for single linkage, ``np.max`` for complete linkage,
and ``np.mean`` for average linkage (the default).
The reason the ``dissimilarity_function`` argument is defined as the
first argument is so that this function may be used as input to
:func:`cassiopeia.data.CassiopeiaTree.compute_dissimilarity_map`. This can
be done by partial application of this function with the desired
dissimilarity function.
Note:
If neither character string is ambiguous, then calling this function is
equivalent to calling ``dissimilarity_function`` separately.
Args:
s1: The first (possibly) ambiguous sample
s2: The second (possibly) ambiguous sample
missing_state_indicator: The character representing missing values
weights: A set of optional weights to weight the similarity of a
mutation
dissimilarity_function: The dissimilarity function to use to
calculate pairwise dissimilarities.
linkage_function: The linkage function to use to aggregate
dissimilarities into a single number. Defaults to ``np.mean`` for
average linkage.
normalize: Whether to normalize to the proportion of sites present in
both strings.
Returns:
The dissimilarity between the two ambiguous samples
"""
# Make any unambiguous character strings into pseudo-ambiguous so that we
# can easily use itertools.product
s1 = [s if isinstance(s, tuple) else (s,) for s in s1]
s2 = [s if isinstance(s, tuple) else (s,) for s in s2]
result = 0
num_present = 0
for i, (c1, c2) in enumerate(zip(s1, s2)):
dissim = []
present = []
for _c1, _c2 in itertools.product(c1, c2):
present.append(
_c1 != missing_state_indicator
and _c2 != missing_state_indicator
)
dissim.append(
dissimilarity_function(
[_c1],
[_c2],
missing_state_indicator,
{0: weights[i]} if weights else None,
)
)
result += linkage_function(dissim)
num_present += np.mean(present)
if num_present == 0:
return 0
return result / num_present if normalize else result
def cluster_dissimilarity_weighted_hamming_distance_min_linkage(
s1: Union[List[int], List[Tuple[int, ...]]],
s2: Union[List[int], List[Tuple[int, ...]]],
missing_state_indicator: int,
weights: Optional[Dict[int, Dict[int, float]]] = None,
) -> float:
"""Compute the dissimilarity between (possibly) ambiguous character strings.
An ambiguous character string is a character string in
which each character contains an tuple of possible states, and such a
character string is represented as a list of tuples of integers.
A naive implementation is to first disambiguate each of the two
ambiguous character strings by generating all possible strings, then
computing the dissimilarity between all pairwise combinations, and finally
applying the linkage function on the calculated dissimilarities. However,
doing so has complexity O(\prod_{i=1}^N |a_i| x |b_i|) where N is the number
of target sites, |a_i| is the number of ambiguous characters at target site
i of string a, and |b_i| is the number of amiguous characters at target site
i of string b. As an example, if we have N=10 and all a_i=b_i=2, then we
have to construct 1,038,576 * 2 strings and compute over 4 trillion
dissimilarities.
By assuming each target site is independent, simply calculating the sum of
the linkages of each target site separately is equivalent to the naive
implementation (can be proven by induction). This procedure is implemented
in this function. One caveat is that we usually normalize the distance by
the number of shared non-missing positions. We approximate this by dividing
the absolute distance by the sum of the probability of each site not being
a missing site for both strings.
Note:
If neither character string is ambiguous, then calling this function is
equivalent to calling ``dissimilarity_function`` separately.
Args:
s1: The first (possibly) ambiguous sample
s2: The second (possibly) ambiguous sample
missing_state_indicator: The character representing missing values
weights: A set of optional weights to weight the similarity of a
mutation
Returns:
The dissimilarity between the two ambiguous samples
"""
# Make any unambiguous character strings into pseudo-ambiguous so that we
# can easily iterate through combinations
s1 = [list(s) if isinstance(s, tuple) else [s] for s in s1]
s2 = [list(s) if isinstance(s, tuple) else [s] for s in s2]
result = 0
num_present = 0
for i in range(len(s1)):
c1, c2 = s1[i], s2[i]
dissim = []
present = 0
total = 0
for _c1 in c1:
for _c2 in c2:
d = 0
total += 1
if (
_c1 != missing_state_indicator
and _c2 != missing_state_indicator
):
present += 1
if (_c1 != _c2) and (
_c1 != missing_state_indicator
and _c2 != missing_state_indicator
):
if _c1 == 0 or _c2 == 0:
if weights:
if _c1 != 0:
d += weights[i][_c1]
else:
d += weights[i][_c2]
else:
d += 1
else:
if weights:
d += weights[i][_c1] + weights[i][_c2]
else:
d += 2
dissim.append(d)
result += np.min(np.array(dissim))
num_present += present / total
if num_present == 0:
return 0
return result / num_present