Python源码示例:datetime.datetime()
示例1
def __getDateTime(cls, value):
dt = None
if isinstance(value, (GXDateTime)):
dt = value
elif isinstance(value, (datetime, str)):
dt = GXDateTime(value)
dt.skip |= DateTimeSkips.MILLISECOND
else:
raise ValueError("Invalid date format.")
return dt
#
# Convert date time to DLMS bytes.
#
# buff
# Byte buffer where data is write.
# value
# Added value.
#
示例2
def getGeneralizedTime(cls, dateString):
year = int(dateString[0:4])
month = int(dateString[4:6])
day = int(dateString[6:8])
hour = int(dateString[8:10])
minute = int(dateString[10:12])
#If UTC time.
if dateString.endsWith("Z"):
if len(dateString) > 13:
second = int(dateString[12:14])
return datetime(year, month, day, hour, minute, second, 0, tzinfo=GXTimeZone(0))
if len(dateString) > 17:
second = int(dateString.substring(12, 14))
tz = dateString[dateString.length() - 4:]
return datetime(year, month, day, hour, minute, second, 0, tzinfo=GXTimeZone(tz))
示例3
def test_get_status_summary(mock_trial_results):
"""Test the status summary based on the trial results."""
expected = {
"SURVIVED": 1,
"DETECTED": 1,
"ERROR": 1,
"UNKNOWN": 1,
"TIMEOUT": 1,
"TOTAL RUNS": 5,
"RUN DATETIME": str(datetime(2019, 1, 1)),
}
result = get_status_summary(mock_trial_results)
print(expected)
assert result == expected
示例4
def timestamp_parameter(timestamp, allow_none=True):
if timestamp is None:
if allow_none:
return None
raise ValueError("Timestamp value cannot be None")
if isinstance(timestamp, datetime):
return timestamp.isoformat()
if isinstance(timestamp, basestring):
if not ISO_8601.match(timestamp):
raise ValueError(("Invalid timestamp: %s is not a valid ISO-8601"
" formatted date") % timestamp)
return timestamp
raise ValueError("Cannot accept type %s for timestamp" % type(timestamp))
示例5
def ensure_datetime(obj):
"""Return the object if it is a datetime-like object
Parameters
----------
obj : Object to be tested.
Returns
-------
The original object if it is a datetime-like object
Raises
------
TypeError if `obj` is not datetime-like
"""
_VALID_TYPES = (str, datetime.datetime, cftime.datetime,
np.datetime64)
if isinstance(obj, _VALID_TYPES):
return obj
raise TypeError("datetime-like object required. "
"Type given: {}".format(type(obj)))
示例6
def datetime_or_default(date, default):
"""Return a datetime-like object or a default.
Parameters
----------
date : `None` or datetime-like object or str
default : The value to return if `date` is `None`
Returns
-------
`default` if `date` is `None`, otherwise returns the result of
`utils.times.ensure_datetime(date)`
"""
if date is None:
return default
else:
return ensure_datetime(date)
示例7
def recursive_test_params():
basic_params = {
'proj': example_proj,
'model': example_model,
'run': example_run,
'var': condensation_rain,
'date_range': (datetime.datetime(4, 1, 1),
datetime.datetime(6, 12, 31)),
'intvl_in': 'monthly',
'dtype_in_time': 'ts'
}
recursive_params = basic_params.copy()
recursive_condensation_rain = Var(
name='recursive_condensation_rain',
variables=(precip, convection_rain), func=lambda x, y: x - y,
def_time=True)
recursive_params['var'] = recursive_condensation_rain
yield (basic_params, recursive_params)
_clean_test_direcs()
示例8
def download_bill(self, bill_date, bill_type="ALL", device_info=None):
"""
下载对账单
:param bill_date: 下载对账单的日期
:param bill_type: 账单类型,ALL,返回当日所有订单信息,默认值
SUCCESS,返回当日成功支付的订单,
REFUND,返回当日退款订单,
REVOKED,已撤销的订单
:param device_info: 微信支付分配的终端设备号,填写此字段,只下载该设备号的对账单
:return: 返回的结果数据
"""
if isinstance(bill_date, (datetime, date)):
bill_date = bill_date.strftime("%Y%m%d")
data = {
"appid": self.appid,
"bill_date": bill_date,
"bill_type": bill_type,
"device_info": device_info,
}
return self._post("pay/downloadbill", data=data)
示例9
def download_fundflow(self, bill_date, account_type="Basic", tar_type=None):
"""
下载资金账单
https://pay.weixin.qq.com/wiki/doc/api/jsapi.php?chapter=9_18&index=7
:param bill_date: 下载对账单的日期
:param account_type: 账单的资金来源账户
Basic 基本账户
Operation 运营账户
Fees 手续费账户
:param tar_type: 非必传参数,固定值:GZIP,返回格式为.gzip的压缩包账单。
不传则默认为数据流形式。
"""
if isinstance(bill_date, (datetime, date)):
bill_date = bill_date.strftime("%Y%m%d")
data = {
"appid": self.appid,
"bill_date": bill_date,
"account_type": account_type,
"sign_type": "HMAC-SHA256",
}
if tar_type is not None:
data["tar_type"] = tar_type
return self._post("pay/downloadfundflow", data=data)
示例10
def get_records(self, start_time, end_time, msgid=1, number=10000):
"""
获取客服聊天记录
:param start_time: 查询开始时间,UNIX 时间戳
:param end_time: 查询结束时间,UNIX 时间戳,每次查询不能跨日查询
:param msgid: 消息id顺序从小到大,从1开始
:param number: 每次获取条数,最多10000条
:return: 返回的 JSON 数据包
"""
if isinstance(start_time, datetime.datetime):
start_time = time.mktime(start_time.timetuple())
if isinstance(end_time, datetime.datetime):
end_time = time.mktime(end_time.timetuple())
record_data = {
"starttime": int(start_time),
"endtime": int(end_time),
"msgid": msgid,
"number": number,
}
res = self._post("https://api.weixin.qq.com/customservice/msgrecord/getmsglist", data=record_data,)
return res
示例11
def list_statistics(self, begin_date, end_date, shop_id=-1):
"""
Wi-Fi数据统计
详情请参考
http://mp.weixin.qq.com/wiki/8/dfa2b756b66fca5d9b1211bc18812698.html
:param begin_date: 起始日期时间,最长时间跨度为30天
:param end_date: 结束日期时间戳,最长时间跨度为30天
:param shop_id: 可选,门店 ID,按门店ID搜索,-1为总统计
:return: 返回的 JSON 数据包
"""
if isinstance(begin_date, (datetime, date)):
begin_date = begin_date.strftime("%Y-%m-%d")
if isinstance(end_date, (datetime, date)):
end_date = end_date.strftime("%Y-%m-%d")
res = self._post(
"statistics/list",
data={"begin_date": begin_date, "end_date": end_date, "shop_id": shop_id},
result_processor=lambda x: x["data"],
)
return res
示例12
def test_qualification_verify_fail_event(self):
from wechatpy.events import QualificationVerifyFailEvent
xml = """
<xml>
<ToUserName><![CDATA[toUser]]></ToUserName>
<FromUserName><![CDATA[FromUser]]></FromUserName>
<CreateTime>1442401156</CreateTime>
<MsgType><![CDATA[event]]></MsgType>
<Event><![CDATA[qualification_verify_fail]]></Event>
<FailTime>1442401122</FailTime>
<FailReason><![CDATA[by time]]></FailReason>
</xml>"""
event = parse_message(xml)
self.assertTrue(isinstance(event, QualificationVerifyFailEvent))
self.assertTrue(isinstance(event.fail_time, datetime))
self.assertEqual(event.fail_reason, "by time")
示例13
def test_naming_verify_fail_event(self):
from wechatpy.events import NamingVerifyFailEvent
xml = """
<xml>
<ToUserName><![CDATA[toUser]]></ToUserName>
<FromUserName><![CDATA[FromUser]]></FromUserName>
<CreateTime>1442401061</CreateTime>
<MsgType><![CDATA[event]]></MsgType>
<Event><![CDATA[naming_verify_fail]]></Event>
<FailTime>1442401061</FailTime>
<FailReason><![CDATA[by time]]></FailReason>
</xml>"""
event = parse_message(xml)
self.assertTrue(isinstance(event, NamingVerifyFailEvent))
self.assertTrue(isinstance(event.fail_time, datetime))
self.assertEqual(event.fail_reason, "by time")
示例14
def update(self):
self.data['events'] = []
self.tzutc = tz.tzutc()
self.tzlocal = tz.tzlocal()
urls, colors = [], {}
for cal in self._iter_calendars():
urls.append(cal.url)
colors[cal.url] = cal.color
for result in utils.iter_responses(urls, timeout=5):
response = result.get('response')
if response:
ical = Calendar.from_ical(response.read().decode('utf-8'))
color = colors[result.get('url')]
self.data['events'] += self._parse_events(ical, color)
self.data['events'] = sorted(self.data['events'], key=lambda e:e['start'])
# Calculate time to next event
now = datetime.datetime.now()
next = [e for e in self.data['events'] if e['start'] > now][0]['start'] if self.data['events'] else self.DELTANONE
if next < now + datetime.timedelta(seconds=self.DEFAULT_INTERVAL*1.5): self.data['next'] = 'Now'
else: self.data['next'] = utils.natural_time(next-now, 1)
super(Plugin, self).update()
示例15
def _parse_events(self, ical, color):
events = []
today = datetime.datetime.combine(datetime.date.today(), datetime.time.min)
title = ical.get('x-wr-calname', ical.get('version', ''))
for event in ical.walk():
if event.name == "VEVENT":
start = self._event_start(event)
if today <= start <= today + datetime.timedelta(days=14):
events.append({
'title': event.get('summary'),
'calendar': title,
'color': color,
'start': start,
'where': event.get('location'),
'status': event.get('description'),
})
return events
示例16
def get(self, request, id, format=None):
strategy_id = id
if strategy_id:
strategy_obj = Strategy.objects.get(id=strategy_id)
code_text = strategy_obj.source_code.code_text
backtest.delay(
strategy_id=strategy_id,
code_text=code_text,
class_name="DoubleMaStrategy",
vt_symbol="IF88.CFFEX",
interval="1m",
start_date=datetime(2016, 1, 1),
end_date=datetime(2019, 1, 1),
rate=3.0 / 10000,
slippage=0.2,
size=300,
pricetick=0.2,
capital=1_000_000,
)
return Response({"status": "Process"})
return Response({"status": "Error"})
示例17
def registerHciCallback(self, callback):
"""
Add a new callback function to self.registeredHciCallbacks.
The function will be called every time the recvThread receives
a HCI packet. The packet will be passed to the callback function
as first argument. The format is a tuple containing:
- HCI packet (subclass of HCI, see hci.py)
- original length
- inc_len
- flags
- drops
- timestamp (python datetime object)
"""
if callback in self.registeredHciCallbacks:
log.warn("registerHciCallback: callback already registered!")
return
self.registeredHciCallbacks.append(callback)
示例18
def registerHciRecvQueue(self, queue, filter_function=None):
"""
Add a new queue to self.registeredHciRecvQueues.
The queue will be filled by the recvThread every time the thread receives
a HCI packet. The format of the packet is a tuple containing:
- HCI packet (subclass of HCI, see hci.py)
- original length
- inc_len
- flags
- drops
- timestamp (python datetime object)
If filter_function is not None, the tuple will first be passed
to the function and only if the function returns True, the packet
is put into the queue.
"""
if queue in self.registeredHciRecvQueues:
log.warn("registerHciRecvQueue: queue already registered!")
return
self.registeredHciRecvQueues.append((queue, filter_function))
示例19
def test_voice_recording_view(self):
http_client = Mock()
http_client.request.return_value = '{"data":[{"id":"12345678-9012-3456-7890-123456789012","format":"wav","legId":"87654321-0987-6543-2109-876543210987","status":"done","duration":32,"type":"transfer","createdAt":"2018-01-01T00:00:01Z","updatedAt":"2018-01-01T00:00:05Z","deletedAt":null}],"_links":{"file":"/calls/12348765-4321-0987-6543-210987654321/legs/87654321-0987-6543-2109-876543210987/recordings/12345678-9012-3456-7890-123456789012.wav","self":"/calls/12345678-9012-3456-7890-123456789012/legs/12348765-4321-0987-6543-210987654321/recordings/12345678-9012-3456-7890-123456789012"},"pagination":{"totalCount":0,"pageCount":0,"currentPage":0,"perPage":0}}'
voice_recording = Client('', http_client).voice_recording_view(
'12348765-4321-0987-6543-210987654321',
'87654321-0987-6543-2109-876543210987',
'12345678-9012-3456-7890-123456789012'
)
http_client.request.assert_called_once_with(
'https://voice.messagebird.com/calls/12348765-4321-0987-6543-210987654321/legs/87654321-0987-6543-2109-876543210987/recordings/12345678-9012-3456-7890-123456789012',
'GET', None)
self.assertEqual('12345678-9012-3456-7890-123456789012', voice_recording.id)
self.assertEqual('done', voice_recording.status)
self.assertEqual('wav', voice_recording.format)
self.assertEqual(datetime(2018, 1, 1, 0, 0, 1, tzinfo=tzutc()), voice_recording.createdAt)
self.assertEqual(datetime(2018, 1, 1, 0, 0, 5, tzinfo=tzutc()), voice_recording.updatedAt)
self.assertEqual(2, len(voice_recording._links))
self.assertIsInstance(str(voice_recording), str)
示例20
def test_create_message(self):
http_client = Mock()
http_client.request.return_value = '{"id":"id","conversationId":"conversation-id","channelId":"channel-id","type":"text","content":{"text":"Example Text Message"},"direction":"sent","status":"pending","createdDatetime":"2019-04-02T11:57:52.142641447Z","updatedDatetime":"2019-04-02T11:57:53.142641447Z"}'
data = {
'channelId': 1234,
'type': 'text',
'content': {
'text': 'this is a message'
},
}
msg = Client('', http_client).conversation_create_message('conversation-id', data)
self.assertEqual(datetime(2019, 4, 2, 11, 57, 53, tzinfo=tzutc()), msg.updatedDatetime)
self.assertEqual(datetime(2019, 4, 2, 11, 57, 52, tzinfo=tzutc()), msg.createdDatetime)
http_client.request.assert_called_once_with('conversations/conversation-id/messages', 'POST', data)
示例21
def test_conversation_start(self):
http_client = Mock()
http_client.request.return_value = '{"id":"1234","contactId":"1234","contact":{"id":"1234","href":"https://contacts.messagebird.com/v2/contacts/1234","msisdn":99999999999,"displayName":"99999999999","firstName":"","lastName":"","customDetails":{},"attributes":{},"createdDatetime":"2019-04-02T08:19:37Z","updatedDatetime":"2019-04-02T08:19:38Z"},"channels":[{"id":"1234","name":"channel-name","platformId":"sms","status":"active","createdDatetime":"2019-04-01T15:25:12Z","updatedDatetime":"0001-01-01T00:00:00Z"}],"status":"active","createdDatetime":"2019-04-02T08:19:37Z","updatedDatetime":"2019-04-02T08:54:42.497114599Z","lastReceivedDatetime":"2019-04-02T08:54:42.464955904Z","lastUsedChannelId":"1234","messages":{"totalCount":1,"href":"https://conversations.messagebird.com/v1/conversations/1234/messages"}}'
data = {
'channelId': '1234',
'to': '+99999999999',
'type': "text",
'content': {
'text': 'Message Example'
},
}
msg = Client('', http_client).conversation_start(data)
http_client.request.assert_called_once_with('conversations/start', 'POST', data)
self.assertEqual('1234', msg.id)
self.assertEqual(99999999999, msg.contact.msisdn)
self.assertEqual(datetime(2019, 4, 2, 8, 19, 37, tzinfo=tzutc()), msg.contact.createdDatetime)
self.assertEqual(datetime(2019, 4, 2, 8, 19, 38, tzinfo=tzutc()), msg.contact.updatedDatetime)
self.assertEqual('channel-name', msg.channels[0].name)
示例22
def test_get_coediting_network(sqlite_db_file):
time_from = datetime(2019, 2, 12, 11, 0, 0)
time_to = datetime(2019, 2, 12, 11, 15, 0)
t, node_info, edge_info = git2net.get_coediting_network(sqlite_db_file, time_from=time_from,
time_to=time_to)
expected_edges = [('Author B', 'Author A', 1549965657),
('Author A', 'Author B', 1549966134),
('Author B', 'Author A', 1549966184),
('Author C', 'Author B', 1549966309),
('Author C', 'Author A', 1549966309),
('Author C', 'Author A', 1549966309),
('Author B', 'Author A', 1549966356),
('Author B', 'Author A', 1549965738),
('Author C', 'Author A', 1549966451),
('Author C', 'Author A', 1549966451),
('Author C', 'Author A', 1549966451)]
assert len(set(t.tedges).difference(set(expected_edges))) == 0
示例23
def testParsePEMCertificateWithoutEmail(self):
"""Test _ParsePEMCertificate."""
self.StubSetup()
pem = 'pem'
date = 'Oct 31 12:34:56 1971 GMT'
dt_date = datetime.datetime(1971, 10, 31, 12, 34, 56)
parsed = {'subject': 'subject', 'issuer': 'issuer', 'certhash': 'hash',
'startdate': [date, dt_date], 'enddate': [date, dt_date],
'fingerprint': 'fing:er:print', 'osx_fingerprint': 'fingerprint',
'email': '', 'serial': '87654321', 'pem': pem}
cmd = [certs.CMD_OPENSSL, 'x509', '-sha1', '-nameopt', 'compat', '-noout',
'-hash', '-subject', '-issuer', '-startdate', '-enddate',
'-fingerprint', '-serial', '-email']
output = ('hash\nsubject= subject\nissuer= issuer\nnotBefore=%s\n'
'notAfter=%s\nSHA1 Fingerprint=fing:er:print\nserial=87654321\n'
% (date, date))
certs.gmacpyutil.RunProcess(cmd, pem).AndReturn((output, '', 0))
self.mox.ReplayAll()
c = certs.Certificate(pem)
self.assertEqual(parsed, c.__dict__)
self.mox.VerifyAll()
示例24
def testParsePEMCertificateWithEmail(self):
"""Test _ParsePEMCertificate."""
self.StubSetup()
pem = 'pem'
date = 'Oct 31 12:34:56 1971 GMT'
dt_date = datetime.datetime(1971, 10, 31, 12, 34, 56)
parsed = {'subject': 'subject', 'issuer': 'issuer', 'certhash': 'hash',
'startdate': [date, dt_date], 'enddate': [date, dt_date],
'fingerprint': 'fing:er:print', 'osx_fingerprint': 'fingerprint',
'serial': '87654321', 'email': 'user@company.com', 'pem': pem}
cmd = [certs.CMD_OPENSSL, 'x509', '-sha1', '-nameopt', 'compat', '-noout',
'-hash', '-subject', '-issuer', '-startdate', '-enddate',
'-fingerprint', '-serial', '-email']
output_with_email = ('hash\nsubject= subject\nissuer= issuer\nnotBefore=%s'
'\nnotAfter=%s\nSHA1 Fingerprint=fing:er:print\n'
'serial=87654321\nuser@company.com\n' % (date, date))
certs.gmacpyutil.RunProcess(cmd, pem).AndReturn((output_with_email, '', 0))
self.mox.ReplayAll()
c = certs.Certificate(pem)
self.assertEqual(parsed, c.__dict__)
self.mox.VerifyAll()
示例25
def ReadTimeFile(self):
"""Read timestamp from self.timeplist.
Returns:
datetime.datetime object from timestamp in self.timeplist
Raises:
ErrorReadingPlist: getplistkey failed
ValueError: bad value from plist that doesn't match strptime
"""
time_str = gmacpyutil.GetPlistKey(self.timeplist, PLIST_TIMESTAMP_KEY)
if not time_str:
raise ErrorReadingPlist('Could not read %s from %s' %
(PLIST_TIMESTAMP_KEY, self.timeplist))
self.stored_time = datetime.datetime.strptime(time_str,
'%Y-%m-%d %H:%M:%S %Z')
return self.stored_time
示例26
def __init__(self, value=None, pattern=None):
"""
Constructor.
value: Date-time value.
pattern: Date-time pattern that is used when value is a string.
"""
self.extra = DateTimeExtraInfo.NONE
self.skip = DateTimeSkips.NONE
self.status = ClockStatus.OK
self.timeZone = 0
self.dayOfWeek = 0
if isinstance(value, datetime.datetime):
if value.tzinfo is None:
self.value = datetime.datetime(value.year, value.month, value.day, value.hour, value.minute, value.second, 0, tzinfo=GXTimeZone(int(-time.altzone / 60)))
else:
self.value = value
elif isinstance(value, str):
self.value = self.fromString(value, pattern)
elif isinstance(value, GXDateTime):
self.value = value.value
self.skip = value.skip
self.extra = value.extra
elif not value:
self.value = None
else:
raise ValueError("Invalid datetime value.")
示例27
def fromUnixTime(cls, unixTime):
return GXDateTime(datetime.datetime(unixTime * 1000))
#
# Convert date time to Epoch time.
#
# @param value
# Date and time.
# Unix time.
#
示例28
def toUnixTime(cls, value):
if isinstance(value, datetime):
return value.utctimetuple()
if isinstance(value, GXDateTime):
return value.value.utctimetuple()
return int(value.value / 1000)
示例29
def from_json(cls, json_dict):
self = cls()
for k, v in json_dict.items():
if isinstance(getattr(self, k, None), datetime.datetime) and v:
try:
if len(v.strip().split()) == 2:
v += " +0000"
v = datetime.datetime.strptime(v, "%Y-%m-%d %H:%M:%S %z")
except TypeError:
pass
if hasattr(self, k):
setattr(self, k, v)
return self
示例30
def _test_parse_data(self):
date_string = '2017-01-01'
parsed_date = bdu.Utils.parse_date(date_string)
self.assertEqual(parsed_date, dt.datetime(2017, 1, 1))