typetapper/typetapper/hierarchy_graph.py

365 lines
16 KiB
Python

from typing import Set, Union, TYPE_CHECKING, List, Optional, Dict, Iterable, Tuple, Callable, Any
from itertools import pairwise, chain
from collections import defaultdict
import networkx
from .relative_graph import RelativeAtomGraph, RelativeAtom
from .data import Atom, OpSequence, Prop
if TYPE_CHECKING:
from .knowledge import TypeTapperManager
class RelativeAtomGroup:
def __init__(self, graph: 'HierarchicalGraph', parent: Optional['RelativeAtomGroup']):
self.graph = graph
self.parent = parent
self.children: Set['RelativeAtomOrGroup'] = set()
self.prop: Prop = Prop()
RelativeAtomOrGroup = Union[RelativeAtom, RelativeAtomGroup]
MultiGraphEdge = Tuple[RelativeAtomOrGroup, RelativeAtomOrGroup, int]
# THIS IS NOT THREAD SAFE. YOU HAVE BEEN WARNED
class HierarchicalGraph(RelativeAtomGraph):
def __init__(self, kp: 'TypeTapperManager', baseline: List[Atom]):
self.__graph = networkx.MultiDiGraph()
self._root_group = RelativeAtomGroup(self, None)
self._atom_parents: Dict[RelativeAtom, RelativeAtomGroup] = {}
self._current_group = self._root_group
super().__init__(kp, baseline)
@property
def root_group(self) -> RelativeAtomGroup:
return self._root_group
def local_graph(self, group: RelativeAtomGroup) -> networkx.MultiDiGraph:
return self.__graph.subgraph([group] + list(group.children))
def prop(self, node: RelativeAtomOrGroup) -> Prop:
if isinstance(node, RelativeAtomGroup):
return node.prop
else:
return self.attrs(node).prop
def expand(self, relatom: RelativeAtom, group: Optional[RelativeAtomGroup]=None) -> Set[RelativeAtom]:
if group is not None:
self._current_group = group
return super().expand(relatom)
def expand_while(self, seeds: Iterable[RelativeAtom], group: RelativeAtomGroup, predicate: Callable[[RelativeAtom], bool]):
for seed in seeds:
if not predicate(seed):
raise ValueError("Predicate does not hold for seed %s", seed)
queue = set(seeds)
while queue:
seed = queue.pop()
self.expand(seed, group=group) # no-op if not in frontier
for adj in chain(self.predecessors(seed), self.successors(seed)):
if adj in self.frontier and adj not in queue and predicate(adj):
queue.add(adj)
def expand_by_key(self, seed: RelativeAtomOrGroup, group: RelativeAtomGroup, key: Callable[[RelativeAtom], Any]):
queues = defaultdict(set)
for atom in self.atoms(seed):
if atom in self.frontier:
queues[key(atom)].add(atom)
else:
for adj in chain(self.predecessors(atom), self.successors(atom)):
if adj in self.frontier:
queues[key(adj)].add(adj)
for keyed, queue in queues.items():
subgroup = self.create_group([], group)
self.expand_while(queue, subgroup, lambda atom: key(atom) == keyed)
def atoms(self, node: RelativeAtomOrGroup) -> Iterable[RelativeAtom]:
if isinstance(node, RelativeAtomGroup):
for child in node.children:
yield from self.atoms(child)
else:
yield node
def _prop_propagate(self, node: RelativeAtomOrGroup, add: bool):
prop = self.prop(node)
for parent in self._ancestry(node):
if add:
parent.prop.update(prop)
else:
parent.prop.subtract(prop)
def _add_node(self, relatom: RelativeAtom, path: OpSequence) -> bool:
res = super()._add_node(relatom, path)
if res:
self._atom_parents[relatom] = self._current_group
self._current_group.children.add(relatom)
self.__graph.add_node(relatom)
self._prop_propagate(relatom, True)
return res
def _add_edge(self, relatom1: RelativeAtom, relatom2: RelativeAtom):
super()._add_edge(relatom1, relatom2)
prev_edge = None
for u, v in pairwise(self._hierarchy_path(relatom1, relatom2)):
key = self.__graph.add_edge(u, v)
edge = (u, v, key)
if prev_edge is not None:
self.__graph.edges[prev_edge]['next'] = edge
self.__graph.edges[edge]['prev'] = prev_edge
else:
self.__graph.edges[edge]['prev'] = None
prev_edge = edge
self.__graph.edges[prev_edge]['next'] = None
def _remove_node(self, relatom: RelativeAtom):
super()._remove_node(relatom)
assert len(list(self.__graph.succ[relatom])) == 0
assert len(list(self.__graph.pred[relatom])) == 0
self._prop_propagate(relatom, False)
self._atom_parents[relatom].children.remove(relatom)
del self._atom_parents[relatom]
self.__graph.remove_node(relatom)
def _remove_edge(self, relatom1: RelativeAtom, relatom2: RelativeAtom):
super()._remove_edge(relatom1, relatom2)
self.__graph.remove_edges_from(pairwise(self._hierarchy_path(relatom1, relatom2)))
def _add_group(self, parent: RelativeAtomGroup) -> RelativeAtomGroup:
group = RelativeAtomGroup(self, parent)
parent.children.add(group)
self.__graph.add_node(group)
return group
def _paths_through(self, item: RelativeAtomOrGroup) -> Iterable[Tuple[Optional[MultiGraphEdge], Optional[MultiGraphEdge]]]:
# if item is a group, all edges will have two sides
# if item is an atom, all edges will have one side
if isinstance(item, RelativeAtom):
yield from ((None, (item, succ, key)) for succ, keys in self.__graph.succ[item].items() for key in keys)
yield from (((pred, item, key), None) for pred, keys in self.__graph.pred[item].items() for key in keys)
else:
yield from ((self.__graph.edges[(item, succ, key)]['prev'], (item, succ, key)) for succ in self.__graph.succ[item] for key in succ)
def move_node_out(self, item: RelativeAtomOrGroup):
# item will now be a neighbor of its parent
if item is self._root_group:
raise ValueError("Cannot reparent root group")
parent = self._parent(item)
if parent is self._root_group:
raise ValueError("There is no parent of the root group")
new_parent = parent.parent
assert new_parent is not None
paths = list(self._paths_through(item))
prev_next = ['prev', 'next']
for prev_edge, next_edge in paths:
pred = prev_edge[0] if prev_edge else None
succ = next_edge[1] if next_edge else None
if pred is None or self._parent(pred) is item:
assert succ is not None
inner_edge = prev_edge
outer = succ
outer_edge = next_edge
outward = True
else:
assert succ is None or self._parent(succ) is item
assert pred is not None
inner_edge = next_edge
outer = pred
outer_edge = prev_edge
outward = False
further_edge = self.__graph.edges[outer_edge][prev_next[outward]]
# we only need to break the outer edge
self.__graph.remove_edge(*outer_edge)
# is this a collapse or an expand operation?
is_neighbor = self._parent(outer) is parent
if is_neighbor:
# expand
parent_item_edge: MultiGraphEdge
parent_neighbor_edge: MultiGraphEdge
if outward:
parent_item_edge = (item, parent, self.__graph.add_edge(item, parent))
parent_neighbor_edge = (parent, outer, self.__graph.add_edge(parent, outer))
else:
parent_item_edge = (parent, item, self.__graph.add_edge(parent, item))
parent_neighbor_edge = (outer, parent, self.__graph.add_edge(outer, parent))
if inner_edge: self.__graph.edges[inner_edge][prev_next[outward]] = parent_item_edge
self.__graph.edges[parent_item_edge][prev_next[not outward]] = inner_edge
self.__graph.edges[parent_item_edge][prev_next[outward]] = parent_neighbor_edge
self.__graph.edges[parent_neighbor_edge][prev_next[not outward]] = parent_item_edge
self.__graph.edges[parent_neighbor_edge][prev_next[outward]] = further_edge
if further_edge: self.__graph.edges[further_edge][prev_next[not outward]] = further_edge
else:
# contract
assert outer is parent
assert further_edge
further = further_edge[outward]
even_further_edge = self.__graph.edges[further_edge][prev_next[outward]]
self.__graph.remove_edge(*further_edge)
if outward:
new_edge = (item, further, self.__graph.add_edge(item, further))
else:
new_edge = (further, item, self.__graph.add_edge(further, item))
if inner_edge: self.__graph.edges[inner_edge][prev_next[outward]] = new_edge
self.__graph.edges[new_edge][prev_next[not outward]] = inner_edge
self.__graph.edges[new_edge][prev_next[outward]] = even_further_edge
if even_further_edge: self.__graph.edges[even_further_edge][prev_next[not outward]] = new_edge
self._prop_propagate(item, False)
new_parent.children.add(item)
parent.children.remove(item)
if isinstance(item, RelativeAtomGroup):
item.parent = new_parent
else:
self._atom_parents[item] = new_parent
self._prop_propagate(item, True)
def move_node_in(self, item: RelativeAtomOrGroup, new_parent: RelativeAtomGroup):
if item is self._root_group:
raise ValueError("Cannot reparent root group")
if item is None:
raise ValueError("There can only be one root group")
parent = self._parent(item)
if item is new_parent:
raise ValueError("Bro are you for serious")
if self._parent(new_parent) is not parent:
raise ValueError("Can only move something into a neighbor")
paths = list(self._paths_through(item))
prev_next = ['prev', 'next']
for prev_edge, next_edge in paths:
pred = prev_edge[0] if prev_edge else None
succ = next_edge[1] if next_edge else None
if pred is None or self._parent(pred) is item:
assert succ is not None
inner_edge = prev_edge
outer = succ
outer_edge = next_edge
outward = True
else:
assert succ is None or self._parent(succ) is item
assert pred is not None
inner_edge = next_edge
outer = pred
outer_edge = prev_edge
outward = False
further_edge = self.__graph.edges[outer_edge][prev_next[outward]]
# we only need to break the outer edge
self.__graph.remove_edge(*outer_edge)
# is this a collapse or an expand operation?
going_out = outer is not new_parent
if going_out:
# expand
parent_item_edge: MultiGraphEdge
parent_outer_edge: MultiGraphEdge
if outward:
parent_item_edge = (item, new_parent, self.__graph.add_edge(item, new_parent))
parent_outer_edge = (new_parent, outer, self.__graph.add_edge(new_parent, outer))
else:
parent_item_edge = (new_parent, item, self.__graph.add_edge(new_parent, item))
parent_outer_edge = (outer, new_parent, self.__graph.add_edge(outer, new_parent))
if inner_edge: self.__graph.edges[inner_edge][prev_next[outward]] = parent_item_edge
self.__graph.edges[parent_item_edge][prev_next[not outward]] = inner_edge
self.__graph.edges[parent_item_edge][prev_next[outward]] = parent_outer_edge
self.__graph.edges[parent_outer_edge][prev_next[not outward]] = parent_item_edge
self.__graph.edges[parent_outer_edge][prev_next[outward]] = further_edge
if further_edge: self.__graph.edges[further_edge][prev_next[not outward]] = further_edge
else:
# contract
assert further_edge
further = further_edge[outward]
even_further_edge = self.__graph.edges[further_edge][prev_next[outward]]
self.__graph.remove_edge(*further_edge)
if outward:
new_edge = (item, further, self.__graph.add_edge(item, further))
else:
new_edge = (further, item, self.__graph.add_edge(further, item))
if inner_edge: self.__graph.edges[inner_edge][prev_next[outward]] = new_edge
self.__graph.edges[new_edge][prev_next[not outward]] = inner_edge
self.__graph.edges[new_edge][prev_next[outward]] = even_further_edge
if even_further_edge: self.__graph.edges[even_further_edge][prev_next[not outward]] = new_edge
self._prop_propagate(item, False)
new_parent.children.add(item)
parent.children.remove(item)
if isinstance(item, RelativeAtomGroup):
item.parent = new_parent
else:
self._atom_parents[item] = new_parent
self._prop_propagate(item, True)
def move_node(self, node: RelativeAtomOrGroup, new_parent: RelativeAtomGroup):
new_parent_ancestry = set(self._ancestry(new_parent))
if node in new_parent_ancestry:
raise ValueError("Cannot move node into itself")
for parent in self._ancestry(node):
if parent in new_parent_ancestry:
break
if parent is new_parent:
return
self.move_node_out(node)
else:
raise ValueError("ruh roh")
for moveto in reversed(list(self._ancestry_until(new_parent, parent))):
self.move_node_in(node, moveto)
self.move_node_in(node, new_parent)
def create_group(self, nodes: List[RelativeAtomOrGroup], parent: RelativeAtomGroup) -> RelativeAtomGroup:
if any(self._parent(node) is not parent for node in nodes):
raise ValueError("Not a child of parent")
group = self._add_group(parent)
for node in nodes:
self.move_node_in(node, group)
return group
def destroy_group(self, group: RelativeAtomGroup):
if group is self._root_group:
raise ValueError("Cannot break root group")
for child in list(group.children):
self.move_node_out(child)
group.parent.children.remove(group)
self.__graph.remove_node(group)
def _parent(self, item: RelativeAtomOrGroup) -> RelativeAtomGroup:
result = item.parent if isinstance(item, RelativeAtomGroup) else self._atom_parents[item]
if result is None:
raise ValueError("Has no parent")
return result
def _ancestry(self, atom: RelativeAtomOrGroup) -> Iterable[RelativeAtomGroup]:
parent = atom.parent if isinstance(atom, RelativeAtomGroup) else self._atom_parents[atom]
while parent is not None:
yield parent
parent = parent.parent
def _common_ancestor(self, atom1: RelativeAtomOrGroup, atom2: RelativeAtomOrGroup) -> RelativeAtomGroup:
set1 = set(self._ancestry(atom1))
for parent in self._ancestry(atom2):
if parent in set1:
return parent
raise ValueError("Hierarchy structure is fucked")
def _ancestry_until(self, start: RelativeAtomOrGroup, stop: RelativeAtomGroup, inclusive=False) -> Iterable[RelativeAtomGroup]:
for parent in self._ancestry(start):
if parent is stop:
if inclusive:
yield parent
break
yield parent
def _hierarchy_path(self, start: RelativeAtomOrGroup, end: RelativeAtomOrGroup):
common = self._common_ancestor(start, end)
line1 = self._ancestry_until(start, common)
line2 = list(self._ancestry_until(end, common))
yield start
yield from line1
yield from reversed(line2)
yield end