Python源码示例:idaapi.FlowChart()
示例1
def find_all_ioctls():
"""
From the currently selected address attempts to traverse all blocks inside the current function to find all immediate values which
are used for a comparison/sub immediately before a jz. Returns a list of address, second operand pairs.
"""
ioctls = []
# Find the currently selected function and get a list of all of it's basic blocks
addr = idc.ScreenEA()
f = idaapi.get_func(addr)
fc = idaapi.FlowChart(f, flags=idaapi.FC_PREDS)
for block in fc:
# grab the last two instructions in the block
last_inst = idc.PrevHead(block.endEA)
penultimate_inst = idc.PrevHead(last_inst)
# If the penultimate instruction is cmp or sub against an immediate value immediately preceding a 'jz'
# then it's a decent guess that it's an IOCTL code (if this is a dispatch function)
if idc.GetMnem(penultimate_inst) in ['cmp', 'sub'] and idc.GetOpType(penultimate_inst, 1) == 5:
if idc.GetMnem(last_inst) == 'jz':
value = get_operand_value(penultimate_inst)
ioctls.append((penultimate_inst, value))
ioctl_tracker.add_ioctl(penultimate_inst, value)
return ioctls
示例2
def enum_function_addrs(fva):
'''
yield the effective addresses of each instruction in the given function.
these addresses are not guaranteed to be in any order.
Args:
fva (int): the starting address of a function
Returns:
sequence[int]: the addresses of each instruction
'''
f = idaapi.get_func(fva)
if not f:
raise ValueError('not a function')
for block in idaapi.FlowChart(f):
ea = block.startEA
while ea <= block.endEA:
yield ea
ea = idc.NextHead(ea)
示例3
def _onFuncButtonClicked(self):
if not self.cc.PatternGenerator.graph.graph:
print("WARNING: Unloaded CFG. Make sure to first \"Load the CFG\"")
return
ea = idaapi.get_screen_ea()
if ea:
func = idaapi.ida_funcs.get_func(ea)
if func:
if self.cc.PatternGenerator.rootNode is None:
print("[I] Adding root node as function entrypoint: %x", func.start_ea)
self.cc.PatternGenerator.setRootNode(func.start_ea)
print("[I] Adding nodes to cover whole function")
flowchart = idaapi.FlowChart(func)
for bb in flowchart:
last_inst_addr = idc.prev_head(bb.end_ea)
self.cc.PatternGenerator.addTargetNode(last_inst_addr)
self._render_if_real_time()
示例4
def get_nx_graph(ea, ignore_external=False):
"""Convert an IDA flowchart to a NetworkX graph."""
nx_graph = networkx.DiGraph()
func = idaapi.get_func(ea)
flowchart = FlowChart(func, ignore_external=ignore_external)
for block in flowchart:
# Make sure all nodes are added (including edge-less nodes)
nx_graph.add_node(block.start_ea)
for pred in block.preds():
nx_graph.add_edge(pred.start_ea, block.start_ea)
for succ in block.succs():
nx_graph.add_edge(block.start_ea, succ.start_ea)
return nx_graph
示例5
def codeblocks(start=None, end=None, full=True):
"""Get all `CodeBlock`s in a given range.
Args:
start - start address of the range. If `None` uses IDB start.
end - end address of the range. If `None` uses IDB end.
full - `True` is required to change node info (e.g. color). `False` causes faster iteration.
"""
if full:
for function in functions(start, end):
fc = FlowChart(f=function.func_t)
for block in fc:
yield block
else:
start, end = fix_addresses(start, end)
for code_block in FlowChart(bounds=(start, end)):
yield code_block
示例6
def __init__(self, fun_addr):
super(MyFlowGraph, self).__init__()
self.fun = idaapi.get_func(fun_addr)
self.startEA = self.fun.startEA
self.endEA = self.fun.endEA
for bb in idaapi.FlowChart(self.fun):
self.__setitem__(bb.id, MyBasicBlock(bb))
self._compute_links()
self.edge_map = self.make_graph()
self.shortest_path_map = self.dijkstra(self.edge_map)
self.size = sum([x.size() for x in self.values()])
self.viewer = MyFlowGraphViewer(self, "Extract(%s)" % idc.GetFunctionName(self.startEA))
示例7
def blocksAt(self, func_ctx):
"""Return a collection of basic blocks at the given function.
Args:
func_ctx (func): function instance (differs between implementations)
Return Value:
A collection of basic block instances
"""
return idaapi.FlowChart(func_ctx.func_t)
# Overridden base function
示例8
def searchIslands(self, func_ea, range_start, range_end):
"""Search a given function for "Islands" from a specific code range.
Args:
func_ea (int): effective address of the wanted function
range_start (int): effective address of the start of the island range
range_end (int): effective address of the end of the island range
Return Value:
Ordered list of code blocks for the found island, or None if found nothing
"""
island_guess = None
func = sark.Function(func_ea)
flow = idaapi.FlowChart(func.func_t)
for block in flow:
if range_start <= block.start_ea and block.end_ea <= range_end:
if island_guess is None or block.start_ea < island_guess.start_ea:
island_guess = block
# quit if found nothing
if island_guess is None:
return None
# make sure that the island is indeed an island, and not a well known function
if sark.Function(island_guess.start_ea).start_ea == island_guess.start_ea:
return None
# find the contained flow, that island_guess is the start of
island_blocks = []
candidate_list = [island_guess]
while len(candidate_list) != 0:
new_candidate_list = []
for candidate_block in candidate_list:
if candidate_block in island_blocks:
continue
island_blocks.append(candidate_block)
new_candidate_list += list(filter(lambda succs: range_start <= succs.start_ea and succs.end_ea <= range_end, candidate_block.succs()))
candidate_list = new_candidate_list
# return the results
return island_blocks
示例9
def get_basic_blocks(fva):
'''
return sequence of `BasicBlock` instances for given function.
'''
ret = []
func = idaapi.get_func(fva)
if func is None:
return ret
for bb in idaapi.FlowChart(func):
ret.append(BasicBlock(va=bb.start_ea,
size=bb.end_ea - bb.start_ea))
return ret
示例10
def get_basic_blocks(fva):
'''
return sequence of `BasicBlock` instances for given function.
'''
ret = []
func = idaapi.get_func(fva)
if func is None:
return ret
for bb in idaapi.FlowChart(func):
ret.append(BasicBlock(va=bb.start_ea,
size=bb.end_ea - bb.start_ea))
return ret
示例11
def __init__(self, f=None, bounds=None, flags=idaapi.FC_PREDS, ignore_external=False):
if f is None and bounds is None:
f = idaapi.get_screen_ea()
if f is not None:
f = get_func(f)
if ignore_external:
flags |= idaapi.FC_NOEXT
super(FlowChart, self).__init__(f=f, bounds=bounds, flags=flags)
示例12
def get_flowchart(ea=None):
if ea is None:
ea = idaapi.get_screen_ea()
func = idaapi.get_func(ea)
flowchart_ = FlowChart(func)
return flowchart_
示例13
def _create_flow(function, bounds):
"""Create a FlowChart."""
f, b = None, None
if function is not None:
f = idaapi.get_func(function)
if f is None:
_log(0, 'Bad func {:#x}', func)
return None
if bounds is not None:
b = (start, end)
return idaapi.FlowChart(f=f, bounds=b)
示例14
def get_basic_blocks(fva):
"""
return sequence of `BasicBlock` instances for given function.
"""
ret = []
func = ida_funcs.get_func(fva)
if func is None:
return ret
for bb in idaapi.FlowChart(func):
ret.append(BasicBlock(va=bb.startEA, size=bb.endEA - bb.startEA))
return ret
示例15
def bottom(func):
'''Return the exit-points of the function `func`.'''
fn = by(func)
fc = idaapi.FlowChart(f=fn, flags=idaapi.FC_PREDS)
exit_types = (
interface.fc_block_type_t.fcb_ret,
interface.fc_block_type_t.fcb_cndret,
interface.fc_block_type_t.fcb_noret,
interface.fc_block_type_t.fcb_enoret,
interface.fc_block_type_t.fcb_error
)
return tuple(database.address.prev(interface.range.end(item)) for item in fc if item.type in exit_types)
示例16
def iterate(cls, func):
'''Returns each ``idaapi.BasicBlock`` for the function `func`.'''
fn = by(func)
fc = idaapi.FlowChart(f=fn, flags=idaapi.FC_PREDS)
for bb in fc:
yield bb
return
示例17
def flowchart(cls):
'''Return an ``idaapi.FlowChart`` object for the current function.'''
return cls.flowchart(ui.current.function())
示例18
def flowchart(cls, func):
'''Return an ``idaapi.FlowChart`` object for the function `func`.'''
fn = by(func)
return idaapi.FlowChart(f=fn, flags=idaapi.FC_PREDS)
示例19
def getBlocks(self, function_offset):
blocks = []
function_chart = ida_gdl.FlowChart(ida_funcs.get_func(function_offset))
for block in function_chart:
extracted_block = []
for instruction in idautils.Heads(block.start_ea, block.end_ea):
if ida_bytes.is_code(ida_bytes.get_flags(instruction)):
extracted_block.append(instruction)
if extracted_block:
blocks.append(extracted_block)
return sorted(blocks)
示例20
def getBlocks(self, function_offset):
blocks = []
function_chart = idaapi.FlowChart(idaapi.get_func(function_offset))
for block in function_chart:
extracted_block = []
for instruction in idautils.Heads(block.startEA, block.endEA):
if idc.isCode(idc.GetFlags(instruction)):
extracted_block.append(instruction)
if extracted_block:
blocks.append(extracted_block)
return sorted(blocks)
示例21
def get_bb_ends(address):
"""
Get end addresses of all bbs in function containing address.
:param address: address in function
:return: list of bb end addresses
"""
function = idaapi.get_func(address)
flowchart = idaapi.FlowChart(function)
return [idc.prev_head(bb.end_ea) for bb in flowchart]
示例22
def calc_fn_machoc(self, fva, fname): # based on Machoc hash implementation (https://github.com/0x00ach/idadiff)
func = idaapi.get_func(fva)
if type(func) == type(None):
self.debug('{}: ignored due to lack of function object'.format(fname))
return None, None
flow = idaapi.FlowChart(f=func)
cur_hash_rev = ""
addrIds = []
cur_id = 1
for c in range(0,flow.size):
cur_basic = flow.__getitem__(c)
cur_hash_rev += shex(cur_basic.start_ea)+":"
addrIds.append((shex(cur_basic.start_ea),str(cur_id)))
cur_id += 1
addr = cur_basic.start_ea
blockEnd = cur_basic.end_ea
mnem = GetMnem(addr)
while mnem != "":
if mnem == "call": # should be separated into 2 blocks by call
cur_hash_rev += "c,"
addr = NextHead(addr,blockEnd)
mnem = GetMnem(addr)
if addr != BADADDR:
cur_hash_rev += shex(addr)+";"+shex(addr)+":"
addrIds.append((shex(addr),str(cur_id)))
cur_id += 1
else:
addr = NextHead(addr,blockEnd)
mnem = GetMnem(addr)
refs = []
for suc in cur_basic.succs():
refs.append(suc.start_ea)
refs.sort()
refsrev = ""
for ref in refs:
refsrev += shex(ref)+","
if refsrev != "":
refsrev = refsrev[:-1]
cur_hash_rev += refsrev+";"
# change addr to index
for aid in addrIds:
#cur_hash_rev = string.replace(cur_hash_rev,aid[0],aid[1])
cur_hash_rev = cur_hash_rev.replace(aid[0],aid[1])
# calculate machoc hash value
self.debug('{}: CFG = {}'.format(fname, cur_hash_rev))
return mmh3.hash(cur_hash_rev) & 0xFFFFFFFF, cur_id-1
示例23
def get_source_strings(min_len = 4, strtypes = [0, 1]):
strings = get_strings(strtypes)
# Search string references to source files
src_langs = Counter()
total_files = 0
d = {}
for s in strings:
if s and s.length > min_len:
ret = re.findall(SOURCE_FILES_REGEXP, str(s), re.IGNORECASE)
if ret and len(ret) > 0:
refs = list(DataRefsTo(s.ea))
if len(refs) > 0:
total_files += 1
full_path = ret[0][0]
d, src_langs = add_source_file_to(d, src_langs, refs, full_path, s)
# Use the loaded debugging information (if any) to find source files
for f in list(Functions()):
done = False
func = idaapi.get_func(f)
if func is not None:
cfg = idaapi.FlowChart(func)
for block in cfg:
if done:
break
for head in list(Heads(block.start_ea, block.end_ea)):
full_path = get_sourcefile(head)
if full_path is not None:
total_files += 1
d, src_langs = add_source_file_to(d, src_langs, [head], full_path, "Symbol: %s" % full_path)
nltk_preprocess(strings)
if len(d) > 0 and total_files > 0:
print("Programming languages found:\n")
for key in src_langs:
print(" %s %f%%" % (key.ljust(10), src_langs[key] * 100. / total_files))
print("\n")
return d, strings
#-------------------------------------------------------------------------------
示例24
def pointer_accesses(function=None, bounds=None, initialization=None, accesses=None):
"""Collect the set of accesses to a pointer register.
In the flow graph defined by the specified function or code region, find all accesses to the
memory region pointed to initially by the given register.
Options:
function: The address of the function to analyze. Any address within the function may be
specified. Default is None.
bounds: A (start, end) tuple containing the start and end addresses of the code region to
analyze. Default is None.
initialization: A dictionary of dictionaries, specifying for each instruction start
address, which registers have which offsets into the memory region of interest. More
precisely: The keys of initialization are the linear addresses of those instructions
for which we know that some register points into the memory region of interest. For
each such instruction, initialization[address] is a map whose keys are the register
numbers of the registers that point into the memory region. Finally,
initialization[address][register] is the delta between the start of the memory region
and where the register points (positive values indicate the register points to a higher
address than the start). This option must be supplied.
accesses: If not None, then the given dictionary will be populated with the accesses,
rather than creating and returning a new dictionary. This dictionary must be of type
collections.defaultdict(set). Default is None.
Returns:
If accesses is None (the default), returns a dictionary mapping each (offset, size) tuple
to the set of (address, delta) tuples that performed that access.
Notes:
Either a function or a code region must be specified. You cannot supply both.
A common use case is analyzing a function for which we know that one register on entry
points to a structure. For example, say that the function at address 0x4000 takes as an
argument in register 10 a pointer 144 bytes in to an unknown structure. The appropriate
initialization dictionary would be:
{ 0x4000: { 10: 144 } }
"""
# Create the FlowChart.
flow = _create_flow(function, bounds)
if flow is None:
return None
# Get the set of (offset, size) accesses by running a data flow.
create = accesses is None
if create:
accesses = collections.defaultdict(set)
_pointer_accesses_data_flow(flow, initialization, accesses)
if create:
accesses = dict(accesses)
return accesses
示例25
def upload(self,ctx):
start = time.time()
func_count = 0
bb_count = 0
call_count = 0
target = idaapi.get_root_filename()
hash = idc.GetInputMD5()
tx = self.neo.cypher.begin()
insert_binary = "MERGE (n:Binary {name:{N},hash:{H}}) RETURN n"
insert_func = "MERGE (n:Function {name:{N},start:{S},flags:{F}}) RETURN n"
insert_bb = "MERGE (n:BasicBlock {start:{S}, end:{E}}) RETURN n"
create_relationship = "MATCH (u:Function {name:{N}}), (r:Function {start:{S}}) CREATE (u)-[:CALLS]->(r)"
create_contains = "MATCH (u:BasicBlock {start:{S}}), (f:Function {name:{N}}) CREATE (f)-[:CONTAINS]->(u)"
create_inside = "MATCH (u:Function {start:{S}}), (b:Binary {hash:{H}}) CREATE (f)-[:INSIDE]->(b)"
self.neo.cypher.execute(insert_binary, {"N":target, "H":hash})
self.neo.cypher.execute("CREATE INDEX ON :Function(start)")
#self.neo.cypher.execute("CREATE INDEX ON :Function(name)")
self.neo.cypher.execute("CREATE INDEX ON :BasicBlock(start)")
for f in Functions():
tx.append(create_inside, {"S":f, "H":hash})
callee_name = GetFunctionName(f)
flags = get_flags(f)
type = GetType(f)
if type:
return_type = type.split()[0]
print type
end_return = type.find(' ')
start_args = type.find('(')
print type[end_return +1:start_args]
print type[start_args+1:].split(',')
else:
print GuessType(f)
tx.append(insert_func, {"N": callee_name, "S":f, "F":flags})
func_count += 1
fc = idaapi.FlowChart(idaapi.get_func(f))
for block in fc:
tx.append(insert_bb, {"S":block.startEA,"E":block.endEA})
tx.append(create_contains,{"S":block.startEA,"N":f})
bb_count += 1
tx.process()
tx.commit()
tx = self.neo.cypher.begin()
for f in Functions():
for xref in CodeRefsTo(f,0):
caller_name = GetFunctionName(xref)
if caller_name != '':
tx.append(create_relationship,{"N":caller_name,"S":f})
call_count += 1
tx.process()
tx.commit()
print "Upload ran in: " + str(time.time() - start)
print "Uploaded " + str(func_count) + " functions, " + str(call_count) +" function calls and " + str(bb_count) + " basic blocks."