Python源码示例:cStringIO.StringIO()
示例1
def dumps(obj, protocol=None):
"""Serialize obj as a string of bytes allocated in memory
protocol defaults to cloudpickle.DEFAULT_PROTOCOL which is an alias to
pickle.HIGHEST_PROTOCOL. This setting favors maximum communication speed
between processes running the same Python version.
Set protocol=pickle.DEFAULT_PROTOCOL instead if you need to ensure
compatibility with older versions of Python.
"""
file = StringIO()
try:
cp = CloudPickler(file, protocol=protocol)
cp.dump(obj)
return file.getvalue()
finally:
file.close()
# including pickles unloading functions in this namespace
示例2
def open_local_file(self, req):
host = req.get_host()
file = req.get_selector()
localfile = url2pathname(file)
stats = os.stat(localfile)
size = stats[stat.ST_SIZE]
modified = rfc822.formatdate(stats[stat.ST_MTIME])
mtype = mimetypes.guess_type(file)[0]
stats = os.stat(localfile)
headers = mimetools.Message(StringIO(
'Content-Type: %s\nContent-Length: %d\nLast-modified: %s\n' %
(mtype or 'text/plain', size, modified)))
if host:
host, port = splitport(host)
if not host or \
(not port and socket.gethostbyname(host) in self.get_names()):
return addinfourl(open(localfile, 'rb'),
headers, 'file:'+file)
raise URLError('file not on local host')
示例3
def convert_flow(self, flow):
"""Convert the flow into XHTML and write it into the internal file."""
# first write the normal HTML into a buffer
orig_file = self._file
html_file = StringIO.StringIO()
self._file = NewlineWriteFile(html_file)
super(HiNetXHTMLVisitor, self).convert_flow(flow)
self._file = orig_file
# now convert it to XHTML
html_code = html_file.getvalue()
html_code = html_code.replace('<br>', '<br />')
html_code = html_code.replace(' ', ' ')
self._file.write(html_code)
## Helper functions ##
示例4
def emit(events, stream=None, Dumper=Dumper,
canonical=None, indent=None, width=None,
allow_unicode=None, line_break=None):
"""
Emit YAML parsing events into a stream.
If stream is None, return the produced string instead.
"""
getvalue = None
if stream is None:
from StringIO import StringIO
stream = StringIO()
getvalue = stream.getvalue
dumper = Dumper(stream, canonical=canonical, indent=indent, width=width,
allow_unicode=allow_unicode, line_break=line_break)
try:
for event in events:
dumper.emit(event)
finally:
dumper.dispose()
if getvalue:
return getvalue()
示例5
def emit(events, stream=None, Dumper=Dumper,
canonical=None, indent=None, width=None,
allow_unicode=None, line_break=None):
"""
Emit YAML parsing events into a stream.
If stream is None, return the produced string instead.
"""
getvalue = None
if stream is None:
from StringIO import StringIO
stream = StringIO()
getvalue = stream.getvalue
dumper = Dumper(stream, canonical=canonical, indent=indent, width=width,
allow_unicode=allow_unicode, line_break=line_break)
try:
for event in events:
dumper.emit(event)
finally:
dumper.dispose()
if getvalue:
return getvalue()
示例6
def _get_suite(buf, default, indent=' '):
"""This is a helper function to extract a python code suite from a StringIO
object.
This will dedent the buffer's content, split it into lines, and then re-indent
it by the amount indicated.
Args:
* buf (StringIO) - The buffer to take the content from.
* default (str) - The content to use if `buf` is empty.
* indent (str) - The string to prepend to all de-indented lines.
Returns the transformed string.
"""
value = textwrap.dedent(buf.getvalue() or default).strip('\n')
return '\n'.join(indent + l for l in value.splitlines())
示例7
def capture_sys_output(use_fake_tty=False):
"""
Wrap a block with this, and it'll capture standard out and standard error
into handy variables:
with capture_sys_output() as (stdout, stderr):
self.cmd.run()
More info: https://stackoverflow.com/questions/18651705/
"""
capture_out, capture_err = StringIO(), StringIO()
current_out, current_err = sys.stdout, sys.stderr
current_in = sys.stdin
try:
if use_fake_tty:
sys.stdin = FakeTTY(current_in)
sys.stdout, sys.stderr = capture_out, capture_err
yield capture_out, capture_err
finally:
sys.stdout, sys.stderr = current_out, current_err
sys.stdin = current_in
示例8
def _run_python_script(self, script):
"""
execute python one-liner script and return its output
:param script: string containing python script to be executed
:return: output of the python script execution, or None, if script has failed
"""
output = StringIO()
command = '"%s" -c "%s"' % (self._python_executable, script)
self.output.info('running %s' % command)
try:
self.run(command=command, output=output)
except ConanException:
self.output.info("(failed)")
return None
output = output.getvalue().strip()
self.output.info(output)
return output if output != "None" else None
示例9
def make_input_stream(input, charset):
# Is already an input stream.
if hasattr(input, 'read'):
if PY2:
return input
rv = _find_binary_reader(input)
if rv is not None:
return rv
raise TypeError('Could not find binary reader for input stream.')
if input is None:
input = b''
elif not isinstance(input, bytes):
input = input.encode(charset)
if PY2:
return StringIO(input)
return io.BytesIO(input)
示例10
def setUp(self):
self.verbose = False
# we will then turn it back on for unittest output
saved_stdout = sys.stdout
if not self.verbose:
sys.stdout = cStringIO.StringIO()
os.chdir('../src')
os.system("./waveconverter.py -p 4 -b ../input_files/weather_s100k.bb > ../test/tmp/tmp_output.txt")
os.chdir('../test')
# now read back the info contained in the temp file
self.outputString = open("./tmp/tmp_output.txt", 'r').read()
# turn stdout back on
if not self.verbose:
sys.stdout = saved_stdout
pass
# grep through the output to confirm that the expected transmissions are there
示例11
def test_store_and_build_forward_message_bad_mimes(self):
"""Test upload with no headers provided."""
for unused_mime in range(len(BAD_MIMES)):
# Should not require actual time value upon failure.
self.now()
self.mox.ReplayAll()
for mime_type in BAD_MIMES:
form = FakeForm({'field1': FakeForm(name='field1',
file=StringIO.StringIO('file1'),
type=mime_type,
type_options={},
filename='file',
headers={}),
})
self.assertRaisesRegexp(
webob.exc.HTTPClientError,
'Incorrectly formatted MIME type: %s' % mime_type,
self.handler.store_and_build_forward_message,
form,
'================1234==')
self.mox.VerifyAll()
示例12
def test_filename_too_large(self):
"""Test that exception is raised if the filename is too large."""
self.now().AndReturn(datetime.datetime(2008, 11, 12, 10, 40))
self.mox.ReplayAll()
filename = 'a' * blob_upload._MAX_STRING_NAME_LENGTH + '.txt'
form = FakeForm({
'field1': FakeForm(name='field1',
file=StringIO.StringIO('a'),
type='image/png',
type_options={'a': 'b', 'x': 'y'},
filename=filename,
headers={}),
})
self.assertRaisesRegexp(
webob.exc.HTTPClientError,
'The filename exceeds the maximum allowed length of 500.',
self.handler.store_and_build_forward_message,
form, '================1234==')
self.mox.VerifyAll()
示例13
def test_content_type_too_large(self):
"""Test that exception is raised if the content-type is too large."""
self.now().AndReturn(datetime.datetime(2008, 11, 12, 10, 40))
self.mox.ReplayAll()
content_type = 'text/' + 'a' * blob_upload._MAX_STRING_NAME_LENGTH
form = FakeForm({
'field1': FakeForm(name='field1',
file=StringIO.StringIO('a'),
type=content_type,
type_options={'a': 'b', 'x': 'y'},
filename='foobar.txt',
headers={}),
})
self.assertRaisesRegexp(
webob.exc.HTTPClientError,
'The Content-Type exceeds the maximum allowed length of 500.',
self.handler.store_and_build_forward_message,
form, '================1234==')
self.mox.VerifyAll()
示例14
def create_blob(self):
"""Create a blob in the datastore and on disk.
Returns:
BlobKey of new blob.
"""
contents = 'a blob'
blob_key = blobstore.BlobKey('blob-key-1')
self.blob_storage.StoreBlob(blob_key, cStringIO.StringIO(contents))
entity = datastore.Entity(blobstore.BLOB_INFO_KIND,
name=str(blob_key),
namespace='')
entity['content_type'] = 'image/png'
entity['creation'] = datetime.datetime(1999, 10, 10, 8, 42, 0)
entity['filename'] = 'largeblob.png'
entity['size'] = len(contents)
datastore.Put(entity)
return blob_key
示例15
def create_blob(self):
"""Create a GS object in the datastore and on disk.
Overrides the superclass create_blob method.
Returns:
The BlobKey of the new object."
"""
data = 'a blob'
filename = '/gs/some_bucket/some_object'
blob_store_key = base64.urlsafe_b64encode(filename)
self.blob_storage.StoreBlob(blob_store_key, cStringIO.StringIO(data))
blob_key = blobstore.create_gs_key(filename)
entity = datastore.Entity(file_service_stub.GS_INFO_KIND,
name=blob_key,
namespace='')
entity['content_type'] = 'image/png'
entity['filename'] = 'largeblob.png'
entity['size'] = len(data)
entity['storage_key'] = blob_store_key
datastore.Put(entity)
return blob_key
示例16
def EndRedirect(self, dispatched_output, original_output):
"""Handle the end of upload complete notification.
Makes sure the application upload handler returned an appropriate status
code.
"""
response = dev_appserver.RewriteResponse(dispatched_output)
logging.info('Upload handler returned %d', response.status_code)
outfile = cStringIO.StringIO()
outfile.write('Status: %s\n' % response.status_code)
if response.body and len(response.body.read()) > 0:
response.body.seek(0)
outfile.write(response.body.read())
else:
outfile.write(''.join(response.headers.headers))
outfile.seek(0)
dev_appserver.URLDispatcher.EndRedirect(self,
outfile,
original_output)
示例17
def EndRedirect(self, dispatched_output, original_output):
"""Process the end of an internal redirect.
This method is called after all subsequent dispatch requests have finished.
By default the output from the dispatched process is copied to the original.
This will not be called on dispatchers that do not return an internal
redirect.
Args:
dispatched_output: StringIO buffer containing the results from the
dispatched
original_output: The original output file.
Returns:
None if request handling is complete.
A new AppServerRequest instance if internal redirect is required.
"""
original_output.write(dispatched_output.read())
示例18
def __init__(self, response_file=None, **kwds):
"""Initializer.
Args:
response_file: A file-like object that contains the full response
generated by the user application request handler. If present
the headers and body are set from this value, although the values
may be further overridden by the keyword parameters.
kwds: All keywords are mapped to attributes of AppServerResponse.
"""
self.status_code = 200
self.status_message = 'Good to go'
self.large_response = False
if response_file:
self.SetResponse(response_file)
else:
self.headers = mimetools.Message(cStringIO.StringIO())
self.body = None
for name, value in kwds.iteritems():
setattr(self, name, value)
示例19
def decode_imgstr(imgstr):
img_data = scipy.misc.imread(StringIO(imgstr))
return img_data
示例20
def compress_img(img, method):
"""Compress an image (numpy array) using the provided image compression
method."""
img_compressed_buffer = StringIO.StringIO()
im = misc.toimage(img)
im.save(img_compressed_buffer, format=method)
img_compressed = img_compressed_buffer.getvalue()
img_compressed_buffer.close()
return img_compressed
示例21
def decompress_img(img_compressed):
"""Decompress a compressed image to a numpy array."""
img_compressed_buffer = StringIO.StringIO()
img_compressed_buffer.write(img_compressed)
img = ndimage.imread(img_compressed_buffer, mode="RGB")
img_compressed_buffer.close()
return img
示例22
def __init__(self, response):
# response doesn't support tell() and read(), required by
# GzipFile
if not gzip:
raise SpeedtestHTTPError('HTTP response body is gzip encoded, '
'but gzip support is not available')
IO = BytesIO or StringIO
self.io = IO()
while 1:
chunk = response.read(1024)
if len(chunk) == 0:
break
self.io.write(chunk)
self.io.seek(0)
gzip.GzipFile.__init__(self, mode='rb', fileobj=self.io)
示例23
def _create_data(self):
chars = '0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ'
multiplier = int(round(int(self.length) / 36.0))
IO = BytesIO or StringIO
self._data = IO(
('content1=%s' %
(chars * multiplier)[0:int(self.length) - 9]
).encode()
)
示例24
def csv(self, delimiter=','):
"""Return data in CSV format"""
data = self.dict()
out = StringIO()
writer = csv.writer(out, delimiter=delimiter, lineterminator='')
writer.writerow([data['server']['id'], data['server']['sponsor'],
data['server']['name'], data['timestamp'],
data['server']['d'], data['ping'], data['download'],
data['upload']])
return out.getvalue()
示例25
def gzip_memfile(fname):
with tf.gfile.Open(readahead(fname)) as f:
memfile = StringIO.StringIO(f.read())
return gzip.GzipFile(fileobj=memfile)
示例26
def save_file(self, obj):
"""Save a file"""
try:
import StringIO as pystringIO # we can't use cStringIO as it lacks the name attribute
except ImportError:
import io as pystringIO
if not hasattr(obj, 'name') or not hasattr(obj, 'mode'):
raise pickle.PicklingError("Cannot pickle files that do not map to an actual file")
if obj is sys.stdout:
return self.save_reduce(getattr, (sys, 'stdout'), obj=obj)
if obj is sys.stderr:
return self.save_reduce(getattr, (sys, 'stderr'), obj=obj)
if obj is sys.stdin:
raise pickle.PicklingError("Cannot pickle standard input")
if obj.closed:
raise pickle.PicklingError("Cannot pickle closed files")
if hasattr(obj, 'isatty') and obj.isatty():
raise pickle.PicklingError("Cannot pickle files that map to tty objects")
if 'r' not in obj.mode and '+' not in obj.mode:
raise pickle.PicklingError("Cannot pickle files that are not opened for reading: %s" % obj.mode)
name = obj.name
retval = pystringIO.StringIO()
try:
# Read the whole file
curloc = obj.tell()
obj.seek(0)
contents = obj.read()
obj.seek(curloc)
except IOError:
raise pickle.PicklingError("Cannot pickle file %s as it cannot be read" % name)
retval.write(contents)
retval.seek(curloc)
retval.name = name
self.save(retval)
self.memoize(obj)
示例27
def ftp_open(self, req):
host = req.get_host()
if not host:
raise IOError('ftp error', 'no host given')
# XXX handle custom username & password
try:
host = socket.gethostbyname(host)
except socket.error(msg):
raise URLError(msg)
host, port = splitport(host)
if port is None:
port = ftplib.FTP_PORT
path, attrs = splitattr(req.get_selector())
path = unquote(path)
dirs = path.split('/')
dirs, file = dirs[:-1], dirs[-1]
if dirs and not dirs[0]:
dirs = dirs[1:]
user = passwd = '' # XXX
try:
fw = self.connect_ftp(user, passwd, host, port, dirs)
type = file and 'I' or 'D'
for attr in attrs:
attr, value = splitattr(attr)
if attr.lower() == 'type' and \
value in ('a', 'A', 'i', 'I', 'd', 'D'):
type = value.upper()
fp, retrlen = fw.retrfile(file, type)
headers = ""
mtype = mimetypes.guess_type(req.get_full_url())[0]
if mtype:
headers += "Content-Type: %s\n" % mtype
if retrlen is not None and retrlen >= 0:
headers += "Content-Length: %d\n" % retrlen
sf = StringIO(headers)
headers = mimetools.Message(sf)
return addinfourl(fp, headers, req.get_full_url())
except ftplib.all_errors(msg):
raise IOError(('ftp error', msg), sys.exc_info()[2])
示例28
def __init__(self, name, value):
"""Constructor from field name and value."""
self.name = name
self.value = value
# self.file = StringIO(value)
示例29
def read_lines(self):
"""Internal: read lines until EOF or outerboundary."""
self.file = self.__file = StringIO()
if self.outerboundary:
self.read_lines_to_outerboundary()
else:
self.read_lines_to_eof()
示例30
def __init__(self, name, value):
"""Constructor from field name and value."""
self.name = name
self.value = value
# self.file = StringIO(value)