import collections from .providers import AbstractResolver from .structs import DirectedGraph, build_iter_view RequirementInformation = collections.namedtuple( "RequirementInformation", ["requirement", "parent"] ) class ResolverException(Exception): """A base class for all exceptions raised by this module. Exceptions derived by this class should all be handled in this module. Any bubbling pass the resolver should be treated as a bug. """ class RequirementsConflicted(ResolverException): def __init__(self, criterion): super(RequirementsConflicted, self).__init__(criterion) self.criterion = criterion def __str__(self): return "Requirements conflict: {}".format( ", ".join(repr(r) for r in self.criterion.iter_requirement()), ) class InconsistentCandidate(ResolverException): def __init__(self, candidate, criterion): super(InconsistentCandidate, self).__init__(candidate, criterion) self.candidate = candidate self.criterion = criterion def __str__(self): return "Provided candidate {!r} does not satisfy {}".format( self.candidate, ", ".join(repr(r) for r in self.criterion.iter_requirement()), ) class Criterion(object): """Representation of possible resolution results of a package. This holds three attributes: * `information` is a collection of `RequirementInformation` pairs. Each pair is a requirement contributing to this criterion, and the candidate that provides the requirement. * `incompatibilities` is a collection of all known not-to-work candidates to exclude from consideration. * `candidates` is a collection containing all possible candidates deducted from the union of contributing requirements and known incompatibilities. It should never be empty, except when the criterion is an attribute of a raised `RequirementsConflicted` (in which case it is always empty). .. note:: This class is intended to be externally immutable. **Do not** mutate any of its attribute containers. """ def __init__(self, candidates, information, incompatibilities): self.candidates = candidates self.information = information self.incompatibilities = incompatibilities def __repr__(self): requirements = ", ".join( "({!r}, via={!r})".format(req, parent) for req, parent in self.information ) return "Criterion({})".format(requirements) @classmethod def from_requirement(cls, provider, requirement, parent): """Build an instance from a requirement.""" cands = build_iter_view(provider.find_matches([requirement])) infos = [RequirementInformation(requirement, parent)] criterion = cls(cands, infos, incompatibilities=[]) if not cands: raise RequirementsConflicted(criterion) return criterion def iter_requirement(self): return (i.requirement for i in self.information) def iter_parent(self): return (i.parent for i in self.information) def merged_with(self, provider, requirement, parent): """Build a new instance from this and a new requirement.""" infos = list(self.information) infos.append(RequirementInformation(requirement, parent)) cands = build_iter_view(provider.find_matches([r for r, _ in infos])) criterion = type(self)(cands, infos, list(self.incompatibilities)) if not cands: raise RequirementsConflicted(criterion) return criterion def excluded_of(self, candidate): """Build a new instance from this, but excluding specified candidate. Returns the new instance, or None if we still have no valid candidates. """ cands = self.candidates.excluding(candidate) if not cands: return None incompats = list(self.incompatibilities) incompats.append(candidate) return type(self)(cands, list(self.information), incompats) class ResolutionError(ResolverException): pass class ResolutionImpossible(ResolutionError): def __init__(self, causes): super(ResolutionImpossible, self).__init__(causes) # causes is a list of RequirementInformation objects self.causes = causes class ResolutionTooDeep(ResolutionError): def __init__(self, round_count): super(ResolutionTooDeep, self).__init__(round_count) self.round_count = round_count # Resolution state in a round. State = collections.namedtuple("State", "mapping criteria") class Resolution(object): """Stateful resolution object. This is designed as a one-off object that holds information to kick start the resolution process, and holds the results afterwards. """ def __init__(self, provider, reporter): self._p = provider self._r = reporter self._states = [] @property def state(self): try: return self._states[-1] except IndexError: raise AttributeError("state") def _push_new_state(self): """Push a new state into history. This new state will be used to hold resolution results of the next coming round. """ try: base = self._states[-1] except IndexError: state = State(mapping=collections.OrderedDict(), criteria={}) else: state = State( mapping=base.mapping.copy(), criteria=base.criteria.copy(), ) self._states.append(state) def _merge_into_criterion(self, requirement, parent): self._r.adding_requirement(requirement, parent) name = self._p.identify(requirement) try: crit = self.state.criteria[name] except KeyError: crit = Criterion.from_requirement(self._p, requirement, parent) else: crit = crit.merged_with(self._p, requirement, parent) return name, crit def _get_criterion_item_preference(self, item): name, criterion = item return self._p.get_preference( self.state.mapping.get(name), criterion.candidates.for_preference(), criterion.information, ) def _is_current_pin_satisfying(self, name, criterion): try: current_pin = self.state.mapping[name] except KeyError: return False return all( self._p.is_satisfied_by(r, current_pin) for r in criterion.iter_requirement() ) def _get_criteria_to_update(self, candidate): criteria = {} for r in self._p.get_dependencies(candidate): name, crit = self._merge_into_criterion(r, parent=candidate) criteria[name] = crit return criteria def _attempt_to_pin_criterion(self, name, criterion): causes = [] for candidate in criterion.candidates: try: criteria = self._get_criteria_to_update(candidate) except RequirementsConflicted as e: causes.append(e.criterion) continue # Check the newly-pinned candidate actually works. This should # always pass under normal circumstances, but in the case of a # faulty provider, we will raise an error to notify the implementer # to fix find_matches() and/or is_satisfied_by(). satisfied = all( self._p.is_satisfied_by(r, candidate) for r in criterion.iter_requirement() ) if not satisfied: raise InconsistentCandidate(candidate, criterion) # Put newly-pinned candidate at the end. This is essential because # backtracking looks at this mapping to get the last pin. self._r.pinning(candidate) self.state.mapping.pop(name, None) self.state.mapping[name] = candidate self.state.criteria.update(criteria) return [] # All candidates tried, nothing works. This criterion is a dead # end, signal for backtracking. return causes def _backtrack(self): # Drop the current state, it's known not to work. del self._states[-1] # We need at least 2 states here: # (a) One to backtrack to. # (b) One to restore state (a) to its state prior to candidate-pinning, # so we can pin another one instead. while len(self._states) >= 2: # Retract the last candidate pin. prev_state = self._states.pop() try: name, candidate = prev_state.mapping.popitem() except KeyError: continue self._r.backtracking(candidate) # Create a new state to work on, with the newly known not-working # candidate excluded. self._push_new_state() # Mark the retracted candidate as incompatible. criterion = self.state.criteria[name].excluded_of(candidate) if criterion is None: # This state still does not work. Try the still previous state. del self._states[-1] continue self.state.criteria[name] = criterion return True return False def resolve(self, requirements, max_rounds): if self._states: raise RuntimeError("already resolved") self._push_new_state() for r in requirements: try: name, crit = self._merge_into_criterion(r, parent=None) except RequirementsConflicted as e: raise ResolutionImpossible(e.criterion.information) self.state.criteria[name] = crit self._r.starting() for round_index in range(max_rounds): self._r.starting_round(round_index) self._push_new_state() curr = self.state unsatisfied_criterion_items = [ item for item in self.state.criteria.items() if not self._is_current_pin_satisfying(*item) ] # All criteria are accounted for. Nothing more to pin, we are done! if not unsatisfied_criterion_items: del self._states[-1] self._r.ending(curr) return self.state # Choose the most preferred unpinned criterion to try. name, criterion = min( unsatisfied_criterion_items, key=self._get_criterion_item_preference, ) failure_causes = self._attempt_to_pin_criterion(name, criterion) # Backtrack if pinning fails. if failure_causes: result = self._backtrack() if not result: causes = [ i for crit in failure_causes for i in crit.information ] raise ResolutionImpossible(causes) self._r.ending_round(round_index, curr) raise ResolutionTooDeep(max_rounds) def _has_route_to_root(criteria, key, all_keys, connected): if key in connected: return True if key not in criteria: return False for p in criteria[key].iter_parent(): try: pkey = all_keys[id(p)] except KeyError: continue if pkey in connected: connected.add(key) return True if _has_route_to_root(criteria, pkey, all_keys, connected): connected.add(key) return True return False Result = collections.namedtuple("Result", "mapping graph criteria") def _build_result(state): mapping = state.mapping all_keys = {id(v): k for k, v in mapping.items()} all_keys[id(None)] = None graph = DirectedGraph() graph.add(None) # Sentinel as root dependencies' parent. connected = {None} for key, criterion in state.criteria.items(): if not _has_route_to_root(state.criteria, key, all_keys, connected): continue if key not in graph: graph.add(key) for p in criterion.iter_parent(): try: pkey = all_keys[id(p)] except KeyError: continue if pkey not in graph: graph.add(pkey) graph.connect(pkey, key) return Result( mapping={k: v for k, v in mapping.items() if k in connected}, graph=graph, criteria=state.criteria, ) class Resolver(AbstractResolver): """The thing that performs the actual resolution work.""" base_exception = ResolverException def resolve(self, requirements, max_rounds=100): """Take a collection of constraints, spit out the resolution result. The return value is a representation to the final resolution result. It is a tuple subclass with three public members: * `mapping`: A dict of resolved candidates. Each key is an identifier of a requirement (as returned by the provider's `identify` method), and the value is the resolved candidate. * `graph`: A `DirectedGraph` instance representing the dependency tree. The vertices are keys of `mapping`, and each edge represents *why* a particular package is included. A special vertex `None` is included to represent parents of user-supplied requirements. * `criteria`: A dict of "criteria" that hold detailed information on how edges in the graph are derived. Each key is an identifier of a requirement, and the value is a `Criterion` instance. The following exceptions may be raised if a resolution cannot be found: * `ResolutionImpossible`: A resolution cannot be found for the given combination of requirements. The `causes` attribute of the exception is a list of (requirement, parent), giving the requirements that could not be satisfied. * `ResolutionTooDeep`: The dependency tree is too deeply nested and the resolver gave up. This is usually caused by a circular dependency, but you can try to resolve this by increasing the `max_rounds` argument. """ resolution = Resolution(self.provider, self.reporter) state = resolution.resolve(requirements, max_rounds=max_rounds) return _build_result(state)