commit f4b68d1ab0f3ae106edfb09d9f5c304fb70b995e Author: Audrey Dutcher Date: Mon Oct 17 20:09:38 2022 -0700 data can now be generated. I think. diff --git a/typetapper/__init__.py b/typetapper/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/typetapper/__pycache__/__init__.cpython-310.pyc b/typetapper/__pycache__/__init__.cpython-310.pyc new file mode 100644 index 0000000..7a5001f Binary files /dev/null and b/typetapper/__pycache__/__init__.cpython-310.pyc differ diff --git a/typetapper/__pycache__/analysis.cpython-310.pyc b/typetapper/__pycache__/analysis.cpython-310.pyc new file mode 100644 index 0000000..5fe36f2 Binary files /dev/null and b/typetapper/__pycache__/analysis.cpython-310.pyc differ diff --git a/typetapper/__pycache__/data.cpython-310.pyc b/typetapper/__pycache__/data.cpython-310.pyc new file mode 100644 index 0000000..ffcd826 Binary files /dev/null and b/typetapper/__pycache__/data.cpython-310.pyc differ diff --git a/typetapper/__pycache__/engine.cpython-310.pyc b/typetapper/__pycache__/engine.cpython-310.pyc new file mode 100644 index 0000000..0431f59 Binary files /dev/null and b/typetapper/__pycache__/engine.cpython-310.pyc differ diff --git a/typetapper/__pycache__/knowledge.cpython-310.pyc b/typetapper/__pycache__/knowledge.cpython-310.pyc new file mode 100644 index 0000000..625623a Binary files /dev/null and b/typetapper/__pycache__/knowledge.cpython-310.pyc differ diff --git a/typetapper/analysis.py b/typetapper/analysis.py new file mode 100644 index 0000000..2479609 --- /dev/null +++ b/typetapper/analysis.py @@ -0,0 +1,64 @@ +from collections import OrderedDict + +import angr +from angr import Block +from angr.analyses.cfg import CFGBase +from angr.knowledge_plugins.cfg import CFGNode + +from .engine import TypeTapperEngine +from .knowledge import TypeTapperManager + +class TypeTapper(angr.Analysis): + def __init__(self, cfg: CFGBase): + self._cfg = cfg + self._manager = self.kb.request_knowledge(TypeTapperManager) + self._engine = TypeTapperEngine(self.project, self._manager) + + if not self._cfg.normalized: + raise ValueError("CFG must be normalized") + + self._analyze_active_flow() + self._analyze_passive_flow() + + def _analyze_active_flow(self): + node: CFGNode + for node in self._cfg.graph.nodes(): + block: Block = node.block + if block is None: + continue + self._engine.handle_vex_block(block.vex) + + def _analyze_passive_flow(self): + queue = OrderedDict() + for block_addr in self._manager.block_info.keys(): + queue[block_addr] = None + + while queue: + block_addr = next(iter(reversed(queue.keys()))) + queue.pop(block_addr) + node_blockinfo = self._manager.block_info[block_addr] + node = self._cfg.model.get_any_node(block_addr) + fakeret_addr = next((pred.addr for pred, attrs in self._cfg.graph.pred[node].items() if attrs['jumpkind'] == 'Ijk_FakeRet'), None) + for pred, attrs in self._cfg.graph.pred[node].items(): + if attrs['jumpkind'] == 'Ijk_FakeRet': + continue + if pred.block is None: + continue + pred_addr = pred.addr + pred_blockinfo = self._manager.block_info[block_addr] + + # TAKE IT BACK NOW Y'ALL + for input_atom, input_info in node_blockinfo.inputs.items(): + input_info_new = input_info.step(pred_addr, block_addr, attrs['jumpkind'], fakeret_addr) + if input_info_new is None: + continue + output_atom = pred_blockinfo.outputs.get(input_atom.slot_name, None) + if output_atom is not None: + if output_atom.name == input_atom.name: + input_info_new.commit(self._manager.graph, output_atom, input_atom) + else: + pass # alias mismatch + elif input_atom not in pred_blockinfo.inputs: # sketchy... this means that we can't account for multiple paths to the same atom + pred_blockinfo.inputs[input_atom] = input_info_new + queue[pred_addr] = None + queue.move_to_end(pred_addr, last=False) diff --git a/typetapper/data.py b/typetapper/data.py new file mode 100644 index 0000000..9e2a82f --- /dev/null +++ b/typetapper/data.py @@ -0,0 +1,248 @@ +from typing import Tuple, Any, List, Set, Optional, Dict +from collections import defaultdict +from enum import Enum, auto +from dataclasses import dataclass, field +import copy + +import networkx + + +@dataclass(frozen=True) +class CodeLoc: + bbl_addr: int + stmt_idx: int + +@dataclass(frozen=True) +class Atom: + loc: CodeLoc + size: int + +@dataclass(frozen=True) +class RegisterAtom(Atom): + name: str + slot_name: str + +@dataclass(frozen=True) +class MemoryAtom(Atom): + endness: str + +@dataclass(frozen=True) +class TmpAtom(Atom): + tmp: int + +@dataclass(frozen=True) +class ConstAtom(Atom): + value: int + + +@dataclass(frozen=True) +class Op: + def invert(self) -> 'Op': + raise NotImplementedError + +@dataclass(frozen=True) +class ConstOffsetOp(Op): + const: int + + def invert(self): + return ConstOffsetOp(-self.const) + +@dataclass(frozen=True) +class VarOffsetOp(Op): + var: Any + + def invert(self): + # TODO ???? + return self + +@dataclass(frozen=True) +class DerefOp(Op): + size: int + + def invert(self): + return RefOp(self.size) + +@dataclass(frozen=True) +class RefOp(Op): + size: int + + def invert(self): + return DerefOp(self.size) + +#@dataclass(frozen=True) +#class OtherOp(Op): +# def invert(self) -> 'Op': +# return self + +@dataclass(frozen=True) +class OpSequence: + ops: Tuple[Op, ...] = () + + def __add__(self, other: 'OpSequence') -> 'OpSequence': + seq = list(self.ops) + seq.extend(other.ops) + simplify_op_sequence(seq) + return OpSequence(tuple(seq)) + + def appended(self, *op: Op) -> 'OpSequence': + seq = list(self.ops) + seq.extend(op) + simplify_op_sequence(seq) + return OpSequence(tuple(seq)) + + @staticmethod + def concat(*sequences: 'OpSequence') -> 'OpSequence': + seq = [] + for s in sequences: + seq.extend(s.ops) + simplify_op_sequence(seq) + return OpSequence(tuple(seq)) + + def invert(self) -> 'OpSequence': + return OpSequence(tuple(x.invert() for x in reversed(self.ops))) + +def simplify_op_sequence(seq: List[Op]): + i = 0 + while i < len(seq): + cur = seq[i] + if isinstance(cur, ConstOffsetOp) and cur.const == 0: + seq.pop(i) + continue + nex = seq[i + 1] if i + 1 < len(seq) else None + if isinstance(cur, ConstOffsetOp) and isinstance(nex, ConstOffsetOp): + seq[i] = ConstOffsetOp(cur.const + nex.const) + seq.pop(i + 1) + continue + if isinstance(cur, RefOp) and isinstance(nex, DerefOp) and cur.size == nex.size: + seq.pop(i) + seq.pop(i) + continue + if isinstance(cur, DerefOp) and isinstance(nex, RefOp) and cur.size == nex.size: + seq.pop(i) + seq.pop(i) + continue + + i += 1 + +class DataKind(Enum): + Int = auto() + Float = auto() + Pointer = auto() + +@dataclass +class Prop: + self_data: defaultdict[DataKind, int] = field(default_factory=lambda: defaultdict(int)) + struct_data: defaultdict[int, defaultdict[int, defaultdict[DataKind, int]]] = field(default_factory=lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(int)))) + unifications: Set[Tuple[int, int]] = field(default_factory=set) + + def update(self, other: 'Prop'): + for kind, v in other.self_data.items(): + self.self_data[kind] += v + for offset, v1 in other.struct_data.items(): + for size, v2 in v1.items(): + for kind, v3 in v2.items(): + self.struct_data[offset][size][kind] += v3 + self.unifications.update(other.unifications) + + def __or__(self, other: 'Prop'): + result = Prop() + result.update(self) + result.update(other) + return result + + def transform(self, ops: OpSequence): + result = copy.deepcopy(self) + for op in ops.ops: + if isinstance(op, RefOp): + result.struct_data.clear() + result.struct_data[0][op.size] = result.self_data + result.self_data = defaultdict(int) + self.unifications.clear() + elif isinstance(op, DerefOp): + result.self_data = result.struct_data[0][op.size] + result.struct_data.clear() + self.unifications.clear() + elif isinstance(op, ConstOffsetOp): + items = list(result.struct_data.items()) + result.struct_data.clear() + for k, v in items: + result.struct_data[k + op.const] = v + result.self_data.clear() # TODO ??? + result.unifications = {(x + op.const, y + op.const) for x, y in result.unifications} + else: + result = Prop() + return result + +@dataclass(frozen=True) +class LiveData: + sources: List[Tuple[Atom, OpSequence]] + const: Optional[int] + size: int + + @classmethod + def new_null(cls, size: int): + return cls([], None, size) + + @classmethod + def new_atom(cls, atom: Atom) -> 'LiveData': + return cls([(atom, OpSequence())], None, atom.size) + + @classmethod + def new_const(cls, value: int, size: int, codeloc: CodeLoc) -> 'LiveData': + return cls([(ConstAtom(codeloc, size, value), OpSequence())], value, size) + + def appended(self, op: Op, size: int) -> 'LiveData': + return LiveData([(atom, seq.appended(op)) for atom, seq in self.sources], self.const, size) + + def unioned(self, other: 'LiveData', size: int, const: Optional[int]=None) -> 'LiveData': + return LiveData(self.sources + other.sources, const, size) + + def commit(self, target: Atom, graph: networkx.DiGraph): + for src, seq in self.sources: + graph.add_edge(src, target, ops=seq) + +@dataclass(frozen=True) +class RegisterInputInfo: + callsites: Tuple[int, ...] + # when we go back through a ret, we push the callsite onto this stack. we may then only go back through calls if + # they match the top of the stack, at which point they are popped off + reverse_callsites: Tuple[int, ...] + # when we go back through a call and there is nothing on the callstack, an entry is pushed onto this stack. + # not sure what this indicates yet + + def step(self, pred: int, succ: int, jumpkind: str, callsite: Optional[int]) -> 'Optional[RegisterInputInfo]': + if jumpkind == 'Ijk_Ret': + return RegisterInputInfo(callsites=self.callsites + (callsite,), reverse_callsites=self.reverse_callsites) + elif jumpkind == 'Ijk_Call': + if not self.callsites: + return RegisterInputInfo(callsites=(), reverse_callsites=self.reverse_callsites + (pred,)) + elif self.callsites[-1] == pred: + return RegisterInputInfo(callsites=self.callsites[:-1], reverse_callsites=self.reverse_callsites) + else: + return None + else: + return RegisterInputInfo(callsites=self.callsites, reverse_callsites=self.reverse_callsites) + + def commit(self, graph: networkx.DiGraph, source: RegisterAtom, dest: RegisterAtom): + actions: List[ControlFlowAction] = [ControlFlowActionPop(i) for i in self.callsites] + actions += [ControlFlowActionPush(i) for i in self.reverse_callsites] + graph.add_edge(source, dest, ops=OpSequence(), cf=actions) + + +@dataclass(frozen=True) +class ControlFlowAction: + pass + +@dataclass(frozen=True) +class ControlFlowActionPush(ControlFlowAction): + callsite: int + +@dataclass(frozen=True) +class ControlFlowActionPop(ControlFlowAction): + callsite: int + + +@dataclass +class BlockInfo: + outputs: Dict[str, RegisterAtom] = field(default_factory=lambda: {}) # slot names + inputs: Dict[RegisterAtom, RegisterInputInfo] = field(default_factory=dict) diff --git a/typetapper/engine.py b/typetapper/engine.py new file mode 100644 index 0000000..e9b4355 --- /dev/null +++ b/typetapper/engine.py @@ -0,0 +1,135 @@ +import logging + +import angr +import pyvex + +from .data import * +from .knowledge import TypeTapperManager + +l = logging.getLogger(__name__) + +def get_type_size_bytes(ty): + return pyvex.get_type_size(ty) // 8 + +class TypeTapperEngine(angr.engines.vex.VEXMixin): + def __init__(self, project: angr.Project, kp: TypeTapperManager, **kwargs): + super().__init__(project, **kwargs) + self.kp = kp + + tmps: List[TmpAtom] + + @property + def codeloc(self): + return CodeLoc(bbl_addr=self.irsb.addr, stmt_idx=self.stmt_idx) + + @property + def graph(self): + return self.kp.graph + + @property + def blockinfo(self): + return self.kp.block_info[self.irsb.addr] + + def _handle_vex_const(self, const): + return LiveData.new_const(const.value, get_type_size_bytes(const.type), self.codeloc) + + def _perform_vex_expr_RdTmp(self, tmp): + return LiveData.new_atom(self.tmps[tmp]) + + def _perform_vex_expr_Get(self, offset: LiveData, ty, **kwargs): + size = get_type_size_bytes(ty) + if type(offset.const) is not int: + return LiveData.new_null(size) + name = self.project.arch.register_size_names[(offset.const, size)] # unsafe + slot_info = self.project.arch.get_base_register(offset.const, size) + if slot_info is None: + l.error("???????") + return LiveData.new_null(size) + slot_name = self.project.arch.register_size_names[slot_info] + reg_atom = RegisterAtom(self.codeloc, size, name, slot_name) + + source = self.blockinfo.outputs.get(slot_name, None) + if source is not None: + if source.name == reg_atom.name: + self.graph.add_edge(source, reg_atom, ops=OpSequence()) + else: + pass # alias mismatch + else: + self.blockinfo.inputs[reg_atom] = RegisterInputInfo(callsites=(), reverse_callsites=()) + + return LiveData.new_atom(reg_atom) + + def _perform_vex_expr_Load(self, addr: LiveData, ty, endness, **kwargs): + size = get_type_size_bytes(ty) + mem_atom = MemoryAtom(self.codeloc, size, endness) + addr.appended(DerefOp(size), size).commit(mem_atom, self.graph) + return LiveData.new_atom(mem_atom) + + def _perform_vex_expr_CCall(self, func_name, ty, args, func=None): + return LiveData.new_null(get_type_size_bytes(ty)) + + def _perform_vex_expr_ITE(self, cond, ifTrue: LiveData, ifFalse: LiveData): + assert ifTrue.size == ifFalse.size + return ifTrue.unioned(ifFalse, ifTrue.size) + + def _perform_vex_expr_Op(self, op, args: List[LiveData]): + size = get_type_size_bytes(pyvex.get_op_retty(op)) + if op in ('Add8', 'Add16', 'Add32', 'Add64'): + sign = 1 + elif op in ('Sub8', 'Sub16', 'Sub32', 'Sub64'): + sign = -1 + else: + sign = None + if sign is not None: + assert size == args[0].size == args[1].size + addend0 = args[0].const + addend1 = args[1].const + if addend0 is not None and addend1 is not None: + const = addend0 + addend1 * sign + else: + const = None + input0 = args[0].appended(ConstOffsetOp(addend1 * sign) if addend1 is not None else VarOffsetOp(args[1]), size) + input1 = args[1].appended(ConstOffsetOp(addend0) if addend0 is not None else VarOffsetOp(args[0]), size) + result = input0.unioned(input1, size, const) + else: + result = LiveData.new_null(size) + + return result + + def _handle_vex_expr_GSPTR(self, expr: pyvex.expr.GSPTR): + return LiveData.new_null(get_type_size_bytes(expr.result_type(self.irsb.tyenv))) + + def _handle_vex_expr_VECRET(self, expr: pyvex.expr.VECRET): + return LiveData.new_null(get_type_size_bytes(expr.result_type(self.irsb.tyenv))) + + def _handle_vex_expr_Binder(self, expr: pyvex.expr.Binder): + return LiveData.new_null(get_type_size_bytes(expr.result_type(self.irsb.tyenv))) + + + def _perform_vex_stmt_Put(self, offset: LiveData, data: LiveData, **kwargs): + if type(offset.const) is not int: + return LiveData.new_null(data.size) + name = self.project.arch.register_size_names[(offset.const, data.size)] # unsafe + slot_info = self.project.arch.get_base_register(offset.const, data.size) + if slot_info is None: + l.error("???????") + return LiveData.new_null(data.size) + slot_name = self.project.arch.register_size_names[slot_info] + reg_atom = RegisterAtom(self.codeloc, data.size, name, slot_name) + data.commit(reg_atom, self.graph) + self.blockinfo.outputs[slot_name] = reg_atom + + def _perform_vex_stmt_WrTmp(self, tmp, data: LiveData): + tmp_atom = TmpAtom(self.codeloc, get_type_size_bytes(self.irsb.tyenv.lookup(tmp)), tmp) + self.tmps[tmp] = tmp_atom + data.commit(tmp_atom, self.graph) + + def _perform_vex_stmt_Store(self, addr: LiveData, data: LiveData, endness, **kwargs): + mem_atom = MemoryAtom(self.codeloc, data.size, endness) + addr.appended(DerefOp(data.size), data.size).commit(mem_atom, self.graph) + data.commit(mem_atom, self.graph) + + def _perform_vex_stmt_Dirty_call(self, func_name, ty, args, func=None): + if ty is None: + return None + return LiveData.new_null(get_type_size_bytes(ty)) diff --git a/typetapper/knowledge.py b/typetapper/knowledge.py new file mode 100644 index 0000000..5d4ed45 --- /dev/null +++ b/typetapper/knowledge.py @@ -0,0 +1,12 @@ +from typing import Dict +from collections import defaultdict +import angr +import networkx + +from .data import BlockInfo + +class TypeTapperManager(angr.knowledge_plugins.plugin.KnowledgeBasePlugin): + def __init__(self, kb: angr.KnowledgeBase): + self.kb = kb + self.graph = networkx.DiGraph() + self.block_info: Dict[int, BlockInfo] = defaultdict(BlockInfo)