321 lines
14 KiB
Python
321 lines
14 KiB
Python
from typing import Set, Union, TYPE_CHECKING, List, Optional, Dict, Iterable, Tuple
|
|
from itertools import pairwise
|
|
|
|
import networkx
|
|
|
|
from .relative_graph import RelativeAtomGraph, RelativeAtom
|
|
from .data import Atom, OpSequence, Prop
|
|
|
|
if TYPE_CHECKING:
|
|
from .knowledge import TypeTapperManager
|
|
|
|
class RelativeAtomGroup:
|
|
def __init__(self, graph: 'HierarchicalGraph', parent: Optional['RelativeAtomGroup']):
|
|
self.graph = graph
|
|
self.parent = parent
|
|
self.children: Set['RelativeAtomOrGroup'] = set()
|
|
self.prop: Prop = Prop()
|
|
|
|
RelativeAtomOrGroup = Union[RelativeAtom, RelativeAtomGroup]
|
|
MultiGraphEdge = Tuple[RelativeAtomOrGroup, RelativeAtomOrGroup, int]
|
|
|
|
# THIS IS NOT THREAD SAFE. YOU HAVE BEEN WARNED
|
|
|
|
class HierarchicalGraph(RelativeAtomGraph):
|
|
def __init__(self, kp: 'TypeTapperManager', baseline: List[Atom]):
|
|
self.__graph = networkx.MultiDiGraph()
|
|
self._root_group = RelativeAtomGroup(self, None)
|
|
self._atom_parents: Dict[RelativeAtom, RelativeAtomGroup] = {}
|
|
self._current_group = self._root_group
|
|
|
|
super().__init__(kp, baseline)
|
|
|
|
@property
|
|
def root_group(self) -> RelativeAtomGroup:
|
|
return self._root_group
|
|
|
|
def local_graph(self, group: RelativeAtomGroup) -> networkx.MultiDiGraph:
|
|
return self.__graph.subgraph([group] + list(group.children))
|
|
|
|
def prop(self, node: RelativeAtomOrGroup) -> Prop:
|
|
if isinstance(node, RelativeAtomGroup):
|
|
return node.prop
|
|
else:
|
|
return self.attrs(node).prop
|
|
|
|
def expand(self, relatom: RelativeAtom, group: Optional[RelativeAtomGroup]=None) -> Set[RelativeAtom]:
|
|
if group is not None:
|
|
self._current_group = group
|
|
return super().expand(relatom)
|
|
|
|
def _prop_propagate(self, node: RelativeAtomOrGroup, add: bool):
|
|
prop = self.prop(node)
|
|
for parent in self._ancestry(node):
|
|
if add:
|
|
parent.prop.update(prop)
|
|
else:
|
|
parent.prop.subtract(prop)
|
|
|
|
def _add_node(self, relatom: RelativeAtom, path: OpSequence) -> bool:
|
|
res = super()._add_node(relatom, path)
|
|
if res:
|
|
self._atom_parents[relatom] = self._current_group
|
|
self._current_group.children.add(relatom)
|
|
self.__graph.add_node(relatom)
|
|
self._prop_propagate(relatom, True)
|
|
return res
|
|
|
|
def _add_edge(self, relatom1: RelativeAtom, relatom2: RelativeAtom):
|
|
super()._add_edge(relatom1, relatom2)
|
|
prev_edge = None
|
|
for u, v in pairwise(self._hierarchy_path(relatom1, relatom2)):
|
|
key = self.__graph.add_edge(u, v)
|
|
edge = (u, v, key)
|
|
if prev_edge is not None:
|
|
self.__graph.edges[prev_edge]['next'] = edge
|
|
self.__graph.edges[edge]['prev'] = prev_edge
|
|
else:
|
|
self.__graph.edges[edge]['prev'] = None
|
|
prev_edge = edge
|
|
self.__graph.edges[prev_edge]['next'] = None
|
|
|
|
def _remove_node(self, relatom: RelativeAtom):
|
|
super()._remove_node(relatom)
|
|
assert len(list(self.__graph.succ[relatom])) == 0
|
|
assert len(list(self.__graph.pred[relatom])) == 0
|
|
self._prop_propagate(relatom, False)
|
|
self._atom_parents[relatom].children.remove(relatom)
|
|
del self._atom_parents[relatom]
|
|
self.__graph.remove_node(relatom)
|
|
|
|
def _remove_edge(self, relatom1: RelativeAtom, relatom2: RelativeAtom):
|
|
super()._remove_edge(relatom1, relatom2)
|
|
self.__graph.remove_edges_from(pairwise(self._hierarchy_path(relatom1, relatom2)))
|
|
|
|
def _add_group(self, parent: RelativeAtomGroup) -> RelativeAtomGroup:
|
|
group = RelativeAtomGroup(self, parent)
|
|
parent.children.add(group)
|
|
return group
|
|
|
|
def _paths_through(self, item: RelativeAtomOrGroup) -> Iterable[Tuple[Optional[MultiGraphEdge], Optional[MultiGraphEdge]]]:
|
|
# if item is a group, all edges will have two sides
|
|
# if item is an atom, all edges will have one side
|
|
if isinstance(item, RelativeAtom):
|
|
yield from ((None, (item, succ, key)) for succ, keys in self.__graph.succ[item].items() for key in keys)
|
|
yield from (((pred, item, key), None) for pred, keys in self.__graph.pred[item].items() for key in keys)
|
|
else:
|
|
yield from ((self.__graph.edges[(item, succ, key)]['prev'], (item, succ, key)) for succ in self.__graph.succ[item] for key in succ)
|
|
|
|
def move_node_out(self, item: RelativeAtomOrGroup):
|
|
# item will now be a neighbor of its parent
|
|
if item is self._root_group:
|
|
raise ValueError("Cannot reparent root group")
|
|
parent = self._parent(item)
|
|
if parent is self._root_group:
|
|
raise ValueError("There is no parent of the root group")
|
|
new_parent = parent.parent
|
|
assert new_parent is not None
|
|
|
|
paths = list(self._paths_through(item))
|
|
prev_next = ['prev', 'next']
|
|
|
|
for prev_edge, next_edge in paths:
|
|
pred = prev_edge[0] if prev_edge else None
|
|
succ = next_edge[1] if next_edge else None
|
|
if pred is None or self._parent(pred) is item:
|
|
assert succ is not None
|
|
inner_edge = prev_edge
|
|
outer = succ
|
|
outer_edge = next_edge
|
|
outward = True
|
|
else:
|
|
assert succ is None or self._parent(succ) is item
|
|
assert pred is not None
|
|
inner_edge = next_edge
|
|
outer = pred
|
|
outer_edge = prev_edge
|
|
outward = False
|
|
further_edge = self.__graph.edges[outer_edge][prev_next[outward]]
|
|
|
|
# we only need to break the outer edge
|
|
self.__graph.remove_edge(*outer_edge)
|
|
# is this a collapse or an expand operation?
|
|
is_neighbor = self._parent(outer) is parent
|
|
if is_neighbor:
|
|
# expand
|
|
parent_item_edge: MultiGraphEdge
|
|
parent_neighbor_edge: MultiGraphEdge
|
|
if outward:
|
|
parent_item_edge = (item, parent, self.__graph.add_edge(item, parent))
|
|
parent_neighbor_edge = (parent, outer, self.__graph.add_edge(parent, outer))
|
|
else:
|
|
parent_item_edge = (parent, item, self.__graph.add_edge(parent, item))
|
|
parent_neighbor_edge = (outer, parent, self.__graph.add_edge(outer, parent))
|
|
if inner_edge: self.__graph.edges[inner_edge][prev_next[outward]] = parent_item_edge
|
|
self.__graph.edges[parent_item_edge][prev_next[not outward]] = inner_edge
|
|
self.__graph.edges[parent_item_edge][prev_next[outward]] = parent_neighbor_edge
|
|
self.__graph.edges[parent_neighbor_edge][prev_next[not outward]] = parent_item_edge
|
|
self.__graph.edges[parent_neighbor_edge][prev_next[outward]] = further_edge
|
|
if further_edge: self.__graph.edges[further_edge][prev_next[not outward]] = further_edge
|
|
else:
|
|
# contract
|
|
assert outer is parent
|
|
assert further_edge
|
|
further = further_edge[outward]
|
|
even_further_edge = self.__graph.edges[further_edge][prev_next[outward]]
|
|
self.__graph.remove_edge(*further_edge)
|
|
if outward:
|
|
new_edge = (item, further, self.__graph.add_edge(item, further))
|
|
else:
|
|
new_edge = (further, item, self.__graph.add_edge(further, item))
|
|
if inner_edge: self.__graph.edges[inner_edge][prev_next[outward]] = new_edge
|
|
self.__graph.edges[new_edge][prev_next[not outward]] = inner_edge
|
|
self.__graph.edges[new_edge][prev_next[outward]] = even_further_edge
|
|
if even_further_edge: self.__graph.edges[even_further_edge][prev_next[not outward]] = new_edge
|
|
|
|
self._prop_propagate(item, False)
|
|
new_parent.children.add(item)
|
|
parent.children.remove(item)
|
|
if isinstance(item, RelativeAtomGroup):
|
|
item.parent = new_parent
|
|
else:
|
|
self._atom_parents[item] = new_parent
|
|
self._prop_propagate(item, True)
|
|
|
|
def move_node_in(self, item: RelativeAtomOrGroup, new_parent: RelativeAtomGroup):
|
|
if item is self._root_group:
|
|
raise ValueError("Cannot reparent root group")
|
|
if item is None:
|
|
raise ValueError("There can only be one root group")
|
|
parent = self._parent(item)
|
|
if item is new_parent:
|
|
raise ValueError("Bro are you for serious")
|
|
if self._parent(new_parent) is not parent:
|
|
raise ValueError("Can only move something into a neighbor")
|
|
|
|
paths = list(self._paths_through(item))
|
|
prev_next = ['prev', 'next']
|
|
|
|
for prev_edge, next_edge in paths:
|
|
pred = prev_edge[0] if prev_edge else None
|
|
succ = next_edge[1] if next_edge else None
|
|
if pred is None or self._parent(pred) is item:
|
|
assert succ is not None
|
|
inner_edge = prev_edge
|
|
outer = succ
|
|
outer_edge = next_edge
|
|
outward = True
|
|
else:
|
|
assert succ is None or self._parent(succ) is item
|
|
assert pred is not None
|
|
inner_edge = next_edge
|
|
outer = pred
|
|
outer_edge = prev_edge
|
|
outward = False
|
|
further_edge = self.__graph.edges[outer_edge][prev_next[outward]]
|
|
|
|
# we only need to break the outer edge
|
|
self.__graph.remove_edge(*outer_edge)
|
|
# is this a collapse or an expand operation?
|
|
going_out = outer is not new_parent
|
|
if going_out:
|
|
# expand
|
|
parent_item_edge: MultiGraphEdge
|
|
parent_outer_edge: MultiGraphEdge
|
|
if outward:
|
|
parent_item_edge = (item, new_parent, self.__graph.add_edge(item, new_parent))
|
|
parent_outer_edge = (new_parent, outer, self.__graph.add_edge(new_parent, outer))
|
|
else:
|
|
parent_item_edge = (new_parent, item, self.__graph.add_edge(new_parent, item))
|
|
parent_outer_edge = (outer, new_parent, self.__graph.add_edge(outer, new_parent))
|
|
if inner_edge: self.__graph.edges[inner_edge][prev_next[outward]] = parent_item_edge
|
|
self.__graph.edges[parent_item_edge][prev_next[not outward]] = inner_edge
|
|
self.__graph.edges[parent_item_edge][prev_next[outward]] = parent_outer_edge
|
|
self.__graph.edges[parent_outer_edge][prev_next[not outward]] = parent_item_edge
|
|
self.__graph.edges[parent_outer_edge][prev_next[outward]] = further_edge
|
|
if further_edge: self.__graph.edges[further_edge][prev_next[not outward]] = further_edge
|
|
else:
|
|
# contract
|
|
assert further_edge
|
|
further = further_edge[outward]
|
|
even_further_edge = self.__graph.edges[further_edge][prev_next[outward]]
|
|
self.__graph.remove_edge(*further_edge)
|
|
if outward:
|
|
new_edge = (item, further, self.__graph.add_edge(item, further))
|
|
else:
|
|
new_edge = (further, item, self.__graph.add_edge(further, item))
|
|
if inner_edge: self.__graph.edges[inner_edge][prev_next[outward]] = new_edge
|
|
self.__graph.edges[new_edge][prev_next[not outward]] = inner_edge
|
|
self.__graph.edges[new_edge][prev_next[outward]] = even_further_edge
|
|
if even_further_edge: self.__graph.edges[even_further_edge][prev_next[not outward]] = new_edge
|
|
|
|
self._prop_propagate(item, False)
|
|
new_parent.children.add(item)
|
|
parent.children.remove(item)
|
|
if isinstance(item, RelativeAtomGroup):
|
|
item.parent = new_parent
|
|
else:
|
|
self._atom_parents[item] = new_parent
|
|
self._prop_propagate(item, True)
|
|
|
|
def move_node(self, node: RelativeAtomOrGroup, new_parent: RelativeAtomGroup):
|
|
new_parent_ancestry = set(self._ancestry(new_parent))
|
|
if node in new_parent_ancestry:
|
|
raise ValueError("Cannot move node into itself")
|
|
for parent in self._ancestry(node):
|
|
if parent in new_parent_ancestry:
|
|
break
|
|
if parent is new_parent:
|
|
return
|
|
self.move_node_out(node)
|
|
else:
|
|
raise ValueError("ruh roh")
|
|
for moveto in reversed(list(self._ancestry_until(new_parent, parent))):
|
|
self.move_node_in(node, moveto)
|
|
self.move_node_in(node, new_parent)
|
|
|
|
def create_group(self, nodes: List[RelativeAtomOrGroup], parent: RelativeAtomGroup) -> RelativeAtomGroup:
|
|
if any(self._parent(node) is not parent for node in nodes):
|
|
raise ValueError("Not a child of parent")
|
|
group = self._add_group(parent)
|
|
for node in nodes:
|
|
self.move_node_in(node, group)
|
|
return group
|
|
|
|
def _parent(self, item: RelativeAtomOrGroup) -> RelativeAtomGroup:
|
|
result = item.parent if isinstance(item, RelativeAtomGroup) else self._atom_parents[item]
|
|
if result is None:
|
|
raise ValueError("Has no parent")
|
|
return result
|
|
|
|
def _ancestry(self, atom: RelativeAtomOrGroup) -> Iterable[RelativeAtomGroup]:
|
|
parent = atom.parent if isinstance(atom, RelativeAtomGroup) else self._atom_parents[atom]
|
|
while parent is not None:
|
|
yield parent
|
|
parent = parent.parent
|
|
|
|
def _common_ancestor(self, atom1: RelativeAtomOrGroup, atom2: RelativeAtomOrGroup) -> RelativeAtomGroup:
|
|
set1 = set(self._ancestry(atom1))
|
|
for parent in self._ancestry(atom2):
|
|
if parent in set1:
|
|
return parent
|
|
raise ValueError("Hierarchy structure is fucked")
|
|
|
|
def _ancestry_until(self, start: RelativeAtomOrGroup, stop: RelativeAtomGroup, inclusive=False) -> Iterable[RelativeAtomGroup]:
|
|
for parent in self._ancestry(start):
|
|
if parent is stop:
|
|
if inclusive:
|
|
yield parent
|
|
break
|
|
yield parent
|
|
|
|
def _hierarchy_path(self, start: RelativeAtomOrGroup, end: RelativeAtomOrGroup):
|
|
common = self._common_ancestor(start, end)
|
|
line1 = self._ancestry_until(start, common)
|
|
line2 = list(self._ancestry_until(end, common))
|
|
|
|
yield start
|
|
yield from line1
|
|
yield from reversed(line2)
|
|
yield end
|