Python源码示例:idaapi.execute_sync()
示例1
def safe_generator(iterator):
# Make the sentinel value something that isn't likely to be returned
# by an API call (and isn't a fixed string that could be inserted into
# a program to break FIRST maliciously)
sentinel = '[1st] Sentinel %d' % (random.randint(0, 65535))
holder = [sentinel] # need a holder, because 'global' sucks
def trampoline():
try:
holder[0] = next(iterator)
except StopIteration:
holder[0] = sentinel
return 1
while True:
# See notes above regarding why we use MFF_WRITE here
idaapi.execute_sync(trampoline, idaapi.MFF_WRITE)
if holder[0] == sentinel:
return
yield holder[0]
# Main Plug-in Form Class
#-------------------------------------------------------------------------------
示例2
def __getattribute__(self, name):
default = '[1st] default'
if (idaapi.IDA_SDK_VERSION >= 700) and (name in IDAWrapper.mapping):
name = IDAWrapper.mapping[name]
val = getattr(idaapi, name, default)
if val == default:
val = getattr(idautils, name, default)
if val == default:
val = getattr(idc, name, default)
if val == default:
msg = 'Unable to find {}'.format(name)
idaapi.execute_ui_requests((FIRSTUI.Requests.Print(msg),))
return
if hasattr(val, '__call__'):
def call(*args, **kwargs):
holder = [None] # need a holder, because 'global' sucks
def trampoline():
holder[0] = val(*args, **kwargs)
return 1
# Execute the request using MFF_WRITE, which should be safe for
# any possible request at the expense of speed. In my testing,
# though, it wasn't noticably slower than MFF_FAST. If this
# is observed to impact performance, consider creating a list
# that maps API calls to the most appropriate flag.
idaapi.execute_sync(trampoline, idaapi.MFF_WRITE)
return holder[0]
return call
else:
return val
示例3
def idawrite(f):
@functools.wraps(f)
def wrapper(*args, **kwargs):
ff = functools.partial(f, *args, **kwargs)
return idaapi.execute_sync(ff, idaapi.MFF_WRITE)
return wrapper
示例4
def sync_wrapper(ff,safety_mode):
"""
Call a function ff with a specific IDA safety_mode.
"""
logger.debug('sync_wrapper: {}, {}'.format(ff.__name__,safety_mode))
if safety_mode not in [IDASafety.SAFE_READ,IDASafety.SAFE_WRITE]:
error_str = 'Invalid safety mode {} over function {}'\
.format(safety_mode,ff.__name__)
logger.error(error_str)
raise IDASyncError(error_str)
# No safety level is set up:
res_container = Queue.Queue()
def runned():
logger.debug('Inside runned')
# Make sure that we are not already inside a sync_wrapper:
if not call_stack.empty():
last_func_name = call_stack.get()
error_str = ('Call stack is not empty while calling the '
'function {} from {}').format(ff.__name__,last_func_name)
logger.error(error_str)
raise IDASyncError(error_str)
call_stack.put((ff.__name__))
try:
res_container.put(ff())
finally:
call_stack.get()
logger.debug('Finished runned')
ret_val = idaapi.execute_sync(runned,safety_mode)
res = res_container.get()
return res
示例5
def execute_sync(function, sync_type):
"""
Synchronize with the disassembler for safe database access.
Modified from https://github.com/vrtadmin/FIRST-plugin-ida
"""
@functools.wraps(function)
def wrapper(*args, **kwargs):
output = [None]
#
# this inline function definition is technically what will execute
# in the context of the main thread. we use this thunk to capture
# any output the function may want to return to the user.
#
def thunk():
output[0] = function(*args, **kwargs)
return 1
if is_mainthread():
thunk()
else:
idaapi.execute_sync(thunk, sync_type)
# return the output of the synchronized execution
return output[0]
return wrapper
#------------------------------------------------------------------------------
# Disassembler Core API (universal)
#------------------------------------------------------------------------------
示例6
def execute_read(function):
return execute_sync(function, idaapi.MFF_READ)
示例7
def execute_write(function):
return execute_sync(function, idaapi.MFF_WRITE)
示例8
def execute_ui(function):
return execute_sync(function, idaapi.MFF_FAST)
#--------------------------------------------------------------------------
# API Shims
#--------------------------------------------------------------------------
示例9
def __getattribute__(self, name):
default = '[1st] default'
if (idaapi.IDA_SDK_VERSION >= 700) and (name in IDAWrapper.mapping):
name = IDAWrapper.mapping[name]
val = getattr(idaapi, name, default)
if val == default:
val = getattr(idautils, name, default)
if val == default:
val = getattr(idc, name, default)
if val == default:
msg = 'Unable to find {}'.format(name)
idaapi.execute_ui_requests((FIRSTUI.Requests.Print(msg),))
return
if hasattr(val, '__call__'):
def call(*args, **kwargs):
holder = [None] # need a holder, because 'global' sucks
def trampoline():
holder[0] = val(*args, **kwargs)
return 1
idaapi.execute_sync(trampoline, idaapi.MFF_FAST)
return holder[0]
return call
else:
return val
示例10
def onmsg_safe(key, data, replay=False):
def tmp():
try:
onmsg(key, data, replay=replay)
except Exception as e:
print('error during callback for %s: %s' % (data.get('cmd'), e))
traceback.print_exc()
idaapi.execute_sync(tmp, MFF_WRITE)
示例11
def warning_msgbox(warning_str):
def fun(warning_str):
idc.Warning(warning_str)
idaapi.execute_sync(partial(fun, warning_str), idaapi.MFF_FAST)
# TODO: not sure if this should always work (race condition? or not with GIL?)
示例12
def asklong(defval, prompt):
res = [None] # Python 2 way to assign outside of a nested function
def fun(defval, prompt):
res[0] = idc.AskLong(defval, prompt)
idaapi.execute_sync(partial(fun, defval, prompt), idaapi.MFF_FAST)
return res[0]
# TODO: Put these global functions somewhere in a scope?
示例13
def get_func_codeblocks(f):
cbs = [None]
def fun():
cbs[0] = list(sark.codeblocks(start=f.startEA, end=f.endEA))
idaapi.execute_sync(fun, idaapi.MFF_READ)
return cbs[0]
示例14
def get_current_codeblock():
cb = [None]
def fun():
cb[0] = sark.CodeBlock()
idaapi.execute_sync(fun, idaapi.MFF_READ)
return cb[0]
示例15
def reset_block_colors(f):
def fun():
cbs = sark.codeblocks(start=f.startEA, end=f.endEA)
for cb in cbs:
cb.color = 0xFFFFFF
idaapi.execute_sync(fun, idaapi.MFF_WRITE)
示例16
def get_segment_names(seg):
names = []
def fun(seg):
names.extend([(a, n) for (a, n) in idautils.Names()
if seg.startEA <= a < seg.endEA])
idaapi.execute_sync(partial(fun, seg), idaapi.MFF_READ)
return names
示例17
def execute_paint(function):
"""
A function decorator to safely paint the IDA database from any thread.
"""
@functools.wraps(function)
def wrapper(*args, **kwargs):
#
# the first argument passed to this decorator will be the
# IDAPainter class instance
#
ida_painter = args[0]
#
# we wrap up the remaining args (and paint function) into a single
# packaged up callable object (a functools.partial)
#
ff = functools.partial(function, *args, **kwargs)
#
# if we are using a 'bugged' downlevel version of IDA, package another
# callable to 'synchronize' a database write. This callable will get
# passed to the main thread and executed through the Qt event loop.
#
# the execute_sync should technically happy in-line, avoiding the
# possibility of deadlocks or aborts as described above.
#
if idaapi.IDA_SDK_VERSION < 710:
fff = functools.partial(idaapi.execute_sync, ff, idaapi.MFF_WRITE)
ida_painter._signal.mainthread.emit(fff)
return idaapi.BADADDR
#
# in IDA 7.1, the MFF_NOWAIT bug is definitely fixed, so we can just
# use it to schedule our paint action ... as designed.
#
return idaapi.execute_sync(ff, idaapi.MFF_NOWAIT | idaapi.MFF_WRITE)
return wrapper
#------------------------------------------------------------------------------
# IDA Painter
#------------------------------------------------------------------------------