data can now be generated. I think.

This commit is contained in:
Audrey 2022-10-17 20:09:38 -07:00
commit f4b68d1ab0
10 changed files with 459 additions and 0 deletions

0
typetapper/__init__.py Normal file
View File

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

64
typetapper/analysis.py Normal file
View File

@ -0,0 +1,64 @@
from collections import OrderedDict
import angr
from angr import Block
from angr.analyses.cfg import CFGBase
from angr.knowledge_plugins.cfg import CFGNode
from .engine import TypeTapperEngine
from .knowledge import TypeTapperManager
class TypeTapper(angr.Analysis):
def __init__(self, cfg: CFGBase):
self._cfg = cfg
self._manager = self.kb.request_knowledge(TypeTapperManager)
self._engine = TypeTapperEngine(self.project, self._manager)
if not self._cfg.normalized:
raise ValueError("CFG must be normalized")
self._analyze_active_flow()
self._analyze_passive_flow()
def _analyze_active_flow(self):
node: CFGNode
for node in self._cfg.graph.nodes():
block: Block = node.block
if block is None:
continue
self._engine.handle_vex_block(block.vex)
def _analyze_passive_flow(self):
queue = OrderedDict()
for block_addr in self._manager.block_info.keys():
queue[block_addr] = None
while queue:
block_addr = next(iter(reversed(queue.keys())))
queue.pop(block_addr)
node_blockinfo = self._manager.block_info[block_addr]
node = self._cfg.model.get_any_node(block_addr)
fakeret_addr = next((pred.addr for pred, attrs in self._cfg.graph.pred[node].items() if attrs['jumpkind'] == 'Ijk_FakeRet'), None)
for pred, attrs in self._cfg.graph.pred[node].items():
if attrs['jumpkind'] == 'Ijk_FakeRet':
continue
if pred.block is None:
continue
pred_addr = pred.addr
pred_blockinfo = self._manager.block_info[block_addr]
# TAKE IT BACK NOW Y'ALL
for input_atom, input_info in node_blockinfo.inputs.items():
input_info_new = input_info.step(pred_addr, block_addr, attrs['jumpkind'], fakeret_addr)
if input_info_new is None:
continue
output_atom = pred_blockinfo.outputs.get(input_atom.slot_name, None)
if output_atom is not None:
if output_atom.name == input_atom.name:
input_info_new.commit(self._manager.graph, output_atom, input_atom)
else:
pass # alias mismatch
elif input_atom not in pred_blockinfo.inputs: # sketchy... this means that we can't account for multiple paths to the same atom
pred_blockinfo.inputs[input_atom] = input_info_new
queue[pred_addr] = None
queue.move_to_end(pred_addr, last=False)

248
typetapper/data.py Normal file
View File

