Python源码示例:cffi.FFI
示例1
def cast_int_addr(n):
"""Cast an address to a Python int
This could be a Python integer or a CFFI pointer
"""
if isinstance(n, (int, long)):
return n
try:
import cffi
except ImportError:
pass
else:
# from pyzmq, this is an FFI void *
ffi = cffi.FFI()
if isinstance(n, ffi.CData):
return int(ffi.cast("size_t", n))
raise ValueError("Cannot cast %r to int" % n)
示例2
def __init__(self, on_device='cpu', blank_label=0):
libpath = get_ctc_lib()
self.ffi = FFI()
self.ffi.cdef(ctc_header())
self.ctclib = self.ffi.dlopen(libpath)
supported_devices = ['cpu', 'gpu']
if on_device not in supported_devices:
print("the requested device {} is not supported".format(
on_device), file=sys.stderr)
sys.exit(1)
assign_device = 0 if on_device is 'cpu' else 1
self.options = self.ffi.new('ctcOptions*',
{"loc": assign_device,
"blank_label": blank_label})[0]
self.size_in_bytes = self.ffi.new("size_t*")
self.nout = None
self.bsz = None
示例3
def test_callback_onerror(self):
ffi = FFI(backend=self.Backend())
seen = []
def oops(*args):
seen.append(args)
def otherfunc():
raise LookupError
def cb(n):
otherfunc()
a = ffi.callback("int(*)(int)", cb, error=42, onerror=oops)
res = a(234)
assert res == 42
assert len(seen) == 1
exc, val, tb = seen[0]
assert exc is LookupError
assert isinstance(val, LookupError)
assert tb.tb_frame.f_code.co_name == 'cb'
assert tb.tb_frame.f_locals['n'] == 234
示例4
def test_ffi_new_allocator_4(self):
ffi = FFI(backend=self.Backend())
py.test.raises(TypeError, ffi.new_allocator, free=lambda x: None)
#
def myalloc2(size):
raise LookupError
alloc2 = ffi.new_allocator(myalloc2)
py.test.raises(LookupError, alloc2, "int[5]")
#
def myalloc3(size):
return 42
alloc3 = ffi.new_allocator(myalloc3)
e = py.test.raises(TypeError, alloc3, "int[5]")
assert str(e.value) == "alloc() must return a cdata object (got int)"
#
def myalloc4(size):
return ffi.cast("int", 42)
alloc4 = ffi.new_allocator(myalloc4)
e = py.test.raises(TypeError, alloc4, "int[5]")
assert str(e.value) == "alloc() must return a cdata pointer, not 'int'"
#
def myalloc5(size):
return ffi.NULL
alloc5 = ffi.new_allocator(myalloc5)
py.test.raises(MemoryError, alloc5, "int[5]")
示例5
def test_callback_returning_void(self):
ffi = FFI(backend=self.Backend())
for returnvalue in [None, 42]:
def cb():
return returnvalue
fptr = ffi.callback("void(*)(void)", cb)
old_stderr = sys.stderr
try:
sys.stderr = StringIO()
returned = fptr()
printed = sys.stderr.getvalue()
finally:
sys.stderr = old_stderr
assert returned is None
if returnvalue is None:
assert printed == ''
else:
assert "None" in printed
示例6
def test_fputs_custom_FILE(self):
if self.Backend is CTypesBackend:
py.test.skip("FILE not supported with the ctypes backend")
filename = str(udir.join('fputs_custom_FILE'))
ffi = FFI(backend=self.Backend())
ffi.cdef("int fputs(const char *, FILE *);")
C = ffi.dlopen(None)
with open(filename, 'wb') as f:
f.write(b'[')
C.fputs(b"hello from custom file", f)
f.write(b'][')
C.fputs(b"some more output", f)
f.write(b']')
with open(filename, 'rb') as f:
res = f.read()
assert res == b'[hello from custom file][some more output]'
示例7
def test_free_callback_cycle(self):
if self.Backend is CTypesBackend:
py.test.skip("seems to fail with the ctypes backend on windows")
import weakref
def make_callback(data):
container = [data]
callback = ffi.callback('int()', lambda: len(container))
container.append(callback)
# Ref cycle: callback -> lambda (closure) -> container -> callback
return callback
class Data(object):
pass
ffi = FFI(backend=self.Backend())
data = Data()
callback = make_callback(data)
wr = weakref.ref(data)
del callback, data
for i in range(3):
if wr() is not None:
import gc; gc.collect()
assert wr() is None # 'data' does not leak
示例8
def test_setting_errno(self):
if self.module is None:
py.test.skip("fix the auto-generation of the tiny test lib")
if sys.platform == 'win32':
py.test.skip("fails, errno at multiple addresses")
if self.Backend is CTypesBackend and '__pypy__' in sys.modules:
py.test.skip("XXX errno issue with ctypes on pypy?")
ffi = FFI(backend=self.Backend())
ffi.cdef("""
int test_setting_errno(void);
""")
ownlib = ffi.dlopen(self.module)
ffi.errno = 42
res = ownlib.test_setting_errno()
assert res == 42
assert ffi.errno == 42
示例9
def test_my_array_7(self):
if self.module is None:
py.test.skip("fix the auto-generation of the tiny test lib")
ffi = FFI(backend=self.Backend())
ffi.cdef("""
int my_array[7];
""")
ownlib = ffi.dlopen(self.module)
for i in range(7):
assert ownlib.my_array[i] == i
assert len(ownlib.my_array) == 7
if self.Backend is CTypesBackend:
py.test.skip("not supported by the ctypes backend")
ownlib.my_array = list(range(10, 17))
for i in range(7):
assert ownlib.my_array[i] == 10 + i
ownlib.my_array = list(range(7))
for i in range(7):
assert ownlib.my_array[i] == i
示例10
def test_my_array_no_length(self):
if self.module is None:
py.test.skip("fix the auto-generation of the tiny test lib")
if self.Backend is CTypesBackend:
py.test.skip("not supported by the ctypes backend")
ffi = FFI(backend=self.Backend())
ffi.cdef("""
int my_array[];
""")
ownlib = ffi.dlopen(self.module)
for i in range(7):
assert ownlib.my_array[i] == i
py.test.raises(TypeError, len, ownlib.my_array)
ownlib.my_array = list(range(10, 17))
for i in range(7):
assert ownlib.my_array[i] == 10 + i
ownlib.my_array = list(range(7))
for i in range(7):
assert ownlib.my_array[i] == i
示例11
def test_keepalive_lib(self):
if self.module is None:
py.test.skip("fix the auto-generation of the tiny test lib")
ffi = FFI(backend=self.Backend())
ffi.cdef("""
int test_getting_errno(void);
""")
ownlib = ffi.dlopen(self.module)
ffi_r = weakref.ref(ffi)
ownlib_r = weakref.ref(ownlib)
func = ownlib.test_getting_errno
del ffi
import gc; gc.collect() # ownlib stays alive
assert ownlib_r() is not None
assert ffi_r() is not None # kept alive by ownlib
res = func()
assert res == -1
示例12
def test_remove_comments():
ffi = FFI(backend=FakeBackend())
ffi.cdef("""
double /*comment here*/ sin // blah blah
/* multi-
line-
//comment */ (
// foo
double // bar /* <- ignored, because it's in a comment itself
x, double/*several*//*comment*/y) /*on the same line*/
;
""")
m = ffi.dlopen(lib_m)
func = m.sin
assert func.name == 'sin'
assert func.BType == '<func (<double>, <double>), <double>, False>'
示例13
def test_remove_line_continuation_comments():
ffi = FFI(backend=FakeBackend())
ffi.cdef("""
double // blah \\
more comments
x(void);
double // blah\\\\
y(void);
double // blah\\ \
etc
z(void);
""")
m = ffi.dlopen(lib_m)
m.x
m.y
m.z
示例14
def test__is_constant_globalvar():
for input, expected_output in [
("int a;", False),
("const int a;", True),
("int *a;", False),
("const int *a;", False),
("int const *a;", False),
("int *const a;", True),
("int a[5];", False),
("const int a[5];", False),
("int *a[5];", False),
("const int *a[5];", False),
("int const *a[5];", False),
("int *const a[5];", False),
("int a[5][6];", False),
("const int a[5][6];", False),
]:
ffi = FFI()
ffi.cdef(input)
declarations = ffi._parser._declarations
assert ('constant a' in declarations) == expected_output
assert ('variable a' in declarations) == (not expected_output)
示例15
def _import_cffi():
global ffi, CData
if ffi is not None:
return
try:
import cffi
ffi = cffi.FFI()
CData = ffi.CData
except ImportError:
ffi = False
示例16
def _getffi():
string = pkgutil.get_data(__name__, 'libmyo.h').decode('utf8')
string = string.replace('\r\n', '\n')
# Remove stuff that cffi can not parse.
string = re.sub('^\s*#.*$', '', string, flags=re.M)
string = string.replace('LIBMYO_EXPORT', '')
string = string.replace('extern "C" {', '')
string = string.replace('} // extern "C"', '')
ffi = cffi.FFI()
ffi.cdef(string)
return ffi
示例17
def zmq_version_info():
ffi_check = FFI()
ffi_check.cdef('void zmq_version(int *major, int *minor, int *patch);')
C_check_version = ffi_check.verify('#include <zmq.h>',
libraries=['c', 'zmq'])
major = ffi.new('int*')
minor = ffi.new('int*')
patch = ffi.new('int*')
C_check_version.zmq_version(major, minor, patch)
return (int(major[0]), int(minor[0]), int(patch[0]))
示例18
def __init__(self, stride, padding, dilation, bias=True):
super(DepthconvFunction, self).__init__()
self.stride = stride
self.padding = padding
self.dilation = dilation
ffi_=cffi.FFI()
self.null = ffi_.NULL
self.bias = bias
示例19
def check_lib_python_found(tmpdir):
global _link_error
if _link_error == '?':
ffi = cffi.FFI()
kwds = {}
ffi._apply_embedding_fix(kwds)
ffi.set_source("_test_lib_python_found", "", **kwds)
try:
ffi.compile(tmpdir=tmpdir, verbose=True)
except cffi.VerificationError as e:
_link_error = e
else:
_link_error = None
if _link_error:
py.test.skip(str(_link_error))
示例20
def test_not_supported_bitfield_in_result(self):
ffi = FFI(backend=self.Backend())
ffi.cdef("struct foo_s { int a,b,c,d,e; int x:1; };")
e = py.test.raises(NotImplementedError, ffi.callback,
"struct foo_s foo(void)", lambda: 42)
assert str(e.value) == ("struct foo_s(*)(): "
"callback with unsupported argument or return type or with '...'")
示例21
def test_inspecttype(self):
ffi = FFI(backend=self.Backend())
assert ffi.typeof("long").kind == "primitive"
assert ffi.typeof("long(*)(long, long**, ...)").cname == (
"long(*)(long, long * *, ...)")
assert ffi.typeof("long(*)(long, long**, ...)").ellipsis is True
示例22
def test_new_handle(self):
ffi = FFI(backend=self.Backend())
o = [2, 3, 4]
p = ffi.new_handle(o)
assert ffi.typeof(p) == ffi.typeof("void *")
assert ffi.from_handle(p) is o
assert ffi.from_handle(ffi.cast("char *", p)) is o
py.test.raises(RuntimeError, ffi.from_handle, ffi.NULL)
示例23
def test_ffi_new_allocator_3(self):
ffi = FFI(backend=self.Backend())
seen = []
def myalloc(size):
seen.append(size)
return ffi.new("char[]", b"X" * size)
alloc1 = ffi.new_allocator(myalloc) # no 'free'
p1 = alloc1("int[10]")
assert seen == [40]
assert ffi.typeof(p1) == ffi.typeof("int[10]")
assert ffi.sizeof(p1) == 40
assert p1[5] == 0
示例24
def test_bitfield_anonymous_no_align(self):
L = FFI().alignof("long long")
self.check("char y; int :1;", 0, 1, 2)
self.check("char x; int z:1; char y;", 2, 4, 4)
self.check("char x; int :1; char y;", 2, 1, 3)
self.check("char x; long long z:48; char y;", 7, L, 8)
self.check("char x; long long :48; char y;", 7, 1, 8)
self.check("char x; long long z:56; char y;", 8, L, 8 + L)
self.check("char x; long long :56; char y;", 8, 1, 9)
self.check("char x; long long z:57; char y;", L + 8, L, L + 8 + L)
self.check("char x; long long :57; char y;", L + 8, 1, L + 9)
示例25
def test_bitfield_anonymous_align_arm(self):
L = FFI().alignof("long long")
self.check("char y; int :1;", 0, 4, 4)
self.check("char x; int z:1; char y;", 2, 4, 4)
self.check("char x; int :1; char y;", 2, 4, 4)
self.check("char x; long long z:48; char y;", 7, L, 8)
self.check("char x; long long :48; char y;", 7, 8, 8)
self.check("char x; long long z:56; char y;", 8, L, 8 + L)
self.check("char x; long long :56; char y;", 8, L, 8 + L)
self.check("char x; long long z:57; char y;", L + 8, L, L + 8 + L)
self.check("char x; long long :57; char y;", L + 8, L, L + 8 + L)
示例26
def test_bitfield_zero_arm(self):
L = FFI().alignof("long long")
self.check("char y; int :0;", 0, 4, 4)
self.check("char x; int :0; char y;", 4, 4, 8)
self.check("char x; int :0; int :0; char y;", 4, 4, 8)
self.check("char x; long long :0; char y;", L, 8, L + 8)
self.check("short x, y; int :0; int :0;", 2, 4, 4)
self.check("char x; int :0; short b:1; char y;", 5, 4, 8)
self.check("int a:1; int :0; int b:1; char y;", 5, 4, 8)
示例27
def test_error_cases(self):
ffi = FFI()
py.test.raises(TypeError,
'ffi.cdef("struct s1 { float x:1; };"); ffi.new("struct s1 *")')
py.test.raises(TypeError,
'ffi.cdef("struct s2 { char x:0; };"); ffi.new("struct s2 *")')
py.test.raises(TypeError,
'ffi.cdef("struct s3 { char x:9; };"); ffi.new("struct s3 *")')
示例28
def test_struct_with_typedef(self):
ffi = FFI()
ffi.cdef("typedef struct { float x; } foo_t;")
p = ffi.new("foo_t *", [5.2])
assert repr(p).startswith("<cdata 'foo_t *' ")
示例29
def test_struct_array_no_length(self):
ffi = FFI()
ffi.cdef("struct foo_s { int x; int a[]; };")
p = ffi.new("struct foo_s *", [100, [200, 300, 400]])
assert p.x == 100
assert ffi.typeof(p.a) is ffi.typeof("int *") # no length available
assert p.a[0] == 200
assert p.a[1] == 300
assert p.a[2] == 400
示例30
def test_getwinerror(self):
ffi = FFI()
code, message = ffi.getwinerror(1155)
assert code == 1155
assert message == ("No application is associated with the "
"specified file for this operation")
ffi.cdef("void SetLastError(int);")
lib = ffi.dlopen("Kernel32.dll")
lib.SetLastError(2)
code, message = ffi.getwinerror()
assert code == 2
assert message == "The system cannot find the file specified"
code, message = ffi.getwinerror(-1)
assert code == 2
assert message == "The system cannot find the file specified"