Python源码示例:bson.codec.CodecOptions()
示例1
def __init__(self, codec_options, read_preference, write_concern,
read_concern):
if not isinstance(codec_options, CodecOptions):
raise TypeError("codec_options must be an instance of "
"bson.codec_options.CodecOptions")
self.__codec_options = codec_options
if not isinstance(read_preference, _ServerMode):
raise TypeError("%r is not valid for read_preference. See "
"pymongo.read_preferences for valid "
"options." % (read_preference,))
self.__read_preference = read_preference
if not isinstance(write_concern, WriteConcern):
raise TypeError("write_concern must be an instance of "
"pymongo.write_concern.WriteConcern")
self.__write_concern = write_concern
if not isinstance(read_concern, ReadConcern):
raise TypeError("read_concern must be an instance of "
"pymongo.read_concern.ReadConcern")
self.__read_concern = read_concern
示例2
def run(self):
try:
self.oplogs['backup'] = Oplog(self.mongodump_oplog['file'], self.do_gzip(), 'a+', self.flush_docs, self.flush_secs)
self.oplogs['tailed'] = Oplog(self.tailed_oplog['file'], self.do_gzip())
logging.info("Resolving oplog for %s to max ts: %s" % (self.uri, self.max_end_ts))
self.state.set('running', True)
self.state.set('first_ts', self.mongodump_oplog['first_ts'])
if not self.state.get('first_ts'):
self.state.set('first_ts', self.tailed_oplog['first_ts'])
for change in decode_file_iter(self.oplogs['tailed'], CodecOptions(unicode_decode_error_handler="ignore")):
self.last_ts = change['ts']
if not self.mongodump_oplog['last_ts'] or self.last_ts > self.mongodump_oplog['last_ts']:
if self.last_ts < self.max_end_ts:
self.oplogs['backup'].add(change)
self.changes += 1
elif self.last_ts > self.max_end_ts:
break
self.state.set('count', self.mongodump_oplog['count'] + self.changes)
self.state.set('last_ts', self.last_ts)
self.state.set('running', False)
self.exit_code = 0
except Exception, e:
raise Error("Resolving of oplogs failed! Error: %s" % e)
示例3
def __init__(self, codec_options, read_preference, write_concern,
read_concern):
if not isinstance(codec_options, CodecOptions):
raise TypeError("codec_options must be an instance of "
"bson.codec_options.CodecOptions")
self.__codec_options = codec_options
if not isinstance(read_preference, _ServerMode):
raise TypeError("%r is not valid for read_preference. See "
"pymongo.read_preferences for valid "
"options." % (read_preference,))
self.__read_preference = read_preference
if not isinstance(write_concern, WriteConcern):
raise TypeError("write_concern must be an instance of "
"pymongo.write_concern.WriteConcern")
self.__write_concern = write_concern
if not isinstance(read_concern, ReadConcern):
raise TypeError("read_concern must be an instance of "
"pymongo.read_concern.ReadConcern")
self.__read_concern = read_concern
示例4
def __init__(self, codec_options, read_preference, write_concern,
read_concern):
if not isinstance(codec_options, CodecOptions):
raise TypeError("codec_options must be an instance of "
"bson.codec_options.CodecOptions")
self.__codec_options = codec_options
if not isinstance(read_preference, _ServerMode):
raise TypeError("%r is not valid for read_preference. See "
"pymongo.read_preferences for valid "
"options." % (read_preference,))
self.__read_preference = read_preference
if not isinstance(write_concern, WriteConcern):
raise TypeError("write_concern must be an instance of "
"pymongo.write_concern.WriteConcern")
self.__write_concern = write_concern
if not isinstance(read_concern, ReadConcern):
raise TypeError("read_concern must be an instance of "
"pymongo.read_concern.ReadConcern")
self.__read_concern = read_concern
示例5
def __init__(self, codec_options, read_preference, write_concern,
read_concern):
if not isinstance(codec_options, CodecOptions):
raise TypeError("codec_options must be an instance of "
"bson.codec_options.CodecOptions")
self.__codec_options = codec_options
if not isinstance(read_preference, _ServerMode):
raise TypeError("%r is not valid for read_preference. See "
"pymongo.read_preferences for valid "
"options." % (read_preference,))
self.__read_preference = read_preference
if not isinstance(write_concern, WriteConcern):
raise TypeError("write_concern must be an instance of "
"pymongo.write_concern.WriteConcern")
self.__write_concern = write_concern
if not isinstance(read_concern, ReadConcern):
raise TypeError("read_concern must be an instance of "
"pymongo.read_concern.ReadConcern")
self.__read_concern = read_concern
示例6
def index():
'''an endpoint to return alert schedules'''
if request.body:
request.body.read()
request.body.close()
response.content_type = "application/json"
mongoclient = MongoClient(options.mongohost, options.mongoport)
schedulers_db = mongoclient.meteor['alertschedules'].with_options(codec_options=CodecOptions(tz_aware=True))
mongodb_alerts = schedulers_db.find()
alert_schedules_dict = {}
for mongodb_alert in mongodb_alerts:
if mongodb_alert['last_run_at']:
mongodb_alert['last_run_at'] = mongodb_alert['last_run_at'].isoformat()
if 'modifiedat' in mongodb_alert:
mongodb_alert['modifiedat'] = mongodb_alert['modifiedat'].isoformat()
alert_schedules_dict[mongodb_alert['name']] = mongodb_alert
response.body = json.dumps(alert_schedules_dict)
response.status = 200
return response
示例7
def sync_alert_schedules():
'''an endpoint to return alerts schedules'''
if not request.body:
response.status = 503
return response
alert_schedules = json.loads(request.body.read())
request.body.close()
response.content_type = "application/json"
mongoclient = MongoClient(options.mongohost, options.mongoport)
schedulers_db = mongoclient.meteor['alertschedules'].with_options(codec_options=CodecOptions(tz_aware=True))
results = schedulers_db.find()
for result in results:
if result['name'] in alert_schedules:
new_sched = alert_schedules[result['name']]
result['total_run_count'] = new_sched['total_run_count']
result['last_run_at'] = new_sched['last_run_at']
if result['last_run_at']:
result['last_run_at'] = toUTC(result['last_run_at'])
logger.debug("Inserting schedule for {0} into mongodb".format(result['name']))
schedulers_db.save(result)
response.status = 200
return response
示例8
def update_alert_schedules():
'''an endpoint to return alerts schedules'''
if not request.body:
response.status = 503
return response
alert_schedules = json.loads(request.body.read())
request.body.close()
response.content_type = "application/json"
mongoclient = MongoClient(options.mongohost, options.mongoport)
schedulers_db = mongoclient.meteor['alertschedules'].with_options(codec_options=CodecOptions(tz_aware=True))
schedulers_db.remove()
for alert_name, alert_schedule in alert_schedules.items():
if alert_schedule['last_run_at']:
alert_schedule['last_run_at'] = toUTC(alert_schedule['last_run_at'])
logger.debug("Inserting schedule for {0} into mongodb".format(alert_name))
schedulers_db.insert(alert_schedule)
response.status = 200
return response
示例9
def validate_unicode_decode_error_handler(dummy, value):
"""Validate the Unicode decode error handler option of CodecOptions.
"""
if value not in _UNICODE_DECODE_ERROR_HANDLERS:
raise ValueError("%s is an invalid Unicode decode error handler. "
"Must be one of "
"%s" % (value, tuple(_UNICODE_DECODE_ERROR_HANDLERS)))
return value
示例10
def codec_options(self):
"""Read only access to the :class:`~bson.codec_options.CodecOptions`
of this instance.
"""
return self.__codec_options
示例11
def with_options(
self, codec_options=None, read_preference=None,
write_concern=None, read_concern=None):
"""Get a clone of this collection changing the specified settings.
>>> coll1.read_preference
Primary()
>>> from pymongo import ReadPreference
>>> coll2 = coll1.with_options(read_preference=ReadPreference.SECONDARY)
>>> coll1.read_preference
Primary()
>>> coll2.read_preference
Secondary(tag_sets=None)
:Parameters:
- `codec_options` (optional): An instance of
:class:`~bson.codec_options.CodecOptions`. If ``None`` (the
default) the :attr:`codec_options` of this :class:`Collection`
is used.
- `read_preference` (optional): The read preference to use. If
``None`` (the default) the :attr:`read_preference` of this
:class:`Collection` is used. See :mod:`~pymongo.read_preferences`
for options.
- `write_concern` (optional): An instance of
:class:`~pymongo.write_concern.WriteConcern`. If ``None`` (the
default) the :attr:`write_concern` of this :class:`Collection`
is used.
- `read_concern` (optional): An instance of
:class:`~pymongo.read_concern.ReadConcern`. If ``None`` (the
default) the :attr:`read_concern` of this :class:`Collection`
is used.
"""
return Collection(self.__database,
self.__name,
False,
codec_options or self.codec_options,
read_preference or self.read_preference,
write_concern or self.write_concern,
read_concern or self.read_concern)
示例12
def get_oplog_rs(self):
if not self._conn:
self.connect()
db = self._conn['local']
return db.oplog.rs.with_options(codec_options=CodecOptions(unicode_decode_error_handler="ignore"))
示例13
def load(self):
try:
oplog = self.open()
logging.debug("Reading oplog file %s" % self.oplog_file)
for change in decode_file_iter(oplog, CodecOptions(unicode_decode_error_handler="ignore")):
if 'ts' in change:
self._last_ts = change['ts']
if self._first_ts is None and self._last_ts is not None:
self._first_ts = self._last_ts
self._count += 1
oplog.close()
except Exception, e:
logging.fatal("Error reading oplog file %s! Error: %s" % (self.oplog_file, e))
raise OperationError(e)
示例14
def codec_options(self):
"""Read only access to the :class:`~bson.codec_options.CodecOptions`
of this instance.
"""
return self.__codec_options
示例15
def _command(self, sock_info, command, slave_ok=False,
read_preference=None,
codec_options=None, check=True, allowable_errors=None,
read_concern=DEFAULT_READ_CONCERN):
"""Internal command helper.
:Parameters:
- `sock_info` - A SocketInfo instance.
- `command` - The command itself, as a SON instance.
- `slave_ok`: whether to set the SlaveOkay wire protocol bit.
- `codec_options` (optional) - An instance of
:class:`~bson.codec_options.CodecOptions`.
- `check`: raise OperationFailure if there are errors
- `allowable_errors`: errors to ignore if `check` is True
- `read_concern` (optional) - An instance of
:class:`~pymongo.read_concern.ReadConcern`.
:Returns:
# todo: don't return address
(result document, address of server the command was run on)
"""
return sock_info.command(self.__database.name,
command,
slave_ok,
read_preference or self.read_preference,
codec_options or self.codec_options,
check,
allowable_errors,
read_concern=read_concern)
示例16
def with_options(
self, codec_options=None, read_preference=None,
write_concern=None, read_concern=None):
"""Get a clone of this collection changing the specified settings.
>>> coll1.read_preference
Primary()
>>> from pymongo import ReadPreference
>>> coll2 = coll1.with_options(read_preference=ReadPreference.SECONDARY)
>>> coll1.read_preference
Primary()
>>> coll2.read_preference
Secondary(tag_sets=None)
:Parameters:
- `codec_options` (optional): An instance of
:class:`~bson.codec_options.CodecOptions`. If ``None`` (the
default) the :attr:`codec_options` of this :class:`Collection`
is used.
- `read_preference` (optional): The read preference to use. If
``None`` (the default) the :attr:`read_preference` of this
:class:`Collection` is used. See :mod:`~pymongo.read_preferences`
for options.
- `write_concern` (optional): An instance of
:class:`~pymongo.write_concern.WriteConcern`. If ``None`` (the
default) the :attr:`write_concern` of this :class:`Collection`
is used.
- `read_concern` (optional): An instance of
:class:`~pymongo.read_concern.ReadConcern`. If ``None`` (the
default) the :attr:`read_concern` of this :class:`Collection`
is used.
"""
return Collection(self.__database,
self.__name,
False,
codec_options or self.codec_options,
read_preference or self.read_preference,
write_concern or self.write_concern,
read_concern or self.read_concern)
示例17
def _pack_request(self, ns, slave_ok):
flags = 4 if slave_ok else 0
request_id, msg_bytes, max_doc_size = message.query(
flags, ns, 0, 0, {}, None, CodecOptions())
# Skip 16-byte standard header.
return msg_bytes[16:], request_id
示例18
def encrypt(self, database, cmd, check_keys, codec_options):
"""Encrypt a MongoDB command.
:Parameters:
- `database`: The database for this command.
- `cmd`: A command document.
- `check_keys`: If True, check `cmd` for invalid keys.
- `codec_options`: The CodecOptions to use while encoding `cmd`.
:Returns:
The encrypted command to execute.
"""
self._check_closed()
# Workaround for $clusterTime which is incompatible with
# check_keys.
cluster_time = check_keys and cmd.pop('$clusterTime', None)
encoded_cmd = _dict_to_bson(cmd, check_keys, codec_options)
max_cmd_size = _MAX_ENC_BSON_SIZE + _COMMAND_OVERHEAD
if len(encoded_cmd) > max_cmd_size:
raise _raise_document_too_large(
next(iter(cmd)), len(encoded_cmd), max_cmd_size)
with _wrap_encryption_errors():
encrypted_cmd = self._auto_encrypter.encrypt(database, encoded_cmd)
# TODO: PYTHON-1922 avoid decoding the encrypted_cmd.
encrypt_cmd = _inflate_bson(
encrypted_cmd, DEFAULT_RAW_BSON_OPTIONS)
if cluster_time:
encrypt_cmd['$clusterTime'] = cluster_time
return encrypt_cmd
示例19
def validate_unicode_decode_error_handler(dummy, value):
"""Validate the Unicode decode error handler option of CodecOptions.
"""
if value not in _UNICODE_DECODE_ERROR_HANDLERS:
raise ValueError("%s is an invalid Unicode decode error handler. "
"Must be one of "
"%s" % (value, tuple(_UNICODE_DECODE_ERROR_HANDLERS)))
return value
示例20
def codec_options(self):
"""Read only access to the :class:`~bson.codec_options.CodecOptions`
of this instance.
"""
return self.__codec_options
示例21
def to_object(bson_bytes):
"""Return deserialized object from BSON bytes"""
return bson.BSON(bson_bytes).decode(CodecOptions(document_class=SON,
tz_aware=True))
示例22
def validate_unicode_decode_error_handler(dummy, value):
"""Validate the Unicode decode error handler option of CodecOptions.
"""
if value not in _UNICODE_DECODE_ERROR_HANDLERS:
raise ValueError("%s is an invalid Unicode decode error handler. "
"Must be one of "
"%s" % (value, tuple(_UNICODE_DECODE_ERROR_HANDLERS)))
return value
示例23
def codec_options(self):
"""Read only access to the :class:`~bson.codec_options.CodecOptions`
of this instance.
"""
return self.__codec_options
示例24
def list_indexes(self):
"""Get a cursor over the index documents for this collection.
>>> for index in db.test.list_indexes():
... print(index)
...
SON([(u'v', 1), (u'key', SON([(u'_id', 1)])),
(u'name', u'_id_'), (u'ns', u'test.test')])
:Returns:
An instance of :class:`~pymongo.command_cursor.CommandCursor`.
.. versionadded:: 3.0
"""
codec_options = CodecOptions(SON)
coll = self.with_options(codec_options)
with self._socket_for_primary_reads() as (sock_info, slave_ok):
cmd = SON([("listIndexes", self.__name), ("cursor", {})])
if sock_info.max_wire_version > 2:
cursor = self._command(sock_info, cmd, slave_ok,
ReadPreference.PRIMARY,
codec_options)["cursor"]
return CommandCursor(coll, cursor, sock_info.address)
else:
namespace = _UJOIN % (self.__database.name, "system.indexes")
res = helpers._first_batch(
sock_info, self.__database.name, "system.indexes",
{"ns": self.__full_name}, 0, slave_ok, codec_options,
ReadPreference.PRIMARY, cmd,
self.database.client._event_listeners)
data = res["data"]
cursor = {
"id": res["cursor_id"],
"firstBatch": data,
"ns": namespace,
}
# Note that a collection can only have 64 indexes, so we don't
# technically have to pass len(data) here. There will never be
# an OP_GET_MORE call.
return CommandCursor(
coll, cursor, sock_info.address, len(data))
示例25
def _command(self, sock_info, command, slave_ok=False,
read_preference=None,
codec_options=None, check=True, allowable_errors=None,
read_concern=None,
write_concern=None,
collation=None,
session=None,
retryable_write=False):
"""Internal command helper.
:Parameters:
- `sock_info` - A SocketInfo instance.
- `command` - The command itself, as a SON instance.
- `slave_ok`: whether to set the SlaveOkay wire protocol bit.
- `codec_options` (optional) - An instance of
:class:`~bson.codec_options.CodecOptions`.
- `check`: raise OperationFailure if there are errors
- `allowable_errors`: errors to ignore if `check` is True
- `read_concern` (optional) - An instance of
:class:`~pymongo.read_concern.ReadConcern`.
- `write_concern`: An instance of
:class:`~pymongo.write_concern.WriteConcern`. This option is only
valid for MongoDB 3.4 and above.
- `collation` (optional) - An instance of
:class:`~pymongo.collation.Collation`.
- `session` (optional): a
:class:`~pymongo.client_session.ClientSession`.
:Returns:
The result document.
"""
with self.__database.client._tmp_session(session) as s:
return sock_info.command(
self.__database.name,
command,
slave_ok,
read_preference or self._read_preference_for(session),
codec_options or self.codec_options,
check,
allowable_errors,
read_concern=read_concern,
write_concern=write_concern,
parse_write_concern_error=True,
collation=collation,
session=s,
client=self.__database.client,
retryable_write=retryable_write)
示例26
def _unpack_response(response, cursor_id=None, codec_options=CodecOptions()):
"""Unpack a response from the database.
Check the response for errors and unpack, returning a dictionary
containing the response data.
Can raise CursorNotFound, NotMasterError, ExecutionTimeout, or
OperationFailure.
:Parameters:
- `response`: byte string as returned from the database
- `cursor_id` (optional): cursor_id we sent to get this response -
used for raising an informative exception when we get cursor id not
valid at server response
- `codec_options` (optional): an instance of
:class:`~bson.codec_options.CodecOptions`
"""
response_flag = struct.unpack("<i", response[:4])[0]
if response_flag & 1:
# Shouldn't get this response if we aren't doing a getMore
assert cursor_id is not None
# Fake a getMore command response. OP_GET_MORE provides no document.
msg = "Cursor not found, cursor id: %d" % (cursor_id,)
errobj = {"ok": 0, "errmsg": msg, "code": 43}
raise CursorNotFound(msg, 43, errobj)
elif response_flag & 2:
error_object = bson.BSON(response[20:]).decode()
# Fake the ok field if it doesn't exist.
error_object.setdefault("ok", 0)
if error_object["$err"].startswith("not master"):
raise NotMasterError(error_object["$err"], error_object)
elif error_object.get("code") == 50:
raise ExecutionTimeout(error_object.get("$err"),
error_object.get("code"),
error_object)
raise OperationFailure("database error: %s" %
error_object.get("$err"),
error_object.get("code"),
error_object)
result = {"cursor_id": struct.unpack("<q", response[4:12])[0],
"starting_from": struct.unpack("<i", response[12:16])[0],
"number_returned": struct.unpack("<i", response[16:20])[0],
"data": bson.decode_all(response[20:], codec_options)}
assert len(result["data"]) == result["number_returned"]
return result
示例27
def _command(self, sock_info, command, slave_ok=False,
read_preference=None,
codec_options=None, check=True, allowable_errors=None,
read_concern=None,
write_concern=None,
collation=None,
session=None,
retryable_write=False,
user_fields=None):
"""Internal command helper.
:Parameters:
- `sock_info` - A SocketInfo instance.
- `command` - The command itself, as a SON instance.
- `slave_ok`: whether to set the SlaveOkay wire protocol bit.
- `codec_options` (optional) - An instance of
:class:`~bson.codec_options.CodecOptions`.
- `check`: raise OperationFailure if there are errors
- `allowable_errors`: errors to ignore if `check` is True
- `read_concern` (optional) - An instance of
:class:`~pymongo.read_concern.ReadConcern`.
- `write_concern`: An instance of
:class:`~pymongo.write_concern.WriteConcern`. This option is only
valid for MongoDB 3.4 and above.
- `collation` (optional) - An instance of
:class:`~pymongo.collation.Collation`.
- `session` (optional): a
:class:`~pymongo.client_session.ClientSession`.
- `retryable_write` (optional): True if this command is a retryable
write.
- `user_fields` (optional): Response fields that should be decoded
using the TypeDecoders from codec_options, passed to
bson._decode_all_selective.
:Returns:
The result document.
"""
with self.__database.client._tmp_session(session) as s:
return sock_info.command(
self.__database.name,
command,
slave_ok,
read_preference or self._read_preference_for(session),
codec_options or self.codec_options,
check,
allowable_errors,
read_concern=read_concern,
write_concern=write_concern,
parse_write_concern_error=True,
collation=collation,
session=s,
client=self.__database.client,
retryable_write=retryable_write,
user_fields=user_fields)
示例28
def list_indexes(self, session=None):
"""Get a cursor over the index documents for this collection.
>>> for index in db.test.list_indexes():
... print(index)
...
SON([(u'v', 1), (u'key', SON([(u'_id', 1)])),
(u'name', u'_id_'), (u'ns', u'test.test')])
:Parameters:
- `session` (optional): a
:class:`~pymongo.client_session.ClientSession`.
:Returns:
An instance of :class:`~pymongo.command_cursor.CommandCursor`.
.. versionchanged:: 3.6
Added ``session`` parameter.
.. versionadded:: 3.0
"""
codec_options = CodecOptions(SON)
coll = self.with_options(codec_options=codec_options,
read_preference=ReadPreference.PRIMARY)
read_pref = ((session and session._txn_read_preference())
or ReadPreference.PRIMARY)
def _cmd(session, server, sock_info, slave_ok):
cmd = SON([("listIndexes", self.__name), ("cursor", {})])
if sock_info.max_wire_version > 2:
with self.__database.client._tmp_session(session, False) as s:
try:
cursor = self._command(sock_info, cmd, slave_ok,
read_pref,
codec_options,
session=s)["cursor"]
except OperationFailure as exc:
# Ignore NamespaceNotFound errors to match the behavior
# of reading from *.system.indexes.
if exc.code != 26:
raise
cursor = {'id': 0, 'firstBatch': []}
return CommandCursor(coll, cursor, sock_info.address,
session=s,
explicit_session=session is not None)
else:
res = message._first_batch(
sock_info, self.__database.name, "system.indexes",
{"ns": self.__full_name}, 0, slave_ok, codec_options,
read_pref, cmd,
self.database.client._event_listeners)
cursor = res["cursor"]
# Note that a collection can only have 64 indexes, so there
# will never be a getMore call.
return CommandCursor(coll, cursor, sock_info.address)
return self.__database.client._retryable_read(
_cmd, read_pref, session)
示例29
def _unpack_response(response,
cursor_id=None,
codec_options=_UNICODE_REPLACE_CODEC_OPTIONS):
"""Unpack a response from the database.
Check the response for errors and unpack, returning a dictionary
containing the response data.
Can raise CursorNotFound, NotMasterError, ExecutionTimeout, or
OperationFailure.
:Parameters:
- `response`: byte string as returned from the database
- `cursor_id` (optional): cursor_id we sent to get this response -
used for raising an informative exception when we get cursor id not
valid at server response
- `codec_options` (optional): an instance of
:class:`~bson.codec_options.CodecOptions`
"""
response_flag = struct.unpack("<i", response[:4])[0]
if response_flag & 1:
# Shouldn't get this response if we aren't doing a getMore
if cursor_id is None:
raise ProtocolError("No cursor id for getMore operation")
# Fake a getMore command response. OP_GET_MORE provides no document.
msg = "Cursor not found, cursor id: %d" % (cursor_id,)
errobj = {"ok": 0, "errmsg": msg, "code": 43}
raise CursorNotFound(msg, 43, errobj)
elif response_flag & 2:
error_object = bson.BSON(response[20:]).decode()
# Fake the ok field if it doesn't exist.
error_object.setdefault("ok", 0)
if error_object["$err"].startswith("not master"):
raise NotMasterError(error_object["$err"], error_object)
elif error_object.get("code") == 50:
raise ExecutionTimeout(error_object.get("$err"),
error_object.get("code"),
error_object)
raise OperationFailure("database error: %s" %
error_object.get("$err"),
error_object.get("code"),
error_object)
result = {"cursor_id": struct.unpack("<q", response[4:12])[0],
"starting_from": struct.unpack("<i", response[12:16])[0],
"number_returned": struct.unpack("<i", response[16:20])[0],
"data": bson.decode_all(response[20:], codec_options)}
assert len(result["data"]) == result["number_returned"]
return result
示例30
def _command(self, sock_info, command, slave_ok=False,
read_preference=None,
codec_options=None, check=True, allowable_errors=None,
read_concern=DEFAULT_READ_CONCERN,
write_concern=None,
parse_write_concern_error=False,
collation=None):
"""Internal command helper.
:Parameters:
- `sock_info` - A SocketInfo instance.
- `command` - The command itself, as a SON instance.
- `slave_ok`: whether to set the SlaveOkay wire protocol bit.
- `codec_options` (optional) - An instance of
:class:`~bson.codec_options.CodecOptions`.
- `check`: raise OperationFailure if there are errors
- `allowable_errors`: errors to ignore if `check` is True
- `read_concern` (optional) - An instance of
:class:`~pymongo.read_concern.ReadConcern`.
- `write_concern`: An instance of
:class:`~pymongo.write_concern.WriteConcern`. This option is only
valid for MongoDB 3.4 and above.
- `parse_write_concern_error` (optional): Whether to parse a
``writeConcernError`` field in the command response.
- `collation` (optional) - An instance of
:class:`~pymongo.collation.Collation`.
:Returns:
# todo: don't return address
(result document, address of server the command was run on)
"""
return sock_info.command(
self.__database.name,
command,
slave_ok,
read_preference or self.read_preference,
codec_options or self.codec_options,
check,
allowable_errors,
read_concern=read_concern,
write_concern=write_concern,
parse_write_concern_error=parse_write_concern_error,
collation=collation)