@ -0,0 +1,248 @@
from typing import Tuple, Any, List, Set, Optional, Dict
from collections import defaultdict
from enum import Enum, auto
from dataclasses import dataclass, field
import copy
import networkx
@dataclass(frozen=True)
class CodeLoc:
bbl_addr: int
stmt_idx: int
@dataclass(frozen=True)
class Atom:
loc: CodeLoc
size: int
@dataclass(frozen=True)
class RegisterAtom(Atom):
name: str
slot_name: str
@dataclass(frozen=True)
class MemoryAtom(Atom):
endness: str
@dataclass(frozen=True)
class TmpAtom(Atom):
tmp: int
@dataclass(frozen=True)
class ConstAtom(Atom):
value: int
@dataclass(frozen=True)
class Op:
def invert(self) -> 'Op':
raise NotImplementedError
@dataclass(frozen=True)
class ConstOffsetOp(Op):
const: int
def invert(self):
return ConstOffsetOp(-self.const)
@dataclass(frozen=True)
class VarOffsetOp(Op):
var: Any
def invert(self):
# TODO ????
return self
@dataclass(frozen=True)
class DerefOp(Op):
size: int
def invert(self):
return RefOp(self.size)
@dataclass(frozen=True)
class RefOp(Op):
size: int
def invert(self):
return DerefOp(self.size)
#@dataclass(frozen=True)
#class OtherOp(Op):
# def invert(self) -> 'Op':
# return self
@dataclass(frozen=True)
class OpSequence:
ops: Tuple[Op, ...] = ()
def __add__(self, other: 'OpSequence') -> 'OpSequence':
seq = list(self.ops)
seq.extend(other.ops)
simplify_op_sequence(seq)
return OpSequence(tuple(seq))
def appended(self, *op: Op) -> 'OpSequence':
seq = list(self.ops)
seq.extend(op)
simplify_op_sequence(seq)
return OpSequence(tuple(seq))
@staticmethod
def concat(*sequences: 'OpSequence') -> 'OpSequence':
seq = []
for s in sequences:
seq.extend(s.ops)
simplify_op_sequence(seq)
return OpSequence(tuple(seq))
def invert(self) -> 'OpSequence':
return OpSequence(tuple(x.invert() for x in reversed(self.ops)))
def simplify_op_sequence(seq: List[Op]):
i = 0
while i < len(seq):
cur = seq[i]
if isinstance(cur, ConstOffsetOp) and cur.const == 0:
seq.pop(i)
continue
nex = seq[i + 1] if i + 1 < len(seq) else None
if isinstance(cur, ConstOffsetOp) and isinstance(nex, ConstOffsetOp):
seq[i] = ConstOffsetOp(cur.const + nex.const)
seq.pop(i + 1)
continue
if isinstance(cur, RefOp) and isinstance(nex, DerefOp) and cur.size == nex.size:
seq.pop(i)
seq.pop(i)
continue
if isinstance(cur, DerefOp) and isinstance(nex, RefOp) and cur.size == nex.size:
seq.pop(i)
seq.pop(i)
continue
i += 1
class DataKind(Enum):
Int = auto()
Float = auto()
Pointer = auto()
@dataclass
class Prop:
self_data: defaultdict[DataKind, int] = field(default_factory=lambda: defaultdict(int))
struct_data: defaultdict[int, defaultdict[int, defaultdict[DataKind, int]]] = field(default_factory=lambda: defaultdict(lambda: defaultdict(lambda: defaultdict(int))))
unifications: Set[Tuple[int, int]] = field(default_factory=set)
def update(self, other: 'Prop'):
for kind, v in other.self_data.items():
self.self_data[kind] += v
for offset, v1 in other.struct_data.items():
for size, v2 in v1.items():
for kind, v3 in v2.items():
self.struct_data[offset][size][kind] += v3
self.unifications.update(other.unifications)
def __or__(self, other: 'Prop'):
result = Prop()
result.update(self)
result.update(other)
return result
def transform(self, ops: OpSequence):
result = copy.deepcopy(self)
for op in ops.ops:
if isinstance(op, RefOp):
result.struct_data.clear()
result.struct_data[0][op.size] = result.self_data
result.self_data = defaultdict(int)
self.unifications.clear()
elif isinstance(op, DerefOp):
result.self_data = result.struct_data[0][op.size]
result.struct_data.clear()
self.unifications.clear()
elif isinstance(op, ConstOffsetOp):
items = list(result.struct_data.items())
result.struct_data.clear()
for k, v in items:
result.struct_data[k + op.const] = v
result.self_data.clear() # TODO ???
result.unifications = {(x + op.const, y + op.const) for x, y in result.unifications}
else:
result = Prop()
return result
@dataclass(frozen=True)
class LiveData:
sources: List[Tuple[Atom, OpSequence]]
const: Optional[int]
size: int
@classmethod
def new_null(cls, size: int):
return cls([], None, size)
@classmethod
def new_atom(cls, atom: Atom) -> 'LiveData':
return cls([(atom, OpSequence())], None, atom.size)
@classmethod
def new_const(cls, value: int, size: int, codeloc: CodeLoc) -> 'LiveData':
return cls([(ConstAtom(codeloc, size, value), OpSequence())], value, size)
def appended(self, op: Op, size: int) -> 'LiveData':
return LiveData([(atom, seq.appended(op)) for atom, seq in self.sources], self.const, size)
def unioned(self, other: 'LiveData', size: int, const: Optional[int]=None) -> 'LiveData':
return LiveData(self.sources + other.sources, const, size)
def commit(self, target: Atom, graph: networkx.DiGraph):
for src, seq in self.sources:
graph.add_edge(src, target, ops=seq)
@dataclass(frozen=True)
class RegisterInputInfo:
callsites: Tuple[int, ...]
# when we go back through a ret, we push the callsite onto this stack. we may then only go back through calls if
# they match the top of the stack, at which point they are popped off
reverse_callsites: Tuple[int, ...]
# when we go back through a call and there is nothing on the callstack, an entry is pushed onto this stack.
# not sure what this indicates yet
def step(self, pred: int, succ: int, jumpkind: str, callsite: Optional[int]) -> 'Optional[RegisterInputInfo]':
if jumpkind == 'Ijk_Ret':
return RegisterInputInfo(callsites=self.callsites + (callsite,), reverse_callsites=self.reverse_callsites)
elif jumpkind == 'Ijk_Call':
if not self.callsites:
return RegisterInputInfo(callsites=(), reverse_callsites=self.reverse_callsites + (pred,))
elif self.callsites[-1] == pred:
return RegisterInputInfo(callsites=self.callsites[:-1], reverse_callsites=self.reverse_callsites)
else:
return None
else:
return RegisterInputInfo(callsites=self.callsites, reverse_callsites=self.reverse_callsites)
def commit(self, graph: networkx.DiGraph, source: RegisterAtom, dest: RegisterAtom):
actions: List[ControlFlowAction] = [ControlFlowActionPop(i) for i in self.callsites]
actions += [ControlFlowActionPush(i) for i in self.reverse_callsites]
graph.add_edge(source, dest, ops=OpSequence(), cf=actions)
@dataclass(frozen=True)
class ControlFlowAction:
pass
@dataclass(frozen=True)
class ControlFlowActionPush(ControlFlowAction):
callsite: int
@dataclass(frozen=True)
class ControlFlowActionPop(ControlFlowAction):
callsite: int
@dataclass
class BlockInfo:
outputs: Dict[str, RegisterAtom] = field(default_factory=lambda: {}) # slot names
inputs: Dict[RegisterAtom, RegisterInputInfo] = field(default_factory=dict)

