diff --git a/typetapper/analysis.py b/typetapper/analysis.py index 2479609..72f398e 100644 --- a/typetapper/analysis.py +++ b/typetapper/analysis.py @@ -1,3 +1,4 @@ +import logging from collections import OrderedDict import angr @@ -8,17 +9,23 @@ from angr.knowledge_plugins.cfg import CFGNode from .engine import TypeTapperEngine from .knowledge import TypeTapperManager +l = logging.getLogger(__name__) + class TypeTapper(angr.Analysis): def __init__(self, cfg: CFGBase): self._cfg = cfg - self._manager = self.kb.request_knowledge(TypeTapperManager) - self._engine = TypeTapperEngine(self.project, self._manager) + self.manager = self.kb.request_knowledge(TypeTapperManager) + self.manager.cfg = cfg.model + self._engine = TypeTapperEngine(self.project, self.manager) if not self._cfg.normalized: raise ValueError("CFG must be normalized") + l.debug('Starting active flow analysis') self._analyze_active_flow() + l.debug('Starting passive flow analysis') self._analyze_passive_flow() + l.debug('Done') def _analyze_active_flow(self): node: CFGNode @@ -30,13 +37,13 @@ class TypeTapper(angr.Analysis): def _analyze_passive_flow(self): queue = OrderedDict() - for block_addr in self._manager.block_info.keys(): + for block_addr in self.manager.block_info.keys(): queue[block_addr] = None while queue: block_addr = next(iter(reversed(queue.keys()))) queue.pop(block_addr) - node_blockinfo = self._manager.block_info[block_addr] + node_blockinfo = self.manager.block_info[block_addr] node = self._cfg.model.get_any_node(block_addr) fakeret_addr = next((pred.addr for pred, attrs in self._cfg.graph.pred[node].items() if attrs['jumpkind'] == 'Ijk_FakeRet'), None) for pred, attrs in self._cfg.graph.pred[node].items(): @@ -45,7 +52,7 @@ class TypeTapper(angr.Analysis): if pred.block is None: continue pred_addr = pred.addr - pred_blockinfo = self._manager.block_info[block_addr] + pred_blockinfo = self.manager.block_info[pred_addr] # TAKE IT BACK NOW Y'ALL for input_atom, input_info in node_blockinfo.inputs.items(): @@ -55,7 +62,7 @@ class TypeTapper(angr.Analysis): output_atom = pred_blockinfo.outputs.get(input_atom.slot_name, None) if output_atom is not None: if output_atom.name == input_atom.name: - input_info_new.commit(self._manager.graph, output_atom, input_atom) + input_info_new.commit(self.manager.graph, output_atom, input_atom) else: pass # alias mismatch elif input_atom not in pred_blockinfo.inputs: # sketchy... this means that we can't account for multiple paths to the same atom diff --git a/typetapper/data.py b/typetapper/data.py index 9e2a82f..ec7e345 100644 --- a/typetapper/data.py +++ b/typetapper/data.py @@ -1,5 +1,5 @@ from typing import Tuple, Any, List, Set, Optional, Dict -from collections import defaultdict +from collections import defaultdict, Counter from enum import Enum, auto from dataclasses import dataclass, field import copy @@ -11,6 +11,7 @@ import networkx class CodeLoc: bbl_addr: int stmt_idx: int + ins_addr: int @dataclass(frozen=True) class Atom: @@ -92,7 +93,7 @@ class OpSequence: @staticmethod def concat(*sequences: 'OpSequence') -> 'OpSequence': - seq = [] + seq: List[Op] = [] for s in sequences: seq.extend(s.ops) simplify_op_sequence(seq) @@ -124,6 +125,8 @@ def simplify_op_sequence(seq: List[Op]): i += 1 + +# noinspection PyArgumentList class DataKind(Enum): Int = auto() Float = auto() @@ -131,19 +134,24 @@ class DataKind(Enum): @dataclass class Prop: - self_data: defaultdict[DataKind, int] = field(default_factory=lambda: defaultdict(int)) - struct_data: defaultdict[int, defaultdict[int, defaultdict[DataKind, int]]] = field(default_factory=lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(int)))) - unifications: Set[Tuple[int, int]] = field(default_factory=set) + self_data: Counter[DataKind] = field(default_factory=Counter) + struct_data: defaultdict[int, defaultdict[int, Counter[DataKind]]] = field(default_factory=lambda: defaultdict(lambda: defaultdict(Counter))) + unifications: Counter[Tuple[int, int]] = field(default_factory=Counter) def update(self, other: 'Prop'): - for kind, v in other.self_data.items(): - self.self_data[kind] += v + self.self_data.update(other.self_data) for offset, v1 in other.struct_data.items(): for size, v2 in v1.items(): - for kind, v3 in v2.items(): - self.struct_data[offset][size][kind] += v3 + self.struct_data[offset][size].update(v2) self.unifications.update(other.unifications) + def subtract(self, other: 'Prop'): + self.self_data.subtract(other.self_data) + for offset, v1 in other.struct_data.items(): + for size, v2 in v1.items(): + self.struct_data[offset][size].subtract(v2) + self.unifications.subtract(other.unifications) + def __or__(self, other: 'Prop'): result = Prop() result.update(self) @@ -156,7 +164,7 @@ class Prop: if isinstance(op, RefOp): result.struct_data.clear() result.struct_data[0][op.size] = result.self_data - result.self_data = defaultdict(int) + result.self_data = Counter() self.unifications.clear() elif isinstance(op, DerefOp): result.self_data = result.struct_data[0][op.size] @@ -167,8 +175,16 @@ class Prop: result.struct_data.clear() for k, v in items: result.struct_data[k + op.const] = v - result.self_data.clear() # TODO ??? - result.unifications = {(x + op.const, y + op.const) for x, y in result.unifications} + saved = result.self_data.get(DataKind.Pointer, None) + result.self_data.clear() + if saved: + result.self_data[DataKind.Pointer] = saved + result.unifications = Counter((x + op.const, y + op.const) for x, y in result.unifications) + elif isinstance(op, VarOffsetOp): + saved = result.self_data.get(DataKind.Pointer, None) + result = Prop() + if saved: + result.self_data[DataKind.Pointer] = saved else: result = Prop() return result @@ -199,7 +215,20 @@ class LiveData: def commit(self, target: Atom, graph: networkx.DiGraph): for src, seq in self.sources: - graph.add_edge(src, target, ops=seq) + graph.add_edge(src, target, ops=seq, cf=[]) + + def prop(self, prop: Prop, graph: networkx.DiGraph): + for atom, ops in self.sources: + tprop = prop.transform(ops.invert()) + try: + eprop: Prop = graph.nodes[atom].get('prop') + except KeyError: + graph.add_node(atom, prop=tprop) + else: + if eprop: + eprop.update(tprop) + else: + graph.nodes[atom]['prop'] = tprop @dataclass(frozen=True) class RegisterInputInfo: @@ -212,6 +241,8 @@ class RegisterInputInfo: def step(self, pred: int, succ: int, jumpkind: str, callsite: Optional[int]) -> 'Optional[RegisterInputInfo]': if jumpkind == 'Ijk_Ret': + if callsite is None: + raise TypeError("Must specify callsite if jumpkind is Ret") return RegisterInputInfo(callsites=self.callsites + (callsite,), reverse_callsites=self.reverse_callsites) elif jumpkind == 'Ijk_Call': if not self.callsites: @@ -244,5 +275,6 @@ class ControlFlowActionPop(ControlFlowAction): @dataclass class BlockInfo: - outputs: Dict[str, RegisterAtom] = field(default_factory=lambda: {}) # slot names + outputs: Dict[str, RegisterAtom] = field(default_factory=dict) # slot names inputs: Dict[RegisterAtom, RegisterInputInfo] = field(default_factory=dict) + atoms: List[Atom] = field(default_factory=list) diff --git a/typetapper/engine.py b/typetapper/engine.py index e9b4355..cb8e4ac 100644 --- a/typetapper/engine.py +++ b/typetapper/engine.py @@ -15,12 +15,13 @@ class TypeTapperEngine(angr.engines.vex.VEXMixin): def __init__(self, project: angr.Project, kp: TypeTapperManager, **kwargs): super().__init__(project, **kwargs) self.kp = kp + self.last_imark: int = 0 tmps: List[TmpAtom] @property def codeloc(self): - return CodeLoc(bbl_addr=self.irsb.addr, stmt_idx=self.stmt_idx) + return CodeLoc(bbl_addr=self.irsb.addr, stmt_idx=self.stmt_idx, ins_addr=self.last_imark) @property def graph(self): @@ -31,7 +32,9 @@ class TypeTapperEngine(angr.engines.vex.VEXMixin): return self.kp.block_info[self.irsb.addr] def _handle_vex_const(self, const): - return LiveData.new_const(const.value, get_type_size_bytes(const.type), self.codeloc) + atom = LiveData.new_const(const.value, get_type_size_bytes(const.type), self.codeloc) + self.blockinfo.atoms.append(atom.sources[0][0]) + return atom def _perform_vex_expr_RdTmp(self, tmp): return LiveData.new_atom(self.tmps[tmp]) @@ -47,6 +50,7 @@ class TypeTapperEngine(angr.engines.vex.VEXMixin): return LiveData.new_null(size) slot_name = self.project.arch.register_size_names[slot_info] reg_atom = RegisterAtom(self.codeloc, size, name, slot_name) + self.blockinfo.atoms.append(reg_atom) source = self.blockinfo.outputs.get(slot_name, None) if source is not None: @@ -60,8 +64,13 @@ class TypeTapperEngine(angr.engines.vex.VEXMixin): return LiveData.new_atom(reg_atom) def _perform_vex_expr_Load(self, addr: LiveData, ty, endness, **kwargs): + prop = Prop() + prop.self_data[DataKind.Pointer] += 1 + addr.prop(prop, self.graph) + size = get_type_size_bytes(ty) mem_atom = MemoryAtom(self.codeloc, size, endness) + self.blockinfo.atoms.append(mem_atom) addr.appended(DerefOp(size), size).commit(mem_atom, self.graph) return LiveData.new_atom(mem_atom) @@ -73,7 +82,18 @@ class TypeTapperEngine(angr.engines.vex.VEXMixin): return ifTrue.unioned(ifFalse, ifTrue.size) def _perform_vex_expr_Op(self, op, args: List[LiveData]): - size = get_type_size_bytes(pyvex.get_op_retty(op)) + ret_ty, arg_tys = pyvex.expr.op_arg_types(op) + for arg, ty in zip(args, arg_tys): + if ty.startswith('Ity_F'): + prop = Prop() + prop.self_data[DataKind.Float] += 1 + arg.prop(prop, self.graph) + elif ty.startswith('Ity_I'): + prop = Prop() + prop.self_data[DataKind.Int] += 1 + arg.prop(prop, self.graph) + + size = get_type_size_bytes(ret_ty) if op in ('Add8', 'Add16', 'Add32', 'Add64'): sign = 1 elif op in ('Sub8', 'Sub16', 'Sub32', 'Sub64'): @@ -106,6 +126,9 @@ class TypeTapperEngine(angr.engines.vex.VEXMixin): return LiveData.new_null(get_type_size_bytes(expr.result_type(self.irsb.tyenv))) + def _handle_vex_stmt_IMark(self, stmt: pyvex.stmt.IMark): + self.last_imark = stmt.addr + stmt.delta + def _perform_vex_stmt_Put(self, offset: LiveData, data: LiveData, **kwargs): if type(offset.const) is not int: return LiveData.new_null(data.size) @@ -116,16 +139,23 @@ class TypeTapperEngine(angr.engines.vex.VEXMixin): return LiveData.new_null(data.size) slot_name = self.project.arch.register_size_names[slot_info] reg_atom = RegisterAtom(self.codeloc, data.size, name, slot_name) + self.blockinfo.atoms.append(reg_atom) data.commit(reg_atom, self.graph) self.blockinfo.outputs[slot_name] = reg_atom def _perform_vex_stmt_WrTmp(self, tmp, data: LiveData): tmp_atom = TmpAtom(self.codeloc, get_type_size_bytes(self.irsb.tyenv.lookup(tmp)), tmp) + self.blockinfo.atoms.append(tmp_atom) self.tmps[tmp] = tmp_atom data.commit(tmp_atom, self.graph) def _perform_vex_stmt_Store(self, addr: LiveData, data: LiveData, endness, **kwargs): + prop = Prop() + prop.self_data[DataKind.Pointer] += 1 + addr.prop(prop, self.graph) + mem_atom = MemoryAtom(self.codeloc, data.size, endness) + self.blockinfo.atoms.append(mem_atom) addr.appended(DerefOp(data.size), data.size).commit(mem_atom, self.graph) data.commit(mem_atom, self.graph) diff --git a/typetapper/hierarchy_graph.py b/typetapper/hierarchy_graph.py new file mode 100644 index 0000000..efcc5d2 --- /dev/null +++ b/typetapper/hierarchy_graph.py @@ -0,0 +1,312 @@ +from typing import Set, Union, TYPE_CHECKING, List, Optional, Dict, Iterable, Tuple +from itertools import pairwise + +import networkx + +from .relative_graph import RelativeAtomGraph, RelativeAtom +from .data import Atom, OpSequence, Prop + +if TYPE_CHECKING: + from .knowledge import TypeTapperManager + +class RelativeAtomGroup: + def __init__(self, graph: 'HierarchicalGraph', parent: Optional['RelativeAtomGroup']): + self.graph = graph + self.parent = parent + self.children: Set['RelativeAtomOrGroup'] = set() + self.prop: Prop = Prop() + +RelativeAtomOrGroup = Union[RelativeAtom, RelativeAtomGroup] +MultiGraphEdge = Tuple[RelativeAtomOrGroup, RelativeAtomOrGroup, int] + +# THIS IS NOT THREAD SAFE. YOU HAVE BEEN WARNED + +class HierarchicalGraph(RelativeAtomGraph): + def __init__(self, kp: 'TypeTapperManager', baseline: List[Atom]): + self.__graph = networkx.MultiDiGraph() + self._root_group = RelativeAtomGroup(self, None) + self._atom_parents: Dict[RelativeAtom, RelativeAtomGroup] = {} + self._current_group = self._root_group + + super().__init__(kp, baseline) + + @property + def root_group(self) -> RelativeAtomGroup: + return self._root_group + + def local_graph(self, group: RelativeAtomGroup) -> networkx.MultiDiGraph: + return self.__graph.subgraph([group] + list(group.children)) + + def prop(self, node: RelativeAtomOrGroup) -> Prop: + if isinstance(node, RelativeAtomGroup): + return node.prop + else: + return self.attrs(node).prop + + def expand(self, relatom: RelativeAtom, group: Optional[RelativeAtomGroup]=None) -> Set[RelativeAtom]: + if group is not None: + self._current_group = group + return super().expand(relatom) + + def _prop_propagate(self, node: RelativeAtomOrGroup, add: bool): + prop = self.prop(node) + for parent in self._ancestry(node): + if add: + parent.prop.update(prop) + else: + parent.prop.subtract(prop) + + def _add_node(self, relatom: RelativeAtom, path: OpSequence) -> bool: + res = super()._add_node(relatom, path) + if res: + self._atom_parents[relatom] = self._current_group + self._current_group.children.add(relatom) + self.__graph.add_node(relatom) + self._prop_propagate(relatom, True) + return res + + def _add_edge(self, relatom1: RelativeAtom, relatom2: RelativeAtom): + super()._add_edge(relatom1, relatom2) + prev_edge = None + for u, v in pairwise(self._hierarchy_path(relatom1, relatom2)): + key = self.__graph.add_edge(u, v) + edge = (u, v, key) + if prev_edge is not None: + self.__graph.edges[prev_edge]['next'] = edge + self.__graph.edges[edge]['prev'] = prev_edge + else: + self.__graph.edges[edge]['prev'] = None + prev_edge = edge + self.__graph.edges[prev_edge]['next'] = None + + def _remove_node(self, relatom: RelativeAtom): + super()._remove_node(relatom) + assert len(list(self.__graph.succ[relatom])) == 0 + assert len(list(self.__graph.pred[relatom])) == 0 + self._prop_propagate(relatom, False) + self._atom_parents[relatom].children.remove(relatom) + del self._atom_parents[relatom] + self.__graph.remove_node(relatom) + + def _remove_edge(self, relatom1: RelativeAtom, relatom2: RelativeAtom): + super()._remove_edge(relatom1, relatom2) + self.__graph.remove_edges_from(pairwise(self._hierarchy_path(relatom1, relatom2))) + + def _add_group(self, parent: RelativeAtomGroup) -> RelativeAtomGroup: + group = RelativeAtomGroup(self, parent) + parent.children.add(group) + return group + + def _paths_through(self, item: RelativeAtomOrGroup) -> Iterable[Tuple[Optional[MultiGraphEdge], Optional[MultiGraphEdge]]]: + # if item is a group, all edges will have two sides + # if item is an atom, all edges will have one side + if isinstance(item, RelativeAtom): + yield from ((None, (item, succ, key)) for succ in self.__graph.succ[item] for key in succ) + yield from (((pred, item, key), None) for pred in self.__graph.pred[item] for key in pred) + else: + yield from ((self.__graph.edges[(item, succ, key)]['prev'], (item, succ, key)) for succ in self.__graph.succ[item] for key in succ) + + def move_node_out(self, item: RelativeAtomOrGroup): + # item will now be a neighbor of its parent + if item is self._root_group: + raise ValueError("Cannot reparent root group") + parent = self._parent(item) + if parent is self._root_group: + raise ValueError("There is no parent of the root group") + new_parent = parent.parent + assert new_parent is not None + + paths = list(self._paths_through(item)) + prev_next = ['prev', 'next'] + + for prev_edge, next_edge in paths: + pred = prev_edge[0] if prev_edge else None + succ = next_edge[1] if next_edge else None + if pred is None or self._parent(pred) is item: + assert succ is not None + inner_edge = prev_edge + outer = succ + outer_edge = next_edge + outward = True + else: + assert succ is None or self._parent(succ) is item + assert pred is not None + inner_edge = next_edge + outer = pred + outer_edge = prev_edge + outward = False + further_edge = self.__graph.edges[outer_edge][prev_next[outward]] + + # we only need to break the outer edge + self.__graph.remove_edge(*outer_edge) + # is this a collapse or an expand operation? + is_neighbor = self._parent(outer) is parent + if is_neighbor: + # expand + parent_item_edge: MultiGraphEdge + parent_neighbor_edge: MultiGraphEdge + if outward: + parent_item_edge = (item, parent, self.__graph.add_edge(item, parent)) + parent_neighbor_edge = (parent, outer, self.__graph.add_edge(parent, outer)) + else: + parent_item_edge = (parent, item, self.__graph.add_edge(parent, item)) + parent_neighbor_edge = (outer, parent, self.__graph.add_edge(outer, parent)) + if inner_edge: self.__graph.edges[inner_edge][prev_next[outward]] = parent_item_edge + self.__graph.edges[parent_item_edge][prev_next[not outward]] = inner_edge + self.__graph.edges[parent_item_edge][prev_next[outward]] = parent_neighbor_edge + self.__graph.edges[parent_neighbor_edge][prev_next[not outward]] = parent_item_edge + self.__graph.edges[parent_neighbor_edge][prev_next[outward]] = further_edge + if further_edge: self.__graph.edges[further_edge][prev_next[not outward]] = further_edge + else: + # contract + assert outer is parent + assert further_edge + further = further_edge[outward] + even_further_edge = self.__graph.edges[further_edge][prev_next[outward]] + self.__graph.remove_edge(*further_edge) + if outward: + new_edge = (item, further, self.__graph.add_edge(item, further)) + else: + new_edge = (further, item, self.__graph.add_edge(further, item)) + if inner_edge: self.__graph.edges[inner_edge][prev_next[outward]] = new_edge + self.__graph.edges[new_edge][prev_next[not outward]] = inner_edge + self.__graph.edges[new_edge][prev_next[outward]] = even_further_edge + if even_further_edge: self.__graph.edges[even_further_edge][prev_next[not outward]] = new_edge + + self._prop_propagate(item, False) + new_parent.children.add(item) + parent.children.remove(item) + if isinstance(item, RelativeAtomGroup): + item.parent = new_parent + else: + self._atom_parents[item] = new_parent + self._prop_propagate(item, True) + + def move_node_in(self, item: RelativeAtomOrGroup, new_parent: RelativeAtomGroup): + if item is self._root_group: + raise ValueError("Cannot reparent root group") + if item is None: + raise ValueError("There can only be one root group") + parent = self._parent(item) + if item is new_parent: + raise ValueError("Bro are you for serious") + if self._parent(new_parent) is not parent: + raise ValueError("Can only move something into a neighbor") + + paths = list(self._paths_through(item)) + prev_next = ['prev', 'next'] + + for prev_edge, next_edge in paths: + pred = prev_edge[0] if prev_edge else None + succ = next_edge[1] if next_edge else None + if pred is None or self._parent(pred) is item: + assert succ is not None + inner_edge = prev_edge + outer = succ + outer_edge = next_edge + outward = True + else: + assert succ is None or self._parent(succ) is item + assert pred is not None + inner_edge = next_edge + outer = pred + outer_edge = prev_edge + outward = False + further_edge = self.__graph.edges[outer_edge][prev_next[outward]] + + # we only need to break the outer edge + self.__graph.remove_edge(*outer_edge) + # is this a collapse or an expand operation? + going_out = outer is not new_parent + if going_out: + # expand + parent_item_edge: MultiGraphEdge + parent_outer_edge: MultiGraphEdge + if outward: + parent_item_edge = (item, new_parent, self.__graph.add_edge(item, new_parent)) + parent_outer_edge = (new_parent, outer, self.__graph.add_edge(new_parent, outer)) + else: + parent_item_edge = (new_parent, item, self.__graph.add_edge(new_parent, item)) + parent_outer_edge = (outer, new_parent, self.__graph.add_edge(outer, new_parent)) + if inner_edge: self.__graph.edges[inner_edge][prev_next[outward]] = parent_item_edge + self.__graph.edges[parent_item_edge][prev_next[not outward]] = inner_edge + self.__graph.edges[parent_item_edge][prev_next[outward]] = parent_outer_edge + self.__graph.edges[parent_outer_edge][prev_next[not outward]] = parent_item_edge + self.__graph.edges[parent_outer_edge][prev_next[outward]] = further_edge + if further_edge: self.__graph.edges[further_edge][prev_next[not outward]] = further_edge + else: + # contract + assert further_edge + further = further_edge[outward] + even_further_edge = self.__graph.edges[further_edge][prev_next[outward]] + self.__graph.remove_edge(*further_edge) + if outward: + new_edge = (item, further, self.__graph.add_edge(item, further)) + else: + new_edge = (further, item, self.__graph.add_edge(further, item)) + if inner_edge: self.__graph.edges[inner_edge][prev_next[outward]] = new_edge + self.__graph.edges[new_edge][prev_next[not outward]] = inner_edge + self.__graph.edges[new_edge][prev_next[outward]] = even_further_edge + if even_further_edge: self.__graph.edges[even_further_edge][prev_next[not outward]] = new_edge + + self._prop_propagate(item, False) + new_parent.children.add(item) + parent.children.remove(item) + if isinstance(item, RelativeAtomGroup): + item.parent = new_parent + else: + self._atom_parents[item] = new_parent + self._prop_propagate(item, True) + + def move_node(self, node: RelativeAtomOrGroup, new_parent: RelativeAtomGroup): + new_parent_ancestry = set(self._ancestry(new_parent)) + if node in new_parent_ancestry: + raise ValueError("Cannot move node into itself") + for parent in self._ancestry(node): + if parent in new_parent_ancestry: + break + if parent is new_parent: + return + self.move_node_out(node) + else: + raise ValueError("ruh roh") + for moveto in reversed(list(self._ancestry_until(new_parent, parent))): + self.move_node_in(node, moveto) + self.move_node_in(node, new_parent) + + def _parent(self, item: RelativeAtomOrGroup) -> RelativeAtomGroup: + result = item.parent if isinstance(item, RelativeAtomGroup) else self._atom_parents[item] + if result is None: + raise ValueError("Has no parent") + return result + + def _ancestry(self, atom: RelativeAtomOrGroup) -> Iterable[RelativeAtomGroup]: + parent = atom.parent if isinstance(atom, RelativeAtomGroup) else self._atom_parents[atom] + while parent is not None: + yield parent + parent = parent.parent + + def _common_ancestor(self, atom1: RelativeAtomOrGroup, atom2: RelativeAtomOrGroup) -> RelativeAtomGroup: + set1 = set(self._ancestry(atom1)) + for parent in self._ancestry(atom2): + if parent in set1: + return parent + raise ValueError("Hierarchy structure is fucked") + + def _ancestry_until(self, start: RelativeAtomOrGroup, stop: RelativeAtomGroup, inclusive=False) -> Iterable[RelativeAtomGroup]: + for parent in self._ancestry(start): + if parent is stop: + if inclusive: + yield parent + break + yield parent + + def _hierarchy_path(self, start: RelativeAtomOrGroup, end: RelativeAtomOrGroup): + common = self._common_ancestor(start, end) + line1 = self._ancestry_until(start, common) + line2 = list(self._ancestry_until(end, common)) + + yield start + yield from line1 + yield from reversed(line2) + yield end diff --git a/typetapper/knowledge.py b/typetapper/knowledge.py index 5d4ed45..07c74e6 100644 --- a/typetapper/knowledge.py +++ b/typetapper/knowledge.py @@ -3,10 +3,64 @@ from collections import defaultdict import angr import networkx -from .data import BlockInfo +from .data import BlockInfo, RegisterAtom, MemoryAtom, TmpAtom, Atom +from .hierarchy_graph import HierarchicalGraph class TypeTapperManager(angr.knowledge_plugins.plugin.KnowledgeBasePlugin): def __init__(self, kb: angr.KnowledgeBase): self.kb = kb + self.cfg: angr.knowledge_plugins.cfg.CFGModel = None self.graph = networkx.DiGraph() self.block_info: Dict[int, BlockInfo] = defaultdict(BlockInfo) + + def normalized_block(self, addr: int) -> angr.Block: + block = self.cfg.get_any_node(addr, anyaddr=True) + if block is None: + raise LookupError("No such block %#x" % addr) + + return block.block + + def _block_info(self, addr: int): + block = self.cfg.get_any_node(addr, anyaddr=True) + if block is None: + raise LookupError("No such block %#x" % addr) + + return self.block_info[addr] + + def lookup_reg(self, addr: int, register: str) -> RegisterAtom: + blockinfo = self._block_info(addr) + # this algorithm could use some tweaking once we know how it's used + for atom in blockinfo.atoms: + if not isinstance(atom, RegisterAtom): + continue + if atom.name == register and atom.loc.ins_addr == addr: + return atom + for atom in blockinfo.atoms: + if not isinstance(atom, RegisterAtom): + continue + if atom.slot_name == register and atom.loc.ins_addr == addr: + return atom + for atom in blockinfo.inputs.keys(): + if atom.name == register: + return atom + for atom in blockinfo.inputs.keys(): + if atom.slot_name == register: + return atom + raise LookupError("Cannot find register %s in instruction %#x" % (register, addr)) + + def lookup_mem(self, addr: int) -> MemoryAtom: + blockinfo = self._block_info(addr) + for atom in blockinfo.atoms: + if isinstance(atom, MemoryAtom) and atom.loc.ins_addr == addr: + return atom + raise LookupError("Cannot find memory access in instruction %#x" % addr) + + def lookup_tmp(self, addr: int, tmp: int): + blockinfo = self._block_info(addr) + for atom in blockinfo.atoms: + if isinstance(atom, TmpAtom) and atom.tmp == tmp: + return atom + raise LookupError("Cannot find tmp %d in instruction %#x. Are your temp numbers based on the .normalized_block?" % (tmp, addr)) + + def session(self, atom: Atom) -> HierarchicalGraph: + return HierarchicalGraph(self, [atom]) diff --git a/typetapper/relative_graph.py b/typetapper/relative_graph.py new file mode 100644 index 0000000..0045f55 --- /dev/null +++ b/typetapper/relative_graph.py @@ -0,0 +1,146 @@ +from typing import TYPE_CHECKING, List, Optional, Tuple, Set +from dataclasses import dataclass +import logging + +import networkx + +from .data import Atom, Prop, OpSequence, ControlFlowActionPop, ControlFlowActionPush, ControlFlowAction + +if TYPE_CHECKING: + from .knowledge import TypeTapperManager + +l = logging.getLogger(__name__) + +@dataclass(frozen=True) +class RelativeAtom: + atom: Atom + callstack: Tuple[int, ...] + +@dataclass +class RelativeAtomAttrs: + path: OpSequence + prop: Prop + + def merge(self, other: 'RelativeAtomAttrs'): + self.prop.update(other.prop) + if self.path != other.path: + # TODO unifications + pass + return False + +class RelativeAtomGraph: + def __init__(self, kp: 'TypeTapperManager', baseline: List[Atom]): + self.kp = kp + self.__graph = networkx.DiGraph() + self.frontier = set() # nodes present in self.graph but haven't had all their edges analyzed + + for atom in baseline: + relative = RelativeAtom(atom=atom, callstack=()) + self._add_node(relative, OpSequence()) + self.frontier.add(relative) # TODO ??? + + def _add_node(self, relatom: RelativeAtom, path: OpSequence) -> bool: + """ + If relatom is not present in the graph, add it. + If it is present in the graph, merge the new information into its attrs + """ + newattrs = RelativeAtomAttrs( + prop=self.kp.graph.nodes[relatom.atom].get('prop', Prop()).transform(path.invert()), + path=path, + ) + if relatom in self.__graph.nodes: + return self.__graph.nodes[relatom]['attr'].merge(newattrs) + else: + self.__graph.add_node(relatom, attr=newattrs) + return True + + def _remove_node(self, relatom: RelativeAtom): + for pred in list(self.__graph.pred[relatom]): + self._remove_edge(pred, relatom) + for succ in list(self.__graph.succ[relatom]): + self._remove_edge(relatom, succ) + self.__graph.remove_node(relatom) + + def _add_edge(self, relatom1: RelativeAtom, relatom2: RelativeAtom): + assert relatom1 in self.__graph.nodes + assert relatom2 in self.__graph.nodes + self.__graph.add_edge(relatom1, relatom2) + + def _remove_edge(self, relatom1: RelativeAtom, relatom2: RelativeAtom): + assert relatom1 in self.__graph.nodes + assert relatom2 in self.__graph.nodes + self.__graph.remove_edge(relatom1, relatom2) + + def attrs(self, relatom: RelativeAtom) -> RelativeAtomAttrs: + return self.__graph.nodes[relatom]['attr'] + + def expand(self, relatom: RelativeAtom) -> Set[RelativeAtom]: + if relatom not in self.frontier: + return set() + + attrs = self.attrs(relatom) + result = set() + for succ, edge_attrs in self.kp.graph.succ[relatom.atom].items(): + res = self._expand_single(relatom, attrs, succ, edge_attrs['cf'], edge_attrs['ops'], False) + if res is not None: + result.add(res) + for pred, edge_attrs in self.kp.graph.pred[relatom.atom].items(): + res = self._expand_single(relatom, attrs, pred, edge_attrs['cf'], edge_attrs['ops'], True) + if res is not None: + result.add(res) + self.frontier.update(result) + return result + + def _expand_single( + self, + relatom: RelativeAtom, + attrs: RelativeAtomAttrs, + succ: Atom, + edge_cf: List[ControlFlowAction], + edge_ops: OpSequence, + is_pred: bool, + ) -> Optional[RelativeAtom]: + callstack = self._update_callstack(relatom.callstack, edge_cf, is_pred) + if callstack is None: + return None + + if is_pred: + path = edge_ops.invert() + attrs.path + else: + path = attrs.path + edge_ops + + relsucc = RelativeAtom(atom=succ, callstack=callstack) + res = self._add_node(relsucc, path) + if is_pred: + self._add_edge(relsucc, relatom) + else: + self._add_edge(relatom, relsucc) + return relsucc if res else None + + @staticmethod + def _update_callstack( + callstack: Tuple[int, ...], + cf: List[ControlFlowAction], + reverse: bool + ) -> Optional[Tuple[int, ...]]: + for directive in reversed(cf) if reverse else cf: + if isinstance(directive, ControlFlowActionPop): + pop = True + callsite = directive.callsite + elif isinstance(directive, ControlFlowActionPush): + pop = False + callsite = directive.callsite + else: + raise TypeError(type(directive)) + + pop ^= reverse + + if pop: + if callstack: + if callstack[-1] != callsite: + return None + callstack = callstack[:-1] + else: + callstack = callstack + (callsite,) + + return callstack