Hierarchy is implemented. I can't tell if it's correct until I start the vis

This commit is contained in:
Audrey 2022-10-22 21:07:01 -07:00
parent a7ea0c07c2
commit 2f6e2035f5
6 changed files with 605 additions and 24 deletions

View File

@ -1,3 +1,4 @@
import logging
from collections import OrderedDict
import angr
@ -8,17 +9,23 @@ from angr.knowledge_plugins.cfg import CFGNode
from .engine import TypeTapperEngine
from .knowledge import TypeTapperManager
l = logging.getLogger(__name__)
class TypeTapper(angr.Analysis):
def __init__(self, cfg: CFGBase):
self._cfg = cfg
self._manager = self.kb.request_knowledge(TypeTapperManager)
self._engine = TypeTapperEngine(self.project, self._manager)
self.manager = self.kb.request_knowledge(TypeTapperManager)
self.manager.cfg = cfg.model
self._engine = TypeTapperEngine(self.project, self.manager)
if not self._cfg.normalized:
raise ValueError("CFG must be normalized")
l.debug('Starting active flow analysis')
self._analyze_active_flow()
l.debug('Starting passive flow analysis')
self._analyze_passive_flow()
l.debug('Done')
def _analyze_active_flow(self):
node: CFGNode
@ -30,13 +37,13 @@ class TypeTapper(angr.Analysis):
def _analyze_passive_flow(self):
queue = OrderedDict()
for block_addr in self._manager.block_info.keys():
for block_addr in self.manager.block_info.keys():
queue[block_addr] = None
while queue:
block_addr = next(iter(reversed(queue.keys())))
queue.pop(block_addr)
node_blockinfo = self._manager.block_info[block_addr]
node_blockinfo = self.manager.block_info[block_addr]
node = self._cfg.model.get_any_node(block_addr)
fakeret_addr = next((pred.addr for pred, attrs in self._cfg.graph.pred[node].items() if attrs['jumpkind'] == 'Ijk_FakeRet'), None)
for pred, attrs in self._cfg.graph.pred[node].items():
@ -45,7 +52,7 @@ class TypeTapper(angr.Analysis):
if pred.block is None:
continue
pred_addr = pred.addr
pred_blockinfo = self._manager.block_info[block_addr]
pred_blockinfo = self.manager.block_info[pred_addr]
# TAKE IT BACK NOW Y'ALL
for input_atom, input_info in node_blockinfo.inputs.items():
@ -55,7 +62,7 @@ class TypeTapper(angr.Analysis):
output_atom = pred_blockinfo.outputs.get(input_atom.slot_name, None)
if output_atom is not None:
if output_atom.name == input_atom.name:
input_info_new.commit(self._manager.graph, output_atom, input_atom)
input_info_new.commit(self.manager.graph, output_atom, input_atom)
else:
pass # alias mismatch
elif input_atom not in pred_blockinfo.inputs: # sketchy... this means that we can't account for multiple paths to the same atom

View File

