diff --git a/typetapper/analysis.py b/typetapper/analysis.py index 83b7cf6..d82d8f8 100644 --- a/typetapper/analysis.py +++ b/typetapper/analysis.py @@ -9,6 +9,7 @@ from angr.knowledge_plugins.cfg import CFGNode, CFGModel from .engine import TypeTapperEngine from .knowledge import TypeTapperManager from .data import Atom, RegisterAtom, CodeLoc, OpSequence, ControlFlowActionPop, ControlFlowActionPush +from . import procedures l = logging.getLogger(__name__) @@ -32,9 +33,13 @@ class TypeTapperAnalysis(angr.Analysis): node: CFGNode for node in self._cfg.graph.nodes(): block: Block = node.block - if block is None: - continue - self._engine.handle_vex_block(block.vex) + if block is not None: + self._engine.handle_vex_block(block.vex) + elif node.simprocedure_name is not None: + proc_cls = getattr(procedures, node.simprocedure_name, None) + if proc_cls is not None and issubclass(proc_cls, procedures.TypeTapperProcedure): + proc = proc_cls(self._engine, self.project.factory.cc()) + proc.analyze(node.addr) def _analyze_passive_flow(self): queue = OrderedDict() @@ -48,15 +53,20 @@ class TypeTapperAnalysis(angr.Analysis): node = self._cfg.get_any_node(block_addr) fakeret_addr = next((pred.addr for pred, attrs in self._cfg.graph.pred[node].items() if attrs['jumpkind'] == 'Ijk_FakeRet'), None) for pred, attrs in self._cfg.graph.pred[node].items(): - if attrs['jumpkind'] == 'Ijk_FakeRet': - continue pred_addr = pred.addr pred_blockinfo = self.manager.block_info[pred_addr] - callsite_addr = fakeret_addr if attrs['jumpkind'] == 'Ijk_Ret' else pred_addr if attrs['jumpkind'] == 'Ijk_Call' else None + callsite_addr = fakeret_addr if attrs['jumpkind'] == 'Ijk_Ret' else pred_addr if attrs['jumpkind'] in ('Ijk_Call', 'Ijk_FakeRet') else None # TAKE IT BACK NOW Y'ALL for name in node_blockinfo.ready_inputs: input_atom = node_blockinfo.inputs[name] + if attrs['jumpkind'] == 'Ijk_FakeRet': + # determine which registers are clobbered; determine the cc + func_addr: CFGNode = next((succ.addr for succ, attrs in self._cfg.graph.succ[pred].items() if attrs['jumpkind'] == 'Ijk_Call'), None) + function = self.kb.functions[func_addr] + if function.calling_convention is None or input_atom.slot_name in function.calling_convention.CALLER_SAVED_REGS: + continue + output_atom = pred_blockinfo.outputs.get(input_atom.slot_name, None) if output_atom is not None: if output_atom.name == name: @@ -71,8 +81,8 @@ class TypeTapperAnalysis(angr.Analysis): queue.move_to_end(pred_addr, last=False) self._passive_link(input_atom, pred_blockinfo.inputs[name], attrs['jumpkind'], callsite_addr) - # this is safe because if there is a loop edge, the atom will already be in the inputs so it won't be re-upped - node_blockinfo.ready_inputs.clear() + # this is safe because if there is a loop edge, the atom will already be in the inputs, so it won't be re-upped + node_blockinfo.ready_inputs.clear() def _passive_link(self, input_atom: Atom, output_atom: Atom, jumpkind: str, callsite: Optional[int]): if jumpkind == 'Ijk_Ret': diff --git a/typetapper/data.py b/typetapper/data.py index 84d4291..e054045 100644 --- a/typetapper/data.py +++ b/typetapper/data.py @@ -164,10 +164,20 @@ class Prop: self.struct_data[offset][size].subtract(v2) self.unifications.subtract(other.unifications) + def maximize(self, other: 'Prop'): + for key, val in other.self_data.items(): + self.self_data[key] = max(self.self_data[key], val) + for offset, v1 in other.struct_data.items(): + for size, v2 in v1.items(): + for kind, val in v2.items(): + self.struct_data[offset][size][kind] = max(self.struct_data[offset][size][kind], val) + for key, val in other.unifications.items(): + self.unifications[key] = max(self.unifications[key], val) + def __or__(self, other: 'Prop'): result = Prop() - result.update(self) - result.update(other) + result.maximize(self) + result.maximize(other) return result def transform(self, ops: OpSequence): @@ -186,12 +196,12 @@ class Prop: items = list(result.struct_data.items()) result.struct_data.clear() for k, v in items: - result.struct_data[k + op.const] = v + result.struct_data[k - op.const] = v # there is some JANK shit going on with this sign saved = result.self_data.get(DataKind.Pointer, None) result.self_data.clear() if saved: result.self_data[DataKind.Pointer] = saved - result.unifications = Counter((x + op.const, y + op.const) for x, y in result.unifications) + result.unifications = Counter((x - op.const, y - op.const) for x, y in result.unifications) elif isinstance(op, VarOffsetOp): saved = result.self_data.get(DataKind.Pointer, None) result = Prop() @@ -203,6 +213,9 @@ class Prop: @dataclass(frozen=True) class LiveData: + """ + The in-flight data representation for the analysis. All sizes are in bytes + """ sources: List[Tuple[Atom, OpSequence]] const: Optional[int] size: int @@ -242,6 +255,16 @@ class LiveData: else: graph.nodes[atom]['prop'] = tprop + def prop_self(self, kind: DataKind, graph: networkx.DiGraph): + prop = Prop() + prop.self_data[kind] += 1 + self.prop(prop, graph) + + def prop_union(self, offset1: int, offset2: int, graph: networkx.DiGraph): + prop = Prop() + prop.unifications[(offset1, offset2)] += 1 + self.prop(prop, graph) + @dataclass(frozen=True) class RegisterInputInfo: atom: RegisterAtom diff --git a/typetapper/engine.py b/typetapper/engine.py index c9081b2..f1407f7 100644 --- a/typetapper/engine.py +++ b/typetapper/engine.py @@ -18,23 +18,39 @@ class TypeTapperEngine(angr.engines.vex.VEXMixin): self.kp = kp self.last_imark: int = 0 self.tmp_atoms = tmp_atoms + self._force_addr: Optional[int] = None tmps: List[Union[LiveData, TmpAtom]] @property - def codeloc(self): + def codeloc(self) -> CodeLoc: + if self._force_addr is not None: + return CodeLoc(bbl_addr=self._force_addr, stmt_idx=self.stmt_idx, ins_addr=self._force_addr) + return CodeLoc(bbl_addr=self.irsb.addr, stmt_idx=self.stmt_idx, ins_addr=self.last_imark) + def force_addr(self, v: int): + self._force_addr = v + @property def graph(self): return self.kp.graph @property def blockinfo(self): + if self._force_addr is not None: + return self.kp.block_info[self._force_addr] return self.kp.block_info[self.irsb.addr] + def handle_vex_block(self, irsb): + self._force_addr = None + super().handle_vex_block(irsb) + def _handle_vex_const(self, const): - atom = LiveData.new_const(const.value, get_type_size_bytes(const.type), self.codeloc) + return self.const(const.value, get_type_size_bytes(const.type)) + + def const(self, val, size) -> LiveData: + atom = LiveData.new_const(val, size, self.codeloc) self.blockinfo.atoms.append(atom.sources[0][0]) return atom @@ -49,9 +65,14 @@ class TypeTapperEngine(angr.engines.vex.VEXMixin): if type(offset.const) is not int: return LiveData.new_null(size) name = self.project.arch.register_size_names[(offset.const, size)] # unsafe - slot_info = self.project.arch.get_base_register(offset.const, size) + return self.get(name, offset.const, size) + + def get(self, name, offset=None, size=None): + if size is None or offset is None: + offset, size = self.project.arch.registers[name] + slot_info = self.project.arch.get_base_register(offset, size) if slot_info is None: - l.error("???????") + l.error("??????? (%s, %s)", offset, size) return LiveData.new_null(size) slot_name = self.project.arch.register_size_names[slot_info] reg_atom = RegisterAtom(self.codeloc, size, name, slot_name) @@ -72,11 +93,12 @@ class TypeTapperEngine(angr.engines.vex.VEXMixin): return LiveData.new_atom(reg_atom) def _perform_vex_expr_Load(self, addr: LiveData, ty, endness, **kwargs): - prop = Prop() - prop.self_data[DataKind.Pointer] += 1 - addr.prop(prop, self.graph) - size = get_type_size_bytes(ty) + return self.load(addr, size, endness) + + def load(self, addr, size, endness): + addr.prop_self(DataKind.Pointer, self.graph) + mem_atom = MemoryAtom(self.codeloc, size, endness) self.blockinfo.atoms.append(mem_atom) addr.appended(DerefOp(size), size).commit(mem_atom, self.graph) @@ -90,16 +112,15 @@ class TypeTapperEngine(angr.engines.vex.VEXMixin): return ifTrue.unioned(ifFalse, ifTrue.size) def _perform_vex_expr_Op(self, op, args: List[LiveData]): + return self.op(op, args) + + def op(self, op, args: List[LiveData]) -> LiveData: ret_ty, arg_tys = pyvex.expr.op_arg_types(op) for arg, ty in zip(args, arg_tys): if ty.startswith('Ity_F'): - prop = Prop() - prop.self_data[DataKind.Float] += 1 - arg.prop(prop, self.graph) + arg.prop_self(DataKind.Float, self.graph) elif ty.startswith('Ity_I'): - prop = Prop() - prop.self_data[DataKind.Int] += 1 - arg.prop(prop, self.graph) + arg.prop_self(DataKind.Int, self.graph) size = get_type_size_bytes(ret_ty) if op in ('Iop_Add8', 'Iop_Add16', 'Iop_Add32', 'Iop_Add64'): @@ -141,9 +162,15 @@ class TypeTapperEngine(angr.engines.vex.VEXMixin): if type(offset.const) is not int: return LiveData.new_null(data.size) name = self.project.arch.register_size_names[(offset.const, data.size)] # unsafe - slot_info = self.project.arch.get_base_register(offset.const, data.size) + return self.put(name, data, offset.const) + + def put(self, name, data, offset=None): + if offset is None: + offset = self.project.arch.registers[name][0] + + slot_info = self.project.arch.get_base_register(offset, data.size) if slot_info is None: - l.error("???????") + l.error("??????? (%s, %s)", offset, data.size) return LiveData.new_null(data.size) slot_name = self.project.arch.register_size_names[slot_info] reg_atom = RegisterAtom(self.codeloc, data.size, name, slot_name) @@ -161,6 +188,9 @@ class TypeTapperEngine(angr.engines.vex.VEXMixin): self.tmps[tmp] = data def _perform_vex_stmt_Store(self, addr: LiveData, data: LiveData, endness, **kwargs): + self.store(addr, data, endness) + + def store(self, addr, data, endness): prop = Prop() prop.self_data[DataKind.Pointer] += 1 addr.prop(prop, self.graph) diff --git a/typetapper/hierarchy_graph.py b/typetapper/hierarchy_graph.py index 4cf508d..cbb93f8 100644 --- a/typetapper/hierarchy_graph.py +++ b/typetapper/hierarchy_graph.py @@ -92,8 +92,8 @@ class HierarchicalGraph(RelativeAtomGraph): else: parent.prop.subtract(prop) - def _add_node(self, relatom: RelativeAtom, path: OpSequence) -> bool: - res = super()._add_node(relatom, path) + def _add_node(self, relatom: RelativeAtom, path: OpSequence, has_gone_down: bool) -> bool: + res = super()._add_node(relatom, path, has_gone_down) if res: self._atom_parents[relatom] = self._current_group self._current_group.children.add(relatom) diff --git a/typetapper/hierarchy_graph_view.py b/typetapper/hierarchy_graph_view.py index 98a8b12..79b7220 100644 --- a/typetapper/hierarchy_graph_view.py +++ b/typetapper/hierarchy_graph_view.py @@ -133,6 +133,11 @@ class HierarchicalGraphWidget(QZoomableDraggableGraphicsView): self.nodes = {chart.nav: chart for chart in charts} + rect = self.scene().itemsBoundingRect() + self.resetTransform() + self.setSceneRect(rect) + self.centerOn(rect.center()) + def label(self, node: RelativeAtomOrGroup) -> str: if node not in self._labels: if isinstance(node, RelativeAtom): @@ -199,7 +204,7 @@ class PropChart(QGraphicsItem): data.sort() cols_allocated = defaultdict(int) row_allocated = 0 - offset_allocated = 0 + offset_allocated = None marks = [] ticks = [] for offset, kind, size, count in data: @@ -207,7 +212,7 @@ class PropChart(QGraphicsItem): for suboffset in range(offset, offset + size): cols_allocated[suboffset] = xpos + count - if offset >= offset_allocated: + if offset_allocated is None or offset >= offset_allocated: ypos = row_allocated row_allocated += size offset_allocated = offset + size diff --git a/typetapper/procedures.py b/typetapper/procedures.py new file mode 100644 index 0000000..d964100 --- /dev/null +++ b/typetapper/procedures.py @@ -0,0 +1,142 @@ +from typing import List, Optional + +import networkx + +from angr.calling_conventions import SimCC, SimFunctionArgument, SimRegArg, SimStackArg +from angr.sim_type import SimType, parse_signature, SimTypeFunction, SimTypeBottom + +from .data import BlockInfo, LiveData, CodeLoc, DataKind +from .engine import TypeTapperEngine + +class TypeTapperProcedure: + PROTOTYPE: str = NotImplemented + + def run(self, args: List[LiveData]) -> Optional[LiveData]: + raise NotImplementedError + + def __init__(self, engine: TypeTapperEngine, cc: SimCC): + self._engine = engine + self.cc = cc + self.arg_session = None + self._stackptr = None + + @property + def graph(self) -> networkx.DiGraph: + return self._engine.graph + + @property + def stackptr(self) -> LiveData: + if self._stackptr is None: + self._stackptr = self.get(self._engine.project.arch.register_names[self._engine.project.arch.sp_offset]) + return self._stackptr + + @stackptr.setter + def stackptr(self, value): + self._stackptr = value + self.put(self._engine.project.arch.register_names[self._engine.project.arch.sp_offset], value) + + def load(self, addr, size, endness=None): + if endness is None: + endness = self._engine.project.arch.memory_endness + self._engine.stmt_idx += 1 + return self._engine.load(addr, size, endness) + + def store(self, addr, data, endness=None): + if endness is None: + endness = self._engine.project.arch.memory_endness + self._engine.stmt_idx += 1 + self._engine.store(addr, data, endness) + + def get(self, reg): + self._engine.stmt_idx += 1 + return self._engine.get(reg) + + def put(self, reg, data): + self._engine.stmt_idx += 1 + self._engine.put(reg, data) + + def op(self, op, args: List[LiveData], append_size=True): + if append_size: + op = 'Iop_' + op + str(args[0].size * 8) + return self._engine.op(op, args) + + def const(self, val, size): + self._engine.stmt_idx += 1 + return self._engine.const(val, size) + + def _translate_reg_arg(self, loc: SimRegArg) -> str: + slot_offset = self._engine.project.arch.registers[loc.reg_name][0] + return self._engine.project.arch.register_size_names[(slot_offset + loc.reg_offset, loc.size)] + + def _translate_stack_offset(self, loc: SimStackArg) -> LiveData: + return self.op('Add', [self.stackptr, self.const(loc.stack_offset, self.stackptr.size)]) + + def _read_argloc(self, loc: SimFunctionArgument) -> LiveData: + if isinstance(loc, SimRegArg): + name = self._translate_reg_arg(loc) + return self.get(name) + elif isinstance(loc, SimStackArg): + return self.load(self._translate_stack_offset(loc), loc.size, self._engine.project.arch.memory_endness) + else: + return LiveData.new_null(loc.size) + + def _write_argloc(self, loc: SimFunctionArgument, value: LiveData): + if isinstance(loc, SimRegArg): + name = self._translate_reg_arg(loc) + self.put(name, value) + + def analyze(self, addr: int): + self._engine.force_addr(addr) + prototype = parse_signature(self.PROTOTYPE, arch=self._engine.project.arch) + arg_locs = self.cc.arg_locs(prototype) + args = [self._read_argloc(loc) for loc in arg_locs] + result = self.run(args) + if result is None and not isinstance(prototype.returnty, SimTypeBottom): + result = LiveData.new_null(prototype.returnty.size // 8) + if result is not None: + self._write_argloc(self.cc.return_val(prototype.returnty), result) + if self._engine.project.arch.call_sp_fix: + self.stackptr = self.op('Add', [self.stackptr, self.const(-self._engine.project.arch.call_sp_fix, self.stackptr.size)]) + + +class strrchr(TypeTapperProcedure): + PROTOTYPE: str = "char *strrchr(const char *s, int c);" + def run(self, args: List[LiveData]) -> Optional[LiveData]: + self.load(args[0], 1).prop_self(DataKind.Int, self.graph) + args[0].prop_union(0, 1, self.graph) + args[1].prop_self(DataKind.Int, self.graph) + return None + +class strchr(TypeTapperProcedure): + PROTOTYPE: str = "char *strchr(const char *s, int c);" + def run(self, args: List[LiveData]) -> Optional[LiveData]: + self.load(args[0], 1).prop_self(DataKind.Int, self.graph) + args[0].prop_union(0, 1, self.graph) + args[1].prop_self(DataKind.Int, self.graph) + return None + +class strlen(TypeTapperProcedure): + PROTOTYPE = "size_t strlen(const char *s);" + def run(self, args: List[LiveData]) -> Optional[LiveData]: + self.load(args[0], 1).prop_self(DataKind.Int, self.graph) + args[0].prop_union(0, 1, self.graph) + return None + +class strcmp(TypeTapperProcedure): + PROTOTYPE = "size_t strcmp(const char *s1, const char *s2);" + def run(self, args: List[LiveData]) -> Optional[LiveData]: + self.load(args[0], 1).prop_self(DataKind.Int, self.graph) + args[0].prop_union(0, 1, self.graph) + self.load(args[1], 1).prop_self(DataKind.Int, self.graph) + args[1].prop_union(0, 1, self.graph) + return None + +class strncmp(TypeTapperProcedure): + PROTOTYPE = "size_t strncmp(const char *s1, const char *s2, size_t n);" + def run(self, args: List[LiveData]) -> Optional[LiveData]: + self.load(args[0], 1).prop_self(DataKind.Int, self.graph) + args[0].prop_union(0, 1, self.graph) + self.load(args[1], 1).prop_self(DataKind.Int, self.graph) + args[1].prop_union(0, 1, self.graph) + args[2].prop_self(DataKind.Int, self.graph) + return None diff --git a/typetapper/relative_graph.py b/typetapper/relative_graph.py index 50f9ac1..73aed09 100644 --- a/typetapper/relative_graph.py +++ b/typetapper/relative_graph.py @@ -4,7 +4,7 @@ import logging import networkx -from .data import Atom, Prop, OpSequence, ControlFlowActionPop, ControlFlowActionPush, ControlFlowAction +from .data import Atom, Prop, OpSequence, ControlFlowActionPop, ControlFlowActionPush, ControlFlowAction, RefOp, DerefOp if TYPE_CHECKING: from .knowledge import TypeTapperManager @@ -21,9 +21,11 @@ class RelativeAtom: class RelativeAtomAttrs: path: OpSequence prop: Prop + has_gone_down: bool = False def merge(self, other: 'RelativeAtomAttrs'): - self.prop.update(other.prop) + self.prop.maximize(other.prop) + # TODO has_gone_down if self.path != other.path: # TODO unifications pass @@ -41,10 +43,10 @@ class RelativeAtomGraph: for atom in baseline: relative = RelativeAtom(atom=atom, callstack=(), rcallstack=()) - self._add_node(relative, OpSequence()) + self._add_node(relative, OpSequence(), False) self.frontier.add(relative) # TODO ??? - def _add_node(self, relatom: RelativeAtom, path: OpSequence) -> bool: + def _add_node(self, relatom: RelativeAtom, path: OpSequence, has_gone_down: bool) -> bool: """ If relatom is not present in the graph, add it. If it is present in the graph, merge the new information into its attrs @@ -52,6 +54,7 @@ class RelativeAtomGraph: newattrs = RelativeAtomAttrs( prop=self.kp.graph.nodes[relatom.atom].get('prop', Prop()).transform(path.invert()), path=path, + has_gone_down=has_gone_down, ) if relatom in self.__graph.nodes: return self.__graph.nodes[relatom]['attr'].merge(newattrs) @@ -106,17 +109,23 @@ class RelativeAtomGraph: edge_ops: OpSequence, is_pred: bool, ) -> Optional[RelativeAtom]: - callstack, rcallstack = self._update_callstack(relatom.callstack, relatom.rcallstack, edge_cf, is_pred) - if callstack is None: + goes_down = any(isinstance(op, DerefOp) for op in edge_ops.ops) + goes_up = any(isinstance(op, DerefOp) for op in edge_ops.ops) + if is_pred: + goes_down, goes_up = goes_up, goes_down + if attrs.has_gone_down and goes_up: return None - if is_pred: - path = edge_ops.invert() + attrs.path - else: - path = attrs.path + edge_ops + weh = self._update_callstack(relatom.callstack, relatom.rcallstack, edge_cf, is_pred) + if weh is None: + return None + callstack, rcallstack = weh + + path = attrs.path + path += edge_ops.invert() if is_pred else edge_ops relsucc = RelativeAtom(atom=succ, callstack=callstack, rcallstack=rcallstack) - res = self._add_node(relsucc, path) + res = self._add_node(relsucc, path, attrs.has_gone_down or goes_down) if is_pred: if not self.__graph.has_edge(relsucc, relatom): self._add_edge(relsucc, relatom)