Python源码示例:asyncio.BaseEventLoop()
示例1
def _record_perf_async(
loop: asyncio.BaseEventLoop, event: str, message: str) -> None:
"""Record timing metric async
:param asyncio.BaseEventLoop loop: event loop
:param str event: event
:param str message: message
"""
if not _RECORD_PERF:
return
proc = await asyncio.create_subprocess_shell(
'./perf.py cascade {ev} --prefix {pr} --message "{msg}"'.format(
ev=event, pr=_PREFIX, msg=message), loop=loop)
await proc.wait()
if proc.returncode != 0:
logger.error(
'could not record perf to storage for event: {}'.format(event))
示例2
def execute_for(self, batch, new_loop=False):
""" Run a pipeline for one batch
Parameters
----------
batch
an input batch
new_loop : bool
whether to create a new :class:`async loop <asyncio.BaseEventLoop>`.
Returns
-------
a batch - an output from the last action in the pipeline
"""
if new_loop:
asyncio.set_event_loop(asyncio.new_event_loop())
batch.pipeline = self
batch_res = self._exec_all_actions(batch)
batch_res.pipeline = self
return batch_res
示例3
def _record_perf_async(
loop: asyncio.BaseEventLoop, event: str, message: str) -> None:
"""Record timing metric async
:param asyncio.BaseEventLoop loop: event loop
:param str event: event
:param str message: message
"""
if not _RECORD_PERF:
return
proc = await asyncio.subprocess.create_subprocess_shell(
'./perf.py cascade {ev} --prefix {pr} --message "{msg}"'.format(
ev=event, pr=_PREFIX, msg=message), loop=loop)
await proc.wait()
if proc.returncode != 0:
logger.error(
'could not record perf to storage for event: {}'.format(event))
示例4
def _renew_queue_message_lease(
loop: asyncio.BaseEventLoop,
queue_client: azure.storage.queue.QueueService,
queue_key: str, cb_key: str, msg_id: str):
"""Renew a storage queue message lease
:param asyncio.BaseEventLoop loop: event loop
:param azure.storage.queue.QueueService queue_client: queue client
:param str queue_key: queue name key index into _STORAGE_CONTAINERS
:param str cb_key: callback handle key
:param str msg_id: message id
"""
msg = queue_client.update_message(
_STORAGE_CONTAINERS[queue_key],
message_id=msg_id,
pop_receipt=_QUEUE_MESSAGES[msg_id],
visibility_timeout=45)
if msg.pop_receipt is None:
raise RuntimeError(
'update message failed for id={} pr={}'.format(
msg_id, _QUEUE_MESSAGES[msg_id]))
_QUEUE_MESSAGES[msg_id] = msg.pop_receipt
_CBHANDLES[cb_key] = loop.call_later(
15, _renew_queue_message_lease, loop, queue_client, queue_key, cb_key,
msg_id)
示例5
def run(coro, loop=None):
"""
Convenient shortcut alias to ``loop.run_until_complete``.
Arguments:
coro (coroutine): coroutine object to schedule.
loop (asyncio.BaseEventLoop): optional event loop to use.
Defaults to: ``asyncio.get_event_loop()``.
Returns:
mixed: returned value by coroutine.
Usage::
async def mul_2(num):
return num * 2
paco.run(mul_2(4))
# => 8
"""
loop = loop or asyncio.get_event_loop()
return loop.run_until_complete(coro)
示例6
def listen_for_trades(self, ev_loop: asyncio.BaseEventLoop, output: asyncio.Queue):
while True:
try:
ws_message: str = await self.get_ws_subscription_message("trade")
async with websockets.connect(DIFF_STREAM_URL) as ws:
ws: websockets.WebSocketClientProtocol = ws
await ws.send(ws_message)
async for raw_msg in self._inner_messages(ws):
msg: List[Any] = ujson.loads(raw_msg)
trades: List[Dict[str, Any]] = [{"pair": msg[-1], "trade": trade} for trade in msg[1]]
for trade in trades:
trade_msg: OrderBookMessage = KrakenOrderBook.trade_message_from_exchange(trade)
output.put_nowait(trade_msg)
except asyncio.CancelledError:
raise
except Exception:
self.logger().error("Unexpected error with WebSocket connection. Retrying after 30 seconds...",
exc_info=True)
await asyncio.sleep(30.0)
示例7
def listen_for_order_book_diffs(self, ev_loop: asyncio.BaseEventLoop, output: asyncio.Queue):
while True:
try:
trading_pairs: List[str] = await self.get_trading_pairs()
ws_path: str = "/".join([f"{trading_pair.lower()}@depth" for trading_pair in trading_pairs])
stream_url: str = f"{DIFF_STREAM_URL}/{ws_path}"
async with websockets.connect(stream_url) as ws:
ws: websockets.WebSocketClientProtocol = ws
async for raw_msg in self._inner_messages(ws):
msg = ujson.loads(raw_msg)
order_book_message: OrderBookMessage = BinanceOrderBook.diff_message_from_exchange(
msg, time.time())
output.put_nowait(order_book_message)
except asyncio.CancelledError:
raise
except Exception:
self.logger().error("Unexpected error with WebSocket connection. Retrying after 30 seconds...",
exc_info=True)
await asyncio.sleep(30.0)
示例8
def listen_for_user_stream(self, ev_loop: asyncio.BaseEventLoop, output: asyncio.Queue):
while True:
try:
ws: websockets.WebSocketClientProtocol
async with websockets.connect(BITFINEX_WS_URI) as ws:
ws: websockets.WebSocketClientProtocol = ws
payload = self._bitfinex_auth.generate_auth_payload()
await ws.send(json.dumps(payload))
async for raw_msg in self._get_response(ws):
transformed_msg: BitfinexOrderBookMessage = self._transform_message_from_exchange(raw_msg)
if transformed_msg:
output.put_nowait(transformed_msg)
except asyncio.CancelledError:
raise
except Exception:
self.logger().error(
"Unexpected error with Bitfinex WebSocket connection. " "Retrying after 30 seconds...",
exc_info=True,
)
await asyncio.sleep(self.MESSAGE_TIMEOUT)
示例9
def __init__(
self,
data_source_type: OrderBookTrackerDataSourceType = OrderBookTrackerDataSourceType.EXCHANGE_API,
trading_pairs: Optional[List[str]] = None,
rest_api_url: str = "",
websocket_url: str = "",
):
super().__init__(data_source_type=data_source_type)
self._order_books: Dict[str, DolomiteOrderBook] = {}
self._saved_message_queues: Dict[str, Deque[DolomiteOrderBookMessage]] = defaultdict(lambda: deque(maxlen=1000))
self._order_book_snapshot_stream: asyncio.Queue = asyncio.Queue()
self._ev_loop: asyncio.BaseEventLoop = asyncio.get_event_loop()
self._data_source: Optional[OrderBookTrackerDataSource] = None
self._active_order_trackers: Dict[str, DolomiteActiveOrderTracker] = defaultdict(DolomiteActiveOrderTracker)
self._trading_pairs: Optional[List[str]] = trading_pairs
self.rest_api_url = rest_api_url
self.websocket_url = websocket_url
示例10
def apply(loop=None):
"""Patch asyncio to make its event loop reentrent."""
loop = loop or asyncio.get_event_loop()
if not isinstance(loop, asyncio.BaseEventLoop):
raise ValueError('Can\'t patch loop of type %s' % type(loop))
if getattr(loop, '_nest_patched', None):
# already patched
return
_patch_asyncio()
_patch_loop(loop)
_patch_task()
_patch_handle()
示例11
def setup_test_loop(loop_factory=asyncio.new_event_loop):
"""Create and return an asyncio.BaseEventLoop instance.
The caller should also call teardown_test_loop, once they are done
with the loop.
"""
loop = loop_factory()
asyncio.set_event_loop(loop)
if sys.platform != "win32":
policy = asyncio.get_event_loop_policy()
watcher = asyncio.SafeChildWatcher()
watcher.attach_loop(loop)
policy.set_child_watcher(watcher)
return loop
示例12
def _renew_blob_lease(
loop: asyncio.BaseEventLoop,
blob_client: azureblob.BlockBlobService,
container_key: str, resource: str, blob_name: str):
"""Renew a storage blob lease
:param asyncio.BaseEventLoop loop: event loop
:param azureblob.BlockBlobService blob_client: blob client
:param str container_key: blob container index into _STORAGE_CONTAINERS
:param str resource: resource
:param str blob_name: blob name
"""
try:
lease_id = blob_client.renew_blob_lease(
container_name=_STORAGE_CONTAINERS[container_key],
blob_name=blob_name,
lease_id=_BLOB_LEASES[resource],
)
except azure.common.AzureException as e:
logger.exception(e)
_BLOB_LEASES.pop(resource)
_CBHANDLES.pop(resource)
else:
_BLOB_LEASES[resource] = lease_id
_CBHANDLES[resource] = loop.call_later(
15, _renew_blob_lease, loop, blob_client, container_key, resource,
blob_name)
示例13
def distribute_global_resources(
loop: asyncio.BaseEventLoop,
blob_client: azureblob.BlockBlobService,
table_client: azuretable.TableService) -> None:
"""Distribute global services/resources
:param asyncio.BaseEventLoop loop: event loop
:param azureblob.BlockBlobService blob_client: blob client
:param azuretable.TableService table_client: table client
"""
# get globalresources from table
try:
entities = table_client.query_entities(
_STORAGE_CONTAINERS['table_globalresources'],
filter='PartitionKey eq \'{}\''.format(_PARTITION_KEY))
except azure.common.AzureMissingResourceHttpError:
entities = []
nentities = 0
for ent in entities:
resource = ent['Resource']
grtype, image = get_container_image_name_from_resource(resource)
if grtype == _CONTAINER_MODE.name.lower():
nentities += 1
_DIRECTDL_QUEUE.put(resource)
key_fingerprint = ent.get('KeyFingerprint', None)
if key_fingerprint is not None:
_DIRECTDL_KEY_FINGERPRINT_DICT[image] = key_fingerprint
else:
logger.info(
'skipping resource {}: not matching container '
'mode "{}"'.format(resource, _CONTAINER_MODE.name.lower()))
if nentities == 0:
logger.info('no global resources specified')
return
logger.info('{} global resources matching container mode "{}"'.format(
nentities, _CONTAINER_MODE.name.lower()))
# run async func in loop
loop.run_until_complete(download_monitor_async(
loop, blob_client, nentities))
示例14
def poll_for_monitoring_changes(
loop: asyncio.BaseEventLoop,
config: Dict,
cloud: msrestazure.azure_cloud.Cloud,
table_client: azure.cosmosdb.table.TableService,
compute_client: azure.mgmt.compute.ComputeManagementClient,
network_client: azure.mgmt.network.NetworkManagementClient
) -> Generator[None, None, None]:
"""Poll for monitoring changes
:param loop: asyncio loop
:param config: configuration
:param cloud: cloud object
:param table_client: table client
:param compute_client: compute client
:param network_client: network client
"""
polling_interval = config.get('polling_interval', 10)
table_name = config['storage']['table_name']
prom_var_dir = config['prometheus_var_dir']
pool_targets_file = pathlib.Path(prom_var_dir) / 'batch_pools.json'
remotefs_targets_file = pathlib.Path(prom_var_dir) / 'remotefs.json'
logger.debug('polling table {} every {} sec'.format(
table_name, polling_interval))
last_pool_hash = None
last_remotefs_hash = None
while True:
last_pool_hash = _construct_pool_monitoring_targets(
cloud, table_client, table_name, pool_targets_file, last_pool_hash)
last_remotefs_hash = _construct_remotefs_monitoring_targets(
cloud, table_client, compute_client, network_client, table_name,
remotefs_targets_file, last_remotefs_hash)
await asyncio.sleep(polling_interval)
示例15
def poll_for_federations(
self,
loop: asyncio.BaseEventLoop,
) -> Generator[None, None, None]:
"""Poll federations
:param loop: asyncio loop
"""
# lease global lock blob
self.fdh.lease_global_lock(loop)
# block until global lock acquired
while not await self.check_global_lock():
pass
# mount log storage
log_path = self.fdh.mount_file_storage()
# set logging configuration
self.fdh.set_log_configuration(log_path)
self._service_proxy.log_configuration()
logger.debug('polling federation table {} every {} sec'.format(
self._service_proxy.table_name_global, self.fed_refresh_interval))
logger.debug('polling action queues {} every {} sec'.format(
self._service_proxy.table_name_jobs, self.action_refresh_interval))
# begin message processing
asyncio.ensure_future(
self.iterate_and_process_federation_queues(), loop=loop)
# continuously update federations
while True:
if not await self.check_global_lock():
continue
try:
self.update_federations()
except Exception as exc:
logger.exception(str(exc))
await asyncio.sleep(self.fed_refresh_interval)
示例16
def __concurrent_future__await__(self: futures.Future):
def _callback():
try:
future.set_result(self.result())
except BaseException as e:
future.set_exception(e)
loop: asyncio.BaseEventLoop = asyncio.get_event_loop()
self.add_done_callback(lambda _: loop.call_soon_threadsafe(_callback))
future = asyncio.Future()
yield from future
return future.result()
示例17
def __init__(self, block_manager: 'BlockManager'):
super().__init__(block_manager)
self.__block_generation_timer = None
self.__lock = None
self._loop: asyncio.BaseEventLoop = None
self._vote_queue: asyncio.Queue = None
util.logger.debug(f"Stop previous broadcast!")
self.stop_broadcast_send_unconfirmed_block_timer()
示例18
def __init__(self):
CommonThread.__init__(self)
self.__timer_list: Dict[str, Timer] = {}
self.__loop: asyncio.BaseEventLoop = asyncio.new_event_loop()
# self.__loop.set_debug(True)
# Deprecated function, need to review delete.
示例19
def __await__(self: _Rendezvous):
def _callback():
try:
future.set_result(self.result())
except BaseException as e:
future.set_exception(e)
loop: asyncio.BaseEventLoop = asyncio.get_event_loop()
self.add_done_callback(lambda _: loop.call_soon_threadsafe(_callback))
future = asyncio.Future()
yield from future
return future.result()
示例20
def _load_and_register_async(
loop: asyncio.BaseEventLoop,
table_client: azure.storage.table.TableService,
nglobalresources: int) -> None:
"""Load and register image
:param asyncio.BaseEventLoop loop: event loop
:param azure.storage.table.TableService table_client: table client
:param int nglobalresource: number of global resources
"""
global _LR_LOCK_ASYNC
async with _LR_LOCK_ASYNC:
for resource in _TORRENTS:
# if torrent is seeding, load container/file and register
if (_TORRENTS[resource]['started'] and
_TORRENTS[resource]['handle'].is_seed()):
if (not _TORRENTS[resource]['loaded'] and
not _TORRENTS[resource]['loading']):
# docker load image
if resource.startswith(_DOCKER_TAG):
thr = DockerLoadThread(resource)
thr.start()
else:
# TODO "load blob" - move to appropriate path
raise NotImplementedError()
# register to services table
if (not _TORRENTS[resource]['registered'] and
_TORRENTS[resource]['loaded'] and
not _TORRENTS[resource]['loading']):
_merge_service(
table_client, resource, nglobalresources)
_TORRENTS[resource]['registered'] = True
示例21
def _get_ipaddress_async(loop: asyncio.BaseEventLoop) -> str:
"""Get IP address
:param asyncio.BaseEventLoop loop: event loop
:rtype: str
:return: ip address
"""
if _ON_WINDOWS:
raise NotImplementedError()
else:
proc = await asyncio.subprocess.create_subprocess_shell(
'ip addr list eth0 | grep "inet " | cut -d\' \' -f6 | cut -d/ -f1',
stdout=asyncio.subprocess.PIPE, loop=loop)
output = await proc.communicate()
return output[0].decode('ascii').strip()
示例22
def setup_private_registry_async(
loop: asyncio.BaseEventLoop,
table_client: azure.storage.table.TableService,
ipaddress: str, container: str, registry_archive: str,
registry_image_id: str) -> None:
"""Set up a docker private registry if a ticket exists
:param asyncio.BaseEventLoop loop: event loop
:param azure.storage.table.TableService table_client: table client
:param str ipaddress: ip address
:param str container: container holding registry
:param str registry_archive: registry archive file
:param str registry_image_id: registry image id
"""
# first check if we've registered before
try:
entity = table_client.get_entity(
_STORAGE_CONTAINERS['table_registry'], _PARTITION_KEY, _NODEID)
exists = True
print('private registry row already exists: {}'.format(entity))
except azure.common.AzureMissingResourceHttpError:
exists = False
# install/start docker registy container
await _start_private_registry_instance_async(
loop, container, registry_archive, registry_image_id)
# register self into registry table
if not exists:
entity = {
'PartitionKey': _PARTITION_KEY,
'RowKey': _NODEID,
'IpAddress': ipaddress,
'Port': _DEFAULT_PRIVATE_REGISTRY_PORT,
'StorageAccount': _SHIPYARD_STORAGEACCOUNT,
'Container': container,
}
table_client.insert_or_replace_entity(
_STORAGE_CONTAINERS['table_registry'], entity=entity)
示例23
def setUp(self):
self.loop = asyncio.BaseEventLoop()
self.loop._process_events = mock.Mock()
self.loop._selector = mock.Mock()
self.loop._selector.select.return_value = ()
self.set_event_loop(self.loop)
示例24
def get_event_loops():
loops = []
threads = {x.ident: x for x in threading.enumerate()}
for obj in gc.get_objects():
if isinstance(obj, BaseEventLoop):
thread_id = getattr(obj, '_thread_id', None)
if thread_id is not None:
thread = threads.get(thread_id)
else:
thread = None
loops.append((obj, thread))
return loops
示例25
def get_or_create_event_loop() -> asyncio.BaseEventLoop:
try:
loop = asyncio.get_event_loop()
except RuntimeError as ex:
if 'no current event loop' not in ex.args[0]:
raise
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
return loop
示例26
def repeat(coro, times=1, step=1, limit=1, loop=None):
"""
Executes the coroutine function ``x`` number of times,
and accumulates results in order as you would use with ``map``.
Execution concurrency is configurable using ``limit`` param.
This function is a coroutine.
Arguments:
coro (coroutinefunction): coroutine function to schedule.
times (int): number of times to execute the coroutine.
step (int): increment iteration step, as with ``range()``.
limit (int): concurrency execution limit. Defaults to 10.
loop (asyncio.BaseEventLoop): optional event loop to use.
Raises:
TypeError: if coro is not a coroutine function.
Returns:
list: accumulated yielded values returned by coroutine.
Usage::
async def mul_2(num):
return num * 2
await paco.repeat(mul_2, times=5)
# => [2, 4, 6, 8, 10]
"""
assert_corofunction(coro=coro)
# Iterate and attach coroutine for defer scheduling
times = max(int(times), 1)
iterable = range(1, times + 1, step)
# Run iterable times
return (yield from map(coro, iterable, limit=limit, loop=loop))
示例27
def timeout(coro, timeout=None, loop=None):
"""
Wraps a given coroutine function, that when executed, if it takes more
than the given timeout in seconds to execute, it will be canceled and
raise an `asyncio.TimeoutError`.
This function is equivalent to Python standard
`asyncio.wait_for()` function.
This function can be used as decorator.
Arguments:
coro (coroutinefunction|coroutine): coroutine to wrap.
timeout (int|float): max wait timeout in seconds.
loop (asyncio.BaseEventLoop): optional event loop to use.
Raises:
TypeError: if coro argument is not a coroutine function.
Returns:
coroutinefunction: wrapper coroutine function.
Usage::
await paco.timeout(coro, timeout=10)
"""
@asyncio.coroutine
def _timeout(coro):
return (yield from asyncio.wait_for(coro, timeout, loop=loop))
@asyncio.coroutine
def wrapper(*args, **kw):
return (yield from _timeout(coro(*args, **kw)))
return _timeout(coro) if asyncio.iscoroutine(coro) else wrapper
示例28
def get_ioloop() -> asyncio.BaseEventLoop:
loop = asyncio.get_event_loop()
if sys.platform == 'win32' and not isinstance(loop, asyncio.ProactorEventLoop):
loop = asyncio.ProactorEventLoop()
ctlc_hotfix(loop)
asyncio.set_event_loop(loop)
return loop
示例29
def setUp(self):
self.loop = mock.Mock(spec=asyncio.BaseEventLoop)
self.loop.time = lambda: 0.0
self.peer_manager = PeerManager(self.loop)
self.data_store = DictDataStore(self.loop, self.peer_manager)
示例30
def loop(self) -> asyncio.BaseEventLoop:
"""Return client loop."""