Python源码示例:capstone.CS_MODE_THUMB
示例1
def init_disassembler_engine(self):
# init state for disasambler
# set capstone, lexer, asmline
arch, mode = self.plugin.hintDisasm()
self.disasm_engine = capstone.Cs(arch, mode)
self.disasm_engine.detail = True
if arch == capstone.CS_ARCH_X86:
Lexer = X86_Lexer()
if arch == capstone.CS_ARCH_ARM and mode in [capstone.CS_MODE_ARM, capstone.CS_MODE_THUMB]:
Lexer = ARM_Lexer()
if arch == capstone.CS_ARCH_ARM64:
Lexer = ARM64_Lexer()
# todo: ASM_ARM_Line?
self.ASMLine = ASMx86Line
Lexer.build()
self.lexer = Lexer.lexer()
示例2
def __setup_available_disassemblers(self):
arch_map = {
ARCH_ARM_MODE_ARM: CS_MODE_ARM,
ARCH_ARM_MODE_THUMB: CS_MODE_THUMB,
}
self._available_disassemblers = {
ARCH_ARM_MODE_ARM: Cs(CS_ARCH_ARM, arch_map[ARCH_ARM_MODE_ARM]),
ARCH_ARM_MODE_THUMB: Cs(CS_ARCH_ARM, arch_map[ARCH_ARM_MODE_THUMB]),
}
self._available_disassemblers[ARCH_ARM_MODE_ARM].detail = True
self._available_disassemblers[ARCH_ARM_MODE_THUMB].detail = True
# Casptone to BARF translation
# ======================================================================== #
示例3
def __init__(self, trace=True, sca_mode=False, local_vars={}):
super().__init__(trace, sca_mode)
self.emu = uc.Uc(uc.UC_ARCH_ARM, uc.UC_MODE_ARM)
self.disasm = cs.Cs(cs.CS_ARCH_ARM, cs.CS_MODE_ARM | cs.CS_MODE_THUMB)
self.disasm.detail = True
self.word_size = 4
self.endianness = "little"
self.page_size = self.emu.query(uc.UC_QUERY_PAGE_SIZE)
self.page_shift = self.page_size.bit_length() - 1
self.pc = uc.arm_const.UC_ARM_REG_PC
known_regs = [i[len('UC_ARM_REG_'):] for i in dir(uc.arm_const) if '_REG' in i]
self.reg_map = {r.lower(): getattr(uc.arm_const, 'UC_ARM_REG_'+r) for r in known_regs}
self.stubbed_functions = local_vars
self.setup(sca_mode)
self.reset_stack()
示例4
def __init__(self, trace=True, sca_mode=False, local_vars={}):
super().__init__(trace, sca_mode)
self.emu = uc.Uc(uc.UC_ARCH_ARM, uc.UC_MODE_THUMB | uc.UC_MODE_MCLASS)
self.disasm = cs.Cs(cs.CS_ARCH_ARM, cs.CS_MODE_THUMB | cs.CS_MODE_MCLASS)
self.disasm.detail = True
self.word_size = 4
self.endianness = "little"
self.page_size = self.emu.query(uc.UC_QUERY_PAGE_SIZE)
self.page_shift = self.page_size.bit_length() - 1
self.pc = uc.arm_const.UC_ARM_REG_PC
known_regs = [i[len('UC_ARM_REG_'):] for i in dir(uc.arm_const) if '_REG' in i]
self.reg_map = {r.lower(): getattr(uc.arm_const, 'UC_ARM_REG_'+r) for r in known_regs}
self.stubbed_functions = local_vars
self.setup(sca_mode)
self.reset_stack()
# Force mapping of those addresses so that
# exception returns can be caught in the base
# block hook rather than a code fetch hook
self.map_space(0xfffffff0, 0xffffffff)
self.emu.hook_add(uc.UC_HOOK_INTR, self.intr_hook)
示例5
def _set_mode_by_val(self, val):
new_mode = Operators.ITEBV(
self.address_bit_size, (val & 0x1) == 0x1, cs.CS_MODE_THUMB, cs.CS_MODE_ARM
)
if issymbolic(new_mode):
from ..state import Concretize
def set_concrete_mode(state, value):
state.cpu.mode = value
raise Concretize(
"Concretizing ARMv7 mode", expression=new_mode, setstate=set_concrete_mode
)
self.mode = new_mode
示例6
def _ks_assemble(asm: str, mode=CS_MODE_ARM) -> bytes:
"""Assemble the given string using Keystone using the specified CPU mode."""
# Explicitly uses late importing so that Keystone will only be imported if this is called.
# This lets us avoid requiring installation of Keystone for running tests.
global ks, ks_thumb
from keystone import Ks, KS_ARCH_ARM, KS_MODE_ARM, KS_MODE_THUMB
if ks is None:
ks = Ks(KS_ARCH_ARM, KS_MODE_ARM)
if ks_thumb is None:
ks_thumb = Ks(KS_ARCH_ARM, KS_MODE_THUMB)
if CS_MODE_ARM == mode:
ords = ks.asm(asm)[0]
elif CS_MODE_THUMB == mode:
ords = ks_thumb.asm(asm)[0]
else:
raise Exception(f"bad processor mode for assembly: {mode}")
if not ords:
raise Exception(f"bad assembly: {asm}")
return binascii.hexlify(bytearray(ords))
示例7
def _ks_assemble(asm: str, mode=CS_MODE_ARM) -> bytes:
"""Assemble the given string using Keystone using the specified CPU mode."""
# Explicitly uses late importing so that Keystone will only be imported if this is called.
# This lets us avoid requiring installation of Keystone for running tests.
global ks, ks_thumb
from keystone import Ks, KS_ARCH_ARM, KS_MODE_ARM, KS_MODE_THUMB
if ks is None:
ks = Ks(KS_ARCH_ARM, KS_MODE_ARM)
if ks_thumb is None:
ks_thumb = Ks(KS_ARCH_ARM, KS_MODE_THUMB)
if CS_MODE_ARM == mode:
ords = ks.asm(asm)[0]
elif CS_MODE_THUMB == mode:
ords = ks_thumb.asm(asm)[0]
else:
raise Exception(f"bad processor mode for assembly: {mode}")
if not ords:
raise Exception(f"bad assembly: {asm}")
return binascii.hexlify(bytearray(ords))
示例8
def test_blx_reg_sym(self):
dest = self.cpu.memory.constraints.new_bitvec(32, "dest")
self.cpu.memory.constraints.add(dest >= 0x1000)
self.cpu.memory.constraints.add(dest <= 0x1001)
self.cpu.R1 = dest
# First, make sure we raise when the mode is symbolic and ambiguous
with self.assertRaises(Concretize) as cm:
self.cpu.execute()
# Then, make sure we have the correct expression
e = cm.exception
all_modes = solver.get_all_values(self.cpu.memory.constraints, e.expression)
self.assertIn(CS_MODE_THUMB, all_modes)
self.assertIn(CS_MODE_ARM, all_modes)
# Assuming we're in ARM mode, ensure the callback toggles correctly.
self.assertEqual(self.cpu.mode, CS_MODE_ARM)
# The setstate callback expects a State as its first argument; since we
# don't have a state, the unit test itself is an okay approximation, since
# the cpu lives in self.cpu
e.setstate(self, CS_MODE_THUMB)
self.assertEqual(self.cpu.mode, CS_MODE_THUMB)
示例9
def _import_dependencies(self):
# Load the Capstone bindings.
global capstone
if capstone is None:
import capstone
# Load the constants for the requested architecture.
self.__constants = {
win32.ARCH_I386:
(capstone.CS_ARCH_X86, capstone.CS_MODE_32),
win32.ARCH_AMD64:
(capstone.CS_ARCH_X86, capstone.CS_MODE_64),
win32.ARCH_THUMB:
(capstone.CS_ARCH_ARM, capstone.CS_MODE_THUMB),
win32.ARCH_ARM:
(capstone.CS_ARCH_ARM, capstone.CS_MODE_ARM),
win32.ARCH_ARM64:
(capstone.CS_ARCH_ARM64, capstone.CS_MODE_ARM),
}
# Test for the bug in early versions of Capstone.
# If found, warn the user about it.
try:
self.__bug = not isinstance(
capstone.cs_disasm_quick(
capstone.CS_ARCH_X86, capstone.CS_MODE_32, "\x90", 1)[0],
capstone.capstone.CsInsn)
except AttributeError:
self.__bug = False
if self.__bug:
warnings.warn(
"This version of the Capstone bindings is unstable,"
" please upgrade to a newer one!",
RuntimeWarning, stacklevel=4)
示例10
def __init__(self):
TranslationContext.__init__(self)
self.registers = {
capstone.arm.ARM_REG_R0: r('r0', 32),
capstone.arm.ARM_REG_R1: r('r1', 32),
capstone.arm.ARM_REG_R2: r('r2', 32),
capstone.arm.ARM_REG_R3: r('r3', 32),
capstone.arm.ARM_REG_R4: r('r4', 32),
capstone.arm.ARM_REG_R5: r('r5', 32),
capstone.arm.ARM_REG_R6: r('r6', 32),
capstone.arm.ARM_REG_R7: r('r7', 32),
capstone.arm.ARM_REG_R8: r('r8', 32),
capstone.arm.ARM_REG_R9: r('r9', 32),
capstone.arm.ARM_REG_R10: r('r10', 32),
capstone.arm.ARM_REG_R11: r('r11', 32),
capstone.arm.ARM_REG_R13: r('sp', 32),
capstone.arm.ARM_REG_R14: r('lr', 32),
capstone.arm.ARM_REG_R15: r('pc', 32),
}
self.word_size = 32
self.thumb = True
self.stack_ptr = self.registers[capstone.arm.ARM_REG_R13]
self.link_reg = self.registers[capstone.arm.ARM_REG_R14]
self.program_ctr = self.registers[capstone.arm.ARM_REG_R15]
self.disassembler = capstone.Cs(capstone.CS_ARCH_ARM, capstone.CS_MODE_THUMB)
self.disassembler.detail = True
示例11
def mode(self, new_mode):
assert new_mode in (cs.CS_MODE_ARM, cs.CS_MODE_THUMB)
if self._mode != new_mode:
logger.debug(f'swapping into {"ARM" if new_mode == cs.CS_MODE_ARM else "THUMB"} mode')
self._mode = new_mode
self.disasm.disasm.mode = new_mode
示例12
def _swap_mode(self):
"""Toggle between ARM and Thumb mode"""
assert self.mode in (cs.CS_MODE_ARM, cs.CS_MODE_THUMB)
if self.mode == cs.CS_MODE_ARM:
self.mode = cs.CS_MODE_THUMB
else:
self.mode = cs.CS_MODE_ARM
# Flags that are the result of arithmetic instructions. Unconditionally
# set, but conditionally committed.
#
# Register file has the actual CPU flags
示例13
def BL(cpu, label):
next_instr_addr = cpu.regfile.read("PC")
if cpu.mode == cs.CS_MODE_THUMB:
cpu.regfile.write("LR", next_instr_addr + 1)
else:
cpu.regfile.write("LR", next_instr_addr)
cpu.regfile.write("PC", label.read())
示例14
def test_thumb_mode_emulation(self):
asm = "add r0, r1, r2"
self._setupCpu(asm, mode=CS_MODE_THUMB)
self.rf.write("R0", 0)
self.rf.write("R1", 0x1234)
self.rf.write("R2", 0x5678)
emu = emulate_next(self.cpu)
self.assertEqual(self.rf.read("R0"), 0x1234 + 0x5678)
self.assertEqual(emu._emu.query(UC_QUERY_MODE), UC_MODE_THUMB)
示例15
def itest_thumb(asm):
def instr_dec(assertions_func):
@wraps(assertions_func)
def wrapper(self):
self._setupCpu(asm, mode=CS_MODE_THUMB)
self.cpu.execute()
assertions_func(self)
return wrapper
return instr_dec
示例16
def test_ldr_imm_off_none_to_thumb(self):
self.cpu.stack_push(43)
self.cpu.execute()
self.assertEqual(self.rf.read("R15"), 42)
self.assertEqual(self.cpu.mode, CS_MODE_THUMB)
示例17
def test_bx_thumb(self):
pre_pc = self.rf.read("PC")
self.cpu.execute()
self.assertEqual(self.rf.read("PC"), pre_pc + 4)
self.assertEqual(self.cpu.mode, CS_MODE_THUMB)
# ORR
示例18
def test_blx_reg_thumb(self):
self.assertEqual(self.rf.read("PC"), 0x1008)
self.assertEqual(self.rf.read("LR"), 0x1008)
self.assertEqual(self.cpu.mode, CS_MODE_THUMB)
示例19
def test_symbolic_conditional(self):
asm = ""
asm += " tst r0, r0\n" # 0x1004
asm += " beq label\n" # 0x1006
asm += " bne label\n" # 0x1008
asm += "label:\n"
asm += " nop" # 0x100a
self._setupCpu(asm, mode=CS_MODE_THUMB) # code starts at 0x1004
# Set R0 as a symbolic value
self.cpu.R0 = self.cpu.memory.constraints.new_bitvec(32, "val")
self.cpu.execute() # tst r0, r0
self.cpu.execute() # beq label
# Here the PC can have two values, one for each branch of the beq
with self.assertRaises(ConcretizeRegister) as cm:
self.cpu.execute() # Should request concretizing the PC
# Get the symbolic expression of the PC
expression = self.cpu.read_register(cm.exception.reg_name)
# Get all possible values of the expression
all_values = solver.get_all_values(self.cpu.memory.constraints, expression)
# They should be either the beq instruction itself, or the next instruction
self.assertEqual(sorted(all_values), [0x1006, 0x1008])
# Move the PC to the second branch instruction
self.cpu.PC = 0x1008
self.cpu.execute() # bne label
# Here the PC can have two values again, one for each branch of the bne
with self.assertRaises(ConcretizeRegister) as cm:
self.cpu.execute() # Should request concretizing the PC
# Get the symbolic expression of the PC
expression = self.cpu.read_register(cm.exception.reg_name)
# Get all possible values of the PC
all_values = solver.get_all_values(self.cpu.memory.constraints, expression)
# They should be either the bne instruction itself, or the next instruction
self.assertEqual(sorted(all_values), [0x1008, 0x100A])
示例20
def __init__(self, arch = None):
super(CapstoneEngine, self).__init__(arch)
# Load the constants for the requested architecture.
self.__constants = {
win32.ARCH_I386:
(capstone.CS_ARCH_X86, capstone.CS_MODE_32),
win32.ARCH_AMD64:
(capstone.CS_ARCH_X86, capstone.CS_MODE_64),
win32.ARCH_THUMB:
(capstone.CS_ARCH_ARM, capstone.CS_MODE_THUMB),
win32.ARCH_ARM:
(capstone.CS_ARCH_ARM, capstone.CS_MODE_ARM),
win32.ARCH_ARM64:
(capstone.CS_ARCH_ARM64, capstone.CS_MODE_ARM),
}
# Test for the bug in early versions of Capstone.
# If found, warn the user about it.
try:
self.__bug = not isinstance(
list(capstone.cs_disasm_quick(
capstone.CS_ARCH_X86, capstone.CS_MODE_32, "\x90", 1
))[0],
capstone.capstone.CsInsn
)
except AttributeError:
self.__bug = False
if self.__bug:
warnings.warn(
"This version of the Capstone bindings is unstable,"
" please upgrade to a newer one!",
RuntimeWarning, stacklevel=4)
示例21
def uboot_mux_init(self):
self._mux_name = "set_muxconf_regs"
(self._mux_start, self._mux_end) = utils.get_symbol_location_start_end(self._mux_name,
self.stage)
self._mux_start += 2
self._mux_end -= 2
if self.thumbranges.overlaps_point(self._mux_start):
self.cs = capstone.Cs(CS_ARCH_ARM, CS_MODE_THUMB)
self.cs.detail = True
self._thumb = True
self.emu = Uc(UC_ARCH_ARM, UC_MODE_THUMB)
else:
self.cs = capstone.Cs(CS_ARCH_ARM, CS_MODE_ARM)
self.cs.detail = True
self._thumb = False
self.emu = Uc(UC_ARCH_ARM, UC_MODE_ARM)
entrypoint = self._mux_start
headers = pure_utils.get_section_headers(elf)
for h in headers:
if h['size'] > 0:
codeaddr = h['virtaddr']
break
alignedstart = self._mux_start & 0xFFFFF0000
size = 2*1024*1024
fileoffset = alignedstart
elf = stage.elf
code = open(elf, "rb").read()[self._mux_start-fileoffset:self._mux_end-fileoffset]
hw = Main.get_hardwareclass_config()
for i in hw.addr_range:
if i.begin == 0:
size = i.end
else:
size = i.begin - i.end
self.emu.mem_map(i.begin, size, UC_PROT_ALL)
self.emu.mem_write(self._mux_start, code)
self.emu.reg_write(self.stage.elf.entrypoint, ARM_REG_SP)
示例22
def mdthumb(self):
if self._mdthumb is None:
self._mdthumb = capstone.Cs(capstone.CS_ARCH_ARM,
capstone.CS_MODE_THUMB + capstone.CS_MODE_V8)
self._mdthumb.detail = True
return self._mdthumb
示例23
def _import_dependencies(self):
# Load the Capstone bindings.
global capstone
if capstone is None:
import capstone
# Load the constants for the requested architecture.
self.__constants = {
win32.ARCH_I386:
(capstone.CS_ARCH_X86, capstone.CS_MODE_32),
win32.ARCH_AMD64:
(capstone.CS_ARCH_X86, capstone.CS_MODE_64),
win32.ARCH_THUMB:
(capstone.CS_ARCH_ARM, capstone.CS_MODE_THUMB),
win32.ARCH_ARM:
(capstone.CS_ARCH_ARM, capstone.CS_MODE_ARM),
win32.ARCH_ARM64:
(capstone.CS_ARCH_ARM64, capstone.CS_MODE_ARM),
}
# Test for the bug in early versions of Capstone.
# If found, warn the user about it.
try:
self.__bug = not isinstance(
capstone.cs_disasm_quick(
capstone.CS_ARCH_X86, capstone.CS_MODE_32, "\x90", 1)[0],
capstone.capstone.CsInsn)
except AttributeError:
self.__bug = False
if self.__bug:
warnings.warn(
"This version of the Capstone bindings is unstable,"
" please upgrade to a newer one!",
RuntimeWarning, stacklevel=4)
示例24
def capstone_thumb(self):
if _capstone is None:
l.warning("Capstone is not installed!")
return None
if self._cs_thumb is None:
self._cs_thumb = _capstone.Cs(self.cs_arch, self.cs_mode + _capstone.CS_MODE_THUMB)
self._cs_thumb.detail = True
return self._cs_thumb
示例25
def read(self, nbits=None, with_carry=False):
carry = self.cpu.regfile.read("APSR_C")
if self.__type == "register":
value = self.cpu.regfile.read(self.reg)
# PC in this case has to be set to the instruction after next. PC at this point
# is already pointing to next instruction; we bump it one more.
if self.reg in ("PC", "R15"):
value += self.cpu.instruction.size
if self.is_shifted():
shift = self.op.shift
# XXX: This is unnecessary repetition.
if shift.type in range(cs.arm.ARM_SFT_ASR_REG, cs.arm.ARM_SFT_RRX_REG + 1):
if self.cpu.mode == cs.CS_MODE_THUMB:
amount = shift.value.read()
else:
src_reg = self.cpu.instruction.reg_name(shift.value).upper()
amount = self.cpu.regfile.read(src_reg)
else:
amount = shift.value
value, carry = self.cpu._shift(value, shift.type, amount, carry)
if self.op.subtracted:
value = -value
if with_carry:
return value, carry
return value
elif self.__type == "immediate":
imm = self.op.imm
if self.op.subtracted:
imm = -imm
if with_carry:
return imm, self._get_expand_imm_carry(carry)
return imm
elif self.__type == "coprocessor":
imm = self.op.imm
return imm
elif self.__type == "memory":
val = self.cpu.read_int(self.address(), nbits)
if with_carry:
return val, carry
return val
else:
raise NotImplementedError("readOperand unknown type", self.op.type)
示例26
def _SR(cpu, insn_id, dest, op, *rest):
"""
Notes on Capstone behavior:
- In ARM mode, _SR reg has `rest`, but _SR imm does not, its baked into `op`.
- In ARM mode, `lsr r1, r2` will have a `rest[0]`
- In Thumb mode, `lsr r1, r2` will have an empty `rest`
- In ARM mode, something like `lsr r1, 3` will not have `rest` and op will be
the immediate.
"""
assert insn_id in (cs.arm.ARM_INS_ASR, cs.arm.ARM_INS_LSL, cs.arm.ARM_INS_LSR)
if insn_id == cs.arm.ARM_INS_ASR:
if rest and rest[0].type == "immediate":
srtype = cs.arm.ARM_SFT_ASR
else:
srtype = cs.arm.ARM_SFT_ASR_REG
elif insn_id == cs.arm.ARM_INS_LSL:
if rest and rest[0].type == "immediate":
srtype = cs.arm.ARM_SFT_LSL
else:
srtype = cs.arm.ARM_SFT_LSL_REG
elif insn_id == cs.arm.ARM_INS_LSR:
if rest and rest[0].type == "immediate":
srtype = cs.arm.ARM_SFT_LSR
else:
srtype = cs.arm.ARM_SFT_LSR_REG
carry = cpu.regfile.read("APSR_C")
if rest and rest[0].type == "register":
# FIXME we should make Operand.op private (and not accessible)
src_reg = cpu.instruction.reg_name(rest[0].op.reg).upper()
amount = cpu.regfile.read(src_reg)
result, carry = cpu._shift(op.read(), srtype, amount, carry)
elif rest and rest[0].type == "immediate":
amount = rest[0].read()
result, carry = cpu._shift(op.read(), srtype, amount, carry)
elif cpu.mode == cs.CS_MODE_THUMB:
amount = op.read()
result, carry = cpu._shift(dest.read(), srtype, amount, carry)
else:
result, carry = op.read(with_carry=True)
dest.write(result)
cpu.set_flags(N=HighBit(result), Z=(result == 0), C=carry)
示例27
def __init__(self, controller, r, stage):
# controller.gdb_print("creating longwrite break\n")
self.emptywrite = {'start': None,
'end': None,
'pc': None}
self.writeinfo = self.emptywrite
self.breakaddr = r['breakaddr']
self.contaddr = r['contaddr']
self.writeaddr = r['writeaddr']
self.thumb = r['thumb']
r2.gets(stage.elf, "s 0x%x" % self.writeaddr)
if self.thumb:
self.emu = unicorn.Uc(unicorn.UC_ARCH_ARM, unicorn.UC_MODE_THUMB)
r2.gets(stage.elf, "ahb 16")
r2.gets(stage.elf, "e asm.bits=16")
self.cs = capstone.Cs(capstone.CS_ARCH_ARM, capstone.CS_MODE_THUMB)
else:
self.emu = unicorn.Uc(unicorn.UC_ARCH_ARM, unicorn.UC_MODE_ARM)
r2.gets(stage.elf, "ahb 32")
r2.gets(stage.elf, "e asm.bits=32")
self.cs = capstone.Cs(capstone.CS_ARCH_ARM, capstone.CS_MODE_ARM)
r2.get(stage.elf, "pdj 1")
self.cs.detail = True
self.info = staticanalysis.LongWriteInfo(stage.elf, r['start'],
r['end'], self.thumb)
self.inss = []
self.regs = set()
self.bytes = b""
self.dst_addrs = []
self.write_size = r['writesize']
for i in self.info.bbs:
self.inss.append(i)
bs = i["bytes"].decode("hex")
self.bytes += b"%s" % bs
ci = next(self.cs.disasm(bs, i["offset"], 1))
if i["offset"] == self.writeaddr:
self.write_ins = ci
(read, write) = ci.regs_access()
for rs in (read, write):
self.regs.update([ci.reg_name(rn).encode('ascii') for rn in rs])
self.emu.mem_map(0, 0xFFFFFFFF + 1, unicorn.UC_PROT_ALL)
self.emu.mem_write(self.inss[0]["offset"], self.bytes)
self.emu.hook_add(unicorn.UC_HOOK_MEM_WRITE, self.write_hook)
self.spec = "*(0x%x)" % r['breakaddr']
TargetBreak.__init__(self, self.spec, controller, True, stage, r=r)