Python源码示例:datetime.strptime()
示例1
def detect_date(e):
if is_date(e): return True
for date_type in [ datetime.datetime, datetime.date, np.datetime64 ]:
if isinstance(e, date_type): return True
# Slow!!!
# for date_format in DATE_FORMATS:
# try:
# if datetime.strptime(e, date_format):
# return True
# except:
# continue
# Also slow
# try:
# dparser.parse(e)
# return True
# except: pass
return False
示例2
def testTimestampValue(self):
"""Checks whether the timestamp attribute in the XML output is valid.
Runs a test program that generates an empty XML output, and checks if
the timestamp attribute in the testsuites tag is valid.
"""
actual = self._GetXmlOutput('gtest_no_test_unittest', [], {}, 0)
date_time_str = actual.documentElement.getAttributeNode('timestamp').value
# datetime.strptime() is only available in Python 2.5+ so we have to
# parse the expected datetime manually.
match = re.match(r'(\d+)-(\d\d)-(\d\d)T(\d\d):(\d\d):(\d\d)', date_time_str)
self.assertTrue(
re.match,
'XML datettime string %s has incorrect format' % date_time_str)
date_time_from_xml = datetime.datetime(
year=int(match.group(1)), month=int(match.group(2)),
day=int(match.group(3)), hour=int(match.group(4)),
minute=int(match.group(5)), second=int(match.group(6)))
time_delta = abs(datetime.datetime.now() - date_time_from_xml)
# timestamp value should be near the current local time
self.assertTrue(time_delta < datetime.timedelta(seconds=600),
'time_delta is %s' % time_delta)
actual.unlink()
示例3
def testTimestampValue(self):
"""Checks whether the timestamp attribute in the JSON output is valid.
Runs a test program that generates an empty JSON output, and checks if
the timestamp attribute in the testsuites tag is valid.
"""
actual = self._GetJsonOutput('gtest_no_test_unittest', [], 0)
date_time_str = actual['timestamp']
# datetime.strptime() is only available in Python 2.5+ so we have to
# parse the expected datetime manually.
match = re.match(r'(\d+)-(\d\d)-(\d\d)T(\d\d):(\d\d):(\d\d)', date_time_str)
self.assertTrue(
re.match,
'JSON datettime string %s has incorrect format' % date_time_str)
date_time_from_json = datetime.datetime(
year=int(match.group(1)), month=int(match.group(2)),
day=int(match.group(3)), hour=int(match.group(4)),
minute=int(match.group(5)), second=int(match.group(6)))
time_delta = abs(datetime.datetime.now() - date_time_from_json)
# timestamp value should be near the current local time
self.assertTrue(time_delta < datetime.timedelta(seconds=600),
'time_delta is %s' % time_delta)
示例4
def is_time_format(time):
"""
Check if 'time' variable has the format of one
of the 'time_formats'
"""
if time is None:
return False
for time_format in TIME_FORMATS:
try:
datetime.strptime(time, time_format)
return True
except ValueError:
pass
return False
示例5
def withdraw():
# send xbt withdrawal request
targetAddress = "xxxxxxxxxx"
currency = "xbt"
amount = 0.12345678
result = cfclient.send_withdrawal(targetAddress, currency, amount)
print("send_withdrawal:\n", result)
# get xbt transfers
lastTransferTime = datetime.datetime.strptime("2016-02-01", "%Y-%m-%d").isoformat() + ".000Z"
result = cfclient.get_transfers(lastTransferTime=lastTransferTime)
print("get_transfers:\n", result)
# transfer
fromAccount = "fi_ethusd"
toAccount = "cash"
unit = "eth"
amount = 0.1
result = cfclient.transfer(fromAccount, toAccount, unit, amount)
print("transfer:\n", result)
示例6
def test(self):
cx = self.Symbol_Db['equity'].find()
symbolSet = set([d['code'] for d in cx])
for code in symbolSet:
start = self.Symbol_Db['equity'].find({"code" : code})[0]['timeToMarket']
try:
start = datetime.datetime.strptime(str(start), '%Y%m%d')
except :
print code
start = start.strftime("%Y-%m-%d")
print start
return
#----------------------------------------------------------------------
示例7
def _triggerTime(self):
'''检查定时触发'''
if not self._dataModel.getConfigModel().hasTimerTrigger() or not self.isRealTimeStatus():
return
nowTime = datetime.now()
for i,timeSecond in enumerate(self._dataModel.getConfigTimer()):
specifiedTime = datetime.strptime(timeSecond, "%H%M%S")
if 0<=(nowTime-specifiedTime).seconds<1 and not self._isTimeTriggered[i]:
self._isTimeTriggered[i] = True
key = self._dataModel.getConfigModel().getKLineShowInfoSimple()
dateTimeStamp, tradeDate, lv1Data = self.getTriggerTimeAndData(key[0])
event = Event({
"EventCode" : ST_TRIGGER_TIMER,
"ContractNo": None,
"KLineType" : None,
"KLineSlice": None,
"Data":{
"TradeDate": tradeDate,
"DateTimeStamp": dateTimeStamp,
"Data":timeSecond
}
})
self._triggerQueue.put(event)
示例8
def feature2tile(cls, feature):
""" convert tile field attributes to tile identifier """
fldindex_h = feature.GetFieldIndex("h")
fldindex_v = feature.GetFieldIndex("v")
h = str(int(feature.GetField(fldindex_h))).zfill(2)
v = str(int(feature.GetField(fldindex_v))).zfill(2)
return "h%sv%s" % (h, v)
# @classmethod
# def find_dates(cls, tile):
# """ Get list of dates available in repository for a tile """
# tdir = cls.path(tile=tile)
# if os.path.exists(tdir):
# return [datetime.strptime(os.path.basename(d), cls._datedir).date() for d in os.listdir(tdir)]
# else:
# return []
示例9
def get_total_seconds_from_epoch_for_fluent_logs(self, datetime_string):
# fluentd logs timestamp format : 2018-08-02 19:27:34 +0000
# for python 2.7 or earlier there is no good way to convert it into seconds.
# so we parse upto seconds, and parse utc specific offset seperately.
try:
date_time_format = '%Y-%m-%d %H:%M:%S'
epoch = datetime(1970, 1, 1)
# get hours and minute delta for utc offset.
hours_delta_utc = int(datetime_string[21:23])
minutes_delta_utc= int(datetime_string[23:])
log_time = datetime.strptime(datetime_string[:19], date_time_format) + ((timedelta(hours=hours_delta_utc, minutes=minutes_delta_utc)) * (-1 if datetime_string[20] == "+" else 1))
return (log_time - epoch).total_seconds()
except Exception as e:
self._hutil_error('Error converting timestamp string to seconds. Exception={0}'.format(e))
return 0
示例10
def filter_put(mongodb, slug=None):
base()
data = request.json or {}
data['slug'] = slug
data = dict(data.items())
if 'lastupdate' in data and isinstance(data.get('lastupdate'), basestring):
data['lastupdate'] = datetime.strptime(data.get('lastupdate'),
'%Y-%m-%d %H:%M:%S')
if 'start_process' in data and isinstance(data.get('start_process'),
basestring):
data['start_process'] = datetime.strptime(data.get('start_process'),
'%Y-%m-%d %H:%M:%S')
get = mongodb[collection].find_one({'slug': slug})
if get:
mongodb[collection].update({'slug': slug}, data)
return json.dumps(data, default=parse_dumps)
return {'status': 'error',
'message': 'Object not exist, please send POST to create!'}
示例11
def _olt_version(self):
# Version
# 0 Unknown
# 1 V1 OMCI format
# 2 V2 OMCI format
# 3 2018-01-11 or later
version = 0
info = self._rest_support.get('module-info', [dict()])
hw_mod_ver_str = next((mod.get('revision') for mod in info
if mod.get('module-name', '').lower() == 'gpon-olt-hw'), None)
if hw_mod_ver_str is not None:
try:
from datetime import datetime
hw_mod_dt = datetime.strptime(hw_mod_ver_str, '%Y-%m-%d')
version = 2 if hw_mod_dt >= datetime(2017, 9, 21) else 2
except Exception as e:
self.log.exception('ver-str-check', e=e)
return version
示例12
def str_to_datetime_processor_factory(regexp, type_):
rmatch = regexp.match
# Even on python2.6 datetime.strptime is both slower than this code
# and it does not support microseconds.
has_named_groups = bool(regexp.groupindex)
def process(value):
if value is None:
return None
else:
try:
m = rmatch(value)
except TypeError:
raise ValueError("Couldn't parse %s string '%r' "
"- value is not a string." %
(type_.__name__, value))
if m is None:
raise ValueError("Couldn't parse %s string: "
"'%s'" % (type_.__name__, value))
if has_named_groups:
groups = m.groupdict(0)
return type_(**dict(list(zip(
iter(groups.keys()),
list(map(int, iter(groups.values())))
))))
else:
return type_(*list(map(int, m.groups(0))))
return process
示例13
def get_newest(base_url, url_pattern, links):
'''
Returns a tuple with the newest url in the `links` list matching the
pattern `url_pattern` and a datetime object representing the creation
date of the url.
The creation date is extracted from the url using datetime.strptime().
'''
logger = logging.getLogger('auditor.srmdumps')
times = []
pattern_components = url_pattern.split('/')
date_pattern = '{0}/{1}'.format(base_url, pattern_components[0])
if len(pattern_components) > 1:
postfix = '/' + '/'.join(pattern_components[1:])
else:
postfix = ''
for link in links:
try:
time = datetime.datetime.strptime(link, date_pattern)
except ValueError:
pass
else:
times.append((str(link) + postfix, time))
if not times:
msg = 'No links found matching the pattern {0} in {1}'.format(date_pattern, links)
logger.error(msg)
raise Exception(msg)
return max(times, key=operator.itemgetter(1))
示例14
def get_dt_header(self, header_key):
"""
A helper method to retrieve a response header as a date+time.
Args/kwargs:
`header_key`:
The name of the HTTP response header.
Returns:
`None` or UTC date+time as a `datetime.datetime` instance
(a naive one, i.e., without explicit timezone information).
Example usage:
with RequestPerformer('GET', 'http://example.com/FOO') as perf:
foo_last_modified = perf.get_dt_header('Last-Modified')
if foo_last_modified is None:
print 'I have no idea when FOO was modified.`
else:
print 'FOO modification date+time:', foo_last_modified.isoformat()
"""
raw_value = (self.response.headers.get(header_key) or '').strip()
if raw_value:
for dt_format in self._HTTP_DATETIME_FORMATS:
try:
return datetime.datetime.strptime(raw_value, dt_format)
except ValueError:
pass
try:
return parse_iso_datetime_to_utc(raw_value)
except ValueError:
pass
return None
示例15
def _try_to_set_http_last_modified(self, headers):
http_header = headers.get(self._http_last_modified_header)
if http_header:
for dt_format in self._http_datetime_formats:
try:
parsed_datetime = datetime.datetime.strptime(http_header, dt_format)
except ValueError:
pass
else:
self._http_last_modified = parsed_datetime
break
示例16
def _get_date_facet_counts(self, timespan, date_field, start_date=None, end_date=None):
'''
Returns Range Facet counts based on
'''
if 'DAY' not in timespan:
raise ValueError("At this time, only DAY date range increment is supported. Aborting..... ")
#Need to do this a bit better later. Don't like the string and date concatenations.
if not start_date:
start_date = self._get_edge_date(date_field, 'asc')
start_date = datetime.strptime(start_date,'%Y-%m-%dT%H:%M:%S.%fZ').date().isoformat()+'T00:00:00.000Z'
else:
start_date = start_date+'T00:00:00.000Z'
if not end_date:
end_date = self._get_edge_date(date_field, 'desc')
end_date = datetime.strptime(end_date,'%Y-%m-%dT%H:%M:%S.%fZ').date()
end_date += timedelta(days=1)
end_date = end_date.isoformat()+'T00:00:00.000Z'
else:
end_date = end_date+'T00:00:00.000Z'
self.log.info("Processing Items from {} to {}".format(start_date, end_date))
#Get facet counts for source and destination collections
source_facet = self._source.query(self._source_coll,
self._get_date_range_query(timespan=timespan, start_date=start_date, end_date=end_date)
).get_facets_ranges()[date_field]
dest_facet = self._dest.query(
self._dest_coll, self._get_date_range_query(
timespan=timespan, start_date=start_date, end_date=end_date
)).get_facets_ranges()[date_field]
return source_facet, dest_facet
示例17
def datetime_to_ms(dt):
"""
Convert an unaware datetime object to milliseconds. This will
be a UTC time. The SMC stores all times in UTC and will do the
time conversions based on the local timezone.
Example of converting a datetime to milliseconds::
utc_time = datetime.strptime("2018-06-04T00:00:00", "%Y-%m-%dT%H:%M:%S")
datetime_to_ms(utc_time)
:param dt datetime: pass in python datetime object.
:return: value representing the datetime in milliseconds
:rtype: int
"""
return int(time.mktime(dt.timetuple()) * 1000)
示例18
def str_to_datetime_processor_factory(regexp, type_):
rmatch = regexp.match
# Even on python2.6 datetime.strptime is both slower than this code
# and it does not support microseconds.
has_named_groups = bool(regexp.groupindex)
def process(value):
if value is None:
return None
else:
try:
m = rmatch(value)
except TypeError:
raise ValueError("Couldn't parse %s string '%r' "
"- value is not a string." %
(type_.__name__, value))
if m is None:
raise ValueError("Couldn't parse %s string: "
"'%s'" % (type_.__name__, value))
if has_named_groups:
groups = m.groupdict(0)
return type_(**dict(list(zip(
iter(groups.keys()),
list(map(int, iter(groups.values())))
))))
else:
return type_(*list(map(int, m.groups(0))))
return process
示例19
def get_point_in_the_middle(start_point, end_point, time_diff, point_idx):
"""
Calculates a new point between two points depending of the
time difference between them and the point index.
Parameters
----------
start_point: DataFrame
end_point: DataFrame
time_diff: float
point_idx: int
Point index between the start and the end points
Returns
-------
point: list
A new point between the start and the end points.
"""
time_proportion = (time_diff * point_idx) / end_point['TimeDifference'].item()
distance_proportion = end_point['Distance'].item() * time_proportion
time_diff_proportion = end_point['TimeDifference'].item() * time_proportion
speed = distance_proportion / time_diff_proportion
distance = time_diff * speed
cum_time_diff = int(start_point['CumTimeDiff'].item() + time_diff_proportion)
# date = datetime.strptime(start_point['Date'].item(), '%Y-%m-%d %H:%M:%S') + dt.timedelta(seconds=int(
# time_diff_proportion))
date = pd.to_datetime(start_point['Date'].astype(str), format='%Y-%m-%d %H:%M:%S') + dt.timedelta(
seconds=int(time_diff_proportion))
altitude = (end_point['Altitude'].item() + start_point['Altitude'].item()) / 2
name = start_point['CodeRoute'].item()
geo_start = geopy.Point(start_point['Latitude'].item(), start_point['Longitude'].item())
geo_end = geopy.Point(end_point['Latitude'].item(), end_point['Longitude'].item())
middle_point = get_coordinates(geo_start, geo_end, distance_proportion)
df_middle_point = ([[name, middle_point.latitude, middle_point.longitude, altitude,
date, speed, int(time_diff), distance, None, cum_time_diff]])
return df_middle_point
示例20
def parser(self, x):
return datetime.strptime('190' + x, '%Y-%m')
# convert time series into supervised learning problem
示例21
def load_dataset(self):
series = read_csv(self.dataset_path, sep=',')
header = list(series.columns.values)
raw_time = series[header[0]]
raw_values = series[header[1]]
raw_time = raw_time.values
raw_datetime = [datetime.datetime.strptime(
i, "%Y-%m-%d %H:%M:%S") for i in raw_time]
raw_values = raw_values.values
series_time = Series(raw_time)
series_values = Series(raw_values)
return series, series_values, raw_datetime
示例22
def convert_date(date):
if date is None:
return date
try:
# skip dates already in the right format (like after a downgrade).
datetime.strptime(date, FORMAT_FR)
except ValueError:
pass
else:
return date
return datetime.strptime(date, FORMAT_EN).strftime(FORMAT_FR)
示例23
def str_to_datetime_processor_factory(regexp, type_):
rmatch = regexp.match
# Even on python2.6 datetime.strptime is both slower than this code
# and it does not support microseconds.
has_named_groups = bool(regexp.groupindex)
def process(value):
if value is None:
return None
else:
try:
m = rmatch(value)
except TypeError:
raise ValueError("Couldn't parse %s string '%r' "
"- value is not a string." %
(type_.__name__, value))
if m is None:
raise ValueError("Couldn't parse %s string: "
"'%s'" % (type_.__name__, value))
if has_named_groups:
groups = m.groupdict(0)
return type_(**dict(list(zip(
iter(groups.keys()),
list(map(int, iter(groups.values())))
))))
else:
return type_(*list(map(int, m.groups(0))))
return process
示例24
def APITester():
##### public endpoints #####
# get tickers
result = cfclient.get_tickers()
print("get_tickers:\n", result)
# get order book
symbol = "FI_XBTUSD_180615"
result = cfclient.get_orderbook(symbol)
print("get_orderbook:\n", result)
# get history
"""
symbol = "FI_XBTUSD_180615" # "FI_XBTUSD_180615", "cf-bpi", "cf-hbpi"
lastTime = datetime.datetime.strptime("2016-01-20", "%Y-%m-%d").isoformat() + ".000Z"
result = cfclient.get_history(symbol, lastTime=lastTime)
print("get_history:\n", result)
"""
##### private endpoints #####
# get fills
#lastFillTime = datetime.strptime("2016-02-01", "%Y-%m-%d").isoformat() + ".000Z"
#result = cfclient.get_fills(lastFillTime=lastFillTime)
#print("get_fills:\n", result)
# get open positions
result = cfclient.get_openpositions()
print("get_openpositions:\n", result)
示例25
def get_range_min_tick_data(self, code, start=None, end=None, ktype=1):
start = str(fc.get_stock_timeToMarket(code)) if start is None else start
end = str(datetime.datetime.today().date()) if end is None else end
startD = datetime.datetime.strptime(start, '%Y-%m-%d')
endD = datetime.datetime.strptime(end, '%Y-%m-%d')
delta = datetime.timedelta(days=1)
inDate = endD - delta
while inDate >= startD:
self.parse(code, inDate.strftime("%Y-%m-%d"), ktype)
inDate -= delta
示例26
def downloadEquityAllData(self, code):
start = self.Symbol_Db['equity'].find({"code" : code})[0]['timeToMarket']
try:
start = datetime.datetime.strptime(str(start), '%Y%m%d')
except:
return
start = start.strftime("%Y-%m-%d")
self.get_range_daily_data(code, start) #default上市以来
self.get_range_min_tick_data(code, start)
# 添加index,大幅加快查询速度
self.Tick_Db[code].ensure_index([('date', pymongo.DESCENDING)])
self.OneMin_Db[code].ensure_index([('date', pymongo.DESCENDING)])
示例27
def updateEquityAllData(self, code):
# find the latest timestamp in collection.
latest = self.Daily_Db[code].find_one(sort=[('date', pymongo.DESCENDING)])['date']
latest = datetime.datetime.strptime(str(latest), '%Y-%m-%d')
start = datetime.datetime.strftime(latest + timedelta(days=1), '%Y-%m-%d')
self.get_range_daily_data(code, start) #default上市以来
self.get_range_min_tick_data(code, start)
示例28
def loadMcCsv(self, fileName, dbName, symbol):
"""将Multicharts导出的csv格式的历史数据插入到Mongo数据库中"""
import csv
start = time()
print u'开始读取CSV文件%s中的数据插入到%s的%s中' %(fileName, dbName, symbol)
# 锁定集合,并创建索引
host, port = loadMongoSetting()
client = pymongo.MongoClient(host, port)
collection = client[dbName][symbol]
collection.ensure_index([('datetime', pymongo.ASCENDING)], unique=True)
# 读取数据和插入到数据库
reader = csv.DictReader(file(fileName, 'r'))
for d in reader:
bar = CtaBarData()
bar.vtSymbol = symbol
bar.symbol = symbol
bar.open = float(d['Open'])
bar.high = float(d['High'])
bar.low = float(d['Low'])
bar.close = float(d['Close'])
bar.date = datetime.strptime(d['Date'], '%Y/%m/%d').strftime('%Y%m%d')
bar.time = d['Time']
bar.datetime = datetime.strptime(bar.date + ' ' + bar.time, '%Y%m%d %H:%M:%S')
bar.volume = d['TotalVolume']
flt = {'datetime': bar.datetime}
collection.update_one(flt, {'$set':bar.__dict__}, upsert=True)
print bar.date, bar.time
print u'插入完毕,耗时:%s' % (time()-start)
示例29
def str_to_datetime_processor_factory(regexp, type_):
rmatch = regexp.match
# Even on python2.6 datetime.strptime is both slower than this code
# and it does not support microseconds.
has_named_groups = bool(regexp.groupindex)
def process(value):
if value is None:
return None
else:
try:
m = rmatch(value)
except TypeError:
raise ValueError("Couldn't parse %s string '%r' "
"- value is not a string." %
(type_.__name__, value))
if m is None:
raise ValueError("Couldn't parse %s string: "
"'%s'" % (type_.__name__, value))
if has_named_groups:
groups = m.groupdict(0)
return type_(**dict(list(zip(
iter(groups.keys()),
list(map(int, iter(groups.values())))
))))
else:
return type_(*list(map(int, m.groups(0))))
return process
示例30
def str_to_datetime_processor_factory(regexp, type_):
rmatch = regexp.match
# Even on python2.6 datetime.strptime is both slower than this code
# and it does not support microseconds.
has_named_groups = bool(regexp.groupindex)
def process(value):
if value is None:
return None
else:
try:
m = rmatch(value)
except TypeError:
raise ValueError("Couldn't parse %s string '%r' "
"- value is not a string." %
(type_.__name__, value))
if m is None:
raise ValueError("Couldn't parse %s string: "
"'%s'" % (type_.__name__, value))
if has_named_groups:
groups = m.groupdict(0)
return type_(**dict(list(zip(
iter(groups.keys()),
list(map(int, iter(groups.values())))
))))
else:
return type_(*list(map(int, m.groups(0))))
return process