Python源码示例:datetime.timedelta()
示例1
def run(which):
print("running all tests...")
summary = pandas.DataFrame(columns=["pass", "info", "timing"])
# Test chromosome ends
if len(which)==0 or "chrom_ends" in which:
summary.loc["chrom_ends"] = _runTest(runTestIssues, "issues")
# Run the demos
if len(which)==0 or "demos" in which:
summary.loc["demos"] = _runTest(testDemos.run, "demos")
# Run regression testing on ref/alt/amb counts
if len(which)==0 or "counts" in which:
summary.loc["counts"] = _runTest(runTestCounts, "counts")
# Run the render regression tests
if len(which)==0 or "rendering" in which:
summary.loc["rendering"] = _runTest(rendertest.run, "rendering")
summary["timing"] = summary["timing"].apply(lambda x: "{}".format(datetime.timedelta(seconds=int(x))))
print(summary)
saveTimingInfo(summary)
示例2
def get_kms_auth_token(session, bless_config, lambda_regional_config):
logger.info("Requesting new KMS auth token in %s", lambda_regional_config["aws_region"])
token_not_before = datetime.datetime.utcnow() - datetime.timedelta(minutes=1)
token_not_after = token_not_before + datetime.timedelta(hours=1)
token = dict(not_before=token_not_before.strftime("%Y%m%dT%H%M%SZ"),
not_after=token_not_after.strftime("%Y%m%dT%H%M%SZ"))
encryption_context = {
"from": session.resource("iam").CurrentUser().user_name,
"to": bless_config["lambda_config"]["function_name"],
"user_type": "user"
}
kms = session.client('kms', region_name=lambda_regional_config["aws_region"])
res = kms.encrypt(KeyId=lambda_regional_config["kms_auth_key_id"],
Plaintext=json.dumps(token),
EncryptionContext=encryption_context)
return base64.b64encode(res["CiphertextBlob"]).decode()
示例3
def describe_bucket_worker(bucket):
bucket.LocationConstraint = clients.s3.get_bucket_location(Bucket=bucket.name)["LocationConstraint"]
cloudwatch = resources.cloudwatch
bucket_region = bucket.LocationConstraint or "us-east-1"
if bucket_region != cloudwatch.meta.client.meta.region_name:
cloudwatch = boto3.Session(region_name=bucket_region).resource("cloudwatch")
data = get_cloudwatch_metric_stats("AWS/S3", "NumberOfObjects",
start_time=datetime.utcnow() - timedelta(days=2),
end_time=datetime.utcnow(), period=3600, BucketName=bucket.name,
StorageType="AllStorageTypes", resource=cloudwatch)
bucket.NumberOfObjects = int(data["Datapoints"][-1]["Average"]) if data["Datapoints"] else None
data = get_cloudwatch_metric_stats("AWS/S3", "BucketSizeBytes",
start_time=datetime.utcnow() - timedelta(days=2),
end_time=datetime.utcnow(), period=3600, BucketName=bucket.name,
StorageType="StandardStorage", resource=cloudwatch)
bucket.BucketSizeBytes = format_number(data["Datapoints"][-1]["Average"]) if data["Datapoints"] else None
return bucket
示例4
def cluster_memory_reservation(cwClient, clusterName):
# Return cluster mem reservation average per minute cloudwatch metric
try:
response = cwClient.get_metric_statistics(
Namespace='AWS/ECS',
MetricName='MemoryReservation',
Dimensions=[
{
'Name': 'ClusterName',
'Value': clusterName
},
],
StartTime=datetime.datetime.utcnow() - datetime.timedelta(seconds=120),
EndTime=datetime.datetime.utcnow(),
Period=60,
Statistics=['Average']
)
return response['Datapoints'][0]['Average']
except Exception:
logger({'ClusterMemoryError': 'Could not retrieve mem reservation for {}'.format(clusterName)})
示例5
def ec2_avg_cpu_utilization(clusterName, asgData, cwclient):
asg = find_asg(clusterName, asgData)
response = cwclient.get_metric_statistics(
Namespace='AWS/EC2',
MetricName='CPUUtilization',
Dimensions=[
{
'Name': 'AutoScalingGroupName',
'Value': asg
},
],
StartTime=datetime.datetime.utcnow() - datetime.timedelta(seconds=120),
EndTime=datetime.datetime.utcnow(),
Period=60,
Statistics=['Average']
)
return response['Datapoints'][0]['Average']
示例6
def natural_time(timedelta, precision=2, default='NA'):
# Convert timedelta to seconds
remaining = timedelta
if isinstance(timedelta, datetime.timedelta):
remaining = (timedelta.days * 86400) + timedelta.seconds
# Create and return the natural string
rtnvals = []
for name, seconds in SECONDS:
if remaining > seconds * 2:
value = int(float(remaining) / float(seconds))
remaining = remaining - (value * seconds)
rtnvals.append('%s %s' % (value, name))
precision -= 1
if precision <= 0 or remaining <= 0:
break
if not rtnvals:
rtnvals.append('0 %s' % SECONDS[-1][0])
rtnval = ', '.join(rtnvals)
return rtnval
示例7
def update(self):
self.data['events'] = []
self.tzutc = tz.tzutc()
self.tzlocal = tz.tzlocal()
urls, colors = [], {}
for cal in self._iter_calendars():
urls.append(cal.url)
colors[cal.url] = cal.color
for result in utils.iter_responses(urls, timeout=5):
response = result.get('response')
if response:
ical = Calendar.from_ical(response.read().decode('utf-8'))
color = colors[result.get('url')]
self.data['events'] += self._parse_events(ical, color)
self.data['events'] = sorted(self.data['events'], key=lambda e:e['start'])
# Calculate time to next event
now = datetime.datetime.now()
next = [e for e in self.data['events'] if e['start'] > now][0]['start'] if self.data['events'] else self.DELTANONE
if next < now + datetime.timedelta(seconds=self.DEFAULT_INTERVAL*1.5): self.data['next'] = 'Now'
else: self.data['next'] = utils.natural_time(next-now, 1)
super(Plugin, self).update()
示例8
def _parse_events(self, ical, color):
events = []
today = datetime.datetime.combine(datetime.date.today(), datetime.time.min)
title = ical.get('x-wr-calname', ical.get('version', ''))
for event in ical.walk():
if event.name == "VEVENT":
start = self._event_start(event)
if today <= start <= today + datetime.timedelta(days=14):
events.append({
'title': event.get('summary'),
'calendar': title,
'color': color,
'start': start,
'where': event.get('location'),
'status': event.get('description'),
})
return events
示例9
def save(self):
"""Save session data."""
try:
# If session data has never been loaded then it's never been
# accessed: no need to save it
if self.loaded:
t = datetime.timedelta(seconds=self.timeout * 60)
expiration_time = self.now() + t
if self.debug:
cherrypy.log('Saving session %r with expiry %s' %
(self.id, expiration_time),
'TOOLS.SESSIONS')
self._save(expiration_time)
else:
if self.debug:
cherrypy.log(
'Skipping save of session %r (no session loaded).' %
self.id, 'TOOLS.SESSIONS')
finally:
if self.locked:
# Always release the lock if the user didn't release it
self.release_lock()
if self.debug:
cherrypy.log('Lock released after save.', 'TOOLS.SESSIONS')
示例10
def time_since_initialized(self):
"""
The time elapsed since initialization as :class:`~datetime.timedelta`.
This property is only implemented on devices, which need to store
properties in the udev database. On all other devices this property is
simply zero :class:`~datetime.timedelta`.
.. seealso:: :attr:`is_initialized`
.. udevversion:: 165
.. versionadded:: 0.8
"""
microseconds = self._libudev.udev_device_get_usec_since_initialized(
self)
return timedelta(microseconds=microseconds)
示例11
def test_should_refresh(self):
""" Unit test _should_refresh private method """
# without max age
ref = Refreshable(None)
self.assertFalse(ref._should_refresh())
# with max age and no data
ref = Refreshable(max_age=10)
self.assertTrue(ref._should_refresh())
# manually force refresh time
ref._last_refresh_time = datetime.utcnow()
# with max age and last refreshed date OK
self.assertFalse(ref._should_refresh())
# freeze_time will pretend 10 seconds have passed!
with freeze_time(lambda: datetime.utcnow() + timedelta(seconds=10)):
# with max age and last refreshed date KO
self.assertTrue(ref._should_refresh())
示例12
def test_main_with_expiration_group(self):
""" Test group case with expiration """
group = SSMParameterGroup(max_age=300)
param_1 = group.parameter("my_param_1")
param_2 = group.parameter("my_param_2")
param_3 = group.parameter("my_param_3")
# individual params don't share max_age internally (for now)
for param in (param_1, param_2, param_3):
self.assertEqual(param._max_age, None)
# force fetch
group.refresh()
# pretend time has passed (for the group)
group._last_refresh_time = datetime.utcnow() - timedelta(seconds=301)
self.assertTrue(group._should_refresh())
self.assertTrue(param_1._should_refresh())
self.assertTrue(param_2._should_refresh())
self.assertTrue(param_3._should_refresh())
示例13
def CertificateExpired(cert, expires=0):
"""Checks a given certificate for expiry.
Args:
cert: Certificate object
expires: int, the number of seconds to check for expiry. 0 means now
Returns:
boolean, whether the certificate will expire in expires seconds
Raises:
CertError: cert is a mandatory argument
CertError: cert is not a PEM encoded x509 cert
"""
expiry = datetime.datetime.today() + datetime.timedelta(seconds=expires)
# enddate is a list of [str, (datetime|None)], we want the datetime object
cert_end = cert.enddate[1]
if cert_end:
return expiry > cert_end
else:
raise CertError('Certificate has a malformed enddate.')
示例14
def __init__(
self,
slug: str,
expiry: datetime.timedelta = datetime.timedelta(days=7),
src: str = None,
) -> None:
# Generate a paste_id and a removal_id
# Unless someone proves me wrong that I need to check for collisions
# my famous last words will be that the odds are astronomically small
self.slug = slug
self.removal = utility.slug_create(auto_scale=False)
self.pub_date = datetime.datetime.utcnow()
self.chg_date = datetime.datetime.utcnow()
self.src = src
# The expires date is the pub_date with the delta of the expiry
if expiry:
self.exp_date = self.pub_date + expiry
else:
self.exp_date = None
示例15
def add(lexer: str) -> None:
"""Add a paste to pinnwand's database from stdin."""
from pinnwand import database
from pinnwand import utility
if lexer not in utility.list_languages():
log.error("add: unknown lexer")
return
paste = database.Paste(utility.slug_create(), expiry=timedelta(days=1))
file = database.File(paste.slug, sys.stdin.read(), lexer=lexer)
paste.files.append(file)
with database.session() as session:
session.add(paste)
session.commit()
log.info("add: paste created: %s", paste.slug)
示例16
def __init__(self, offset):
self._offset = datetime.timedelta(seconds=offset * 60)
if offset == 0:
self._name = "Z"
else:
if offset > 0:
self._name = "+"
else:
self._name = "-"
self._name += str(int(offset / 60)).zfill(2)
self._name += ":"
self._name += str(offset % 60).zfill(2)
示例17
def dst(self, dt):
return datetime.timedelta(0)
示例18
def __toLocal(cls, value):
#Convert current time to local.
if value.tzinfo is None:
#If meter is not use time zone.
return value
timestamp = calendar.timegm(value.utctimetuple())
local_dt = datetime.datetime.fromtimestamp(timestamp)
assert value.resolution >= datetime.timedelta(microseconds=1)
return local_dt.replace(microsecond=value.microsecond)
示例19
def clean_trial(src_loc: Path, test_cmds: List[str]) -> timedelta:
"""Remove all existing cache files and run the test suite.
Args:
src_loc: the directory of the package for cache removal, may be a file
test_cmds: test running commands for subprocess.run()
Returns:
None
Raises:
BaselineTestException: if the clean trial does not pass from the test run.
"""
cache.remove_existing_cache_files(src_loc)
LOGGER.info("Running clean trial")
# clean trial will show output all the time for diagnostic purposes
start = datetime.now()
clean_run = subprocess.run(test_cmds, capture_output=False)
end = datetime.now()
if clean_run.returncode != 0:
raise BaselineTestException(
f"Clean trial does not pass, mutant tests will be meaningless.\n"
f"Output: {str(clean_run.stdout)}"
)
return end - start
####################################################################################################
# MUTATION SAMPLE GENERATION
####################################################################################################
示例20
def mock_TrialTimes():
"""Mock Trial Time fixture for the CLI."""
return TrialTimes(
clean_trial_1=timedelta(days=0, seconds=6, microseconds=0),
clean_trial_2=timedelta(days=0, seconds=6, microseconds=0),
mutation_trials=timedelta(days=0, seconds=6, microseconds=0),
)
示例21
def mock_results_summary(mock_trial_results):
"""Mock results summary from multiple trials."""
return ResultsSummary(
results=mock_trial_results,
n_locs_identified=4,
n_locs_mutated=4,
total_runtime=timedelta(days=0, seconds=6, microseconds=0),
)
示例22
def test_clean_trial_timedelta(binop_file, monkeypatch):
"""Clean trial results in a timedelta object."""
def mock_subprocess_run(*args, **kwargs):
return CompletedProcess(args="pytest", returncode=0)
monkeypatch.setattr(subprocess, "run", mock_subprocess_run)
result = run.clean_trial(binop_file.parent, ["pytest"])
assert isinstance(result, timedelta)
示例23
def process_game_data(self, game_date, num_days=1):
game_days_list = list()
for i in range(0, num_days):
game_records = self._get_games_by_date(game_date)
if game_records is not None:
game_days_list.append((game_date, game_records))
game_date = datetime.strftime(datetime.strptime(game_date, "%Y-%m-%d") + timedelta(days=1), "%Y-%m-%d")
return game_days_list
示例24
def check_nonce(request: LocalProxy, session: Session) -> bool:
"""check validity of nonce passed by the user."""
try:
id_ = request.headers['X-Authentication']
nonce = session.query(Nonce).filter(Nonce.id == id_).one()
present = datetime.now()
present = present - nonce.timestamp
session.delete(nonce)
session.commit()
if present > timedelta(0, 0, 0, 0, 1, 0, 0):
return False
except BaseException:
return False
return True
示例25
def download_delta_data(self):
"""
Get yesterday's data and append it to collection,
this method is planned to be executed at each day's 8:30am to update the data.
1. Connect to arctic and get the library.
2. Get today's history data from tushare and strip the unused columns.
3. Store the data to arctic.
:return: None
"""
self._init_coll()
if self._coll_name in self._new_added_colls:
return
# 15:00 PM can get today data
# start = latest_date + 1 day
latest_date = self.get_data().index[-1]
start = latest_date + dt.timedelta(days=1)
start = dt.datetime.strftime(start, '%Y-%m-%d')
his_data = ts.get_hist_data(
code=self._coll_name,
start=start,
retry_count=5
)
# delta data is empty
if len(his_data) == 0:
logger.info(
f'delta data of stock {self._coll_name} is empty, after {start}')
return
his_data = bdu.Utils.strip_unused_cols(his_data, *self._unused_cols)
logger.info(f'got delta data of stock: {self._coll_name}, after {start}')
self._library.append(self._coll_name, his_data)
示例26
def next(self):
if self.order:
return
if not self.position:
if self.sma_s[0] > self.sma_l[0]:
# Using the current close price to calculate the size to buy, but use
# the next open price to executed, so it is possible that the order
# can not be executed due to margin, so set the target to 0.8 instead
# of 1.0 to reduce the odds of not being executed
target_long = 0.8
self.order = self.order_target_percent(target=target_long, valid=bt.Order.DAY)
if self.datas[0].datetime.date() == dt.datetime.now().date() - dt.timedelta(days=1):
stock_id = self.params.ma_periods.get('stock_id')
action = 'buy'
bsu.Utils.log(
self.datas[0].datetime.date(),
f'Market Signal: stock {stock_id}, action: {action}, '
f'adjust position to {target_long:.2f}')
symbol = dt.datetime.now().strftime('%Y-%m-%d')
bsu.Utils.write_daily_alert(symbol, stock_id, action)
else:
if self.sma_s[0] <= self.sma_l[0]:
target_short = 0.0
self.order = self.order_target_percent(target=target_short, valid=bt.Order.DAY)
if self.datas[0].datetime.date() == dt.datetime.now().date() - dt.timedelta(days=1):
stock_id = self.params.ma_periods.get('stock_id')
action = 'sell'
bsu.Utils.log(
self.datas[0].datetime.date(),
f'Market Signal: stock {stock_id}, action: {action}, '
f'adjust position to {target_short:.2f}')
symbol = dt.datetime.now().strftime('%Y-%m-%d')
bsu.Utils.write_daily_alert(symbol, stock_id, action)
示例27
def update_sina_stock_match():
date = (datetime.now() - timedelta(days=1)).strftime('%Y-%m-%d')
msg = get_market_signal_by_date(date)
user = StockMatch(
username=conf.SINA_CONFIG['username'],
password=conf.SINA_CONFIG['password'],
)
for stock_code in msg['buy']:
user.buy(stock_code)
# 经过测试,每隔3S进行一次买入操作的频率最合适
time.sleep(3)
else:
logger.info("没有股票需要买入")
示例28
def _test_download_delta_data(self, mock_get_hist_data):
coll_name = '000651'
ts_his_data = bdt.TsHisData(coll_name)
yesterday = dt.datetime.now() - dt.timedelta(days=1)
mock_delta_data = pd.DataFrame(data={
'open': 38,
'high': 39,
'close': 40,
'low': 41,
'volume': 42,
'price_change': 43,
'p_change': 44,
'ma5': 45,
'ma10': 46,
'ma20': 47,
'v_ma5': 48,
'v_ma10': 49,
'v_ma20': 50,
'turnover': 51
}, index=[dt.datetime.strftime(yesterday, '%Y-%m-%d')])
mock_get_hist_data.return_value = mock_delta_data
ts_his_data.download_delta_data()
hist_data_000651 = ts_his_data.get_data()
self.assertEqual(len(hist_data_000651), 3)
self.assertEqual(dt.datetime.strftime(hist_data_000651.index[-1], '%Y-%m-%d'),
dt.datetime.strftime(yesterday, '%Y-%m-%d'))
lib = models.get_library(conf.CN_STOCK_LIBNAME)
lib.delete(coll_name)
示例29
def test_build_data_select_latest(self, blank_state):
"""Test that build data can be selected for only latest instance."""
build_data_latest = {
'node_name': 'foo',
'generator': 'hello_world',
'data_format': 'text/plain',
'data_element': 'Hello World!',
'task_id': uuid.uuid4(),
'collected_date': datetime.utcnow(),
}
build_data_old = copy.deepcopy(build_data_latest)
build_data_old['collected_date'] = build_data_latest[
'collected_date'] - timedelta(days=1)
build_data_old['task_id'] = uuid.uuid4()
build_data1 = objects.BuildData(**build_data_latest)
build_data2 = objects.BuildData(**build_data_old)
result = blank_state.post_build_data(build_data1)
assert result
result = blank_state.post_build_data(build_data2)
assert result
bd_list = blank_state.get_build_data(node_name='foo', latest=True)
assert len(bd_list) == 1
assert bd_list[0].to_dict() == build_data1.to_dict()
示例30
def ls(args):
"""
List status of all configured SNS-SQS message buses and instances subscribed to them.
"""
table = []
queues = list(resources.sqs.queues.filter(QueueNamePrefix="github"))
max_age = datetime.now(tzutc()) - timedelta(days=15)
for topic in resources.sns.topics.all():
account_id = ARN(topic.arn).account_id
try:
bucket = resources.s3.Bucket("deploy-status-{}".format(account_id))
status_objects = bucket.objects.filter(Prefix=ARN(topic.arn).resource)
recent_status_objects = {o.key: o for o in status_objects if o.last_modified > max_age}
except ClientError:
continue
if ARN(topic.arn).resource.startswith("github"):
for queue in queues:
queue_name = os.path.basename(queue.url)
if queue_name.startswith(ARN(topic.arn).resource):
row = dict(Topic=topic, Queue=queue)
status_object = bucket.Object(os.path.join(queue_name, "status"))
if status_object.key not in recent_status_objects:
continue
try:
github, owner, repo, events, instance = os.path.dirname(status_object.key).split("-", 4)
status = json.loads(status_object.get()["Body"].read().decode("utf-8"))
row.update(status, Owner=owner, Repo=repo, Instance=instance,
Updated=status_object.last_modified)
except Exception:
pass
table.append(row)
args.columns = ["Owner", "Repo", "Instance", "Status", "Ref", "Commit", "Updated", "Topic", "Queue"]
page_output(tabulate(table, args))