Source code for cassiopeia.solver.ILPSolver

"""
Implements the Cassiopeia-ILP solver, as described in Jones et al, Genome Biol
2020. Briefly, this algorithm infers the maximum parsimony tree by solving for
a Steiner Tree over an inferred potential graph of potential intermediate
evolutionary states.
"""
import datetime
import logging
import time
from typing import Dict, List, Optional

import hashlib
import itertools
import networkx as nx
import numpy as np
import pandas as pd

from cassiopeia.data import CassiopeiaTree
from cassiopeia.data import utilities as data_utilities
from cassiopeia.mixins import ILPSolverError, is_ambiguous_state, logger
from cassiopeia.solver import (
    CassiopeiaSolver,
    dissimilarity_functions,
    ilp_solver_utilities,
    solver_utilities,
)


[docs] class ILPSolver(CassiopeiaSolver.CassiopeiaSolver): """ The Cassiopeia ILP-based maximum parsimony solver. ILPSolver is a subclass of CassiopeiaSolver and implements the Cassiopeia-ILP algorithm described in Jones et al, Genome Biol 2020. The solver proceeds by constructing a tree over a network of possible evolutionary states known as the potential graph. The procedure for constructing this tree is done by solving for a Steiner Tree with an integer linear programming (ILP) optimization approach. The ILP optimization is performed using Gurobi. Args: convergence_time_limit: Amount of time allotted to the ILP for convergence. Ignored if set to 0. convergence_iteration_limit: Number of iterations allowed for ILP convergence. Ignored if set to 0. maximum_potential_graph_layer_size: Maximum size allowed for an iteration of the potential graph inference procedure. If this is exceeded, we return the previous iteration's graph or abort altogether. maximum_potential_graph_lca_distance: Maximum height of LCA to add to the potential graph. If this parameter is not provided or the specified value is 0, the maximum distance between any pair of samples is used as the maximum lca height. weighted: Weight edges on the potential graph by the negative log likelihood of the mutations. seed: Random seed to use during ILP optimization. mip_gap: Objective gap for mixed integer linear programming problem. logfile: A file to log output to. This will contain information around the potential graph inference procedure as well as the Steiner Tree optimization. prior_transformation: Function to use when transforming priors into weights. Supports the following transformations: "negative_log": Transforms each probability by the negative log (default) "inverse": Transforms each probability p by taking 1/p "square_root_inverse": Transforms each probability by the the square root of 1/p """ def __init__( self, convergence_time_limit: int = 12600, convergence_iteration_limit: int = 0, maximum_potential_graph_layer_size: int = 10000, maximum_potential_graph_lca_distance: Optional[int] = None, weighted: bool = False, seed: Optional[int] = None, mip_gap: float = 0.01, prior_transformation: str = "negative_log", ): super().__init__(prior_transformation) self.convergence_time_limit = convergence_time_limit self.convergence_iteration_limit = convergence_iteration_limit self.maximum_potential_graph_layer_size = ( maximum_potential_graph_layer_size ) self.maximum_potential_graph_lca_distance = ( maximum_potential_graph_lca_distance ) self.weighted = weighted self.seed = seed self.mip_gap = mip_gap
[docs] @logger.namespaced("ILPSolver") def solve( self, cassiopeia_tree: CassiopeiaTree, layer: Optional[str] = None, collapse_mutationless_edges: bool = False, logfile: str = "stdout.log", ): """Infers a tree with Cassiopeia-ILP. Solves a tree using the Cassiopeia-ILP algorithm and populates a tree in the provided CassiopeiaTree. Specifically, a potential graph of theoretical evolutionary intermediates is estimated from the input data and Gurobi is used to optimize and ILP searching for a maximum parsimony subgraph of the potential graph, resulting in a proposed maximum parsimony phylogeny. Args: cassiopeia_tree: Input CassiopeiaTree layer: Layer storing the character matrix for solving. If None, the default character matrix is used in the CassiopeiaTree. collapse_mutationless_edges: Indicates if the final reconstructed tree should collapse mutationless edges based on internal states inferred by Camin-Sokal parsimony. In scoring accuracy, this removes artifacts caused by arbitrarily resolving polytomies. logfile: Location to log progress. """ if self.weighted and not cassiopeia_tree.priors: raise ILPSolverError( "Specify prior probabilities in the CassiopeiaTree for weighted" " analysis." ) # configure logger if logfile is not None: file_handler = logging.FileHandler(logfile) file_handler.setLevel(logging.INFO) logger.addHandler(file_handler) logger.ch.setLevel(logging.getLogger().level) # add to logger logger.info("Solving tree with the following parameters.") logger.info(f"Convergence time limit: {self.convergence_time_limit}") logger.info( f"Convergence iteration limit: {self.convergence_iteration_limit}" ) logger.info( "Max potential graph layer size: " f"{self.maximum_potential_graph_layer_size}" ) logger.info( "Max potential graph lca distance: " f"{self.maximum_potential_graph_lca_distance}" ) logger.info(f"MIP gap: {self.mip_gap}") if layer: character_matrix = cassiopeia_tree.layers[layer].copy() else: character_matrix = cassiopeia_tree.character_matrix.copy() if any( is_ambiguous_state(state) for state in character_matrix.values.flatten() ): raise ILPSolverError("Solver does not support ambiguous states.") unique_character_matrix = character_matrix.drop_duplicates() weights = None if cassiopeia_tree.priors: weights = solver_utilities.transform_priors( cassiopeia_tree.priors, self.prior_transformation ) # find the root of the tree & generate process ID root = tuple( data_utilities.get_lca_characters( unique_character_matrix.values.tolist(), cassiopeia_tree.missing_state_indicator, ) ) logger.info(f"Phylogenetic root: {root}") pid = hashlib.md5( "|".join([str(r) for r in root]).encode("utf-8") ).hexdigest() targets = [tuple(t) for t in unique_character_matrix.values.tolist()] if unique_character_matrix.shape[0] == 1: optimal_solution = nx.DiGraph() optimal_solution.add_node(root) optimal_solution = ( self.__append_sample_names_and_remove_spurious_leaves( optimal_solution, character_matrix ) ) cassiopeia_tree.populate_tree(optimal_solution, layer=layer) return # determine diameter of the dataset by evaluating maximum distance to # the root from each sample if (self.maximum_potential_graph_lca_distance is not None) and ( self.maximum_potential_graph_lca_distance > 0 ): max_lca_distance = self.maximum_potential_graph_lca_distance else: max_lca_distance = 0 lca_distances = [ dissimilarity_functions.hamming_distance( root, np.array(u), ignore_missing_state=True, missing_state_indicator=cassiopeia_tree.missing_state_indicator, ) for u in targets ] for (i, j) in itertools.combinations(range(len(lca_distances)), 2): max_lca_distance = max( max_lca_distance, lca_distances[i] + lca_distances[j] + 1 ) # infer the potential graph potential_graph = self.infer_potential_graph( unique_character_matrix, pid, max_lca_distance, weights, cassiopeia_tree.missing_state_indicator, ) # generate Steiner Tree ILP model nodes = list(potential_graph.nodes()) encoder = dict(zip(nodes, list(range(len(nodes))))) decoder = dict((v, k) for k, v in encoder.items()) _potential_graph = nx.relabel_nodes(potential_graph, encoder) _targets = list(map(lambda x: encoder[x], targets)) _root = encoder[root] model, edge_variables = self.generate_steiner_model( _potential_graph, _root, _targets ) # solve the ILP problem and return a set of proposed solutions proposed_solutions = self.solve_steiner_instance( model, edge_variables, _potential_graph, pid, logfile ) # select best model and post process the solution optimal_solution = proposed_solutions[0] optimal_solution = nx.relabel_nodes(optimal_solution, decoder) optimal_solution = self.post_process_steiner_solution( optimal_solution, root ) # append sample names to the solution and populate the tree optimal_solution = ( self.__append_sample_names_and_remove_spurious_leaves( optimal_solution, character_matrix ) ) cassiopeia_tree.populate_tree(optimal_solution, layer=layer) # rename internal nodes such that they are not tuples node_name_generator = solver_utilities.node_name_generator() internal_node_rename = {} seen_nodes = [] for i in cassiopeia_tree.internal_nodes: new_node_name = next(node_name_generator) while new_node_name in seen_nodes: new_node_name = next(node_name_generator) internal_node_rename[i] = new_node_name seen_nodes.append(internal_node_rename[i]) cassiopeia_tree.relabel_nodes(internal_node_rename) cassiopeia_tree.collapse_unifurcations() # collapse mutationless edges if collapse_mutationless_edges: cassiopeia_tree.collapse_mutationless_edges( infer_ancestral_characters=True ) if logfile is not None: logger.removeHandler(file_handler) file_handler.close()
[docs] def infer_potential_graph( self, character_matrix: pd.DataFrame, pid: int, lca_height: int, weights: Optional[Dict[int, Dict[int, str]]] = None, missing_state_indicator: int = -1, ) -> nx.DiGraph: """Infers a potential graph for the observed states. Using the set of samples in the character matrix for this solver, this procedure creates a network which contains potential ancestors, or evolutionary intermediates. This procedure invokes `ilp_solver_utilities.infer_potential_graph_cython` which returns the edges of the potential graph in character string format (e.g., "1|2|3|..."). The procedure here decodes these strings, creates a Networkx directed graph, and adds edges to the graph. These weights are added to the edges of the graph using priors, if they are specified in the CassiopeiaTree, or the number of mutations along an edge. Args: character_matrix: Character matrix root: Specified root node, represented as a list of character states pid: Process ID for future reference lca_height: Maximum lca height to consider for connecting nodes to an LCA weights: Weights for character-state pairs, derived from the priors if these are available. missing_state_indicator: Indicator for missing data. Returns: A potential graph represented by a directed graph. """ potential_graph_edges = ( ilp_solver_utilities.infer_potential_graph_cython( character_matrix.astype(str).values, pid, lca_height, self.maximum_potential_graph_layer_size, missing_state_indicator, ) ) if len(potential_graph_edges) == 0: raise ILPSolverError("Potential Graph could not be found with" " solver parameters. Try increasing" " `maximum_potential_graph_layer_size` or" " using another solver.") # the potential graph edges returned are strings in the form # "state1|state2|...", so we "decode" them here decoded_edges = [] for e1, e2 in potential_graph_edges: e1 = np.array( e1.replace("-", str(missing_state_indicator)).split("|") ).astype(int) e2 = np.array( e2.replace("-", str(missing_state_indicator)).split("|") ).astype(int) decoded_edges.append((tuple(e1), tuple(e2))) potential_graph = nx.DiGraph() potential_graph.add_edges_from(decoded_edges) return self.add_edge_weights( potential_graph, weights, missing_state_indicator )
[docs] def add_edge_weights( self, potential_graph: nx.DiGraph(), weights: Optional[Dict[int, Dict[int, str]]] = None, missing_state_indicator: int = -1, ) -> nx.DiGraph: """Annotates edges with the weight. Given a graph where nodes are iterable entities containing character states, annotated edges with respect to the number of mutations. If a prior dictionary is passed into the constructor, the log likelihood of the mutations is added instead. These values are stored in the `weight` attribute of the networkx graph. Args: potential_graph: Potential graph weights: Weights to use when comparing states between characters missing_state_indicator: Variable to indicate missing state information. Returns: The potential graph with edge weights added, stored in the `weight` attribute. """ weighted_graph = potential_graph.copy() for u, v in weighted_graph.edges(): weighted_graph[u][v][ "weight" ] = dissimilarity_functions.weighted_hamming_distance( list(u), list(v), missing_state_indicator, weights ) return weighted_graph
[docs] def generate_steiner_model( self, potential_graph: nx.DiGraph, root: List[int], targets: List[List[int]], ): """Generates a Gurobi instance for Steiner Tree inference. Given a potential graph, a root to treat as the source, and a list of targets, create a Gurobi mixed integer linear programming instance for computing the Steiner Tree. Args: potential_graph: Potential graph representing the evolutionary space on which to solve for the Steiner Tree. root: A node in the graph to treat as the source. targets: A list of nodes in the tree that serve as targets for the Steiner Tree procedure. Returns: A Gurobipy Model instance and the edge variables involved. """ try: import gurobipy except ModuleNotFoundError: raise ILPSolverError( "Gurobi not found. You must install Gurobi & " "gurobipy from source." ) source_flow = {v: 0 for v in potential_graph.nodes()} if root not in potential_graph.nodes: raise ILPSolverError("Root node not in potential graph.") for t in targets: if t not in potential_graph.nodes: raise ILPSolverError("Target node not in potential graph.") # remove source from targets if it exists there targets = [t for t in targets if t != root] source_flow[root] = len(targets) for target in targets: source_flow[target] = -1 model = gurobipy.Model("steiner") ############# add variables ############# # add flow for edges edge_variables = {} for u, v in potential_graph.edges(): edge_variables[u, v] = model.addVar( vtype=gurobipy.GRB.INTEGER, lb=0, ub=len(targets), name=f"edge_{u}_{v}", ) # add edge-usage indicator variable edge_variables_binary = {} for u, v in potential_graph.edges(): edge_variables_binary[u, v] = model.addVar( vtype=gurobipy.GRB.BINARY, name=f"edge_binary_{u}_{v}" ) model.update() ############# add constraints ############# # check if edge is used for u, v in potential_graph.edges(): model.addConstr( edge_variables_binary[u, v] >= (edge_variables[u, v] / len(targets)) ) # flow conservation constraints for v in potential_graph.nodes(): model.addConstr( ( gurobipy.quicksum( edge_variables[u, v] for u in potential_graph.predecessors(v) ) + source_flow[v] ) == ( gurobipy.quicksum( edge_variables[v, w] for w in potential_graph.successors(v) ) ) ) ############ add objective ############# objective_expression = gurobipy.quicksum( edge_variables_binary[u, v] * potential_graph[u][v]["weight"] for u, v in potential_graph.edges() ) model.setObjective(objective_expression, gurobipy.GRB.MINIMIZE) return model, edge_variables
[docs] def solve_steiner_instance( self, model, edge_variables, potential_graph: nx.DiGraph, pid: int, logfile: str, ) -> List[nx.DiGraph]: """Solves for a Steiner Tree from the Gurobi instance. This function works with a model that has been specified via Gurobi, and will solve the model using the stopping criteria that the user has specified in this class instance. Args: model: A Gurobi model corresponding to the Steiner Tree problem. This should be created with `generate_steiner_model`. edge_variables: Edge variables that were created during model generation. These are Gurobi variables that indicate whether two nodes are connected to one another in the Potential Graph; we use these variables to recreate a tree at the end from the Gurobi solution. potential_graph: Potential Graph that was used as input to the Steiner Tree problem. pid: Process ID logfile: Location to store standard out. Returns: A list of solutions """ try: import gurobipy except ModuleNotFoundError: raise ILPSolverError( "Gurobi not found. You must install Gurobi & " "gurobipy from source." ) model.params.LogToConsole = 0 # Adding constant parameters model.params.THREADS = 1 model.params.Presolve = 2 model.params.MIPFocus = 1 model.params.Cuts = 1 model.params.Method = 4 # Add user-defined parameters model.params.MIPGAP = self.mip_gap if logfile is not None: model.params.LogFile = logfile if self.seed is not None: model.params.Seed = self.seed if self.convergence_iteration_limit > 0: model.params.IterationLimit = self.convergence_iteration_limit if self.convergence_time_limit > 0: model.params.TimeLimit = self.convergence_time_limit start_time = time.time() model.optimize() # recover subgraphs solutions = [] for i in range(model.SolCount): model.params.SolutionNumber = i subgraph = nx.DiGraph() value_for_edge = model.getAttr("xn", edge_variables) for u, v in potential_graph.edges(): if value_for_edge[u, v] > 0: subgraph.add_edge( u, v, weight=potential_graph[u][v]["weight"] ) solutions.append(subgraph) end_time = time.time() execution_delta = datetime.timedelta(seconds=(end_time - start_time)) days = execution_delta.days hours = execution_delta.seconds // 3600 minutes = execution_delta.seconds // 60 seconds = execution_delta.seconds % 60 logger.info( f"(Process {pid}) Steiner tree solving tool {days} days, " f"{hours} hours, {minutes} minutes, and {seconds} seconds." ) if model.status != gurobipy.GRB.status.OPTIMAL: logger.info( f"(Process {pid}) Warning: Steiner tree solving did " "not result in an optimal model." ) return solutions
[docs] def post_process_steiner_solution( self, solution: nx.DiGraph, root: List[int], ) -> nx.DiGraph: """Post-processes the returned graph from Gurobi. This procedure post-processes the proposed Steiner Tree from Gurobi by enforcing that no self-loops occur and that every node at most one parent. Args: solution: The Gurobi solution root: The root node targets: A list of targets pid: Process id Returns: A cleaned up networkx solution """ processed_solution = solution.copy() for edge in nx.selfloop_edges(processed_solution): processed_solution.remove_edge(edge[0], edge[1]) # remove spurious roots spurious_roots = [ n for n in processed_solution if processed_solution.in_degree(n) == 0 ] while len(spurious_roots) > 1: for r in spurious_roots: if r != root: processed_solution.remove_node(r) spurious_roots = [ n for n in processed_solution if processed_solution.in_degree(n) == 0 ] # impose that each node only has one parent non_tree_nodes = [ n for n in processed_solution.nodes() if processed_solution.in_degree(n) > 1 ] for node in non_tree_nodes: parents = processed_solution.predecessors(node) parents = sorted( parents, key=lambda k: processed_solution[k][node]["weight"], reverse=True, ) if len(parents) == 2 and ( parents[1] in nx.ancestors(processed_solution, parents[0]) or (parents[0] in nx.ancestors(processed_solution, parents[1])) ): if parents[1] in nx.ancestors(processed_solution, parents[0]): processed_solution.remove_edge(parents[1], node) else: processed_solution.remove_edge(parents[0], node) else: for parent in parents[1:]: processed_solution.remove_edge(parent, node) return processed_solution
def __append_sample_names_and_remove_spurious_leaves( self, solution: nx.DiGraph, character_matrix: pd.DataFrame ) -> nx.DiGraph: """Append samples to character states in tree and prune spurious leaves. Given a tree where every node corresponds to a set of character states, append sample names at the deepest node that has its character state. Sometimes character states can exist in two separate parts of the tree (especially when using the Hybrid algorithm where parts of the tree are built independently), so we make sure we only add a particular sample once to the tree. Additionally, if there exist extant nodes that do not have samples appended to them, these nodes are removed and their lineages pruned as to not create any spurious leaf nodes. Args: solution: A Steiner Tree solution that we wish to add sample names to. character_matrix: Character matrix Returns: A solution with extra leaves corresponding to sample names. """ root = [n for n in solution if solution.in_degree(n) == 0][0] sample_lookup = character_matrix.apply( lambda x: tuple(x.values), axis=1 ) states_added = [] for node in nx.dfs_postorder_nodes(solution, source=root): # append nodes with this character state at the deepest place # possible if node in states_added: continue samples = sample_lookup[sample_lookup == node].index if len(samples) > 0: solution.add_edges_from([(node, sample) for sample in samples]) states_added.append(node) # remove extant lineages that don't correspond to leaves leaves = [n for n in solution if solution.out_degree(n) == 0] for l in leaves: if l not in character_matrix.index: curr_parent = list(solution.predecessors(l))[0] solution.remove_node(l) while ( len(list(solution.successors(curr_parent))) < 1 and curr_parent != root ): next_parent = list(solution.predecessors(curr_parent))[0] solution.remove_node(curr_parent) curr_parent = next_parent return solution