typetapper/typetapper/engine.py

281 lines
11 KiB
Python

from typing import Union
import logging
from collections import Counter
import angr
import pyvex
from .data import *
from .knowledge import TypeTapperManager
l = logging.getLogger(__name__)
def get_type_size_bytes(ty):
return pyvex.get_type_size(ty) // 8
class TypeTapperEngine(angr.engines.vex.VEXMixin):
def __init__(self, project: angr.Project, kp: TypeTapperManager, tmp_atoms: bool=False, **kwargs):
super().__init__(project, **kwargs)
self.kp = kp
self.last_imark: int = 0
self.tmp_atoms = tmp_atoms
self._force_addr: Optional[int] = None
tmps: List[Union[LiveData, TmpAtom]]
@property
def codeloc(self) -> CodeLoc:
if self._force_addr is not None:
return CodeLoc(bbl_addr=self._force_addr, stmt_idx=self.stmt_idx, ins_addr=self._force_addr)
return CodeLoc(bbl_addr=self.irsb.addr, stmt_idx=self.stmt_idx, ins_addr=self.last_imark)
def force_addr(self, v: int):
self._force_addr = v
@property
def graph(self):
return self.kp.graph
@property
def blockinfo(self):
if self._force_addr is not None:
return self.kp.block_info[self._force_addr]
return self.kp.block_info[self.irsb.addr]
def new_null(self, size: int, strides: Tuple[Tuple[Optional['LiveData'], int], ...]=()):
return LiveData.new_null(self.codeloc, size, strides)
def new_atom(self, atom: Atom) -> 'LiveData':
return LiveData.new_atom(self.codeloc, atom)
def new_const(self, value: int, size: int) -> 'LiveData':
return LiveData.new_const(self.codeloc, value, size)
def handle_vex_block(self, irsb):
self._force_addr = None
super().handle_vex_block(irsb)
def _handle_vex_const(self, const):
return self.const(const.value, get_type_size_bytes(const.type))
def const(self, val, size) -> LiveData:
atom = self.new_const(val, size)
self.blockinfo.atoms.append(atom.sources[0][0])
return atom
def _perform_vex_expr_RdTmp(self, tmp):
if self.tmp_atoms:
return self.new_atom(self.tmps[tmp])
else:
return self.tmps[tmp]
def _perform_vex_expr_Get(self, offset: LiveData, ty, **kwargs):
size = get_type_size_bytes(ty)
if type(offset.const) is not int:
return self.new_null(size)
name = self.project.arch.register_size_names[(offset.const, size)] # unsafe
return self.get(name, offset.const, size)
def get(self, name, offset=None, size=None):
if size is None or offset is None:
offset, size = self.project.arch.registers[name]
slot_info = self.project.arch.get_base_register(offset, size)
if slot_info is None:
l.error("??????? (%s, %s)", offset, size)
return self.new_null(size)
slot_name = self.project.arch.register_size_names[slot_info]
reg_atom = RegisterAtom(self.codeloc, size, name, slot_name)
self.blockinfo.atoms.append(reg_atom)
source = self.blockinfo.outputs.get(slot_name, None)
if source is not None:
if source.name == reg_atom.name:
self.graph.add_edge(source, reg_atom, ops=OpSequence())
else:
pass # alias mismatch
elif name in self.blockinfo.inputs:
return self.new_atom(self.blockinfo.inputs[name])
else:
self.blockinfo.inputs[name] = reg_atom
self.blockinfo.ready_inputs.add(name)
return self.new_atom(reg_atom)
def _perform_vex_expr_Load(self, addr: LiveData, ty, endness, **kwargs):
size = get_type_size_bytes(ty)
return self.load(addr, size, endness)
def load(self, addr, size, endness):
prop = Prop()
prop.self_data[DataKind.Pointer] += 1
prop.struct_data[0][size][DataKind.GenericData] += 1
addr.prop(prop, self.graph)
mem_atom = MemoryAtom(self.codeloc, size, endness)
self.blockinfo.atoms.append(mem_atom)
addr.appended(self.codeloc, DerefOp(size), size).commit(mem_atom, self.graph)
return self.new_atom(mem_atom)
def _perform_vex_expr_CCall(self, func_name, ty, args, func=None):
return self.new_null(get_type_size_bytes(ty))
def _perform_vex_expr_ITE(self, cond, ifTrue: LiveData, ifFalse: LiveData):
assert ifTrue.size == ifFalse.size
return ifTrue.unioned(self.codeloc, ifFalse, ifTrue.size)
def _perform_vex_expr_Op(self, op, args: List[LiveData]):
return self.op(op, args)
def op(self, op, args: List[LiveData]) -> LiveData:
ret_ty, arg_tys = pyvex.expr.op_arg_types(op)
for arg, ty in zip(args, arg_tys):
if ty.startswith('Ity_F'):
arg.prop_self(DataKind.Float, self.graph)
elif ty.startswith('Ity_I'):
arg.prop_self(DataKind.Int, self.graph)
size = get_type_size_bytes(ret_ty)
sign = None
mul0, mul1 = None, None
const0, const1 = None, None
if op in ('Iop_Add8', 'Iop_Add16', 'Iop_Add32', 'Iop_Add64'):
sign = 1
elif op in ('Iop_Sub8', 'Iop_Sub16', 'Iop_Sub32', 'Iop_Sub64'):
sign = -1
elif op in ('Iop_Mul8', 'Iop_Mul16', 'Iop_Mul32', 'Iop_Mul64',
'Iop_MullS8', 'Iop_MullS16', 'Iop_MullS32', 'Iop_MullS64',
'Iop_MullU8', 'Iop_MullU16', 'Iop_MullU32', 'Iop_MullU64'):
mul0 = args[0].strides
mul1 = args[1].strides
const0 = args[0].const
const1 = args[1].const
elif op in ('Iop_Shl8', 'Iop_Shl16', 'Iop_Shl32', 'Iop_Shl64'):
if args[1].const is not None and args[1].const >= 0:
const0 = args[0].const
const1 = 2**args[1].const
mul0 = args[0].strides
mul1 = ((None, const1))
if sign is not None:
assert size == args[0].size == args[1].size
stride0 = args[0].strides
stride1 = args[1].strides
strideC = Counter()
if stride0:
for key, n in stride0:
strideC[key] += n
else:
strideC[args[0]] += 1
if stride1:
for key, n in stride1:
strideC[key] += n * sign
else:
strideC[args[1]] += sign
neg1 = args[1]
if sign == -1:
neg1 = neg1.appended(self.codeloc, NegOp(), neg1.size, tuple((k, -n) for k, n in neg1.strides))
input0 = args[0]
if stride1:
for (key, stride) in stride1:
if key is None:
input0 = input0.appended(self.codeloc, ConstOffsetOp(stride), size)
else:
input0 = input0.appended(self.codeloc, StrideOffsetOp(abs(stride)), size)
else:
input0 = input0.appended(self.codeloc, VarOffsetOp(neg1), size)
input1 = args[1]
if stride0:
for (key, stride) in stride0:
if key is None:
input1 = input1.appended(self.codeloc, ConstOffsetOp(stride), size)
else:
input1 = input1.appended(self.codeloc, StrideOffsetOp(abs(stride)), size)
else:
input1 = input1.appended(self.codeloc, VarOffsetOp(args[0]), size)
result = input0.unioned(self.codeloc, input1, size, tuple((key, n) for key, n in strideC.items() if n != 0))
elif mul0 is not None and mul1 is not None:
if const0 is not None and const1 is not None:
result = self.new_null(size, strides=((None, const0 * const1),))
elif const1 is not None and len(mul0) != 0:
result = self.new_null(size, strides=tuple((key, v * const1) for key, v in mul0))
elif const0 is not None and len(mul1) != 0:
result = self.new_null(size, strides=tuple((key, v * const0) for key, v in mul1))
elif const0 is not None:
result = self.new_null(size, strides=((args[1], const0),))
elif const1 is not None:
result = self.new_null(size, strides=((args[0], const1),))
else:
result = self.new_null(size)
else:
result = self.new_null(size)
return result
def _handle_vex_expr_GSPTR(self, expr: pyvex.expr.GSPTR):
return self.new_null(get_type_size_bytes(expr.result_type(self.irsb.tyenv)))
def _handle_vex_expr_VECRET(self, expr: pyvex.expr.VECRET):
return self.new_null(get_type_size_bytes(expr.result_type(self.irsb.tyenv)))
def _handle_vex_expr_Binder(self, expr: pyvex.expr.Binder):
return self.new_null(get_type_size_bytes(expr.result_type(self.irsb.tyenv)))
def _handle_vex_stmt_IMark(self, stmt: pyvex.stmt.IMark):
self.last_imark = stmt.addr + stmt.delta
def _perform_vex_stmt_Put(self, offset: LiveData, data: LiveData, **kwargs):
if type(offset.const) is not int:
return self.new_null(data.size)
name = self.project.arch.register_size_names[(offset.const, data.size)] # unsafe
return self.put(name, data, offset.const)
def put(self, name, data, offset=None):
if offset is None:
offset = self.project.arch.registers[name][0]
slot_info = self.project.arch.get_base_register(offset, data.size)
if slot_info is None:
l.error("??????? (%s, %s)", offset, data.size)
return self.new_null(data.size)
slot_name = self.project.arch.register_size_names[slot_info]
reg_atom = RegisterAtom(self.codeloc, data.size, name, slot_name)
self.blockinfo.atoms.append(reg_atom)
data.commit(reg_atom, self.graph)
self.blockinfo.outputs[slot_name] = reg_atom
def _perform_vex_stmt_WrTmp(self, tmp, data: LiveData):
if self.tmp_atoms:
tmp_atom = TmpAtom(self.codeloc, get_type_size_bytes(self.irsb.tyenv.lookup(tmp)), tmp)
self.blockinfo.atoms.append(tmp_atom)
self.tmps[tmp] = tmp_atom
data.commit(tmp_atom, self.graph)
else:
self.tmps[tmp] = data
def _perform_vex_stmt_Store(self, addr: LiveData, data: LiveData, endness, **kwargs):
self.store(addr, data, endness)
def store(self, addr, data, endness):
prop = Prop()
prop.self_data[DataKind.Pointer] += 1
prop.struct_data[0][data.size][DataKind.GenericData] += 1
addr.prop(prop, self.graph)
mem_atom = MemoryAtom(self.codeloc, data.size, endness)
self.blockinfo.atoms.append(mem_atom)
addr.appended(self.codeloc, DerefOp(data.size), data.size).commit(mem_atom, self.graph)
data.commit(mem_atom, self.graph)
def _perform_vex_stmt_Dirty_call(self, func_name, ty, args, func=None):
if ty is None:
return None
return self.new_null(get_type_size_bytes(ty))