@ -1,5 +1,5 @@
from typing import Tuple, Any, List, Set, Optional, Dict
from collections import defaultdict
from collections import defaultdict, Counter
from enum import Enum, auto
from dataclasses import dataclass, field
import copy
@ -11,6 +11,7 @@ import networkx
class CodeLoc:
bbl_addr: int
stmt_idx: int
ins_addr: int
@dataclass(frozen=True)
class Atom:
@ -92,7 +93,7 @@ class OpSequence:
@staticmethod
def concat(*sequences: 'OpSequence') -> 'OpSequence':
seq = []
seq: List[Op] = []
for s in sequences:
seq.extend(s.ops)
simplify_op_sequence(seq)
@ -124,6 +125,8 @@ def simplify_op_sequence(seq: List[Op]):
i += 1
# noinspection PyArgumentList
class DataKind(Enum):
Int = auto()
Float = auto()
@ -131,19 +134,24 @@ class DataKind(Enum):
@dataclass
class Prop:
self_data: defaultdict[DataKind, int] = field(default_factory=lambda: defaultdict(int))
struct_data: defaultdict[int, defaultdict[int, defaultdict[DataKind, int]]] = field(default_factory=lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(int))))
unifications: Set[Tuple[int, int]] = field(default_factory=set)
self_data: Counter[DataKind] = field(default_factory=Counter)
struct_data: defaultdict[int, defaultdict[int, Counter[DataKind]]] = field(default_factory=lambda: defaultdict(lambda: defaultdict(Counter)))
unifications: Counter[Tuple[int, int]] = field(default_factory=Counter)
def update(self, other: 'Prop'):
for kind, v in other.self_data.items():
self.self_data[kind] += v
self.self_data.update(other.self_data)
for offset, v1 in other.struct_data.items():
for size, v2 in v1.items():
for kind, v3 in v2.items():
self.struct_data[offset][size][kind] += v3
self.struct_data[offset][size].update(v2)
self.unifications.update(other.unifications)
def subtract(self, other: 'Prop'):
self.self_data.subtract(other.self_data)
for offset, v1 in other.struct_data.items():
for size, v2 in v1.items():
self.struct_data[offset][size].subtract(v2)
self.unifications.subtract(other.unifications)
def __or__(self, other: 'Prop'):
result = Prop()
result.update(self)
@ -156,7 +164,7 @@ class Prop:
if isinstance(op, RefOp):
result.struct_data.clear()
result.struct_data[0][op.size] = result.self_data
result.self_data = defaultdict(int)
result.self_data = Counter()
self.unifications.clear()
elif isinstance(op, DerefOp):
result.self_data = result.struct_data[0][op.size]
@ -167,8 +175,16 @@ class Prop:
result.struct_data.clear()
for k, v in items:
result.struct_data[k + op.const] = v
result.self_data.clear() # TODO ???
result.unifications = {(x + op.const, y + op.const) for x, y in result.unifications}
saved = result.self_data.get(DataKind.Pointer, None)
result.self_data.clear()
if saved:
result.self_data[DataKind.Pointer] = saved
result.unifications = Counter((x + op.const, y + op.const) for x, y in result.unifications)
elif isinstance(op, VarOffsetOp):
saved = result.self_data.get(DataKind.Pointer, None)
result = Prop()
if saved:
result.self_data[DataKind.Pointer] = saved
else:
result = Prop()
return result
@ -199,7 +215,20 @@ class LiveData:
def commit(self, target: Atom, graph: networkx.DiGraph):
for src, seq in self.sources:
graph.add_edge(src, target, ops=seq)
graph.add_edge(src, target, ops=seq, cf=[])
def prop(self, prop: Prop, graph: networkx.DiGraph):
for atom, ops in self.sources:
tprop = prop.transform(ops.invert())
try:
eprop: Prop = graph.nodes[atom].get('prop')
except KeyError:
graph.add_node(atom, prop=tprop)
else:
if eprop:
eprop.update(tprop)
else:
graph.nodes[atom]['prop'] = tprop
@dataclass(frozen=True)
class RegisterInputInfo:
@ -212,6 +241,8 @@ class RegisterInputInfo:
def step(self, pred: int, succ: int, jumpkind: str, callsite: Optional[int]) -> 'Optional[RegisterInputInfo]':
if jumpkind == 'Ijk_Ret':
if callsite is None:
raise TypeError("Must specify callsite if jumpkind is Ret")
return RegisterInputInfo(callsites=self.callsites + (callsite,), reverse_callsites=self.reverse_callsites)
elif jumpkind == 'Ijk_Call':
if not self.callsites:
@ -244,5 +275,6 @@ class ControlFlowActionPop(ControlFlowAction):
@dataclass
class BlockInfo:
outputs: Dict[str, RegisterAtom] = field(default_factory=lambda: {}) # slot names
outputs: Dict[str, RegisterAtom] = field(default_factory=dict) # slot names
inputs: Dict[RegisterAtom, RegisterInputInfo] = field(default_factory=dict)
atoms: List[Atom] = field(default_factory=list)

View File

