Python源码示例:utime.ticks_ms()
示例1
def multi_fields(t):
print('Dynamic labels.')
refresh(ssd, True) # Clear any prior image
nfields = []
dy = wri.height + 6
y = 2
col = 15
width = wri.stringlen('99.99')
for txt in ('X:', 'Y:', 'Z:'):
Label(wri, y, 0, txt) # Use wri default colors
nfields.append(Label(wri, y, col, width, bdcolor=None)) # Specify a border, color TBD
y += dy
end = utime.ticks_add(utime.ticks_ms(), t * 1000)
while utime.ticks_diff(end, utime.ticks_ms()) > 0:
for field in nfields:
value = int.from_bytes(uos.urandom(3),'little')/167772
overrange = None if value < 70 else YELLOW if value < 90 else RED
field.value('{:5.2f}'.format(value), fgcolor = overrange, bdcolor = overrange)
refresh(ssd)
utime.sleep(1)
Label(wri, 0, 64, ' OK ', True, fgcolor = RED)
refresh(ssd)
utime.sleep(1)
示例2
def multi_fields(t):
print('multi_fields')
refresh(ssd, True) # Clear any prior image
nfields = []
dy = wri.height + 6
y = 2
col = 15
width = wri.stringlen('99.99')
for txt in ('X:', 'Y:', 'Z:'):
Label(wri, y, 0, txt) # Use wri default colors
nfields.append(Label(wri, y, col, width, bdcolor=None)) # Specify a border, color TBD
y += dy
end = utime.ticks_add(utime.ticks_ms(), t * 1000)
while utime.ticks_diff(end, utime.ticks_ms()) > 0:
for field in nfields:
value = int.from_bytes(uos.urandom(3),'little')/167772
overrange = None if value < 70 else YELLOW if value < 90 else RED
field.value('{:5.2f}'.format(value), fgcolor = overrange, bdcolor = overrange)
refresh(ssd)
utime.sleep(1)
Label(wri, 0, 64, ' OK ', True, fgcolor = RED)
refresh(ssd)
utime.sleep(1)
示例3
def _as_read(self, n, sock=None): # OSError caught by superclass
if sock is None:
sock = self._sock
data = b''
t = ticks_ms()
while len(data) < n:
if self._timeout(t) or not self.isconnected():
raise OSError(-1)
try:
msg = sock.read(n - len(data))
except OSError as e: # ESP32 issues weird 119 errors here
msg = None
if e.args[0] not in BUSY_ERRORS:
raise
if msg == b'': # Connection closed by host
raise OSError(-1)
if msg is not None: # data received
data = b''.join((data, msg))
t = ticks_ms()
self.last_rx = ticks_ms()
await asyncio.sleep_ms(_SOCKET_POLL_DELAY)
return data
示例4
def _as_write(self, bytes_wr, length=0, sock=None):
if sock is None:
sock = self._sock
if length:
bytes_wr = bytes_wr[:length]
t = ticks_ms()
while bytes_wr:
if self._timeout(t) or not self.isconnected():
raise OSError(-1)
try:
n = sock.write(bytes_wr)
except OSError as e: # ESP32 issues weird 119 errors here
n = 0
if e.args[0] not in BUSY_ERRORS:
raise
if n:
t = ticks_ms()
bytes_wr = bytes_wr[n:]
await asyncio.sleep_ms(_SOCKET_POLL_DELAY)
示例5
def broker_up(self): # Test broker connectivity
if not self.isconnected():
return False
tlast = self.last_rx
if ticks_diff(ticks_ms(), tlast) < 1000:
return True
try:
await self._ping()
except OSError:
return False
t = ticks_ms()
while not self._timeout(t):
await asyncio.sleep_ms(100)
if ticks_diff(self.last_rx, tlast) > 0: # Response received
return True
return False
示例6
def run_forever(self):
assert len(self.intervals)>0
while True:
output_things = self._get_tasks()
start_ts = utime.ticks_ms()
for pub in output_things:
pub._observe()
if not pub.__connections__:
self._remove_task(pub)
if len(self.intervals)==0:
break
end_ts = utime.ticks_ms()
if end_ts > start_ts:
self._advance_time(int(round((end_ts-start_ts)/10)))
sleep = self._get_next_sleep_interval()
utime.sleep_ms(sleep*10)
now = utime.ticks_ms()
self._advance_time(int(round((now-end_ts)/10)) if now>=end_ts else sleep)
示例7
def _send(self, d): # Write a line to socket.
async with self._s_lock:
start = utime.ticks_ms()
while d:
try:
ns = self._sock.send(d) # OSError if client closes socket
except OSError as e:
err = e.args[0]
if err == errno.EAGAIN: # Would block: await server read
await asyncio.sleep_ms(100)
else:
self._verbose and print('_send fail. Disconnect')
self._evfail.set()
return False # peer disconnect
else:
d = d[ns:]
if d: # Partial write: pause
await asyncio.sleep_ms(20)
if utime.ticks_diff(utime.ticks_ms(), start) > self._to:
self._verbose and print('_send fail. Timeout.')
self._evfail.set()
return False
self._last_wr = utime.ticks_ms()
return True
示例8
def readline(s, timeout):
line = b''
start = utime.ticks_ms()
while True:
if line.endswith(b'\n'):
if len(line) > 1:
return line
line = b''
start = utime.ticks_ms() # A blank line is just a keepalive
await asyncio.sleep_ms(100) # See note above
d = s.readline()
if d == b'':
raise OSError
if d is not None:
line = b''.join((line, d))
if utime.ticks_diff(utime.ticks_ms(), start) > timeout:
raise OSError
示例9
def readline(self):
line = b''
start = utime.ticks_ms()
while True:
if line.endswith(b'\n'):
if len(line) > 1:
return line
line = b''
start = utime.ticks_ms() # Blank line is keepalive
self.led(not self.led())
await asyncio.sleep_ms(100) # nonzero wait seems empirically necessary
d = self.sock.readline()
if d == b'':
raise OSError
if d is not None:
line = b''.join((line, d))
if utime.ticks_diff(utime.ticks_ms(), start) > self.timeout:
raise OSError
示例10
def perform(self) :
current_tick = utime.ticks_ms()
if not self.initial_tick :
self.initial_tick = current_tick
else :
x = utime.ticks_diff(current_tick, self.initial_tick)
if x == 0 :
pass
elif 0 < x :
rgb = self.get_color(x)
if rgb != self.prev_rgb :
if self.verbose :
logging.info("Lamp: setting color to {} from x={}", rgb, x)
self.fill_pixels(rgb)
self.prev_rgb = rgb
else : # wrap around; start over
logging.info("Lamp: tick wrap")
self.initial_tick = current_tick
return True
示例11
def adcread(chan): # 16 temp 17 vbat 18 vref
assert chan >= 16 and chan <= 18, 'Invalid ADC channel'
start = utime.ticks_ms()
timeout = 100
stm.mem32[stm.RCC + stm.RCC_APB2ENR] |= 0x100 # enable ADC1 clock.0x4100
stm.mem32[stm.ADC1 + stm.ADC_CR2] = 1 # Turn on ADC
stm.mem32[stm.ADC1 + stm.ADC_CR1] = 0 # 12 bit
if chan == 17:
stm.mem32[stm.ADC1 + stm.ADC_SMPR1] = 0x200000 # 15 cycles
stm.mem32[stm.ADC + 4] = 1 << 23
elif chan == 18:
stm.mem32[stm.ADC1 + stm.ADC_SMPR1] = 0x1000000
stm.mem32[stm.ADC + 4] = 0xc00000
else:
stm.mem32[stm.ADC1 + stm.ADC_SMPR1] = 0x40000
stm.mem32[stm.ADC + 4] = 1 << 23
stm.mem32[stm.ADC1 + stm.ADC_SQR3] = chan
stm.mem32[stm.ADC1 + stm.ADC_CR2] = 1 | (1 << 30) | (1 << 10) # start conversion
while not stm.mem32[stm.ADC1 + stm.ADC_SR] & 2: # wait for EOC
if utime.ticks_diff(utime.ticks_ms(), start) > timeout:
raise OSError('ADC timout')
data = stm.mem32[stm.ADC1 + stm.ADC_DR] # clear down EOC
stm.mem32[stm.ADC1 + stm.ADC_CR2] = 0 # Turn off ADC
return data
示例12
def _as_read(self, n):
sock = self._sock
data = b''
t = ticks_ms()
while len(data) < n:
esp32_pause() # Necessary on ESP32 or we can time out.
if self._timeout(t) or not self._sta_if.isconnected():
raise OSError(-1)
try:
msg = sock.read(n - len(data))
except OSError as e: # ESP32 issues weird 119 errors here
msg = None
if e.args[0] not in BUSY_ERRORS:
raise
if msg == b'': # Connection closed by host (?)
raise OSError(-1)
if msg is not None: # data received
data = b''.join((data, msg))
t = ticks_ms() # reset timeout
await asyncio.sleep_ms(_SOCKET_POLL_DELAY)
return data
# Write a buffer
示例13
def run_cancel_test6(loop):
for name in ('complete', 'cancel me'):
loop.create_task(asyn.NamedTask(name, cant60, name)())
loop.create_task(asyn.Cancellable(cant61)())
await asyncio.sleep(4.5)
print('Cancelling task \"{}\". 1.5 secs latency.'.format(name))
await asyn.NamedTask.cancel(name)
await asyncio.sleep(7)
name = 'cancel wait'
loop.create_task(asyn.NamedTask(name, cant60, name)())
await asyncio.sleep(0.5)
print('Cancelling task \"{}\". 1.5 secs latency.'.format(name))
t = time.ticks_ms()
await asyn.NamedTask.cancel('cancel wait', nowait=False)
print('Was cancelled in {} ms'.format(time.ticks_diff(time.ticks_ms(), t)))
print('Cancelling cant61')
await asyn.Cancellable.cancel_all()
print('Done')
示例14
def _gauge(self):
now = utime.ticks_ms()
if utime.ticks_diff(now, self._last_read_ts) > self._new_read_ms:
self._last_read_ts = now
r = self._t_os + (self._p_os << 3) + (1 << 6)
self._write(BMX280_REGISTER_CONTROL, r)
utime.sleep_ms(100) # TODO calc sleep
if self._chip_id == 0x58:
d = self._read(BMX280_REGISTER_DATA, 6) # read all data at once (as by spec)
self._p_raw = (d[0] << 12) + (d[1] << 4) + (d[2] >> 4)
self._t_raw = (d[3] << 12) + (d[4] << 4) + (d[5] >> 4)
else:
d = self._read(BMX280_REGISTER_DATA, 8) # read all data at once (as by spec)
self._p_raw = (d[0] << 12) + (d[1] << 4) + (d[2] >> 4)
self._t_raw = (d[3] << 12) + (d[4] << 4) + (d[5] >> 4)
self._h_raw = (d[6] << 8) + d[7]
self._t_fine = 0
self._t = 0
self._h = 0
self._p = 0
示例15
def time(self):
return time.ticks_ms()
示例16
def main(i2c):
# Initialize Charlieplex matrix wing.
matrix = is31fl3731.CharlieWing(i2c)
matrix.fill(0)
# Initialize font renderer.
with bitmapfont.BitmapFont(DISPLAY_WIDTH, DISPLAY_HEIGHT, matrix.pixel) as bf:
# Global state:
pos = DISPLAY_WIDTH # X position of the message start.
message_width = bf.width(MESSAGE) # Message width in pixels.
frame = 0 # Currently displayed frame.
last = utime.ticks_ms() # Last frame millisecond tick time.
speed_ms = SPEED / 1000.0 # Scroll speed in pixels/ms.
while True:
# Compute the time delta in milliseconds since the last frame.
current = utime.ticks_ms()
delta_ms = utime.ticks_diff(last, current)
last = current
# Compute position using speed and time delta.
pos -= speed_ms*delta_ms
if pos < -message_width:
pos = DISPLAY_WIDTH
# Swap frames to start drawing on a non-visible frame (double buffering).
frame = (frame + 1) % 2
matrix.frame(frame, show=False)
# Clear the frame and draw the text at the current position.
matrix.fill(0)
bf.text(MESSAGE, int(pos), 0, INTENSITY)
# Swap to the new frame on the display.
matrix.frame(frame)
# Sleep a bit to give USB mass storage some processing time (quirk
# of SAMD21 firmware right now).
utime.sleep_ms(20)
示例17
def main(i2c):
# Initialize Charlieplex matrix wing.
matrix = is31fl3731.CharlieWing(i2c)
matrix.fill(0)
# Initialize font renderer.
with bitmapfont.BitmapFont(DISPLAY_WIDTH, DISPLAY_HEIGHT, matrix.pixel) as bf:
# Global state:
pos = DISPLAY_WIDTH # X position of the message start.
message_width = bf.width(MESSAGE) # Message width in pixels.
frame = 0 # Currently displayed frame.
last = utime.ticks_ms() # Last frame millisecond tick time.
speed_ms = SPEED / 1000.0 # Scroll speed in pixels/ms.
while True:
# Compute the time delta in milliseconds since the last frame.
current = utime.ticks_ms()
delta_ms = utime.ticks_diff(current, last)
last = current
# Compute position using speed and time delta.
pos -= speed_ms*delta_ms
if pos < -message_width:
pos = DISPLAY_WIDTH
# Swap frames to start drawing on a non-visible frame (double buffering).
frame = (frame + 1) % 2
matrix.frame(frame, show=False)
# Clear the frame and draw the text at the current position.
matrix.fill(0)
bf.text(MESSAGE, int(pos), 0, INTENSITY)
# Swap to the new frame on the display.
matrix.frame(frame)
# Sleep a bit to give USB mass storage some processing time (quirk
# of SAMD21 firmware right now).
utime.sleep_ms(20)
示例18
def main(i2c):
# Initialize LED matrix.
matrix = ht16k33_matrix.Matrix16x8(i2c)
# Initialize font renderer using a helper function to flip the Y axis
# when rendering so the origin is in the upper left.
def matrix_pixel(x, y):
matrix.pixel(x, DISPLAY_HEIGHT-1-y, 1)
with bitmapfont.BitmapFont(DISPLAY_WIDTH, DISPLAY_HEIGHT, matrix_pixel) as bf:
# Global state:
pos = DISPLAY_WIDTH # X position of the message start.
message_width = bf.width(MESSAGE) # Message width in pixels.
last = utime.ticks_ms() # Last frame millisecond tick time.
speed_ms = SPEED / 1000.0 # Scroll speed in pixels/ms.
# Main loop:
while True:
# Compute the time delta in milliseconds since the last frame.
current = utime.ticks_ms()
delta_ms = utime.ticks_diff(current, last)
last = current
# Compute position using speed and time delta.
pos -= speed_ms*delta_ms
if pos < -message_width:
pos = DISPLAY_WIDTH
# Clear the matrix and draw the text at the current position.
matrix.fill(0)
bf.text(MESSAGE, int(pos), 0)
# Update the matrix LEDs.
matrix.show()
# Sleep a bit to give USB mass storage some processing time (quirk
# of SAMD21 firmware right now).
utime.sleep_ms(20)
示例19
def main(i2c):
# Initialize LED matrix.
matrix = ht16k33_matrix.Matrix16x8(i2c)
# Initialize font renderer using a helper function to flip the Y axis
# when rendering so the origin is in the upper left.
def matrix_pixel(x, y):
matrix.pixel(x, DISPLAY_HEIGHT-1-y, 1)
with bitmapfont.BitmapFont(DISPLAY_WIDTH, DISPLAY_HEIGHT, matrix_pixel) as bf:
# Global state:
pos = DISPLAY_WIDTH # X position of the message start.
message_width = bf.width(MESSAGE) # Message width in pixels.
last = utime.ticks_ms() # Last frame millisecond tick time.
speed_ms = SPEED / 1000.0 # Scroll speed in pixels/ms.
# Main loop:
while True:
# Compute the time delta in milliseconds since the last frame.
current = utime.ticks_ms()
delta_ms = utime.ticks_diff(last, current)
last = current
# Compute position using speed and time delta.
pos -= speed_ms*delta_ms
if pos < -message_width:
pos = DISPLAY_WIDTH
# Clear the matrix and draw the text at the current position.
matrix.fill(0)
bf.text(MESSAGE, int(pos), 0)
# Update the matrix LEDs.
matrix.show()
# Sleep a bit to give USB mass storage some processing time (quirk
# of SAMD21 firmware right now).
utime.sleep_ms(20)
示例20
def new_fix_time(self):
"""Updates a high resolution counter with current time when fix is updated. Currently only triggered from
GGA, GSA and RMC sentences"""
try:
self.fix_time = utime.ticks_ms()
except NameError:
self.fix_time = time.time()
#########################################
# User Helper Functions
# These functions make working with the GPS object data easier
#########################################
示例21
def time_since_fix(self):
"""Returns number of millisecond since the last sentence with a valid fix was parsed. Returns 0 if
no fix has been found"""
# Test if a Fix has been found
if self.fix_time == 0:
return -1
# Try calculating fix time using utime; if not running MicroPython
# time.time() returns a floating point value in secs
try:
current = utime.ticks_diff(utime.ticks_ms(), self.fix_time)
except NameError:
current = (time.time() - self.fix_time) * 1000 # ms
return current
示例22
def __init__(self, config):
# MQTT config
self._client_id = config['client_id']
self._user = config['user']
self._pswd = config['password']
self._keepalive = config['keepalive']
if self._keepalive >= 65536:
raise ValueError('invalid keepalive time')
self._response_time = config['response_time'] * 1000 # Repub if no PUBACK received (ms).
self._max_repubs = config['max_repubs']
self._clean_init = config['clean_init'] # clean_session state on first connection
self._clean = config['clean'] # clean_session state on reconnect
will = config['will']
if will is None:
self._lw_topic = False
else:
self._set_last_will(*will)
# WiFi config
self._ssid = config['ssid'] # Required for ESP32 / Pyboard D. Optional ESP8266
self._wifi_pw = config['wifi_pw']
self._ssl = config['ssl']
self._ssl_params = config['ssl_params']
# Callbacks and coros
self._cb = config['subs_cb']
self._wifi_handler = config['wifi_coro']
self._connect_handler = config['connect_coro']
# Network
self.port = config['port']
if self.port == 0:
self.port = 8883 if self._ssl else 1883
self.server = config['server']
if self.server is None:
raise ValueError('no server specified.')
self._sock = None
self._sta_if = network.WLAN(network.STA_IF)
self._sta_if.active(True)
self.newpid = pid_gen()
self.rcv_pids = set() # PUBACK and SUBACK pids awaiting ACK response
self.last_rx = ticks_ms() # Time of last communication from broker
self.lock = asyncio.Lock()
示例23
def _timeout(self, t):
return ticks_diff(ticks_ms(), t) > self._response_time
示例24
def _keep_alive(self):
while self.isconnected():
pings_due = ticks_diff(ticks_ms(), self.last_rx) // self._ping_interval
if pings_due >= 4:
self.dprint('Reconnect: broker fail.')
break
elif pings_due >= 1:
try:
await self._ping()
except OSError:
break
await asyncio.sleep(1)
self._reconnect() # Broker or WiFi fail.
# DEBUG: show RAM messages.
示例25
def _synchronise(self): # wait for clock
t = ticks_ms()
while self.ckin() == self.phase ^ self.passive ^ 1:
# Other tasks can clear self._running by calling stop()
if (self._timeout and ticks_diff(ticks_ms(), t) > self._timeout) or not self._running:
raise SynComError
await asyncio.sleep_ms(0)
self.indata = (self.indata | (self.din() << _BITS_SYN)) >> 1
odata = self.odata
self.dout(odata & 1)
self.odata = odata >> 1
self.phase ^= 1
self.ckout(self.phase) # set clock
示例26
def _get_bit(self, dest):
t = ticks_ms()
while self.ckin() == self.phase ^ self.passive ^ 1:
if (self._timeout and ticks_diff(ticks_ms(), t) > self._timeout) or not self._running:
raise SynComError
yield # Faster than await asyncio.sleep_ms()
dest = (dest | (self.din() << _BITS_PER_CH)) >> 1
obyte = self.odata
self.dout(obyte & 1)
self.odata = obyte >> 1
self.phase ^= 1
self.ckout(self.phase)
return dest
示例27
def position_update(self):
if time.ticks_ms() - self.updateTime > 20:
self.updateTime = time.ticks_ms()
self._position = self._position + self._read_encoder()
示例28
def run_to(self, pos, speed):
error_last_1 = 0
pwm_last = [255, 255, 255, 255]
self.position_update()
_distance = pos
pwm_now = 0
kp = 0.85
kd = 7.0
tick_now = time.ticks_ms()
num = 0
while True:
time.sleep_ms(1)
if time.ticks_ms() - tick_now > 10:
tick_now = time.ticks_ms()
error = _distance - self.read_encoder()
pwm_now = kp*error + kd*(error-error_last_1)
pwm_now = constrain(pwm_now, -speed, speed)
pwm_now = dead_area(pwm_now, -100, -3, 100, 3)
pwm_now = -pwm_now
error_last_1 = error
pwm_last = pwm_last[1:4]
if -15 < error < 15:
num += 1
else:
num = 0
if num > 10:
self.set_pwm(0)
break
self.set_pwm(pwm_now)
pwm_last.append(pwm_now)
if pwm_last == [0, 0, 0, 0]:
break
示例29
def run_distance(self, distance=500, speed=255):
error_last_1 = 0
pwm_last = [255, 255, 255, 255]
_distance = self.read_encoder() + distance
pwm_now = 0
kp = 0.85
kd = 7.0
tick_now = time.ticks_ms()
num = 0
while True:
time.sleep_ms(1)
if time.ticks_ms() - tick_now > 10:
tick_now = time.ticks_ms()
self.position_update()
error = _distance - self.read_encoder()
pwm_now = kp*error + kd*(error-error_last_1)
pwm_now = constrain(pwm_now, -speed, speed)
pwm_now = dead_area(pwm_now, -100, -3, 100, 3)
pwm_now = -pwm_now
error_last_1 = error
pwm_last = pwm_last[1:4]
if -15 < error < 15:
num += 1
else:
num = 0
if num > 10:
self.set_pwm(0)
break
self.set_pwm(pwm_now)
pwm_last.append(pwm_now)
if pwm_last == [0, 0, 0, 0]:
break
示例30
def _waitTimeout(self, t):
t1 = time.ticks_ms()
while True:
time.sleep(0.01)
if self.uart.any():
return 0
if time.ticks_ms() - t1 > t:
return 1