Python源码示例:ctypes.Structure()
示例1
def windowsRam(self):
"""
Uses Windows API to check RAM
"""
kernel32 = ctypes.windll.kernel32
c_ulong = ctypes.c_ulong
class MEMORYSTATUS(ctypes.Structure):
_fields_ = [
("dwLength", c_ulong),
("dwMemoryLoad", c_ulong),
("dwTotalPhys", c_ulong),
("dwAvailPhys", c_ulong),
("dwTotalPageFile", c_ulong),
("dwAvailPageFile", c_ulong),
("dwTotalVirtual", c_ulong),
("dwAvailVirtual", c_ulong)
]
memoryStatus = MEMORYSTATUS()
memoryStatus.dwLength = ctypes.sizeof(MEMORYSTATUS)
kernel32.GlobalMemoryStatus(ctypes.byref(memoryStatus))
return int(memoryStatus.dwTotalPhys / 1024 ** 2)
示例2
def _is_gui_available():
UOI_FLAGS = 1
WSF_VISIBLE = 0x0001
class USEROBJECTFLAGS(ctypes.Structure):
_fields_ = [("fInherit", ctypes.wintypes.BOOL),
("fReserved", ctypes.wintypes.BOOL),
("dwFlags", ctypes.wintypes.DWORD)]
dll = ctypes.windll.user32
h = dll.GetProcessWindowStation()
if not h:
raise ctypes.WinError()
uof = USEROBJECTFLAGS()
needed = ctypes.wintypes.DWORD()
res = dll.GetUserObjectInformationW(h,
UOI_FLAGS,
ctypes.byref(uof),
ctypes.sizeof(uof),
ctypes.byref(needed))
if not res:
raise ctypes.WinError()
return bool(uof.dwFlags & WSF_VISIBLE)
示例3
def test_union_with_struct_packed(self):
class Struct(ctypes.Structure):
_pack_ = 1
_fields_ = [
('one', ctypes.c_uint8),
('two', ctypes.c_uint32)
]
class Union(ctypes.Union):
_fields_ = [
('a', ctypes.c_uint8),
('b', ctypes.c_uint16),
('c', ctypes.c_uint32),
('d', Struct),
]
expected = np.dtype(dict(
names=['a', 'b', 'c', 'd'],
formats=['u1', np.uint16, np.uint32, [('one', 'u1'), ('two', np.uint32)]],
offsets=[0, 0, 0, 0],
itemsize=ctypes.sizeof(Union)
))
self.check(Union, expected)
示例4
def _is_gui_available():
UOI_FLAGS = 1
WSF_VISIBLE = 0x0001
class USEROBJECTFLAGS(ctypes.Structure):
_fields_ = [("fInherit", ctypes.wintypes.BOOL),
("fReserved", ctypes.wintypes.BOOL),
("dwFlags", ctypes.wintypes.DWORD)]
dll = ctypes.windll.user32
h = dll.GetProcessWindowStation()
if not h:
raise ctypes.WinError()
uof = USEROBJECTFLAGS()
needed = ctypes.wintypes.DWORD()
res = dll.GetUserObjectInformationW(h,
UOI_FLAGS,
ctypes.byref(uof),
ctypes.sizeof(uof),
ctypes.byref(needed))
if not res:
raise ctypes.WinError()
return bool(uof.dwFlags & WSF_VISIBLE)
示例5
def test_union_packed(self):
class Struct(ctypes.Structure):
_fields_ = [
('one', ctypes.c_uint8),
('two', ctypes.c_uint32)
]
_pack_ = 1
class Union(ctypes.Union):
_pack_ = 1
_fields_ = [
('a', ctypes.c_uint8),
('b', ctypes.c_uint16),
('c', ctypes.c_uint32),
('d', Struct),
]
expected = np.dtype(dict(
names=['a', 'b', 'c', 'd'],
formats=['u1', np.uint16, np.uint32, [('one', 'u1'), ('two', np.uint32)]],
offsets=[0, 0, 0, 0],
itemsize=ctypes.sizeof(Union)
))
self.check(Union, expected)
示例6
def MessageClass(length=DMX_LENGTH):
assert 0 <= length <= DMX_LENGTH
assert length % 2 == 0, 'artnet only takes messages of even length'
Char, Int8, Int16 = ctypes.c_char, ctypes.c_ubyte, ctypes.c_ushort
class DMXMessage(ctypes.Structure):
# http://artisticlicence.com/WebSiteMaster/User%20Guides/art-net.pdf p47
_fields_ = [
('id', Char * 8),
('opCode', Int16),
('protVerHi', Int8),
('protVerLo', Int8),
('sequence', Int8),
('physical', Int8),
('subUni', Int8),
('net', Int8),
('lengthHi', Int8),
('length', Int8),
('data', Int8 * length), # At position 18
]
return DMXMessage
示例7
def __init__(self, **kwargs):
"""
Ctypes.Structure with integrated default values.
https://stackoverflow.com/questions/7946519/default-values-in-a-ctypes-structure/25892189#25892189
:param kwargs: values different to defaults
:type kwargs: dict
"""
# sanity checks
defaults = type(self)._defaults_
assert type(defaults) is types.DictionaryType
# use defaults, but override with keyword arguments, if any
values = defaults.copy()
for (key, val) in kwargs.items():
values[key] = val
# appropriately initialize ctypes.Structure
#super().__init__(**values) # Python 3 syntax
return Structure.__init__(self, **values) # Python 2 syntax
# http://stackoverflow.com/questions/1825715/how-to-pack-and-unpack-using-ctypes-structure-str/1827666#1827666
# https://wiki.python.org/moin/ctypes
示例8
def create_remote_array(subtype, len):
class RemoteArray(_ctypes.Array):
_length_ = len
_type_ = subtype
def __init__(self, addr, target):
self._base_addr = addr
self.target = target
def __getitem__(self, slice):
if not isinstance(slice, (int, long)):
raise NotImplementedError("RemoteArray slice __getitem__")
if slice >= len:
raise IndexError("Access to {0} for a RemoteArray of size {1}".format(slice, len))
item_addr = self._base_addr + (ctypes.sizeof(subtype) * slice)
# TODO: do better ?
class TST(ctypes.Structure):
_fields_ = [("TST", subtype)]
return RemoteStructure.from_structure(TST)(item_addr, target=self.target).TST
return RemoteArray
# 64bits pointers
示例9
def _handle_field_getattr(self, ftype, fosset, fsize):
s = self._target.read_memory(self._base_addr + fosset, fsize)
if ftype in self._field_type_to_remote_type:
return self._field_type_to_remote_type[ftype].from_buffer_with_target(bytearray(s), target=self._target).value
if issubclass(ftype, _ctypes._Pointer): # Pointer
return RemoteStructurePointer.from_buffer_with_target_and_ptr_type(bytearray(s), target=self._target, ptr_type=ftype)
if issubclass(ftype, RemotePtr64): # Pointer to remote64 bits process
return RemoteStructurePointer64.from_buffer_with_target_and_ptr_type(bytearray(s), target=self._target, ptr_type=ftype)
if issubclass(ftype, RemoteStructureUnion): # Structure|Union already transfomed in remote
return ftype(self._base_addr + fosset, self._target)
if issubclass(ftype, ctypes.Structure): # Structure that must be transfomed
return RemoteStructure.from_structure(ftype)(self._base_addr + fosset, self._target)
if issubclass(ftype, ctypes.Union): # Union that must be transfomed
return RemoteUnion.from_structure(ftype)(self._base_addr + fosset, self._target)
if issubclass(ftype, _ctypes.Array): # Arrays
return create_remote_array(ftype._type_, ftype._length_)(self._base_addr + fosset, self._target)
# Normal types
# Follow the ctypes usage: if it's not directly inherited from _SimpleCData
# We do not apply the .value
# Seems weird but it's mandatory AND useful :D (in pe_parse)
if _SimpleCData not in ftype.__bases__:
return ftype.from_buffer(bytearray(s))
return ftype.from_buffer(bytearray(s)).value
示例10
def write_virtual_memory(self, addr, data):
"""Write data to a given virtual address
:param addr: The Symbol to write to
:type addr: Symbol
:param size: The Data to write
:type size: str or ctypes.Structure
:returns: the size written -- :class:`int`
"""
try:
# ctypes structure
size = ctypes.sizeof(data)
buffer = ctypes.byref(data)
except TypeError:
# buffer
size = len(data)
buffer = data
written = ULONG(0)
addr = self.resolve_symbol(addr)
self.DebugDataSpaces.WriteVirtual(c_uint64(addr), buffer, size, byref(written))
return written.value
示例11
def write_physical_memory(self, addr, data):
"""Write data to a given physical address
:param addr: The Symbol to write to
:type addr: Symbol
:param size: The Data to write
:type size: str or ctypes.Structure
:returns: the size written -- :class:`int`
"""
try:
# ctypes structure
size = ctypes.sizeof(data)
buffer = ctypes.byref(data)
except TypeError:
# buffer
size = len(data)
buffer = data
written = ULONG(0)
self.DebugDataSpaces.WritePhysical(c_uint64(addr), buffer, size, byref(written))
return written.value
示例12
def create_vtable(cls, **implem_overwrite):
vtables_names = [x[0] for x in cls._funcs_]
non_expected_args = [func_name for func_name in implem_overwrite if func_name not in vtables_names]
if non_expected_args:
raise ValueError("Non expected function : {0}".format(non_expected_args))
implems = []
for name, types, func_implem in cls._funcs_:
func_implem = implem_overwrite.get(name, func_implem)
if func_implem is None:
raise ValueError("Missing implementation for function <{0}>".format(name))
implems.append(create_c_callable(func_implem, types))
class Vtable(ctypes.Structure):
_fields_ = [(name, ctypes.c_void_p) for name in vtables_names]
return Vtable(*implems)
示例13
def test_union_with_struct_packed(self):
class Struct(ctypes.Structure):
_pack_ = 1
_fields_ = [
('one', ctypes.c_uint8),
('two', ctypes.c_uint32)
]
class Union(ctypes.Union):
_fields_ = [
('a', ctypes.c_uint8),
('b', ctypes.c_uint16),
('c', ctypes.c_uint32),
('d', Struct),
]
expected = np.dtype(dict(
names=['a', 'b', 'c', 'd'],
formats=['u1', np.uint16, np.uint32, [('one', 'u1'), ('two', np.uint32)]],
offsets=[0, 0, 0, 0],
itemsize=ctypes.sizeof(Union)
))
self.check(Union, expected)
示例14
def test_union_packed(self):
class Struct(ctypes.Structure):
_fields_ = [
('one', ctypes.c_uint8),
('two', ctypes.c_uint32)
]
_pack_ = 1
class Union(ctypes.Union):
_pack_ = 1
_fields_ = [
('a', ctypes.c_uint8),
('b', ctypes.c_uint16),
('c', ctypes.c_uint32),
('d', Struct),
]
expected = np.dtype(dict(
names=['a', 'b', 'c', 'd'],
formats=['u1', np.uint16, np.uint32, [('one', 'u1'), ('two', np.uint32)]],
offsets=[0, 0, 0, 0],
itemsize=ctypes.sizeof(Union)
))
self.check(Union, expected)
示例15
def _is_gui_available():
UOI_FLAGS = 1
WSF_VISIBLE = 0x0001
class USEROBJECTFLAGS(ctypes.Structure):
_fields_ = [("fInherit", ctypes.wintypes.BOOL),
("fReserved", ctypes.wintypes.BOOL),
("dwFlags", ctypes.wintypes.DWORD)]
dll = ctypes.windll.user32
h = dll.GetProcessWindowStation()
if not h:
raise ctypes.WinError()
uof = USEROBJECTFLAGS()
needed = ctypes.wintypes.DWORD()
res = dll.GetUserObjectInformationW(h,
UOI_FLAGS,
ctypes.byref(uof),
ctypes.sizeof(uof),
ctypes.byref(needed))
if not res:
raise ctypes.WinError()
return bool(uof.dwFlags & WSF_VISIBLE)
示例16
def compile(self, structure):
source = self.gen_struct_class(structure)
c = compile(source, '<compiled>', 'exec')
env = {
'OrderedDict': OrderedDict,
'Structure': Structure,
'Instance': Instance,
'Expression': Expression,
'EnumInstance': EnumInstance,
'PointerInstance': PointerInstance,
'BytesInteger': BytesInteger,
'BitBuffer': BitBuffer,
'struct': struct,
'xrange': xrange,
}
exec(c, env)
sc = env[structure.name](self.cstruct, structure, source)
return sc
示例17
def __init__(self, this):
if not this:
raise WindowsError('Could not construct {}'.format(self.__class__))
self.this = ctypes.cast(this, ctypes.POINTER(self.__class__))
ctypes.Structure.__init__(self)
示例18
def getWindowsBuild():
class OSVersionInfo(ctypes.Structure):
_fields_ = [
("dwOSVersionInfoSize" , ctypes.c_int),
("dwMajorVersion" , ctypes.c_int),
("dwMinorVersion" , ctypes.c_int),
("dwBuildNumber" , ctypes.c_int),
("dwPlatformId" , ctypes.c_int),
("szCSDVersion" , ctypes.c_char*128)];
GetVersionEx = getattr( ctypes.windll.kernel32 , "GetVersionExA")
version = OSVersionInfo()
version.dwOSVersionInfoSize = ctypes.sizeof(OSVersionInfo)
GetVersionEx( ctypes.byref(version) )
return version.dwBuildNumber
示例19
def as_ctypes_type(dtype):
"""
Convert a dtype into a ctypes type.
Parameters
----------
dtype : dtype
The dtype to convert
Returns
-------
ctypes
A ctype scalar, union, array, or struct
Raises
------
NotImplementedError
If the conversion is not possible
Notes
-----
This function does not losslessly round-trip in either direction.
``np.dtype(as_ctypes_type(dt))`` will:
- insert padding fields
- reorder fields to be sorted by offset
- discard field titles
``as_ctypes_type(np.dtype(ctype))`` will:
- discard the class names of ``Structure``s and ``Union``s
- convert single-element ``Union``s into single-element ``Structure``s
- insert padding fields
"""
return _ctype_from_dtype(_dtype(dtype))
示例20
def test_struct_array_pointer(self):
from ctypes import c_int16, Structure, pointer
class Struct(Structure):
_fields_ = [('a', c_int16)]
Struct3 = 3 * Struct
c_array = (2 * Struct3)(
Struct3(Struct(a=1), Struct(a=2), Struct(a=3)),
Struct3(Struct(a=4), Struct(a=5), Struct(a=6))
)
expected = np.array([
[(1,), (2,), (3,)],
[(4,), (5,), (6,)],
], dtype=[('a', np.int16)])
def check(x):
assert_equal(x.dtype, expected.dtype)
assert_equal(x, expected)
# all of these should be equivalent
check(as_array(c_array))
check(as_array(pointer(c_array), shape=()))
check(as_array(pointer(c_array[0]), shape=(2,)))
check(as_array(pointer(c_array[0][0]), shape=(2, 3)))
示例21
def test_structure(self):
dt = np.dtype([
('a', np.uint16),
('b', np.uint32),
])
ct = np.ctypeslib.as_ctypes_type(dt)
assert_(issubclass(ct, ctypes.Structure))
assert_equal(ctypes.sizeof(ct), dt.itemsize)
assert_equal(ct._fields_, [
('a', ctypes.c_uint16),
('b', ctypes.c_uint32),
])
示例22
def test_structure_aligned(self):
dt = np.dtype([
('a', np.uint16),
('b', np.uint32),
], align=True)
ct = np.ctypeslib.as_ctypes_type(dt)
assert_(issubclass(ct, ctypes.Structure))
assert_equal(ctypes.sizeof(ct), dt.itemsize)
assert_equal(ct._fields_, [
('a', ctypes.c_uint16),
('', ctypes.c_char * 2), # padding
('b', ctypes.c_uint32),
])
示例23
def test_padded_structure(self):
class PaddedStruct(ctypes.Structure):
_fields_ = [
('a', ctypes.c_uint8),
('b', ctypes.c_uint16)
]
expected = np.dtype([
('a', np.uint8),
('b', np.uint16)
], align=True)
self.check(PaddedStruct, expected)
示例24
def test_bit_fields(self):
class BitfieldStruct(ctypes.Structure):
_fields_ = [
('a', ctypes.c_uint8, 7),
('b', ctypes.c_uint8, 1)
]
assert_raises(TypeError, np.dtype, BitfieldStruct)
assert_raises(TypeError, np.dtype, BitfieldStruct())
示例25
def test_packed_structure(self):
class PackedStructure(ctypes.Structure):
_pack_ = 1
_fields_ = [
('a', ctypes.c_uint8),
('b', ctypes.c_uint16)
]
expected = np.dtype([
('a', np.uint8),
('b', np.uint16)
])
self.check(PackedStructure, expected)
示例26
def struct_field( offset, ctype, name='field' ):
@fix_ctypes_struct
class Struct( ctypes.Structure ):
_fields_ = [ ("", ubyte * offset), (name, ctype) ]
return getattr( Struct, name )
示例27
def __init__(self, *args):
"""
The initialization function may take an OGREnvelope structure, 4-element
tuple or list, or 4 individual arguments.
"""
if len(args) == 1:
if isinstance(args[0], OGREnvelope):
# OGREnvelope (a ctypes Structure) was passed in.
self._envelope = args[0]
elif isinstance(args[0], (tuple, list)):
# A tuple was passed in.
if len(args[0]) != 4:
raise GDALException('Incorrect number of tuple elements (%d).' % len(args[0]))
else:
self._from_sequence(args[0])
else:
raise TypeError('Incorrect type of argument: %s' % str(type(args[0])))
elif len(args) == 4:
# Individual parameters passed in.
# Thanks to ww for the help
self._from_sequence([float(a) for a in args])
else:
raise GDALException('Incorrect number (%d) of arguments.' % len(args))
# Checking the x,y coordinates
if self.min_x > self.max_x:
raise GDALException('Envelope minimum X > maximum X.')
if self.min_y > self.max_y:
raise GDALException('Envelope minimum Y > maximum Y.')
示例28
def leak_inner():
class POINT(Structure):
_fields_ = [("x", c_int)]
class RECT(Structure):
_fields_ = [("a", POINTER(POINT))]
示例29
def crypt_unprotect_data(
cipher_text=b'', entropy=b'', reserved=None, prompt_struct=None, is_key=False
):
# we know that we're running under windows at this point so it's safe to try these imports
import ctypes
import ctypes.wintypes
class DataBlob(ctypes.Structure):
_fields_ = [
('cbData', ctypes.wintypes.DWORD),
('pbData', ctypes.POINTER(ctypes.c_char))
]
blob_in, blob_entropy, blob_out = map(
lambda x: DataBlob(len(x), ctypes.create_string_buffer(x)),
[cipher_text, entropy, b'']
)
desc = ctypes.c_wchar_p()
CRYPTPROTECT_UI_FORBIDDEN = 0x01
if not ctypes.windll.crypt32.CryptUnprotectData(
ctypes.byref(blob_in), ctypes.byref(
desc), ctypes.byref(blob_entropy),
reserved, prompt_struct, CRYPTPROTECT_UI_FORBIDDEN, ctypes.byref(
blob_out)
):
raise RuntimeError('Failed to decrypt the cipher text with DPAPI')
description = desc.value
buffer_out = ctypes.create_string_buffer(int(blob_out.cbData))
ctypes.memmove(buffer_out, blob_out.pbData, blob_out.cbData)
map(ctypes.windll.kernel32.LocalFree, [desc, blob_out.pbData])
if is_key:
return description, buffer_out.raw
else:
return description, buffer_out.value
示例30
def monotonic_time():
"""
Get monotonic time
"""
def monotonic_time_os():
"""
Get monotonic time using ctypes
"""
class struct_timespec(ctypes.Structure):
_fields_ = [('tv_sec', ctypes.c_long), ('tv_nsec', ctypes.c_long)]
lib = ctypes.CDLL("librt.so.1", use_errno=True)
clock_gettime = lib.clock_gettime
clock_gettime.argtypes = [
ctypes.c_int, ctypes.POINTER(struct_timespec)]
timespec = struct_timespec()
# CLOCK_MONOTONIC_RAW == 4
if not clock_gettime(4, ctypes.pointer(timespec)) == 0:
errno = ctypes.get_errno()
raise OSError(errno, os.strerror(errno))
return timespec.tv_sec + timespec.tv_nsec * 10 ** -9
monotonic_attribute = getattr(time, "monotonic", None)
if callable(monotonic_attribute):
# Introduced in Python 3.3
return time.monotonic()
else:
return monotonic_time_os()