@ -15,12 +15,13 @@ class TypeTapperEngine(angr.engines.vex.VEXMixin):
def __init__(self, project: angr.Project, kp: TypeTapperManager, **kwargs):
super().__init__(project, **kwargs)
self.kp = kp
self.last_imark: int = 0
tmps: List[TmpAtom]
@property
def codeloc(self):
return CodeLoc(bbl_addr=self.irsb.addr, stmt_idx=self.stmt_idx)
return CodeLoc(bbl_addr=self.irsb.addr, stmt_idx=self.stmt_idx, ins_addr=self.last_imark)
@property
def graph(self):
@ -31,7 +32,9 @@ class TypeTapperEngine(angr.engines.vex.VEXMixin):
return self.kp.block_info[self.irsb.addr]
def _handle_vex_const(self, const):
return LiveData.new_const(const.value, get_type_size_bytes(const.type), self.codeloc)
atom = LiveData.new_const(const.value, get_type_size_bytes(const.type), self.codeloc)
self.blockinfo.atoms.append(atom.sources[0][0])
return atom
def _perform_vex_expr_RdTmp(self, tmp):
return LiveData.new_atom(self.tmps[tmp])
@ -47,6 +50,7 @@ class TypeTapperEngine(angr.engines.vex.VEXMixin):
return LiveData.new_null(size)
slot_name = self.project.arch.register_size_names[slot_info]
reg_atom = RegisterAtom(self.codeloc, size, name, slot_name)
self.blockinfo.atoms.append(reg_atom)
source = self.blockinfo.outputs.get(slot_name, None)
if source is not None:
@ -60,8 +64,13 @@ class TypeTapperEngine(angr.engines.vex.VEXMixin):
return LiveData.new_atom(reg_atom)
def _perform_vex_expr_Load(self, addr: LiveData, ty, endness, **kwargs):
prop = Prop()
prop.self_data[DataKind.Pointer] += 1
addr.prop(prop, self.graph)
size = get_type_size_bytes(ty)
mem_atom = MemoryAtom(self.codeloc, size, endness)
self.blockinfo.atoms.append(mem_atom)
addr.appended(DerefOp(size), size).commit(mem_atom, self.graph)
return LiveData.new_atom(mem_atom)
@ -73,7 +82,18 @@ class TypeTapperEngine(angr.engines.vex.VEXMixin):
return ifTrue.unioned(ifFalse, ifTrue.size)
def _perform_vex_expr_Op(self, op, args: List[LiveData]):
size = get_type_size_bytes(pyvex.get_op_retty(op))
ret_ty, arg_tys = pyvex.expr.op_arg_types(op)
for arg, ty in zip(args, arg_tys):
if ty.startswith('Ity_F'):
prop = Prop()
prop.self_data[DataKind.Float] += 1
arg.prop(prop, self.graph)
elif ty.startswith('Ity_I'):
prop = Prop()
prop.self_data[DataKind.Int] += 1
arg.prop(prop, self.graph)
size = get_type_size_bytes(ret_ty)
if op in ('Add8', 'Add16', 'Add32', 'Add64'):
sign = 1
elif op in ('Sub8', 'Sub16', 'Sub32', 'Sub64'):
@ -106,6 +126,9 @@ class TypeTapperEngine(angr.engines.vex.VEXMixin):
return LiveData.new_null(get_type_size_bytes(expr.result_type(self.irsb.tyenv)))
def _handle_vex_stmt_IMark(self, stmt: pyvex.stmt.IMark):
self.last_imark = stmt.addr + stmt.delta
def _perform_vex_stmt_Put(self, offset: LiveData, data: LiveData, **kwargs):
if type(offset.const) is not int:
return LiveData.new_null(data.size)
@ -116,16 +139,23 @@ class TypeTapperEngine(angr.engines.vex.VEXMixin):
return LiveData.new_null(data.size)
slot_name = self.project.arch.register_size_names[slot_info]
reg_atom = RegisterAtom(self.codeloc, data.size, name, slot_name)
self.blockinfo.atoms.append(reg_atom)
data.commit(reg_atom, self.graph)
self.blockinfo.outputs[slot_name] = reg_atom
def _perform_vex_stmt_WrTmp(self, tmp, data: LiveData):
tmp_atom = TmpAtom(self.codeloc, get_type_size_bytes(self.irsb.tyenv.lookup(tmp)), tmp)
self.blockinfo.atoms.append(tmp_atom)
self.tmps[tmp] = tmp_atom
data.commit(tmp_atom, self.graph)
def _perform_vex_stmt_Store(self, addr: LiveData, data: LiveData, endness, **kwargs):
prop = Prop()
prop.self_data[DataKind.Pointer] += 1
addr.prop(prop, self.graph)
mem_atom = MemoryAtom(self.codeloc, data.size, endness)
self.blockinfo.atoms.append(mem_atom)
addr.appended(DerefOp(data.size), data.size).commit(mem_atom, self.graph)
data.commit(mem_atom, self.graph)

