# -*- coding: utf-8 -*-
"""
equip.analysis.flow
~~~~~~~~~~~~~~~~~~~
Extract the control flow graphs from the bytecode.
:copyright: (c) 2014 by Romain Gaucher (@rgaucher)
:license: Apache 2, see LICENSE for more details.
"""
import opcode
from operator import itemgetter, attrgetter
from itertools import tee, izip
from .graph import DiGraph, Edge, Node, Walker, EdgeVisitor
from .graph.dominators import DominatorTree
from .block import BasicBlock
from ..utils.log import logger
from ..bytecode.utils import show_bytecode
BREAK_LOOP = 80
RETURN_VALUE = 83
FOR_ITER = 93
JUMP_FORWARD = 110
JUMP_IF_FALSE_OR_POP = 111
JUMP_IF_TRUE_OR_POP = 112
JUMP_ABSOLUTE = 113
POP_JUMP_IF_FALSE = 114
POP_JUMP_IF_TRUE = 115
JUMP_OPCODES = opcode.hasjabs + opcode.hasjrel
SETUP_LOOP = 120
SETUP_EXCEPT = 121
SETUP_FINALLY = 122
RAISE_VARARGS = 130
SETUP_WITH = 143
NO_FALL_THROUGH = (JUMP_ABSOLUTE, JUMP_FORWARD)
[docs]class ControlFlow(object):
"""
Performs the control-flow analysis on a ``Declaration`` object. It iterates
over its bytecode and builds the basic block. The final representation
leverages the ``DiGraph`` structure, and contains an instance of the
``DominatorTree``.
"""
E_TRUE = 'TRUE'
E_FALSE = 'FALSE'
E_UNCOND = 'UNCOND'
E_COND = 'COND'
E_EXCEPT = 'EXCEPT'
E_FINALLY = 'FINALLY'
E_RETURN = 'RETURN'
E_RAISE = 'RAISE'
E_END_LOOP = 'END_LOOP'
N_ENTRY = 'ENTRY'
N_IMPLICIT_RETURN = 'IMPLICIT_RETURN'
N_UNKNOWN = 'UNKNOWN'
N_LOOP = 'LOOP'
N_IF = 'IF'
N_EXCEPT = 'EXCEPT'
N_CONDITION = 'CONDITION'
CFG_TMP_RETURN = -1
CFG_TMP_BREAK = -2
CFG_TMP_RAISE = -3
def __init__(self, decl):
self._decl = decl
self._blocks = None
self._block_idx_map = {}
self._block_nodes = {}
self._frames = None
self._graph = None
self._entry = None
self._exit = None
self._entry_node = None
self._exit_node = None
self._dom = None
self.analyze()
@property
def decl(self):
return self._decl
@decl.setter
def decl(self, value):
self._decl = value
@property
def entry(self):
return self._entry
@entry.setter
def entry(self, value):
self._entry = value
@property
def entry_node(self):
return self._entry_node
@entry_node.setter
def entry_node(self, value):
self._entry_node = value
@property
def exit(self):
return self._exit
@exit.setter
def exit(self, value):
self._exit = value
@property
def exit_node(self):
return self._exit_node
@exit_node.setter
def exit_node(self, value):
self._exit_node = value
@property
def blocks(self):
"""
Returns the basic blocks created during the control flow analysis.
"""
return self._blocks
@property
def block_indices_dict(self):
"""
Returns the mapping of a bytecode indices and a basic blocks.
"""
return self._block_idx_map
@property
def block_nodes_dict(self):
"""
Returns the mapping of a basic bocks and CFG nodes.
"""
return self._block_nodes
@property
def frames(self):
return self._frames
@property
def graph(self):
"""
Returns the underlying graph that holds the CFG.
"""
return self._graph
@property
def dominators(self):
"""
Returns the ``DominatorTree`` that contains:
- Dominator tree (dict of IDom)
- Post dominator tree (doc of PIDom)
- Dominance frontier (dict of CFG node -> set CFG nodes)
"""
if self._dom is None:
self._dom = DominatorTree(self)
return self._dom
[docs] def analyze(self):
"""
Performs the CFA and stores the resulting CFG.
"""
bytecode = self.decl.bytecode
self.entry = BasicBlock(BasicBlock.ENTRY, self.decl, -1)
self.exit = BasicBlock(BasicBlock.IMPLICIT_RETURN, self.decl, -1)
self._blocks = ControlFlow.make_blocks(self.decl, bytecode)
self.__build_flowgraph(bytecode)
# logger.debug("CFG(%s) :=\n%s", self.decl, self.graph.to_dot())
def __build_flowgraph(self, bytecode):
g = DiGraph(multiple_edges=False)
self.entry_node = g.make_add_node(kind=ControlFlow.N_ENTRY, data=self._entry)
self.exit_node = g.make_add_node(kind=ControlFlow.N_IMPLICIT_RETURN, data=self._exit)
self._block_idx_map = {}
self._block_nodes = {}
# Connect entry/implicit return blocks
last_block_index, last_block = -1, None
for block in self.blocks:
self._block_idx_map[block.index] = block
node_kind = ControlFlow.get_kind_from_block(block)
block_node = g.make_add_node(kind=node_kind, data=block)
self._block_nodes[block] = block_node
if block.index == 0:
g.make_add_edge(self.entry_node,
self._block_nodes[block],
kind=ControlFlow.E_UNCOND)
if block.index >= last_block_index:
last_block = block
last_block_index = block.index
g.make_add_edge(self._block_nodes[last_block],
self.exit_node,
kind=ControlFlow.E_UNCOND)
sorted_blocks = sorted(self.blocks, key=attrgetter('_index'))
i, length = 0, len(sorted_blocks)
while i < length:
cur_block = sorted_blocks[i]
if cur_block.jumps:
# Connect the current block to its jump targets
for (jump_index, branch_kind) in cur_block.jumps:
if jump_index <= ControlFlow.CFG_TMP_RETURN:
continue
target_block = self._block_idx_map[jump_index]
g.make_add_edge(self._block_nodes[cur_block],
self._block_nodes[target_block],
kind=branch_kind)
i += 1
self._graph = g
self.__finalize()
def __finalize(self):
def has_true_false_branches(list_edges):
has_true, has_false = False, False
for edge in list_edges:
if edge.kind == ControlFlow.E_TRUE: has_true = True
elif edge.kind == ControlFlow.E_FALSE: has_false = True
return has_true and has_false
def get_cfg_tmp_values(node):
values = set()
for (jump_index, branch_kind) in node.data.jumps:
if jump_index <= ControlFlow.CFG_TMP_RETURN:
values.add(jump_index)
return values
def get_parent_loop(node):
class BwdEdges(EdgeVisitor):
def __init__(self):
EdgeVisitor.__init__(self)
self.edges = []
def visit(self, edge):
self.edges.append(edge)
visitor = BwdEdges()
walker = Walker(self.graph, visitor, backwards=True)
walker.traverse(node)
parents = visitor.edges
node_bc_index = node.data.index
for parent_edge in parents:
parent = parent_edge.source
if parent.kind != ControlFlow.N_LOOP:
continue
# Find the loop in which the break/current node is nested in
if parent.data.index < node_bc_index and parent.data.end_target > node_bc_index:
return parent
return None
# Burn N_CONDITION nodes
for node in self.graph.nodes:
out_edges = self.graph.out_edges(node)
if len(out_edges) < 2 or not has_true_false_branches(out_edges):
continue
node.kind = ControlFlow.N_CONDITION
# Handle return/break statements:
# - blocks with returns are simply connected to the IMPLICIT_RETURN
# and previous out edges removed
# - blocks with breaks are connected to the end of the current loop
# and previous out edges removed
for node in self.graph.nodes:
cfg_tmp_values = get_cfg_tmp_values(node)
if not cfg_tmp_values:
continue
if ControlFlow.CFG_TMP_BREAK in cfg_tmp_values:
parent_loop = get_parent_loop(node)
if not parent_loop:
logger.error("Cannot find parent loop for %s", node)
continue
target_block = self._block_idx_map[parent_loop.data.end_target]
out_edges = self.graph.out_edges(node)
for edge in out_edges:
self.graph.remove_edge(edge)
self.graph.make_add_edge(node,
self.block_nodes_dict[target_block],
kind=ControlFlow.E_UNCOND)
if ControlFlow.CFG_TMP_RETURN in cfg_tmp_values:
# Remove existing out edges and add a RETURN edge to the IMPLICIT_RETURN
out_edges = self.graph.out_edges(node)
for edge in out_edges:
self.graph.remove_edge(edge)
self.graph.make_add_edge(node,
self._exit_node,
kind=ControlFlow.E_RETURN)
BLOCK_NODE_KIND = {
BasicBlock.UNKNOWN: N_UNKNOWN,
BasicBlock.ENTRY: N_ENTRY,
BasicBlock.IMPLICIT_RETURN: N_IMPLICIT_RETURN,
BasicBlock.LOOP: N_LOOP,
BasicBlock.IF: N_IF,
BasicBlock.EXCEPT: N_EXCEPT,
}
@staticmethod
[docs] def get_kind_from_block(block):
return ControlFlow.BLOCK_NODE_KIND[block.kind]
@staticmethod
[docs] def get_pairs(iterable):
a, b = tee(iterable)
next(b, None)
return izip(a, b)
@staticmethod
[docs] def make_blocks(decl, bytecode):
"""
Returns the set of ``BasicBlock`` that are encountered in the current bytecode.
Each block is annotated with its qualified jump targets (if any).
:param decl: The current declaration object.
:param bytecode: The bytecode associated with the declaration object.
"""
blocks = set()
block_map = {} # bytecode index -> block
i, length = 0, len(bytecode)
start_index = [j for j in range(length) if bytecode[j][0] == 0][0]
prev_co = bytecode[start_index][5]
slice_bytecode = [tpl for tpl in bytecode[start_index:] if tpl[5] == prev_co]
# logger.debug("Current bytecode:\n%s", show_bytecode(slice_bytecode))
slice_length = len(slice_bytecode)
known_targets = ControlFlow.find_targets(slice_bytecode)
known_targets.add(0)
known_targets.add(1 + max([tpl[0] for tpl in slice_bytecode]))
known_targets = list(known_targets)
known_targets.sort()
# logger.debug("Targets: %s", [d for d in ControlFlow.get_pairs(known_targets)])
slice_bytecode_indexed = {}
idx = 0
for l in slice_bytecode:
index = l[0]
slice_bytecode_indexed[index] = (l, idx)
idx += 1
for start_index, end_index in ControlFlow.get_pairs(known_targets):
index, lineno, op, arg, cflow_in, code_object = slice_bytecode_indexed[start_index][0]
block_kind = ControlFlow.block_kind_from_op(op)
cur_block = BasicBlock(block_kind, decl, start_index)
cur_block.length = end_index - start_index - 1
i = slice_bytecode_indexed[start_index][1]
try:
length = slice_bytecode_indexed[end_index][1]
if length >= slice_length:
length = slice_length
except:
length = slice_length
while i < length:
index, lineno, op, arg, cflow_in, code_object = slice_bytecode[i]
if op in JUMP_OPCODES:
jump_address = arg
if op in opcode.hasjrel:
jump_address = arg + index + 3
if op in (SETUP_FINALLY, SETUP_EXCEPT, SETUP_WITH):
kind = ControlFlow.E_UNCOND
if op == SETUP_FINALLY: kind = ControlFlow.E_FINALLY
if op in (SETUP_EXCEPT, SETUP_WITH): kind = ControlFlow.E_EXCEPT
cur_block.end_target = jump_address
cur_block.add_jump(jump_address, kind)
elif op in (JUMP_ABSOLUTE, JUMP_FORWARD):
cur_block.add_jump(jump_address, ControlFlow.E_UNCOND)
elif op in (POP_JUMP_IF_FALSE, JUMP_IF_FALSE_OR_POP, FOR_ITER):
cur_block.add_jump(jump_address, ControlFlow.E_FALSE)
elif op in (POP_JUMP_IF_TRUE, JUMP_IF_TRUE_OR_POP):
cur_block.add_jump(jump_address, ControlFlow.E_TRUE)
elif op == SETUP_LOOP:
cur_block.kind = BasicBlock.LOOP
cur_block.end_target = jump_address
elif op == RETURN_VALUE:
cur_block.has_return_path = True
cur_block.add_jump(ControlFlow.CFG_TMP_RETURN, ControlFlow.E_RETURN)
elif op == BREAK_LOOP:
cur_block.has_return_path = True
cur_block.add_jump(ControlFlow.CFG_TMP_BREAK, ControlFlow.E_UNCOND)
elif op == RAISE_VARARGS:
cur_block.has_return_path = False
cur_block.add_jump(ControlFlow.CFG_TMP_RAISE, ControlFlow.E_UNCOND)
i += 1
# If the last block is not a NO_FALL_THROUGH, we connect the fall through
if not cur_block.has_return_path and op not in NO_FALL_THROUGH and i < slice_length:
kind = ControlFlow.E_UNCOND
if op in (POP_JUMP_IF_FALSE, JUMP_IF_FALSE_OR_POP, FOR_ITER):
kind = ControlFlow.E_TRUE
if op in (POP_JUMP_IF_TRUE, JUMP_IF_TRUE_OR_POP):
kind = ControlFlow.E_FALSE
cur_block.fallthrough = True
fallthrough_address = slice_bytecode[i][0]
cur_block.add_jump(fallthrough_address, kind)
else:
cur_block.fallthrough = False
block_map[start_index] = cur_block
blocks.add(cur_block)
return blocks
@staticmethod
[docs] def block_kind_from_op(op):
if op in (FOR_ITER,):
return BasicBlock.LOOP
# Cannot make the decision at this point, need to await the finalization
# of the CFG
return BasicBlock.UNKNOWN
@staticmethod
[docs] def find_targets(bytecode):
targets = set()
i, length = 0, len(bytecode)
while i < length:
index, lineno, op, arg, cflow_in, code_object = bytecode[i]
if op in JUMP_OPCODES:
jump_address = arg
if op in opcode.hasjrel:
jump_address = arg + index + 3
targets.add(jump_address)
if op not in NO_FALL_THROUGH:
targets.add(bytecode[i + 1][0])
i += 1
return targets