diff --git a/typetapper/analysis.py b/typetapper/analysis.py index 72f398e..ec2daff 100644 --- a/typetapper/analysis.py +++ b/typetapper/analysis.py @@ -12,20 +12,20 @@ from .knowledge import TypeTapperManager l = logging.getLogger(__name__) class TypeTapper(angr.Analysis): - def __init__(self, cfg: CFGBase): + def __init__(self, cfg: CFGBase, tmp_atoms: bool=False): self._cfg = cfg self.manager = self.kb.request_knowledge(TypeTapperManager) self.manager.cfg = cfg.model - self._engine = TypeTapperEngine(self.project, self.manager) + self._engine = TypeTapperEngine(self.project, self.manager, tmp_atoms=tmp_atoms) if not self._cfg.normalized: raise ValueError("CFG must be normalized") - l.debug('Starting active flow analysis') + l.info('Starting active flow analysis') self._analyze_active_flow() - l.debug('Starting passive flow analysis') + l.info('Starting passive flow analysis') self._analyze_passive_flow() - l.debug('Done') + l.info('Done') def _analyze_active_flow(self): node: CFGNode @@ -55,17 +55,22 @@ class TypeTapper(angr.Analysis): pred_blockinfo = self.manager.block_info[pred_addr] # TAKE IT BACK NOW Y'ALL - for input_atom, input_info in node_blockinfo.inputs.items(): + for name in node_blockinfo.ready_inputs: + input_info = node_blockinfo.inputs[name] input_info_new = input_info.step(pred_addr, block_addr, attrs['jumpkind'], fakeret_addr) if input_info_new is None: continue - output_atom = pred_blockinfo.outputs.get(input_atom.slot_name, None) + output_atom = pred_blockinfo.outputs.get(input_info.atom.slot_name, None) if output_atom is not None: - if output_atom.name == input_atom.name: - input_info_new.commit(self.manager.graph, output_atom, input_atom) + if output_atom.name == name: + input_info_new.commit(self.manager.graph, output_atom) else: pass # alias mismatch - elif input_atom not in pred_blockinfo.inputs: # sketchy... this means that we can't account for multiple paths to the same atom - pred_blockinfo.inputs[input_atom] = input_info_new + elif name not in pred_blockinfo.inputs: # sketchy... this means that we can't account for multiple paths to the same atom + pred_blockinfo.inputs[name] = input_info_new + pred_blockinfo.ready_inputs.add(name) queue[pred_addr] = None queue.move_to_end(pred_addr, last=False) + + # this is safe because if there is a loop edge, the atom will already be in the inputs so it won't be re-upped + node_blockinfo.ready_inputs.clear() diff --git a/typetapper/data.py b/typetapper/data.py index ec7e345..e6ae9db 100644 --- a/typetapper/data.py +++ b/typetapper/data.py @@ -232,6 +232,7 @@ class LiveData: @dataclass(frozen=True) class RegisterInputInfo: + atom: RegisterAtom callsites: Tuple[int, ...] # when we go back through a ret, we push the callsite onto this stack. we may then only go back through calls if # they match the top of the stack, at which point they are popped off @@ -243,21 +244,21 @@ class RegisterInputInfo: if jumpkind == 'Ijk_Ret': if callsite is None: raise TypeError("Must specify callsite if jumpkind is Ret") - return RegisterInputInfo(callsites=self.callsites + (callsite,), reverse_callsites=self.reverse_callsites) + return RegisterInputInfo(atom=self.atom, callsites=self.callsites + (callsite,), reverse_callsites=self.reverse_callsites) elif jumpkind == 'Ijk_Call': if not self.callsites: - return RegisterInputInfo(callsites=(), reverse_callsites=self.reverse_callsites + (pred,)) + return RegisterInputInfo(atom=self.atom, callsites=(), reverse_callsites=self.reverse_callsites + (pred,)) elif self.callsites[-1] == pred: - return RegisterInputInfo(callsites=self.callsites[:-1], reverse_callsites=self.reverse_callsites) + return RegisterInputInfo(atom=self.atom, callsites=self.callsites[:-1], reverse_callsites=self.reverse_callsites) else: return None else: - return RegisterInputInfo(callsites=self.callsites, reverse_callsites=self.reverse_callsites) + return RegisterInputInfo(atom=self.atom, callsites=self.callsites, reverse_callsites=self.reverse_callsites) - def commit(self, graph: networkx.DiGraph, source: RegisterAtom, dest: RegisterAtom): + def commit(self, graph: networkx.DiGraph, source: RegisterAtom): actions: List[ControlFlowAction] = [ControlFlowActionPop(i) for i in self.callsites] actions += [ControlFlowActionPush(i) for i in self.reverse_callsites] - graph.add_edge(source, dest, ops=OpSequence(), cf=actions) + graph.add_edge(source, self.atom, ops=OpSequence(), cf=actions) @dataclass(frozen=True) @@ -276,5 +277,6 @@ class ControlFlowActionPop(ControlFlowAction): @dataclass class BlockInfo: outputs: Dict[str, RegisterAtom] = field(default_factory=dict) # slot names - inputs: Dict[RegisterAtom, RegisterInputInfo] = field(default_factory=dict) + inputs: Dict[str, RegisterInputInfo] = field(default_factory=dict) # alias names atoms: List[Atom] = field(default_factory=list) + ready_inputs: Set[str] = field(default_factory=set) # alias names diff --git a/typetapper/engine.py b/typetapper/engine.py index cb8e4ac..c6ababa 100644 --- a/typetapper/engine.py +++ b/typetapper/engine.py @@ -1,3 +1,4 @@ +from typing import Union import logging import angr @@ -12,12 +13,13 @@ def get_type_size_bytes(ty): return pyvex.get_type_size(ty) // 8 class TypeTapperEngine(angr.engines.vex.VEXMixin): - def __init__(self, project: angr.Project, kp: TypeTapperManager, **kwargs): + def __init__(self, project: angr.Project, kp: TypeTapperManager, tmp_atoms: bool=False, **kwargs): super().__init__(project, **kwargs) self.kp = kp self.last_imark: int = 0 + self.tmp_atoms = tmp_atoms - tmps: List[TmpAtom] + tmps: List[Union[LiveData, TmpAtom]] @property def codeloc(self): @@ -37,7 +39,10 @@ class TypeTapperEngine(angr.engines.vex.VEXMixin): return atom def _perform_vex_expr_RdTmp(self, tmp): - return LiveData.new_atom(self.tmps[tmp]) + if self.tmp_atoms: + return LiveData.new_atom(self.tmps[tmp]) + else: + return self.tmps[tmp] def _perform_vex_expr_Get(self, offset: LiveData, ty, **kwargs): size = get_type_size_bytes(ty) @@ -58,8 +63,11 @@ class TypeTapperEngine(angr.engines.vex.VEXMixin): self.graph.add_edge(source, reg_atom, ops=OpSequence()) else: pass # alias mismatch + elif name in self.blockinfo.inputs: + return LiveData.new_atom(self.blockinfo.inputs[name].atom) else: - self.blockinfo.inputs[reg_atom] = RegisterInputInfo(callsites=(), reverse_callsites=()) + self.blockinfo.inputs[name] = RegisterInputInfo(atom=reg_atom, callsites=(), reverse_callsites=()) + self.blockinfo.ready_inputs.add(name) return LiveData.new_atom(reg_atom) @@ -94,9 +102,9 @@ class TypeTapperEngine(angr.engines.vex.VEXMixin): arg.prop(prop, self.graph) size = get_type_size_bytes(ret_ty) - if op in ('Add8', 'Add16', 'Add32', 'Add64'): + if op in ('Iop_Add8', 'Iop_Add16', 'Iop_Add32', 'Iop_Add64'): sign = 1 - elif op in ('Sub8', 'Sub16', 'Sub32', 'Sub64'): + elif op in ('Iop_Sub8', 'Iop_Sub16', 'Iop_Sub32', 'Iop_Sub64'): sign = -1 else: sign = None @@ -144,10 +152,13 @@ class TypeTapperEngine(angr.engines.vex.VEXMixin): self.blockinfo.outputs[slot_name] = reg_atom def _perform_vex_stmt_WrTmp(self, tmp, data: LiveData): - tmp_atom = TmpAtom(self.codeloc, get_type_size_bytes(self.irsb.tyenv.lookup(tmp)), tmp) - self.blockinfo.atoms.append(tmp_atom) - self.tmps[tmp] = tmp_atom - data.commit(tmp_atom, self.graph) + if self.tmp_atoms: + tmp_atom = TmpAtom(self.codeloc, get_type_size_bytes(self.irsb.tyenv.lookup(tmp)), tmp) + self.blockinfo.atoms.append(tmp_atom) + self.tmps[tmp] = tmp_atom + data.commit(tmp_atom, self.graph) + else: + self.tmps[tmp] = data def _perform_vex_stmt_Store(self, addr: LiveData, data: LiveData, endness, **kwargs): prop = Prop() diff --git a/typetapper/knowledge.py b/typetapper/knowledge.py index 07c74e6..b4a2fb9 100644 --- a/typetapper/knowledge.py +++ b/typetapper/knowledge.py @@ -40,12 +40,12 @@ class TypeTapperManager(angr.knowledge_plugins.plugin.KnowledgeBasePlugin): continue if atom.slot_name == register and atom.loc.ins_addr == addr: return atom - for atom in blockinfo.inputs.keys(): - if atom.name == register: - return atom - for atom in blockinfo.inputs.keys(): - if atom.slot_name == register: - return atom + for info in blockinfo.inputs.values(): + if info.atom.name == register: + return info.atom + for info in blockinfo.inputs.values(): + if info.atom.slot_name == register: + return info.atom raise LookupError("Cannot find register %s in instruction %#x" % (register, addr)) def lookup_mem(self, addr: int) -> MemoryAtom: diff --git a/typetapper/relative_graph.py b/typetapper/relative_graph.py index fcfcc7f..3e953ff 100644 --- a/typetapper/relative_graph.py +++ b/typetapper/relative_graph.py @@ -115,8 +115,8 @@ class RelativeAtomGraph: if not self.__graph.has_edge(relsucc, relatom): self._add_edge(relsucc, relatom) else: - if not self.__graph.has_edge(relsucc, relatom): - self._add_edge(relsucc, relatom) + if not self.__graph.has_edge(relatom, relsucc): + self._add_edge(relatom, relsucc) return relsucc if res else None @staticmethod