View File

@ -0,0 +1,312 @@
from typing import Set, Union, TYPE_CHECKING, List, Optional, Dict, Iterable, Tuple
from itertools import pairwise
import networkx
from .relative_graph import RelativeAtomGraph, RelativeAtom
from .data import Atom, OpSequence, Prop
if TYPE_CHECKING:
from .knowledge import TypeTapperManager
class RelativeAtomGroup:
def __init__(self, graph: 'HierarchicalGraph', parent: Optional['RelativeAtomGroup']):
self.graph = graph
self.parent = parent
self.children: Set['RelativeAtomOrGroup'] = set()
self.prop: Prop = Prop()
RelativeAtomOrGroup = Union[RelativeAtom, RelativeAtomGroup]
MultiGraphEdge = Tuple[RelativeAtomOrGroup, RelativeAtomOrGroup, int]
# THIS IS NOT THREAD SAFE. YOU HAVE BEEN WARNED
class HierarchicalGraph(RelativeAtomGraph):
def __init__(self, kp: 'TypeTapperManager', baseline: List[Atom]):
self.__graph = networkx.MultiDiGraph()
self._root_group = RelativeAtomGroup(self, None)
self._atom_parents: Dict[RelativeAtom, RelativeAtomGroup] = {}
self._current_group = self._root_group
super().__init__(kp, baseline)
@property
def root_group(self) -> RelativeAtomGroup:
return self._root_group
def local_graph(self, group: RelativeAtomGroup) -> networkx.MultiDiGraph:
return self.__graph.subgraph([group] + list(group.children))
def prop(self, node: RelativeAtomOrGroup) -> Prop:
if isinstance(node, RelativeAtomGroup):
return node.prop
else:
return self.attrs(node).prop
def expand(self, relatom: RelativeAtom, group: Optional[RelativeAtomGroup]=None) -> Set[RelativeAtom]:
if group is not None:
self._current_group = group
return super().expand(relatom)
def _prop_propagate(self, node: RelativeAtomOrGroup, add: bool):
prop = self.prop(node)
for parent in self._ancestry(node):
if add:
parent.prop.update(prop)
else:
parent.prop.subtract(prop)
def _add_node(self, relatom: RelativeAtom, path: OpSequence) -> bool:
res = super()._add_node(relatom, path)
if res:
self._atom_parents[relatom] = self._current_group
self._current_group.children.add(relatom)
self.__graph.add_node(relatom)
self._prop_propagate(relatom, True)
return res
def _add_edge(self, relatom1: RelativeAtom, relatom2: RelativeAtom):
super()._add_edge(relatom1, relatom2)
prev_edge = None
for u, v in pairwise(self._hierarchy_path(relatom1, relatom2)):
key = self.__graph.add_edge(u, v)
edge = (u, v, key)
if prev_edge is not None:
self.__graph.edges[prev_edge]['next'] = edge
self.__graph.edges[edge]['prev'] = prev_edge
else:
self.__graph.edges[edge]['prev'] = None
prev_edge = edge
self.__graph.edges[prev_edge]['next'] = None
def _remove_node(self, relatom: RelativeAtom):
super()._remove_node(relatom)
assert len(list(self.__graph.succ[relatom])) == 0
assert len(list(self.__graph.pred[relatom])) == 0
self._prop_propagate(relatom, False)
self._atom_parents[relatom].children.remove(relatom)
del self._atom_parents[relatom]
self.__graph.remove_node(relatom)
def _remove_edge(self, relatom1: RelativeAtom, relatom2: RelativeAtom):
super()._remove_edge(relatom1, relatom2)
self.__graph.remove_edges_from(pairwise(self._hierarchy_path(relatom1, relatom2)))
def _add_group(self, parent: RelativeAtomGroup) -> RelativeAtomGroup:
group = RelativeAtomGroup(self, parent)
parent.children.add(group)
return group
def _paths_through(self, item: RelativeAtomOrGroup) -> Iterable[Tuple[Optional[MultiGraphEdge], Optional[MultiGraphEdge]]]:
# if item is a group, all edges will have two sides
# if item is an atom, all edges will have one side
if isinstance(item, RelativeAtom):
yield from ((None, (item, succ, key)) for succ in self.__graph.succ[item] for key in succ)
yield from (((pred, item, key), None) for pred in self.__graph.pred[item] for key in pred)
else:
yield from ((self.__graph.edges[(item, succ, key)]['prev'], (item, succ, key)) for succ in self.__graph.succ[item] for key in succ)
def move_node_out(self, item: RelativeAtomOrGroup):
# item will now be a neighbor of its parent
if item is self._root_group:
raise ValueError("Cannot reparent root group")
parent = self._parent(item)
if parent is self._root_group:
raise ValueError("There is no parent of the root group")
new_parent = parent.parent
assert new_parent is not None
paths = list(self._paths_through(item))
prev_next = ['prev', 'next']
for prev_edge, next_edge in paths:
pred = prev_edge[0] if prev_edge else None
succ = next_edge[1] if next_edge else None
if pred is None or self._parent(pred) is item:
assert succ is not None
inner_edge = prev_edge
outer = succ
outer_edge = next_edge
outward = True
else:
assert succ is None or self._parent(succ) is item
assert pred is not None
inner_edge = next_edge
outer = pred
outer_edge = prev_edge
outward = False
further_edge = self.__graph.edges[outer_edge][prev_next[outward]]
# we only need to break the outer edge
self.__graph.remove_edge(*outer_edge)
# is this a collapse or an expand operation?
is_neighbor = self._parent(outer) is parent
if is_neighbor:
# expand
parent_item_edge: MultiGraphEdge
parent_neighbor_edge: MultiGraphEdge
if outward:
parent_item_edge = (item, parent, self.__graph.add_edge(item, parent))
parent_neighbor_edge = (parent, outer, self.__graph.add_edge(parent, outer))
else:
parent_item_edge = (parent, item, self.__graph.add_edge(parent, item))
parent_neighbor_edge = (outer, parent, self.__graph.add_edge(outer, parent))
if inner_edge: self.__graph.edges[inner_edge][prev_next[outward]] = parent_item_edge
self.__graph.edges[parent_item_edge][prev_next[not outward]] = inner_edge
self.__graph.edges[parent_item_edge][prev_next[outward]] = parent_neighbor_edge
self.__graph.edges[parent_neighbor_edge][prev_next[not outward]] = parent_item_edge
self.__graph.edges[parent_neighbor_edge][prev_next[outward]] = further_edge
if further_edge: self.__graph.edges[further_edge][prev_next[not outward]] = further_edge
else:
# contract
assert outer is parent
assert further_edge
further = further_edge[outward]
even_further_edge = self.__graph.edges[further_edge][prev_next[outward]]
self.__graph.remove_edge(*further_edge)
if outward:
new_edge = (item, further, self.__graph.add_edge(item, further))
else:
new_edge = (further, item, self.__graph.add_edge(further, item))
if inner_edge: self.__graph.edges[inner_edge][prev_next[outward]] = new_edge
self.__graph.edges[new_edge][prev_next[not outward]] = inner_edge
self.__graph.edges[new_edge][prev_next[outward]] = even_further_edge
if even_further_edge: self.__graph.edges[even_further_edge][prev_next[not outward]] = new_edge
self._prop_propagate(item, False)
new_parent.children.add(item)
parent.children.remove(item)
if isinstance(item, RelativeAtomGroup):
item.parent = new_parent
else:
self._atom_parents[item] = new_parent
self._prop_propagate(item, True)
def move_node_in(self, item: RelativeAtomOrGroup, new_parent: RelativeAtomGroup):
if item is self._root_group:
raise ValueError("Cannot reparent root group")
if item is None:
raise ValueError("There can only be one root group")
parent = self._parent(item)
if item is new_parent:
raise ValueError("Bro are you for serious")
if self._parent(new_parent) is not parent:
raise ValueError("Can only move something into a neighbor")
paths = list(self._paths_through(item))
prev_next = ['prev', 'next']
for prev_edge, next_edge in paths:
pred = prev_edge[0] if prev_edge else None
succ = next_edge[1] if next_edge else None
if pred is None or self._parent(pred) is item:
assert succ is not None
inner_edge = prev_edge
outer = succ
outer_edge = next_edge
outward = True
else:
assert succ is None or self._parent(succ) is item
assert pred is not None
inner_edge = next_edge
outer = pred
outer_edge = prev_edge
outward = False
further_edge = self.__graph.edges[outer_edge][prev_next[outward]]
# we only need to break the outer edge
self.__graph.remove_edge(*outer_edge)
# is this a collapse or an expand operation?
going_out = outer is not new_parent
if going_out:
# expand
parent_item_edge: MultiGraphEdge
parent_outer_edge: MultiGraphEdge
if outward:
parent_item_edge = (item, new_parent, self.__graph.add_edge(item, new_parent))
parent_outer_edge = (new_parent, outer, self.__graph.add_edge(new_parent, outer))
else:
parent_item_edge = (new_parent, item, self.__graph.add_edge(new_parent, item))
parent_outer_edge = (outer, new_parent, self.__graph.add_edge(outer, new_parent))
if inner_edge: self.__graph.edges[inner_edge][prev_next[outward]] = parent_item_edge
self.__graph.edges[parent_item_edge][prev_next[not outward]] = inner_edge
self.__graph.edges[parent_item_edge][prev_next[outward]] = parent_outer_edge
self.__graph.edges[parent_outer_edge][prev_next[not outward]] = parent_item_edge
self.__graph.edges[parent_outer_edge][prev_next[outward]] = further_edge
if further_edge: self.__graph.edges[further_edge][prev_next[not outward]] = further_edge
else:
# contract
assert further_edge
further = further_edge[outward]
even_further_edge = self.__graph.edges[further_edge][prev_next[outward]]
self.__graph.remove_edge(*further_edge)
if outward:
new_edge = (item, further, self.__graph.add_edge(item, further))
else:
new_edge = (further, item, self.__graph.add_edge(further, item))
if inner_edge: self.__graph.edges[inner_edge][prev_next[outward]] = new_edge
self.__graph.edges[new_edge][prev_next[not outward]] = inner_edge
self.__graph.edges[new_edge][prev_next[outward]] = even_further_edge
if even_further_edge: self.__graph.edges[even_further_edge][prev_next[not outward]] = new_edge
self._prop_propagate(item, False)
new_parent.children.add(item)
parent.children.remove(item)
if isinstance(item, RelativeAtomGroup):
item.parent = new_parent
else:
self._atom_parents[item] = new_parent
self._prop_propagate(item, True)
def move_node(self, node: RelativeAtomOrGroup, new_parent: RelativeAtomGroup):
new_parent_ancestry = set(self._ancestry(new_parent))
if node in new_parent_ancestry:
raise ValueError("Cannot move node into itself")
for parent in self._ancestry(node):
if parent in new_parent_ancestry:
break
if parent is new_parent:
return
self.move_node_out(node)
else:
raise ValueError("ruh roh")
for moveto in reversed(list(self._ancestry_until(new_parent, parent))):
self.move_node_in(node, moveto)
self.move_node_in(node, new_parent)
def _parent(self, item: RelativeAtomOrGroup) -> RelativeAtomGroup:
result = item.parent if isinstance(item, RelativeAtomGroup) else self._atom_parents[item]
if result is None:
raise ValueError("Has no parent")
return result
def _ancestry(self, atom: RelativeAtomOrGroup) -> Iterable[RelativeAtomGroup]:
parent = atom.parent if isinstance(atom, RelativeAtomGroup) else self._atom_parents[atom]
while parent is not None:
yield parent
parent = parent.parent
def _common_ancestor(self, atom1: RelativeAtomOrGroup, atom2: RelativeAtomOrGroup) -> RelativeAtomGroup:
set1 = set(self._ancestry(atom1))
for parent in self._ancestry(atom2):
if parent in set1:
return parent
raise ValueError("Hierarchy structure is fucked")
def _ancestry_until(self, start: RelativeAtomOrGroup, stop: RelativeAtomGroup, inclusive=False) -> Iterable[RelativeAtomGroup]:
for parent in self._ancestry(start):
if parent is stop:
if inclusive:
yield parent
break
yield parent
def _hierarchy_path(self, start: RelativeAtomOrGroup, end: RelativeAtomOrGroup):
common = self._common_ancestor(start, end)
line1 = self._ancestry_until(start, common)
line2 = list(self._ancestry_until(end, common))
yield start
yield from line1
yield from reversed(line2)
yield end

