Python源码示例:utime.ticks_diff()
示例1
def test():
"""Bouncing sprite."""
try:
# Baud rate of 14500000 seems about the max
spi = SPI(2, baudrate=14500000, sck=Pin(18), mosi=Pin(23))
display = Display(spi, dc=Pin(17), cs=Pin(5), rst=Pin(16))
display.clear()
# Load sprite
logo = BouncingSprite('images/Python41x49.raw',
41, 49, 128, 128, 1, display)
while True:
timer = ticks_us()
logo.update_pos()
logo.draw()
# Attempt to set framerate to 30 FPS
timer_dif = 33333 - ticks_diff(ticks_us(), timer)
if timer_dif > 0:
sleep_us(timer_dif)
except KeyboardInterrupt:
display.cleanup()
示例2
def multi_fields(t):
print('Dynamic labels.')
refresh(ssd, True) # Clear any prior image
nfields = []
dy = wri.height + 6
y = 2
col = 15
width = wri.stringlen('99.99')
for txt in ('X:', 'Y:', 'Z:'):
Label(wri, y, 0, txt) # Use wri default colors
nfields.append(Label(wri, y, col, width, bdcolor=None)) # Specify a border, color TBD
y += dy
end = utime.ticks_add(utime.ticks_ms(), t * 1000)
while utime.ticks_diff(end, utime.ticks_ms()) > 0:
for field in nfields:
value = int.from_bytes(uos.urandom(3),'little')/167772
overrange = None if value < 70 else YELLOW if value < 90 else RED
field.value('{:5.2f}'.format(value), fgcolor = overrange, bdcolor = overrange)
refresh(ssd)
utime.sleep(1)
Label(wri, 0, 64, ' OK ', True, fgcolor = RED)
refresh(ssd)
utime.sleep(1)
示例3
def multi_fields(t):
print('multi_fields')
refresh(ssd, True) # Clear any prior image
nfields = []
dy = wri.height + 6
y = 2
col = 15
width = wri.stringlen('99.99')
for txt in ('X:', 'Y:', 'Z:'):
Label(wri, y, 0, txt) # Use wri default colors
nfields.append(Label(wri, y, col, width, bdcolor=None)) # Specify a border, color TBD
y += dy
end = utime.ticks_add(utime.ticks_ms(), t * 1000)
while utime.ticks_diff(end, utime.ticks_ms()) > 0:
for field in nfields:
value = int.from_bytes(uos.urandom(3),'little')/167772
overrange = None if value < 70 else YELLOW if value < 90 else RED
field.value('{:5.2f}'.format(value), fgcolor = overrange, bdcolor = overrange)
refresh(ssd)
utime.sleep(1)
Label(wri, 0, 64, ' OK ', True, fgcolor = RED)
refresh(ssd)
utime.sleep(1)
示例4
def broker_up(self): # Test broker connectivity
if not self.isconnected():
return False
tlast = self.last_rx
if ticks_diff(ticks_ms(), tlast) < 1000:
return True
try:
await self._ping()
except OSError:
return False
t = ticks_ms()
while not self._timeout(t):
await asyncio.sleep_ms(100)
if ticks_diff(self.last_rx, tlast) > 0: # Response received
return True
return False
示例5
def run(self, conversion): # Uses assembler for speed
if self.popfunc is not None:
self.popfunc(self) # Populate the data (for fwd transfers, just the real data)
if conversion != REVERSE: # Forward transform: real data assumed
setarray(self.im, 0, self._length)# Fast zero imaginary data
if self.windata is not None: # Fast apply the window function
winapply(self.re, self.windata, self._length)
start = utime.ticks_us()
fft(self.ctrl, conversion)
delta = utime.ticks_diff(utime.ticks_us(), start)
if (conversion & POLAR) == POLAR: # Ignore complex conjugates, convert 1st half of arrays
topolar(self.re, self.im, self._length//2) # Fast
if conversion == DB: # Ignore conjugates: convert 1st half only
for idx, val in enumerate(self.re[0:self._length//2]):
self.re[idx] = -80.0 if val <= 0.0 else 20*math.log10(val) - self.dboffset
return delta
# Subclass for acquiring data from Pyboard ADC using read_timed() method.
示例6
def readline(s, timeout):
line = b''
start = utime.ticks_ms()
while True:
if line.endswith(b'\n'):
if len(line) > 1:
return line
line = b''
start = utime.ticks_ms() # A blank line is just a keepalive
await asyncio.sleep_ms(100) # See note above
d = s.readline()
if d == b'':
raise OSError
if d is not None:
line = b''.join((line, d))
if utime.ticks_diff(utime.ticks_ms(), start) > timeout:
raise OSError
示例7
def readline(self):
line = b''
start = utime.ticks_ms()
while True:
if line.endswith(b'\n'):
if len(line) > 1:
return line
line = b''
start = utime.ticks_ms() # Blank line is keepalive
self.led(not self.led())
await asyncio.sleep_ms(100) # nonzero wait seems empirically necessary
d = self.sock.readline()
if d == b'':
raise OSError
if d is not None:
line = b''.join((line, d))
if utime.ticks_diff(utime.ticks_ms(), start) > self.timeout:
raise OSError
示例8
def timed_function(f, *args, **kwargs):
#myname = str(f).split(' ')[1]
def new_func(*args, **kwargs):
#t = utime.ticks_us()
result = f(*args, **kwargs)
#if myname not in profile_stats_time_including_children:
# profile_stats_time_including_children[myname] = 0
# profile_stats_calls[myname] = 0
#profile_stats_time_including_children[myname] += utime.ticks_diff(utime.ticks_us(), t)
#profile_stats_calls[myname] += 1
return result
return new_func
示例9
def perform(self) :
current_tick = utime.ticks_ms()
if not self.initial_tick :
self.initial_tick = current_tick
else :
x = utime.ticks_diff(current_tick, self.initial_tick)
if x == 0 :
pass
elif 0 < x :
rgb = self.get_color(x)
if rgb != self.prev_rgb :
if self.verbose :
logging.info("Lamp: setting color to {} from x={}", rgb, x)
self.fill_pixels(rgb)
self.prev_rgb = rgb
else : # wrap around; start over
logging.info("Lamp: tick wrap")
self.initial_tick = current_tick
return True
示例10
def adcread(chan): # 16 temp 17 vbat 18 vref
assert chan >= 16 and chan <= 18, 'Invalid ADC channel'
start = utime.ticks_ms()
timeout = 100
stm.mem32[stm.RCC + stm.RCC_APB2ENR] |= 0x100 # enable ADC1 clock.0x4100
stm.mem32[stm.ADC1 + stm.ADC_CR2] = 1 # Turn on ADC
stm.mem32[stm.ADC1 + stm.ADC_CR1] = 0 # 12 bit
if chan == 17:
stm.mem32[stm.ADC1 + stm.ADC_SMPR1] = 0x200000 # 15 cycles
stm.mem32[stm.ADC + 4] = 1 << 23
elif chan == 18:
stm.mem32[stm.ADC1 + stm.ADC_SMPR1] = 0x1000000
stm.mem32[stm.ADC + 4] = 0xc00000
else:
stm.mem32[stm.ADC1 + stm.ADC_SMPR1] = 0x40000
stm.mem32[stm.ADC + 4] = 1 << 23
stm.mem32[stm.ADC1 + stm.ADC_SQR3] = chan
stm.mem32[stm.ADC1 + stm.ADC_CR2] = 1 | (1 << 30) | (1 << 10) # start conversion
while not stm.mem32[stm.ADC1 + stm.ADC_SR] & 2: # wait for EOC
if utime.ticks_diff(utime.ticks_ms(), start) > timeout:
raise OSError('ADC timout')
data = stm.mem32[stm.ADC1 + stm.ADC_DR] # clear down EOC
stm.mem32[stm.ADC1 + stm.ADC_CR2] = 0 # Turn off ADC
return data
示例11
def get_t_split(self):
state = machine.disable_irq()
t = self.t_ms
acquired = self.acquired
machine.enable_irq(state)
isecs, ims = divmod(t, 1000) # Get integer secs and ms
x, secs = divmod(isecs, 60)
hrs, mins = divmod(x, 60)
dt = utime.ticks_diff(utime.ticks_us(), acquired) # μs to time now
ds, us = divmod(dt, 1000000)
# If dt > 1e6 can add to secs without risk of rollover: see above.
self._time[0] = hrs
self._time[1] = mins
self._time[2] = secs + ds
self._time[3] = us + ims*1000
return self._time
示例12
def run_cancel_test6(loop):
for name in ('complete', 'cancel me'):
loop.create_task(asyn.NamedTask(name, cant60, name)())
loop.create_task(asyn.Cancellable(cant61)())
await asyncio.sleep(4.5)
print('Cancelling task \"{}\". 1.5 secs latency.'.format(name))
await asyn.NamedTask.cancel(name)
await asyncio.sleep(7)
name = 'cancel wait'
loop.create_task(asyn.NamedTask(name, cant60, name)())
await asyncio.sleep(0.5)
print('Cancelling task \"{}\". 1.5 secs latency.'.format(name))
t = time.ticks_ms()
await asyn.NamedTask.cancel('cancel wait', nowait=False)
print('Was cancelled in {} ms'.format(time.ticks_diff(time.ticks_ms(), t)))
print('Cancelling cant61')
await asyn.Cancellable.cancel_all()
print('Done')
示例13
def main(i2c):
# Initialize Charlieplex matrix wing.
matrix = is31fl3731.CharlieWing(i2c)
matrix.fill(0)
# Initialize font renderer.
with bitmapfont.BitmapFont(DISPLAY_WIDTH, DISPLAY_HEIGHT, matrix.pixel) as bf:
# Global state:
pos = DISPLAY_WIDTH # X position of the message start.
message_width = bf.width(MESSAGE) # Message width in pixels.
frame = 0 # Currently displayed frame.
last = utime.ticks_ms() # Last frame millisecond tick time.
speed_ms = SPEED / 1000.0 # Scroll speed in pixels/ms.
while True:
# Compute the time delta in milliseconds since the last frame.
current = utime.ticks_ms()
delta_ms = utime.ticks_diff(last, current)
last = current
# Compute position using speed and time delta.
pos -= speed_ms*delta_ms
if pos < -message_width:
pos = DISPLAY_WIDTH
# Swap frames to start drawing on a non-visible frame (double buffering).
frame = (frame + 1) % 2
matrix.frame(frame, show=False)
# Clear the frame and draw the text at the current position.
matrix.fill(0)
bf.text(MESSAGE, int(pos), 0, INTENSITY)
# Swap to the new frame on the display.
matrix.frame(frame)
# Sleep a bit to give USB mass storage some processing time (quirk
# of SAMD21 firmware right now).
utime.sleep_ms(20)
示例14
def main(i2c):
# Initialize Charlieplex matrix wing.
matrix = is31fl3731.CharlieWing(i2c)
matrix.fill(0)
# Initialize font renderer.
with bitmapfont.BitmapFont(DISPLAY_WIDTH, DISPLAY_HEIGHT, matrix.pixel) as bf:
# Global state:
pos = DISPLAY_WIDTH # X position of the message start.
message_width = bf.width(MESSAGE) # Message width in pixels.
frame = 0 # Currently displayed frame.
last = utime.ticks_ms() # Last frame millisecond tick time.
speed_ms = SPEED / 1000.0 # Scroll speed in pixels/ms.
while True:
# Compute the time delta in milliseconds since the last frame.
current = utime.ticks_ms()
delta_ms = utime.ticks_diff(current, last)
last = current
# Compute position using speed and time delta.
pos -= speed_ms*delta_ms
if pos < -message_width:
pos = DISPLAY_WIDTH
# Swap frames to start drawing on a non-visible frame (double buffering).
frame = (frame + 1) % 2
matrix.frame(frame, show=False)
# Clear the frame and draw the text at the current position.
matrix.fill(0)
bf.text(MESSAGE, int(pos), 0, INTENSITY)
# Swap to the new frame on the display.
matrix.frame(frame)
# Sleep a bit to give USB mass storage some processing time (quirk
# of SAMD21 firmware right now).
utime.sleep_ms(20)
示例15
def main(i2c):
# Initialize LED matrix.
matrix = ht16k33_matrix.Matrix16x8(i2c)
# Initialize font renderer using a helper function to flip the Y axis
# when rendering so the origin is in the upper left.
def matrix_pixel(x, y):
matrix.pixel(x, DISPLAY_HEIGHT-1-y, 1)
with bitmapfont.BitmapFont(DISPLAY_WIDTH, DISPLAY_HEIGHT, matrix_pixel) as bf:
# Global state:
pos = DISPLAY_WIDTH # X position of the message start.
message_width = bf.width(MESSAGE) # Message width in pixels.
last = utime.ticks_ms() # Last frame millisecond tick time.
speed_ms = SPEED / 1000.0 # Scroll speed in pixels/ms.
# Main loop:
while True:
# Compute the time delta in milliseconds since the last frame.
current = utime.ticks_ms()
delta_ms = utime.ticks_diff(current, last)
last = current
# Compute position using speed and time delta.
pos -= speed_ms*delta_ms
if pos < -message_width:
pos = DISPLAY_WIDTH
# Clear the matrix and draw the text at the current position.
matrix.fill(0)
bf.text(MESSAGE, int(pos), 0)
# Update the matrix LEDs.
matrix.show()
# Sleep a bit to give USB mass storage some processing time (quirk
# of SAMD21 firmware right now).
utime.sleep_ms(20)
示例16
def main(i2c):
# Initialize LED matrix.
matrix = ht16k33_matrix.Matrix16x8(i2c)
# Initialize font renderer using a helper function to flip the Y axis
# when rendering so the origin is in the upper left.
def matrix_pixel(x, y):
matrix.pixel(x, DISPLAY_HEIGHT-1-y, 1)
with bitmapfont.BitmapFont(DISPLAY_WIDTH, DISPLAY_HEIGHT, matrix_pixel) as bf:
# Global state:
pos = DISPLAY_WIDTH # X position of the message start.
message_width = bf.width(MESSAGE) # Message width in pixels.
last = utime.ticks_ms() # Last frame millisecond tick time.
speed_ms = SPEED / 1000.0 # Scroll speed in pixels/ms.
# Main loop:
while True:
# Compute the time delta in milliseconds since the last frame.
current = utime.ticks_ms()
delta_ms = utime.ticks_diff(last, current)
last = current
# Compute position using speed and time delta.
pos -= speed_ms*delta_ms
if pos < -message_width:
pos = DISPLAY_WIDTH
# Clear the matrix and draw the text at the current position.
matrix.fill(0)
bf.text(MESSAGE, int(pos), 0)
# Update the matrix LEDs.
matrix.show()
# Sleep a bit to give USB mass storage some processing time (quirk
# of SAMD21 firmware right now).
utime.sleep_ms(20)
示例17
def time_since_fix(self):
"""Returns number of millisecond since the last sentence with a valid fix was parsed. Returns 0 if
no fix has been found"""
# Test if a Fix has been found
if self.fix_time == 0:
return -1
# Try calculating fix time using utime; if not running MicroPython
# time.time() returns a floating point value in secs
try:
current = utime.ticks_diff(utime.ticks_ms(), self.fix_time)
except NameError:
current = (time.time() - self.fix_time) * 1000 # ms
return current
示例18
def _timeout(self, t):
return ticks_diff(ticks_ms(), t) > self._response_time
示例19
def _keep_alive(self):
while self.isconnected():
pings_due = ticks_diff(ticks_ms(), self.last_rx) // self._ping_interval
if pings_due >= 4:
self.dprint('Reconnect: broker fail.')
break
elif pings_due >= 1:
try:
await self._ping()
except OSError:
break
await asyncio.sleep(1)
self._reconnect() # Broker or WiFi fail.
# DEBUG: show RAM messages.
示例20
def _synchronise(self): # wait for clock
t = ticks_ms()
while self.ckin() == self.phase ^ self.passive ^ 1:
# Other tasks can clear self._running by calling stop()
if (self._timeout and ticks_diff(ticks_ms(), t) > self._timeout) or not self._running:
raise SynComError
await asyncio.sleep_ms(0)
self.indata = (self.indata | (self.din() << _BITS_SYN)) >> 1
odata = self.odata
self.dout(odata & 1)
self.odata = odata >> 1
self.phase ^= 1
self.ckout(self.phase) # set clock
示例21
def _get_bit(self, dest):
t = ticks_ms()
while self.ckin() == self.phase ^ self.passive ^ 1:
if (self._timeout and ticks_diff(ticks_ms(), t) > self._timeout) or not self._running:
raise SynComError
yield # Faster than await asyncio.sleep_ms()
dest = (dest | (self.din() << _BITS_PER_CH)) >> 1
obyte = self.odata
self.dout(obyte & 1)
self.odata = obyte >> 1
self.phase ^= 1
self.ckout(self.phase)
return dest
示例22
def waitfor(self, val): # Wait for response for 1 sec
tim = utime.ticks_ms()
while not self.rem() == val:
if utime.ticks_diff(utime.ticks_ms(), tim) > 1000:
raise OSError
示例23
def _run(self):
while True:
# If hardware link exists reboot Responder
await self.reboot()
self.txbyt = b''
self.rxbyt = b''
await self._sync()
await asyncio.sleep(1) # Ensure Responder is ready
if self.cr_go:
asyncio.create_task(self.cr_go(*self.go_args))
while True:
gc.collect()
try:
tstart = utime.ticks_us()
self._sendrx()
t = utime.ticks_diff(utime.ticks_us(), tstart)
except OSError: # Reboot remote.
break
await asyncio.sleep_ms(Initiator.t_poll)
self.block_max = max(self.block_max, t) # self measurement
self.block_cnt += 1
self.block_sum += t
self.nboots += 1
if self.cr_fail:
await self.cr_fail(*self.f_args)
if self.reset is None: # No means of recovery
raise OSError('Responder fail.')
示例24
def _do_qos(self, mid, line):
while True:
# Wait for any outage to clear
await self
# Wait for the matching ACK.
tstart = utime.ticks_ms()
while utime.ticks_diff(utime.ticks_ms(), tstart) < self._to:
await asyncio.sleep_ms(self._tim_short)
if mid not in self._acks_pend:
return # ACK was received
# ACK was not received. Re-send.
await self._write(line)
self._verbose and print('Repeat', line, 'to server app')
# Make an attempt to connect to WiFi. May not succeed.
示例25
def _keepalive(self):
while True:
due = self._tim_ka - utime.ticks_diff(utime.ticks_ms(), self._last_wr)
if due <= 0:
# error sets ._evfail, .run cancels this coro
await self._send(b'\n')
else:
await asyncio.sleep_ms(due)
# Read a line from nonblocking socket: reads can return partial data which
# are joined into a line. Blank lines are keepalive packets which reset
# the timeout: _readline() pauses until a complete line has been received.
示例26
def _readline(self, to):
led = self._led
line = b''
start = utime.ticks_ms()
while True:
if line.endswith(b'\n'):
self._ok = True # Got at least 1 packet after an outage.
if len(line) > 1:
return line
# Got a keepalive: discard, reset timers, toggle LED.
self._feed(0)
line = b''
if led is not None:
if isinstance(led, machine.Pin):
led(not led())
else: # On Pyboard D
led.toggle()
try:
d = self._sock.readline()
except Exception as e:
self._verbose and print('_readline exception', d)
raise
if d == b'':
self._verbose and print('_readline peer disconnect')
raise OSError
if d is None: # Nothing received: wait on server
if utime.ticks_diff(utime.ticks_ms(), start) > to:
self._verbose and print('_readline timeout')
raise OSError
await asyncio.sleep_ms(0)
else: # Something received: reset timer
start = utime.ticks_ms()
line = b''.join((line, d)) if line else d
示例27
def test():
"""Bouncing box."""
try:
# Baud rate of 14500000 seems about the max
spi = SPI(2, baudrate=14500000, sck=Pin(18), mosi=Pin(23))
display = Display(spi, dc=Pin(17), cs=Pin(5), rst=Pin(16))
display.clear()
colors = [color565(255, 0, 0),
color565(0, 255, 0),
color565(0, 0, 255),
color565(255, 255, 0),
color565(0, 255, 255),
color565(255, 0, 255)]
sizes = [12, 11, 10, 9, 8, 7]
boxes = [Box(128, 128, sizes[i], display,
colors[i]) for i in range(6)]
while True:
timer = ticks_us()
for b in boxes:
b.update_pos()
b.draw()
# Attempt to set framerate to 30 FPS
timer_dif = 33333 - ticks_diff(ticks_us(), timer)
if timer_dif > 0:
sleep_us(timer_dif)
except KeyboardInterrupt:
display.cleanup()
示例28
def run(self, conversion, duration):
tim = self.timer
tim.deinit()
tim.init(freq = int(self._length/duration))
self.adc.read_timed(self.buff, tim) # Note: blocks for duration
start = utime.ticks_us()
icopy(self.buff, self.re, self._length) # Fast copy integer array into real
super().run(conversion)
return utime.ticks_diff(utime.ticks_us(), start)
示例29
def rtc_test(self, runtime=600, ppm=False, verbose=True):
if rtc is None:
raise RuntimeError('machine.RTC does not exist')
verbose and print('Waiting {} minutes for result'.format(runtime//60))
factor = 1_000_000 if ppm else 114_155_200 # seconds per year
self.await_transition() # Start on transition of DS3231. Record time in .timebuf
t = utime.ticks_ms() # Get system time now
ss = rtc.datetime()[6] # Seconds from system RTC
while ss == rtc.datetime()[6]:
pass
ds = utime.ticks_diff(utime.ticks_ms(), t) # ms to transition of RTC
ds3231_start = utime.mktime(self.convert()) # Time when transition occurred
t = rtc.datetime()
rtc_start = utime.mktime((t[0], t[1], t[2], t[4], t[5], t[6], t[3] - 1, 0)) # y m d h m s wday 0
utime.sleep(runtime) # Wait a while (precision doesn't matter)
self.await_transition() # of DS3231 and record the time
t = utime.ticks_ms() # and get system time now
ss = rtc.datetime()[6] # Seconds from system RTC
while ss == rtc.datetime()[6]:
pass
de = utime.ticks_diff(utime.ticks_ms(), t) # ms to transition of RTC
ds3231_end = utime.mktime(self.convert()) # Time when transition occurred
t = rtc.datetime()
rtc_end = utime.mktime((t[0], t[1], t[2], t[4], t[5], t[6], t[3] - 1, 0)) # y m d h m s wday 0
d_rtc = 1000 * (rtc_end - rtc_start) + de - ds # ms recorded by RTC
d_ds3231 = 1000 * (ds3231_end - ds3231_start) # ms recorded by DS3231
ratio = (d_ds3231 - d_rtc) / d_ds3231
ppm = ratio * 1_000_000
verbose and print('DS3231 leads RTC by {:4.1f}ppm {:4.1f}mins/yr'.format(ppm, ppm*1.903))
return ratio * factor
示例30
def getcal(self, minutes=5, cal=0, verbose=True):
if d_series:
return self._getcal_d(minutes, cal, verbose)
verbose and print('Pyboard 1.x. Waiting {} minutes for calibration factor.'.format(minutes))
rtc.calibration(cal) # Clear existing cal
self.save_time() # Set DS3231 from RTC
self.await_transition() # Wait for DS3231 to change: on a 1 second boundary
tus = utime.ticks_us()
st = rtc.datetime()[7]
while rtc.datetime()[7] == st: # Wait for RTC to change
pass
t1 = utime.ticks_diff(utime.ticks_us(), tus) # t1 is duration (μs) between DS and RTC change (start)
rtcstart = get_ms(rtc.datetime()) # RTC start time in mS
dsstart = utime.mktime(self.convert()) # DS start time in secs as recorded by await_transition
utime.sleep(minutes * 60)
self.await_transition() # DS second boundary
tus = utime.ticks_us()
st = rtc.datetime()[7]
while rtc.datetime()[7] == st:
pass
t2 = utime.ticks_diff(utime.ticks_us(), tus) # t2 is duration (μs) between DS and RTC change (end)
rtcend = get_ms(rtc.datetime())
dsend = utime.mktime(self.convert())
dsdelta = (dsend - dsstart) * 1000000 # Duration (μs) between DS edges as measured by DS3231
if rtcend < rtcstart: # It's run past midnight. Assumption: run time < 1 day!
rtcend += 24 * 3_600_000
rtcdelta = (rtcend - rtcstart) * 1000 + t1 - t2 # Duration (μs) between DS edges as measured by RTC and corrected
ppm = (1000000* (rtcdelta - dsdelta))/dsdelta
if cal:
verbose and print('Error {:4.1f}ppm {:4.1f}mins/year.'.format(ppm, ppm * 1.903))
return 0
cal = int(-ppm / 0.954)
verbose and print('Error {:4.1f}ppm {:4.1f}mins/year. Cal factor {}'.format(ppm, ppm * 1.903, cal))
return cal
# Version for Pyboard D. This has μs resolution.