135
typetapper/engine.py Normal file
View File

@ -0,0 +1,135 @@
import logging
import angr
import pyvex
from .data import *
from .knowledge import TypeTapperManager
l = logging.getLogger(__name__)
def get_type_size_bytes(ty):
return pyvex.get_type_size(ty) // 8
class TypeTapperEngine(angr.engines.vex.VEXMixin):
def __init__(self, project: angr.Project, kp: TypeTapperManager, **kwargs):
super().__init__(project, **kwargs)
self.kp = kp
tmps: List[TmpAtom]
@property
def codeloc(self):
return CodeLoc(bbl_addr=self.irsb.addr, stmt_idx=self.stmt_idx)
@property
def graph(self):
return self.kp.graph
@property
def blockinfo(self):
return self.kp.block_info[self.irsb.addr]
def _handle_vex_const(self, const):
return LiveData.new_const(const.value, get_type_size_bytes(const.type), self.codeloc)
def _perform_vex_expr_RdTmp(self, tmp):
return LiveData.new_atom(self.tmps[tmp])
def _perform_vex_expr_Get(self, offset: LiveData, ty, **kwargs):
size = get_type_size_bytes(ty)
if type(offset.const) is not int:
return LiveData.new_null(size)
name = self.project.arch.register_size_names[(offset.const, size)] # unsafe
slot_info = self.project.arch.get_base_register(offset.const, size)
if slot_info is None:
l.error("???????")
return LiveData.new_null(size)
slot_name = self.project.arch.register_size_names[slot_info]
reg_atom = RegisterAtom(self.codeloc, size, name, slot_name)
source = self.blockinfo.outputs.get(slot_name, None)
if source is not None:
if source.name == reg_atom.name:
self.graph.add_edge(source, reg_atom, ops=OpSequence())
else:
pass # alias mismatch
else:
self.blockinfo.inputs[reg_atom] = RegisterInputInfo(callsites=(), reverse_callsites=())
return LiveData.new_atom(reg_atom)
def _perform_vex_expr_Load(self, addr: LiveData, ty, endness, **kwargs):
size = get_type_size_bytes(ty)
mem_atom = MemoryAtom(self.codeloc, size, endness)
addr.appended(DerefOp(size), size).commit(mem_atom, self.graph)
return LiveData.new_atom(mem_atom)
def _perform_vex_expr_CCall(self, func_name, ty, args, func=None):
return LiveData.new_null(get_type_size_bytes(ty))
def _perform_vex_expr_ITE(self, cond, ifTrue: LiveData, ifFalse: LiveData):
assert ifTrue.size == ifFalse.size
return ifTrue.unioned(ifFalse, ifTrue.size)
def _perform_vex_expr_Op(self, op, args: List[LiveData]):
size = get_type_size_bytes(pyvex.get_op_retty(op))
if op in ('Add8', 'Add16', 'Add32', 'Add64'):
sign = 1
elif op in ('Sub8', 'Sub16', 'Sub32', 'Sub64'):
sign = -1
else:
sign = None
if sign is not None:
assert size == args[0].size == args[1].size
addend0 = args[0].const
addend1 = args[1].const
if addend0 is not None and addend1 is not None:
const = addend0 + addend1 * sign
else:
const = None
input0 = args[0].appended(ConstOffsetOp(addend1 * sign) if addend1 is not None else VarOffsetOp(args[1]), size)
input1 = args[1].appended(ConstOffsetOp(addend0) if addend0 is not None else VarOffsetOp(args[0]), size)
result = input0.unioned(input1, size, const)
else:
result = LiveData.new_null(size)
return result
def _handle_vex_expr_GSPTR(self, expr: pyvex.expr.GSPTR):
return LiveData.new_null(get_type_size_bytes(expr.result_type(self.irsb.tyenv)))
def _handle_vex_expr_VECRET(self, expr: pyvex.expr.VECRET):
return LiveData.new_null(get_type_size_bytes(expr.result_type(self.irsb.tyenv)))
def _handle_vex_expr_Binder(self, expr: pyvex.expr.Binder):
return LiveData.new_null(get_type_size_bytes(expr.result_type(self.irsb.tyenv)))
def _perform_vex_stmt_Put(self, offset: LiveData, data: LiveData, **kwargs):
if type(offset.const) is not int:
return LiveData.new_null(data.size)
name = self.project.arch.register_size_names[(offset.const, data.size)] # unsafe
slot_info = self.project.arch.get_base_register(offset.const, data.size)
if slot_info is None:
l.error("???????")
return LiveData.new_null(data.size)
slot_name = self.project.arch.register_size_names[slot_info]
reg_atom = RegisterAtom(self.codeloc, data.size, name, slot_name)
data.commit(reg_atom, self.graph)
self.blockinfo.outputs[slot_name] = reg_atom
def _perform_vex_stmt_WrTmp(self, tmp, data: LiveData):
tmp_atom = TmpAtom(self.codeloc, get_type_size_bytes(self.irsb.tyenv.lookup(tmp)), tmp)
self.tmps[tmp] = tmp_atom
data.commit(tmp_atom, self.graph)
def _perform_vex_stmt_Store(self, addr: LiveData, data: LiveData, endness, **kwargs):
mem_atom = MemoryAtom(self.codeloc, data.size, endness)
addr.appended(DerefOp(data.size), data.size).commit(mem_atom, self.graph)
data.commit(mem_atom, self.graph)
def _perform_vex_stmt_Dirty_call(self, func_name, ty, args, func=None):
if ty is None:
return None
return LiveData.new_null(get_type_size_bytes(ty))

12
typetapper/knowledge.py Normal file
View File

@ -0,0 +1,12 @@
from typing import Dict
from collections import defaultdict
import angr
import networkx
from .data import BlockInfo
class TypeTapperManager(angr.knowledge_plugins.plugin.KnowledgeBasePlugin):
def __init__(self, kb: angr.KnowledgeBase):
self.kb = kb
self.graph = networkx.DiGraph()
self.block_info: Dict[int, BlockInfo] = defaultdict(BlockInfo)