View File

@ -3,10 +3,64 @@ from collections import defaultdict
import angr
import networkx
from .data import BlockInfo
from .data import BlockInfo, RegisterAtom, MemoryAtom, TmpAtom, Atom
from .hierarchy_graph import HierarchicalGraph
class TypeTapperManager(angr.knowledge_plugins.plugin.KnowledgeBasePlugin):
def __init__(self, kb: angr.KnowledgeBase):
self.kb = kb
self.cfg: angr.knowledge_plugins.cfg.CFGModel = None
self.graph = networkx.DiGraph()
self.block_info: Dict[int, BlockInfo] = defaultdict(BlockInfo)
def normalized_block(self, addr: int) -> angr.Block:
block = self.cfg.get_any_node(addr, anyaddr=True)
if block is None:
raise LookupError("No such block %#x" % addr)
return block.block
def _block_info(self, addr: int):
block = self.cfg.get_any_node(addr, anyaddr=True)
if block is None:
raise LookupError("No such block %#x" % addr)
return self.block_info[addr]
def lookup_reg(self, addr: int, register: str) -> RegisterAtom:
blockinfo = self._block_info(addr)
# this algorithm could use some tweaking once we know how it's used
for atom in blockinfo.atoms:
if not isinstance(atom, RegisterAtom):
continue
if atom.name == register and atom.loc.ins_addr == addr:
return atom
for atom in blockinfo.atoms:
if not isinstance(atom, RegisterAtom):
continue
if atom.slot_name == register and atom.loc.ins_addr == addr:
return atom
for atom in blockinfo.inputs.keys():
if atom.name == register:
return atom
for atom in blockinfo.inputs.keys():
if atom.slot_name == register:
return atom
raise LookupError("Cannot find register %s in instruction %#x" % (register, addr))
def lookup_mem(self, addr: int) -> MemoryAtom:
blockinfo = self._block_info(addr)
for atom in blockinfo.atoms:
if isinstance(atom, MemoryAtom) and atom.loc.ins_addr == addr:
return atom
raise LookupError("Cannot find memory access in instruction %#x" % addr)
def lookup_tmp(self, addr: int, tmp: int):
blockinfo = self._block_info(addr)
for atom in blockinfo.atoms:
if isinstance(atom, TmpAtom) and atom.tmp == tmp:
return atom
raise LookupError("Cannot find tmp %d in instruction %#x. Are your temp numbers based on the .normalized_block?" % (tmp, addr))
def session(self, atom: Atom) -> HierarchicalGraph:
return HierarchicalGraph(self, [atom])

