typetapper/typetapper/relative_graph.py

190 lines
6.9 KiB
Python

from typing import TYPE_CHECKING, List, Optional, Tuple, Set
from dataclasses import dataclass
import logging
import networkx
from .data import Atom, Prop, OpSequence, ControlFlowActionPop, ControlFlowActionPush, ControlFlowAction, RefOp, \
DerefOp, ConstOffsetOp
if TYPE_CHECKING:
from .knowledge import TypeTapperManager
l = logging.getLogger(__name__)
@dataclass(frozen=True)
class RelativeAtom:
atom: Atom
callstack: Tuple[int, ...]
rcallstack: Tuple[int, ...]
@dataclass
class RelativeAtomAttrs:
path: OpSequence
prop: Prop
has_gone_down: bool = False
def merge(self, other: 'RelativeAtomAttrs'):
self.prop.maximize(other.prop)
# TODO has_gone_down
if self.path != other.path:
# TODO unifications
# compute a stupidity score
score_0 = sum(0 if isinstance(op, (ConstOffsetOp, RefOp, DerefOp)) else 1 for op in self.path.ops)
score_1 = sum(0 if isinstance(op, (ConstOffsetOp, RefOp, DerefOp)) else 1 for op in other.path.ops)
if score_1 < score_0:
self.path = other.path
return True
return False
class RelativeAtomGraph:
def __init__(self, kp: 'TypeTapperManager', baseline: List[Atom]):
self.kp = kp
self.__graph = networkx.DiGraph()
self.frontier = set() # nodes present in self.graph but haven't had all their edges analyzed
self.has_edge = self.__graph.has_edge
self.successors = self.__graph.successors
self.predecessors = self.__graph.predecessors
for atom in baseline:
relative = RelativeAtom(atom=atom, callstack=(), rcallstack=())
self._add_node(relative, OpSequence(), False)
self.frontier.add(relative) # TODO ???
def _add_node(self, relatom: RelativeAtom, path: OpSequence, has_gone_down: bool) -> bool:
"""
If relatom is not present in the graph, add it.
If it is present in the graph, merge the new information into its attrs
"""
from .data import RegisterAtom
newattrs = RelativeAtomAttrs(
prop=self.kp.graph.nodes[relatom.atom].get('prop', Prop()).transform(path.invert()),
path=path,
has_gone_down=has_gone_down,
)
if relatom in self.__graph.nodes:
return self.__graph.nodes[relatom]['attr'].merge(newattrs)
else:
self.__graph.add_node(relatom, attr=newattrs)
return True
def _remove_node(self, relatom: RelativeAtom):
for pred in list(self.__graph.pred[relatom]):
self._remove_edge(pred, relatom)
for succ in list(self.__graph.succ[relatom]):
self._remove_edge(relatom, succ)
self.__graph.remove_node(relatom)
def _add_edge(self, relatom1: RelativeAtom, relatom2: RelativeAtom):
assert relatom1 in self.__graph.nodes
assert relatom2 in self.__graph.nodes
self.__graph.add_edge(relatom1, relatom2)
def _remove_edge(self, relatom1: RelativeAtom, relatom2: RelativeAtom):
assert relatom1 in self.__graph.nodes
assert relatom2 in self.__graph.nodes
self.__graph.remove_edge(relatom1, relatom2)
def attrs(self, relatom: RelativeAtom) -> RelativeAtomAttrs:
return self.__graph.nodes[relatom]['attr']
def expand(self, relatom: RelativeAtom) -> Set[RelativeAtom]:
if relatom not in self.frontier:
return set()
attrs = self.attrs(relatom)
result = set()
for succ, edge_attrs in self.kp.graph.succ[relatom.atom].items():
res = self._expand_single(relatom, attrs, succ, edge_attrs['cf'], edge_attrs['ops'], False)
if res is not None:
result.add(res)
for pred, edge_attrs in self.kp.graph.pred[relatom.atom].items():
res = self._expand_single(relatom, attrs, pred, edge_attrs['cf'], edge_attrs['ops'], True)
if res is not None:
result.add(res)
self.frontier.update(result)
self.frontier.remove(relatom)
return result
def _expand_single(
self,
relatom: RelativeAtom,
attrs: RelativeAtomAttrs,
succ: Atom,
edge_cf: List[ControlFlowAction],
edge_ops: OpSequence,
is_pred: bool,
) -> Optional[RelativeAtom]:
#if str(relatom.atom) == 'rsp @ 0x4144a8' and str(succ) == 'MEM @ 0x4144d9':
# import ipdb; ipdb.set_trace()
weh = self._update_callstack(relatom.callstack, relatom.rcallstack, edge_cf, is_pred)
if weh is None:
return None
callstack, rcallstack = weh
path = attrs.path
path += edge_ops.invert() if is_pred else edge_ops
derefs = sum(isinstance(op, DerefOp) for op in path.ops)
refs = sum(isinstance(op, RefOp) for op in path.ops)
goes_down = any(isinstance(op, DerefOp) for op in edge_ops.ops)
goes_up = any(isinstance(op, RefOp) for op in edge_ops.ops)
if is_pred:
goes_down, goes_up = goes_up, goes_down
reset_down = len(path.ops) == 0
if derefs > 0 and refs > 0:
return None
if derefs > 1:
return None
if goes_up and attrs.has_gone_down:
return None
relsucc = RelativeAtom(atom=succ, callstack=callstack, rcallstack=rcallstack)
res = self._add_node(relsucc, path, (attrs.has_gone_down or goes_down) and not reset_down)
if is_pred:
if not self.__graph.has_edge(relsucc, relatom):
self._add_edge(relsucc, relatom)
else:
if not self.__graph.has_edge(relatom, relsucc):
self._add_edge(relatom, relsucc)
return relsucc if res else None
@staticmethod
def _update_callstack(
callstack: Tuple[int, ...],
rcallstack: Tuple[int, ...],
cf: List[ControlFlowAction],
reverse: bool
) -> Optional[Tuple[Tuple[int, ...], Tuple[int, ...]]]:
for directive in reversed(cf) if reverse else cf:
if isinstance(directive, ControlFlowActionPop):
pop = True
callsite = directive.callsite
elif isinstance(directive, ControlFlowActionPush):
pop = False
callsite = directive.callsite
else:
raise TypeError(type(directive))
pop ^= reverse
if pop:
if callstack:
if callstack[-1] != callsite:
return None
callstack = callstack[:-1]
else:
rcallstack = rcallstack + (callsite,)
else:
if rcallstack:
if rcallstack[-1] != callsite:
return None
rcallstack = rcallstack[:-1]
else:
callstack = callstack + (callsite,)
return callstack, rcallstack