A lot of things
- Added homespun simprocedures - Follow fakeret edges - Prop unification uses max rather than sum - Sign errors - Down-then-up heuristic - Render negative offsets correctly
This commit is contained in:
parent
0583d70c05
commit
9a0c4c6607
|
@ -9,6 +9,7 @@ from angr.knowledge_plugins.cfg import CFGNode, CFGModel
|
|||
from .engine import TypeTapperEngine
|
||||
from .knowledge import TypeTapperManager
|
||||
from .data import Atom, RegisterAtom, CodeLoc, OpSequence, ControlFlowActionPop, ControlFlowActionPush
|
||||
from . import procedures
|
||||
|
||||
l = logging.getLogger(__name__)
|
||||
|
||||
|
@ -32,9 +33,13 @@ class TypeTapperAnalysis(angr.Analysis):
|
|||
node: CFGNode
|
||||
for node in self._cfg.graph.nodes():
|
||||
block: Block = node.block
|
||||
if block is None:
|
||||
continue
|
||||
self._engine.handle_vex_block(block.vex)
|
||||
if block is not None:
|
||||
self._engine.handle_vex_block(block.vex)
|
||||
elif node.simprocedure_name is not None:
|
||||
proc_cls = getattr(procedures, node.simprocedure_name, None)
|
||||
if proc_cls is not None and issubclass(proc_cls, procedures.TypeTapperProcedure):
|
||||
proc = proc_cls(self._engine, self.project.factory.cc())
|
||||
proc.analyze(node.addr)
|
||||
|
||||
def _analyze_passive_flow(self):
|
||||
queue = OrderedDict()
|
||||
|
@ -48,15 +53,20 @@ class TypeTapperAnalysis(angr.Analysis):
|
|||
node = self._cfg.get_any_node(block_addr)
|
||||
fakeret_addr = next((pred.addr for pred, attrs in self._cfg.graph.pred[node].items() if attrs['jumpkind'] == 'Ijk_FakeRet'), None)
|
||||
for pred, attrs in self._cfg.graph.pred[node].items():
|
||||
if attrs['jumpkind'] == 'Ijk_FakeRet':
|
||||
continue
|
||||
pred_addr = pred.addr
|
||||
pred_blockinfo = self.manager.block_info[pred_addr]
|
||||
callsite_addr = fakeret_addr if attrs['jumpkind'] == 'Ijk_Ret' else pred_addr if attrs['jumpkind'] == 'Ijk_Call' else None
|
||||
callsite_addr = fakeret_addr if attrs['jumpkind'] == 'Ijk_Ret' else pred_addr if attrs['jumpkind'] in ('Ijk_Call', 'Ijk_FakeRet') else None
|
||||
|
||||
# TAKE IT BACK NOW Y'ALL
|
||||
for name in node_blockinfo.ready_inputs:
|
||||
input_atom = node_blockinfo.inputs[name]
|
||||
if attrs['jumpkind'] == 'Ijk_FakeRet':
|
||||
# determine which registers are clobbered; determine the cc
|
||||
func_addr: CFGNode = next((succ.addr for succ, attrs in self._cfg.graph.succ[pred].items() if attrs['jumpkind'] == 'Ijk_Call'), None)
|
||||
function = self.kb.functions[func_addr]
|
||||
if function.calling_convention is None or input_atom.slot_name in function.calling_convention.CALLER_SAVED_REGS:
|
||||
continue
|
||||
|
||||
output_atom = pred_blockinfo.outputs.get(input_atom.slot_name, None)
|
||||
if output_atom is not None:
|
||||
if output_atom.name == name:
|
||||
|
@ -71,8 +81,8 @@ class TypeTapperAnalysis(angr.Analysis):
|
|||
queue.move_to_end(pred_addr, last=False)
|
||||
self._passive_link(input_atom, pred_blockinfo.inputs[name], attrs['jumpkind'], callsite_addr)
|
||||
|
||||
# this is safe because if there is a loop edge, the atom will already be in the inputs so it won't be re-upped
|
||||
node_blockinfo.ready_inputs.clear()
|
||||
# this is safe because if there is a loop edge, the atom will already be in the inputs, so it won't be re-upped
|
||||
node_blockinfo.ready_inputs.clear()
|
||||
|
||||
def _passive_link(self, input_atom: Atom, output_atom: Atom, jumpkind: str, callsite: Optional[int]):
|
||||
if jumpkind == 'Ijk_Ret':
|
||||
|
|
|
@ -164,10 +164,20 @@ class Prop:
|
|||
self.struct_data[offset][size].subtract(v2)
|
||||
self.unifications.subtract(other.unifications)
|
||||
|
||||
def maximize(self, other: 'Prop'):
|
||||
for key, val in other.self_data.items():
|
||||
self.self_data[key] = max(self.self_data[key], val)
|
||||
for offset, v1 in other.struct_data.items():
|
||||
for size, v2 in v1.items():
|
||||
for kind, val in v2.items():
|
||||
self.struct_data[offset][size][kind] = max(self.struct_data[offset][size][kind], val)
|
||||
for key, val in other.unifications.items():
|
||||
self.unifications[key] = max(self.unifications[key], val)
|
||||
|
||||
def __or__(self, other: 'Prop'):
|
||||
result = Prop()
|
||||
result.update(self)
|
||||
result.update(other)
|
||||
result.maximize(self)
|
||||
result.maximize(other)
|
||||
return result
|
||||
|
||||
def transform(self, ops: OpSequence):
|
||||
|
@ -186,12 +196,12 @@ class Prop:
|
|||
items = list(result.struct_data.items())
|
||||
result.struct_data.clear()
|
||||
for k, v in items:
|
||||
result.struct_data[k + op.const] = v
|
||||
result.struct_data[k - op.const] = v # there is some JANK shit going on with this sign
|
||||
saved = result.self_data.get(DataKind.Pointer, None)
|
||||
result.self_data.clear()
|
||||
if saved:
|
||||
result.self_data[DataKind.Pointer] = saved
|
||||
result.unifications = Counter((x + op.const, y + op.const) for x, y in result.unifications)
|
||||
result.unifications = Counter((x - op.const, y - op.const) for x, y in result.unifications)
|
||||
elif isinstance(op, VarOffsetOp):
|
||||
saved = result.self_data.get(DataKind.Pointer, None)
|
||||
result = Prop()
|
||||
|
@ -203,6 +213,9 @@ class Prop:
|
|||
|
||||
@dataclass(frozen=True)
|
||||
class LiveData:
|
||||
"""
|
||||
The in-flight data representation for the analysis. All sizes are in bytes
|
||||
"""
|
||||
sources: List[Tuple[Atom, OpSequence]]
|
||||
const: Optional[int]
|
||||
size: int
|
||||
|
@ -242,6 +255,16 @@ class LiveData:
|
|||
else:
|
||||
graph.nodes[atom]['prop'] = tprop
|
||||
|
||||
def prop_self(self, kind: DataKind, graph: networkx.DiGraph):
|
||||
prop = Prop()
|
||||
prop.self_data[kind] += 1
|
||||
self.prop(prop, graph)
|
||||
|
||||
def prop_union(self, offset1: int, offset2: int, graph: networkx.DiGraph):
|
||||
prop = Prop()
|
||||
prop.unifications[(offset1, offset2)] += 1
|
||||
self.prop(prop, graph)
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class RegisterInputInfo:
|
||||
atom: RegisterAtom
|
||||
|
|
|
@ -18,23 +18,39 @@ class TypeTapperEngine(angr.engines.vex.VEXMixin):
|
|||
self.kp = kp
|
||||
self.last_imark: int = 0
|
||||
self.tmp_atoms = tmp_atoms
|
||||
self._force_addr: Optional[int] = None
|
||||
|
||||
tmps: List[Union[LiveData, TmpAtom]]
|
||||
|
||||
@property
|
||||
def codeloc(self):
|
||||
def codeloc(self) -> CodeLoc:
|
||||
if self._force_addr is not None:
|
||||
return CodeLoc(bbl_addr=self._force_addr, stmt_idx=self.stmt_idx, ins_addr=self._force_addr)
|
||||
|
||||
return CodeLoc(bbl_addr=self.irsb.addr, stmt_idx=self.stmt_idx, ins_addr=self.last_imark)
|
||||
|
||||
def force_addr(self, v: int):
|
||||
self._force_addr = v
|
||||
|
||||
@property
|
||||
def graph(self):
|
||||
return self.kp.graph
|
||||
|
||||
@property
|
||||
def blockinfo(self):
|
||||
if self._force_addr is not None:
|
||||
return self.kp.block_info[self._force_addr]
|
||||
return self.kp.block_info[self.irsb.addr]
|
||||
|
||||
def handle_vex_block(self, irsb):
|
||||
self._force_addr = None
|
||||
super().handle_vex_block(irsb)
|
||||
|
||||
def _handle_vex_const(self, const):
|
||||
atom = LiveData.new_const(const.value, get_type_size_bytes(const.type), self.codeloc)
|
||||
return self.const(const.value, get_type_size_bytes(const.type))
|
||||
|
||||
def const(self, val, size) -> LiveData:
|
||||
atom = LiveData.new_const(val, size, self.codeloc)
|
||||
self.blockinfo.atoms.append(atom.sources[0][0])
|
||||
return atom
|
||||
|
||||
|
@ -49,9 +65,14 @@ class TypeTapperEngine(angr.engines.vex.VEXMixin):
|
|||
if type(offset.const) is not int:
|
||||
return LiveData.new_null(size)
|
||||
name = self.project.arch.register_size_names[(offset.const, size)] # unsafe
|
||||
slot_info = self.project.arch.get_base_register(offset.const, size)
|
||||
return self.get(name, offset.const, size)
|
||||
|
||||
def get(self, name, offset=None, size=None):
|
||||
if size is None or offset is None:
|
||||
offset, size = self.project.arch.registers[name]
|
||||
slot_info = self.project.arch.get_base_register(offset, size)
|
||||
if slot_info is None:
|
||||
l.error("???????")
|
||||
l.error("??????? (%s, %s)", offset, size)
|
||||
return LiveData.new_null(size)
|
||||
slot_name = self.project.arch.register_size_names[slot_info]
|
||||
reg_atom = RegisterAtom(self.codeloc, size, name, slot_name)
|
||||
|
@ -72,11 +93,12 @@ class TypeTapperEngine(angr.engines.vex.VEXMixin):
|
|||
return LiveData.new_atom(reg_atom)
|
||||
|
||||
def _perform_vex_expr_Load(self, addr: LiveData, ty, endness, **kwargs):
|
||||
prop = Prop()
|
||||
prop.self_data[DataKind.Pointer] += 1
|
||||
addr.prop(prop, self.graph)
|
||||
|
||||
size = get_type_size_bytes(ty)
|
||||
return self.load(addr, size, endness)
|
||||
|
||||
def load(self, addr, size, endness):
|
||||
addr.prop_self(DataKind.Pointer, self.graph)
|
||||
|
||||
mem_atom = MemoryAtom(self.codeloc, size, endness)
|
||||
self.blockinfo.atoms.append(mem_atom)
|
||||
addr.appended(DerefOp(size), size).commit(mem_atom, self.graph)
|
||||
|
@ -90,16 +112,15 @@ class TypeTapperEngine(angr.engines.vex.VEXMixin):
|
|||
return ifTrue.unioned(ifFalse, ifTrue.size)
|
||||
|
||||
def _perform_vex_expr_Op(self, op, args: List[LiveData]):
|
||||
return self.op(op, args)
|
||||
|
||||
def op(self, op, args: List[LiveData]) -> LiveData:
|
||||
ret_ty, arg_tys = pyvex.expr.op_arg_types(op)
|
||||
for arg, ty in zip(args, arg_tys):
|
||||
if ty.startswith('Ity_F'):
|
||||
prop = Prop()
|
||||
prop.self_data[DataKind.Float] += 1
|
||||
arg.prop(prop, self.graph)
|
||||
arg.prop_self(DataKind.Float, self.graph)
|
||||
elif ty.startswith('Ity_I'):
|
||||
prop = Prop()
|
||||
prop.self_data[DataKind.Int] += 1
|
||||
arg.prop(prop, self.graph)
|
||||
arg.prop_self(DataKind.Int, self.graph)
|
||||
|
||||
size = get_type_size_bytes(ret_ty)
|
||||
if op in ('Iop_Add8', 'Iop_Add16', 'Iop_Add32', 'Iop_Add64'):
|
||||
|
@ -141,9 +162,15 @@ class TypeTapperEngine(angr.engines.vex.VEXMixin):
|
|||
if type(offset.const) is not int:
|
||||
return LiveData.new_null(data.size)
|
||||
name = self.project.arch.register_size_names[(offset.const, data.size)] # unsafe
|
||||
slot_info = self.project.arch.get_base_register(offset.const, data.size)
|
||||
return self.put(name, data, offset.const)
|
||||
|
||||
def put(self, name, data, offset=None):
|
||||
if offset is None:
|
||||
offset = self.project.arch.registers[name][0]
|
||||
|
||||
slot_info = self.project.arch.get_base_register(offset, data.size)
|
||||
if slot_info is None:
|
||||
l.error("???????")
|
||||
l.error("??????? (%s, %s)", offset, data.size)
|
||||
return LiveData.new_null(data.size)
|
||||
slot_name = self.project.arch.register_size_names[slot_info]
|
||||
reg_atom = RegisterAtom(self.codeloc, data.size, name, slot_name)
|
||||
|
@ -161,6 +188,9 @@ class TypeTapperEngine(angr.engines.vex.VEXMixin):
|
|||
self.tmps[tmp] = data
|
||||
|
||||
def _perform_vex_stmt_Store(self, addr: LiveData, data: LiveData, endness, **kwargs):
|
||||
self.store(addr, data, endness)
|
||||
|
||||
def store(self, addr, data, endness):
|
||||
prop = Prop()
|
||||
prop.self_data[DataKind.Pointer] += 1
|
||||
addr.prop(prop, self.graph)
|
||||
|
|
|
@ -92,8 +92,8 @@ class HierarchicalGraph(RelativeAtomGraph):
|
|||
else:
|
||||
parent.prop.subtract(prop)
|
||||
|
||||
def _add_node(self, relatom: RelativeAtom, path: OpSequence) -> bool:
|
||||
res = super()._add_node(relatom, path)
|
||||
def _add_node(self, relatom: RelativeAtom, path: OpSequence, has_gone_down: bool) -> bool:
|
||||
res = super()._add_node(relatom, path, has_gone_down)
|
||||
if res:
|
||||
self._atom_parents[relatom] = self._current_group
|
||||
self._current_group.children.add(relatom)
|
||||
|
|
|
@ -133,6 +133,11 @@ class HierarchicalGraphWidget(QZoomableDraggableGraphicsView):
|
|||
|
||||
self.nodes = {chart.nav: chart for chart in charts}
|
||||
|
||||
rect = self.scene().itemsBoundingRect()
|
||||
self.resetTransform()
|
||||
self.setSceneRect(rect)
|
||||
self.centerOn(rect.center())
|
||||
|
||||
def label(self, node: RelativeAtomOrGroup) -> str:
|
||||
if node not in self._labels:
|
||||
if isinstance(node, RelativeAtom):
|
||||
|
@ -199,7 +204,7 @@ class PropChart(QGraphicsItem):
|
|||
data.sort()
|
||||
cols_allocated = defaultdict(int)
|
||||
row_allocated = 0
|
||||
offset_allocated = 0
|
||||
offset_allocated = None
|
||||
marks = []
|
||||
ticks = []
|
||||
for offset, kind, size, count in data:
|
||||
|
@ -207,7 +212,7 @@ class PropChart(QGraphicsItem):
|
|||
for suboffset in range(offset, offset + size):
|
||||
cols_allocated[suboffset] = xpos + count
|
||||
|
||||
if offset >= offset_allocated:
|
||||
if offset_allocated is None or offset >= offset_allocated:
|
||||
ypos = row_allocated
|
||||
row_allocated += size
|
||||
offset_allocated = offset + size
|
||||
|
|
|
@ -0,0 +1,142 @@
|
|||
from typing import List, Optional
|
||||
|
||||
import networkx
|
||||
|
||||
from angr.calling_conventions import SimCC, SimFunctionArgument, SimRegArg, SimStackArg
|
||||
from angr.sim_type import SimType, parse_signature, SimTypeFunction, SimTypeBottom
|
||||
|
||||
from .data import BlockInfo, LiveData, CodeLoc, DataKind
|
||||
from .engine import TypeTapperEngine
|
||||
|
||||
class TypeTapperProcedure:
|
||||
PROTOTYPE: str = NotImplemented
|
||||
|
||||
def run(self, args: List[LiveData]) -> Optional[LiveData]:
|
||||
raise NotImplementedError
|
||||
|
||||
def __init__(self, engine: TypeTapperEngine, cc: SimCC):
|
||||
self._engine = engine
|
||||
self.cc = cc
|
||||
self.arg_session = None
|
||||
self._stackptr = None
|
||||
|
||||
@property
|
||||
def graph(self) -> networkx.DiGraph:
|
||||
return self._engine.graph
|
||||
|
||||
@property
|
||||
def stackptr(self) -> LiveData:
|
||||
if self._stackptr is None:
|
||||
self._stackptr = self.get(self._engine.project.arch.register_names[self._engine.project.arch.sp_offset])
|
||||
return self._stackptr
|
||||
|
||||
@stackptr.setter
|
||||
def stackptr(self, value):
|
||||
self._stackptr = value
|
||||
self.put(self._engine.project.arch.register_names[self._engine.project.arch.sp_offset], value)
|
||||
|
||||
def load(self, addr, size, endness=None):
|
||||
if endness is None:
|
||||
endness = self._engine.project.arch.memory_endness
|
||||
self._engine.stmt_idx += 1
|
||||
return self._engine.load(addr, size, endness)
|
||||
|
||||
def store(self, addr, data, endness=None):
|
||||
if endness is None:
|
||||
endness = self._engine.project.arch.memory_endness
|
||||
self._engine.stmt_idx += 1
|
||||
self._engine.store(addr, data, endness)
|
||||
|
||||
def get(self, reg):
|
||||
self._engine.stmt_idx += 1
|
||||
return self._engine.get(reg)
|
||||
|
||||
def put(self, reg, data):
|
||||
self._engine.stmt_idx += 1
|
||||
self._engine.put(reg, data)
|
||||
|
||||
def op(self, op, args: List[LiveData], append_size=True):
|
||||
if append_size:
|
||||
op = 'Iop_' + op + str(args[0].size * 8)
|
||||
return self._engine.op(op, args)
|
||||
|
||||
def const(self, val, size):
|
||||
self._engine.stmt_idx += 1
|
||||
return self._engine.const(val, size)
|
||||
|
||||
def _translate_reg_arg(self, loc: SimRegArg) -> str:
|
||||
slot_offset = self._engine.project.arch.registers[loc.reg_name][0]
|
||||
return self._engine.project.arch.register_size_names[(slot_offset + loc.reg_offset, loc.size)]
|
||||
|
||||
def _translate_stack_offset(self, loc: SimStackArg) -> LiveData:
|
||||
return self.op('Add', [self.stackptr, self.const(loc.stack_offset, self.stackptr.size)])
|
||||
|
||||
def _read_argloc(self, loc: SimFunctionArgument) -> LiveData:
|
||||
if isinstance(loc, SimRegArg):
|
||||
name = self._translate_reg_arg(loc)
|
||||
return self.get(name)
|
||||
elif isinstance(loc, SimStackArg):
|
||||
return self.load(self._translate_stack_offset(loc), loc.size, self._engine.project.arch.memory_endness)
|
||||
else:
|
||||
return LiveData.new_null(loc.size)
|
||||
|
||||
def _write_argloc(self, loc: SimFunctionArgument, value: LiveData):
|
||||
if isinstance(loc, SimRegArg):
|
||||
name = self._translate_reg_arg(loc)
|
||||
self.put(name, value)
|
||||
|
||||
def analyze(self, addr: int):
|
||||
self._engine.force_addr(addr)
|
||||
prototype = parse_signature(self.PROTOTYPE, arch=self._engine.project.arch)
|
||||
arg_locs = self.cc.arg_locs(prototype)
|
||||
args = [self._read_argloc(loc) for loc in arg_locs]
|
||||
result = self.run(args)
|
||||
if result is None and not isinstance(prototype.returnty, SimTypeBottom):
|
||||
result = LiveData.new_null(prototype.returnty.size // 8)
|
||||
if result is not None:
|
||||
self._write_argloc(self.cc.return_val(prototype.returnty), result)
|
||||
if self._engine.project.arch.call_sp_fix:
|
||||
self.stackptr = self.op('Add', [self.stackptr, self.const(-self._engine.project.arch.call_sp_fix, self.stackptr.size)])
|
||||
|
||||
|
||||
class strrchr(TypeTapperProcedure):
|
||||
PROTOTYPE: str = "char *strrchr(const char *s, int c);"
|
||||
def run(self, args: List[LiveData]) -> Optional[LiveData]:
|
||||
self.load(args[0], 1).prop_self(DataKind.Int, self.graph)
|
||||
args[0].prop_union(0, 1, self.graph)
|
||||
args[1].prop_self(DataKind.Int, self.graph)
|
||||
return None
|
||||
|
||||
class strchr(TypeTapperProcedure):
|
||||
PROTOTYPE: str = "char *strchr(const char *s, int c);"
|
||||
def run(self, args: List[LiveData]) -> Optional[LiveData]:
|
||||
self.load(args[0], 1).prop_self(DataKind.Int, self.graph)
|
||||
args[0].prop_union(0, 1, self.graph)
|
||||
args[1].prop_self(DataKind.Int, self.graph)
|
||||
return None
|
||||
|
||||
class strlen(TypeTapperProcedure):
|
||||
PROTOTYPE = "size_t strlen(const char *s);"
|
||||
def run(self, args: List[LiveData]) -> Optional[LiveData]:
|
||||
self.load(args[0], 1).prop_self(DataKind.Int, self.graph)
|
||||
args[0].prop_union(0, 1, self.graph)
|
||||
return None
|
||||
|
||||
class strcmp(TypeTapperProcedure):
|
||||
PROTOTYPE = "size_t strcmp(const char *s1, const char *s2);"
|
||||
def run(self, args: List[LiveData]) -> Optional[LiveData]:
|
||||
self.load(args[0], 1).prop_self(DataKind.Int, self.graph)
|
||||
args[0].prop_union(0, 1, self.graph)
|
||||
self.load(args[1], 1).prop_self(DataKind.Int, self.graph)
|
||||
args[1].prop_union(0, 1, self.graph)
|
||||
return None
|
||||
|
||||
class strncmp(TypeTapperProcedure):
|
||||
PROTOTYPE = "size_t strncmp(const char *s1, const char *s2, size_t n);"
|
||||
def run(self, args: List[LiveData]) -> Optional[LiveData]:
|
||||
self.load(args[0], 1).prop_self(DataKind.Int, self.graph)
|
||||
args[0].prop_union(0, 1, self.graph)
|
||||
self.load(args[1], 1).prop_self(DataKind.Int, self.graph)
|
||||
args[1].prop_union(0, 1, self.graph)
|
||||
args[2].prop_self(DataKind.Int, self.graph)
|
||||
return None
|
|
@ -4,7 +4,7 @@ import logging
|
|||
|
||||
import networkx
|
||||
|
||||
from .data import Atom, Prop, OpSequence, ControlFlowActionPop, ControlFlowActionPush, ControlFlowAction
|
||||
from .data import Atom, Prop, OpSequence, ControlFlowActionPop, ControlFlowActionPush, ControlFlowAction, RefOp, DerefOp
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from .knowledge import TypeTapperManager
|
||||
|
@ -21,9 +21,11 @@ class RelativeAtom:
|
|||
class RelativeAtomAttrs:
|
||||
path: OpSequence
|
||||
prop: Prop
|
||||
has_gone_down: bool = False
|
||||
|
||||
def merge(self, other: 'RelativeAtomAttrs'):
|
||||
self.prop.update(other.prop)
|
||||
self.prop.maximize(other.prop)
|
||||
# TODO has_gone_down
|
||||
if self.path != other.path:
|
||||
# TODO unifications
|
||||
pass
|
||||
|
@ -41,10 +43,10 @@ class RelativeAtomGraph:
|
|||
|
||||
for atom in baseline:
|
||||
relative = RelativeAtom(atom=atom, callstack=(), rcallstack=())
|
||||
self._add_node(relative, OpSequence())
|
||||
self._add_node(relative, OpSequence(), False)
|
||||
self.frontier.add(relative) # TODO ???
|
||||
|
||||
def _add_node(self, relatom: RelativeAtom, path: OpSequence) -> bool:
|
||||
def _add_node(self, relatom: RelativeAtom, path: OpSequence, has_gone_down: bool) -> bool:
|
||||
"""
|
||||
If relatom is not present in the graph, add it.
|
||||
If it is present in the graph, merge the new information into its attrs
|
||||
|
@ -52,6 +54,7 @@ class RelativeAtomGraph:
|
|||
newattrs = RelativeAtomAttrs(
|
||||
prop=self.kp.graph.nodes[relatom.atom].get('prop', Prop()).transform(path.invert()),
|
||||
path=path,
|
||||
has_gone_down=has_gone_down,
|
||||
)
|
||||
if relatom in self.__graph.nodes:
|
||||
return self.__graph.nodes[relatom]['attr'].merge(newattrs)
|
||||
|
@ -106,17 +109,23 @@ class RelativeAtomGraph:
|
|||
edge_ops: OpSequence,
|
||||
is_pred: bool,
|
||||
) -> Optional[RelativeAtom]:
|
||||
callstack, rcallstack = self._update_callstack(relatom.callstack, relatom.rcallstack, edge_cf, is_pred)
|
||||
if callstack is None:
|
||||
goes_down = any(isinstance(op, DerefOp) for op in edge_ops.ops)
|
||||
goes_up = any(isinstance(op, DerefOp) for op in edge_ops.ops)
|
||||
if is_pred:
|
||||
goes_down, goes_up = goes_up, goes_down
|
||||
if attrs.has_gone_down and goes_up:
|
||||
return None
|
||||
|
||||
if is_pred:
|
||||
path = edge_ops.invert() + attrs.path
|
||||
else:
|
||||
path = attrs.path + edge_ops
|
||||
weh = self._update_callstack(relatom.callstack, relatom.rcallstack, edge_cf, is_pred)
|
||||
if weh is None:
|
||||
return None
|
||||
callstack, rcallstack = weh
|
||||
|
||||
path = attrs.path
|
||||
path += edge_ops.invert() if is_pred else edge_ops
|
||||
|
||||
relsucc = RelativeAtom(atom=succ, callstack=callstack, rcallstack=rcallstack)
|
||||
res = self._add_node(relsucc, path)
|
||||
res = self._add_node(relsucc, path, attrs.has_gone_down or goes_down)
|
||||
if is_pred:
|
||||
if not self.__graph.has_edge(relsucc, relatom):
|
||||
self._add_edge(relsucc, relatom)
|
||||
|
|
Loading…
Reference in New Issue