View File

@ -0,0 +1,146 @@
from typing import TYPE_CHECKING, List, Optional, Tuple, Set
from dataclasses import dataclass
import logging
import networkx
from .data import Atom, Prop, OpSequence, ControlFlowActionPop, ControlFlowActionPush, ControlFlowAction
if TYPE_CHECKING:
from .knowledge import TypeTapperManager
l = logging.getLogger(__name__)
@dataclass(frozen=True)
class RelativeAtom:
atom: Atom
callstack: Tuple[int, ...]
@dataclass
class RelativeAtomAttrs:
path: OpSequence
prop: Prop
def merge(self, other: 'RelativeAtomAttrs'):
self.prop.update(other.prop)
if self.path != other.path:
# TODO unifications
pass
return False
class RelativeAtomGraph:
def __init__(self, kp: 'TypeTapperManager', baseline: List[Atom]):
self.kp = kp
self.__graph = networkx.DiGraph()
self.frontier = set() # nodes present in self.graph but haven't had all their edges analyzed
for atom in baseline:
relative = RelativeAtom(atom=atom, callstack=())
self._add_node(relative, OpSequence())
self.frontier.add(relative) # TODO ???
def _add_node(self, relatom: RelativeAtom, path: OpSequence) -> bool:
"""
If relatom is not present in the graph, add it.
If it is present in the graph, merge the new information into its attrs
"""
newattrs = RelativeAtomAttrs(
prop=self.kp.graph.nodes[relatom.atom].get('prop', Prop()).transform(path.invert()),
path=path,
)
if relatom in self.__graph.nodes:
return self.__graph.nodes[relatom]['attr'].merge(newattrs)
else:
self.__graph.add_node(relatom, attr=newattrs)
return True
def _remove_node(self, relatom: RelativeAtom):
for pred in list(self.__graph.pred[relatom]):
self._remove_edge(pred, relatom)
for succ in list(self.__graph.succ[relatom]):
self._remove_edge(relatom, succ)
self.__graph.remove_node(relatom)
def _add_edge(self, relatom1: RelativeAtom, relatom2: RelativeAtom):
assert relatom1 in self.__graph.nodes
assert relatom2 in self.__graph.nodes
self.__graph.add_edge(relatom1, relatom2)
def _remove_edge(self, relatom1: RelativeAtom, relatom2: RelativeAtom):
assert relatom1 in self.__graph.nodes
assert relatom2 in self.__graph.nodes
self.__graph.remove_edge(relatom1, relatom2)
def attrs(self, relatom: RelativeAtom) -> RelativeAtomAttrs:
return self.__graph.nodes[relatom]['attr']
def expand(self, relatom: RelativeAtom) -> Set[RelativeAtom]:
if relatom not in self.frontier:
return set()
attrs = self.attrs(relatom)
result = set()
for succ, edge_attrs in self.kp.graph.succ[relatom.atom].items():
res = self._expand_single(relatom, attrs, succ, edge_attrs['cf'], edge_attrs['ops'], False)
if res is not None:
result.add(res)
for pred, edge_attrs in self.kp.graph.pred[relatom.atom].items():
res = self._expand_single(relatom, attrs, pred, edge_attrs['cf'], edge_attrs['ops'], True)
if res is not None:
result.add(res)
self.frontier.update(result)
return result
def _expand_single(
self,
relatom: RelativeAtom,
attrs: RelativeAtomAttrs,
succ: Atom,
edge_cf: List[ControlFlowAction],
edge_ops: OpSequence,
is_pred: bool,
) -> Optional[RelativeAtom]:
callstack = self._update_callstack(relatom.callstack, edge_cf, is_pred)
if callstack is None:
return None
if is_pred:
path = edge_ops.invert() + attrs.path
else:
path = attrs.path + edge_ops
relsucc = RelativeAtom(atom=succ, callstack=callstack)
res = self._add_node(relsucc, path)
if is_pred:
self._add_edge(relsucc, relatom)
else:
self._add_edge(relatom, relsucc)
return relsucc if res else None
@staticmethod
def _update_callstack(
callstack: Tuple[int, ...],
cf: List[ControlFlowAction],
reverse: bool
) -> Optional[Tuple[int, ...]]:
for directive in reversed(cf) if reverse else cf:
if isinstance(directive, ControlFlowActionPop):
pop = True
callsite = directive.callsite
elif isinstance(directive, ControlFlowActionPush):
pop = False
callsite = directive.callsite
else:
raise TypeError(type(directive))
pop ^= reverse
if pop:
if callstack:
if callstack[-1] != callsite:
return None
callstack = callstack[:-1]
else:
callstack = callstack + (callsite,)
return callstack