Python源码示例:utime.sleep_ms()
示例1
def clock(x):
print('Clock test.')
refresh(ssd, True) # Clear any prior image
lbl = Label(wri, 5, 85, 'Clock')
dial = Dial(wri, 5, 5, height = 75, ticks = 12, bdcolor=None, label=50) # Border in fg color
hrs = Pointer(dial)
mins = Pointer(dial)
hrs.value(0 + 0.7j, RED)
mins.value(0 + 0.9j, YELLOW)
dm = cmath.rect(1, -cmath.pi/30) # Rotate by 1 minute (CW)
dh = cmath.rect(1, -cmath.pi/1800) # Rotate hours by 1 minute
for n in range(x):
refresh(ssd)
utime.sleep_ms(200)
mins.value(mins.value() * dm, YELLOW)
hrs.value(hrs.value() * dh, RED)
dial.text('ticks: {}'.format(n))
lbl.value('Done')
示例2
def __init__(self, spi, pincs, pindc, pinrs, height=64, width=96):
self.spi = spi
self.rate = 6660000 # Data sheet: 150ns min clock period
self.pincs = pincs
self.pindc = pindc # 1 = data 0 = cmd
self.height = height # Required by Writer class
self.width = width
# Save color mode for use by writer_gui (blit)
self.mode = framebuf.GS8 # Use 8bit greyscale for 8 bit color.
gc.collect()
self.buffer = bytearray(self.height * self.width)
super().__init__(self.buffer, self.width, self.height, self.mode)
pinrs(0) # Pulse the reset line
utime.sleep_ms(1)
pinrs(1)
utime.sleep_ms(1)
self._write(b'\xae\xa0\x32\xa1\x00\xa2\x00\xa4\xa8\x3f\xad\x8e\xb0'\
b'\x0b\xb1\x31\xb3\xf0\x8a\x64\x8b\x78\x8c\x64\xbb\x3a\xbe\x3e\x87'\
b'\x06\x81\x91\x82\x50\x83\x7d\xaf', 0)
gc.collect()
self.show()
示例3
def __init__(self, spi, pincs, pindc, pinrs, height=64, width=96):
self.spi = spi
self.rate = 6660000 # Data sheet: 150ns min clock period
self.pincs = pincs
self.pindc = pindc # 1 = data 0 = cmd
self.height = height # Required by Writer class
self.width = width
# Save color mode for use by writer_gui (blit)
self.mode = framebuf.RGB565
gc.collect()
self.buffer = bytearray(self.height * self.width * 2)
super().__init__(self.buffer, self.width, self.height, self.mode)
pinrs(0) # Pulse the reset line
utime.sleep_ms(1)
pinrs(1)
utime.sleep_ms(1)
self._write(b'\xae\xa0\x72\xa1\x00\xa2\x00\xa4\xa8\x3f\xad\x8e\xb0'\
b'\x0b\xb1\x31\xb3\xf0\x8a\x64\x8b\x78\x8c\x64\xbb\x3a\xbe\x3e\x87'\
b'\x06\x81\x91\x82\x50\x83\x7d\xaf', 0)
gc.collect()
self.show()
示例4
def main():
print('alevel test is running.')
CWriter.set_textpos(ssd, 0, 0) # In case previous tests have altered it
wri = CWriter(ssd, arial10, GREEN, BLACK, verbose=False)
wri.set_clip(True, True, False)
acc = pyb.Accel()
dial = Dial(wri, 5, 5, height = 75, ticks = 12, bdcolor=None,
label='Tilt Pyboard', style = Dial.COMPASS, pip=YELLOW) # Border in fg color
ptr = Pointer(dial)
scale = 1/40
while True:
x, y, z = acc.filtered_xyz()
# Depending on relative alignment of display and Pyboard this line may
# need changing: swap x and y or change signs so arrow points in direction
# board is tilted.
ptr.value(-y*scale + 1j*x*scale, YELLOW)
refresh(ssd)
utime.sleep_ms(200)
示例5
def rt_rect():
print('Simulate realtime data acquisition of discontinuous data.')
refresh(ssd, True) # Clear any prior image
g = CartesianGraph(wri, 2, 2, fgcolor=WHITE, gridcolor=LIGHTGREEN)
curve = Curve(g, RED)
x = -1
for _ in range(40):
y = 0.1/x if abs(x) > 0.05 else None # Discontinuity
curve.point(x, y)
utime.sleep_ms(100)
refresh(ssd)
x += 0.05
g.clear()
curve = Curve(g, YELLOW)
x = -1
for _ in range(40):
y = -0.1/x if abs(x) > 0.05 else None # Discontinuity
curve.point(x, y)
utime.sleep_ms(100)
refresh(ssd)
x += 0.05
示例6
def run_forever(self):
assert len(self.intervals)>0
while True:
output_things = self._get_tasks()
start_ts = utime.ticks_ms()
for pub in output_things:
pub._observe()
if not pub.__connections__:
self._remove_task(pub)
if len(self.intervals)==0:
break
end_ts = utime.ticks_ms()
if end_ts > start_ts:
self._advance_time(int(round((end_ts-start_ts)/10)))
sleep = self._get_next_sleep_interval()
utime.sleep_ms(sleep*10)
now = utime.ticks_ms()
self._advance_time(int(round((now-end_ts)/10)) if now>=end_ts else sleep)
示例7
def go_forward(self, speed_percent=None, delay_ms=None):
'''
小车前进
'''
if speed_percent is None:
speed_percent = self.speed_percent
else:
speed_percent = float(speed_percent)
if delay_ms is not None:
delay_ms = int(delay_ms)
self.left_motor.speed_percent = speed_percent
self.right_motor.speed_percent = speed_percent
if delay_ms is not None:
delay_ms = int(float(delay_ms))
utime.sleep_ms(delay_ms)
self.stop()
示例8
def go_backward(self, speed_percent=None, delay_ms=None):
'''
小车后退
'''
if speed_percent is None:
speed_percent = self.speed_percent
else:
speed_percent = float(speed_percent)
if delay_ms is not None:
delay_ms = int(delay_ms)
self.left_motor.speed_percent = -1*speed_percent
self.right_motor.speed_percent = -1*speed_percent
if delay_ms is not None:
delay_ms = int(float(delay_ms))
utime.sleep_ms(delay_ms)
self.stop()
示例9
def turn_left(self, speed_percent=None, delay_ms=None):
'''
小车左转
'''
if speed_percent is None:
speed_percent = self.speed_percent
else:
speed_percent = float(speed_percent)
self.left_motor.speed_percent = -1* speed_percent
self.right_motor.speed_percent = speed_percent
if delay_ms is not None:
delay_ms = int(float(delay_ms))
utime.sleep_ms(delay_ms)
self.stop()
示例10
def turn_right(self, speed_percent=None, delay_ms=None):
'''
小车右转
'''
if speed_percent is None:
speed_percent = self.speed_percent
else:
speed_percent = float(speed_percent)
self.left_motor.speed_percent = speed_percent
self.right_motor.speed_percent = -1* speed_percent
if delay_ms is not None:
delay_ms = int(float(delay_ms))
utime.sleep_ms(delay_ms)
self.stop()
示例11
def test(use_priority=True):
global after, after_ms, loop, lp_version
processing_delay = 2 # Processing time in low priority task (ms)
if use_priority and not lp_version:
print('To test priority mechanism you must use fast_io version of uasyncio.')
else:
ntasks = max(num_coros) + 10 #4
if use_priority:
loop = asyncio.get_event_loop(ntasks, ntasks, 0, ntasks)
after = asyncio.after
after_ms = asyncio.after_ms
else:
lp_version = False
after = asyncio.sleep
after_ms = asyncio.sleep_ms
loop = asyncio.get_event_loop(ntasks, ntasks)
s = 'Testing latency of priority task with coros blocking for {}ms.'
print(s.format(processing_delay))
if lp_version:
print('Using priority mechanism.')
else:
print('Not using priority mechanism.')
loop.create_task(run_test(processing_delay))
loop.run_until_complete(report())
示例12
def _gauge(self):
now = utime.ticks_ms()
if utime.ticks_diff(now, self._last_read_ts) > self._new_read_ms:
self._last_read_ts = now
r = self._t_os + (self._p_os << 3) + (1 << 6)
self._write(BMX280_REGISTER_CONTROL, r)
utime.sleep_ms(100) # TODO calc sleep
if self._chip_id == 0x58:
d = self._read(BMX280_REGISTER_DATA, 6) # read all data at once (as by spec)
self._p_raw = (d[0] << 12) + (d[1] << 4) + (d[2] >> 4)
self._t_raw = (d[3] << 12) + (d[4] << 4) + (d[5] >> 4)
else:
d = self._read(BMX280_REGISTER_DATA, 8) # read all data at once (as by spec)
self._p_raw = (d[0] << 12) + (d[1] << 4) + (d[2] >> 4)
self._t_raw = (d[3] << 12) + (d[4] << 4) + (d[5] >> 4)
self._h_raw = (d[6] << 8) + d[7]
self._t_fine = 0
self._t = 0
self._h = 0
self._p = 0
示例13
def irq_handler(self, irq_pin):
'''
外部中断的相应函数
'''
# 如果当前正在处理中断,则忽略
if not self.flag:
return
# 添加软件滤波
utime.sleep_ms(50)
if self.pin.value() == self.BUTTON_PRESS:
# 判断当前按键状态是不是按下,如果是,则执行回调函数
if self.flag and self.callback is not None:
self.flag = False
# 执行回调函数
self.callback(self.pin)
self.flag = True
示例14
def irq_handler(self, irq_pin):
'''
外部中断的相应函数
'''
# 如果当前正在处理中断,则忽略
if not self.flag:
return
# 添加软件滤波
utime.sleep_ms(50)
if self.pin.value() == self.BUTTON_PRESS:
# 判断当前按键状态是不是按下,如果是,则执行回调函数
if self.flag and self.callback is not None:
self.flag = False
# 执行回调函数
self.callback(self.pin)
self.flag = True
示例15
def connect(self, timeout=_CONNECT_TIMEOUT):
end_time = time.time() + timeout
while not self.connected():
if self._state == self.DISCONNECTED:
try:
self._get_socket()
self._authenticate()
self._set_heartbeat()
self._last_rcv_time = ticks_ms()
self.log('Registered events: {}\n'.format(list(self._events.keys())))
self.call_handler(self._CONNECT)
return True
except BlynkError as b_err:
self.disconnect(b_err)
sleep_ms(self.TASK_PERIOD_RES)
except RedirectError as r_err:
self.disconnect()
self.server = r_err.server
self.port = r_err.port
sleep_ms(self.TASK_PERIOD_RES)
if time.time() >= end_time:
return False
示例16
def wait(self, delay):
# Default wait implementation, to be overriden in subclasses
# with IO scheduling
if __debug__ and DEBUG:
log.debug("Sleeping for: %s", delay)
time.sleep_ms(delay)
示例17
def sleep(secs):
yield int(secs * 1000)
# Implementation of sleep_ms awaitable with zero heap memory usage
示例18
def main(i2c):
# Initialize Charlieplex matrix wing.
matrix = is31fl3731.CharlieWing(i2c)
matrix.fill(0)
# Initialize font renderer.
with bitmapfont.BitmapFont(DISPLAY_WIDTH, DISPLAY_HEIGHT, matrix.pixel) as bf:
# Global state:
pos = DISPLAY_WIDTH # X position of the message start.
message_width = bf.width(MESSAGE) # Message width in pixels.
frame = 0 # Currently displayed frame.
last = utime.ticks_ms() # Last frame millisecond tick time.
speed_ms = SPEED / 1000.0 # Scroll speed in pixels/ms.
while True:
# Compute the time delta in milliseconds since the last frame.
current = utime.ticks_ms()
delta_ms = utime.ticks_diff(last, current)
last = current
# Compute position using speed and time delta.
pos -= speed_ms*delta_ms
if pos < -message_width:
pos = DISPLAY_WIDTH
# Swap frames to start drawing on a non-visible frame (double buffering).
frame = (frame + 1) % 2
matrix.frame(frame, show=False)
# Clear the frame and draw the text at the current position.
matrix.fill(0)
bf.text(MESSAGE, int(pos), 0, INTENSITY)
# Swap to the new frame on the display.
matrix.frame(frame)
# Sleep a bit to give USB mass storage some processing time (quirk
# of SAMD21 firmware right now).
utime.sleep_ms(20)
示例19
def main(i2c):
# Initialize Charlieplex matrix wing.
matrix = is31fl3731.CharlieWing(i2c)
matrix.fill(0)
# Initialize font renderer.
with bitmapfont.BitmapFont(DISPLAY_WIDTH, DISPLAY_HEIGHT, matrix.pixel) as bf:
# Global state:
pos = DISPLAY_WIDTH # X position of the message start.
message_width = bf.width(MESSAGE) # Message width in pixels.
frame = 0 # Currently displayed frame.
last = utime.ticks_ms() # Last frame millisecond tick time.
speed_ms = SPEED / 1000.0 # Scroll speed in pixels/ms.
while True:
# Compute the time delta in milliseconds since the last frame.
current = utime.ticks_ms()
delta_ms = utime.ticks_diff(current, last)
last = current
# Compute position using speed and time delta.
pos -= speed_ms*delta_ms
if pos < -message_width:
pos = DISPLAY_WIDTH
# Swap frames to start drawing on a non-visible frame (double buffering).
frame = (frame + 1) % 2
matrix.frame(frame, show=False)
# Clear the frame and draw the text at the current position.
matrix.fill(0)
bf.text(MESSAGE, int(pos), 0, INTENSITY)
# Swap to the new frame on the display.
matrix.frame(frame)
# Sleep a bit to give USB mass storage some processing time (quirk
# of SAMD21 firmware right now).
utime.sleep_ms(20)
示例20
def main(i2c):
# Initialize LED matrix.
matrix = ht16k33_matrix.Matrix16x8(i2c)
# Initialize font renderer using a helper function to flip the Y axis
# when rendering so the origin is in the upper left.
def matrix_pixel(x, y):
matrix.pixel(x, DISPLAY_HEIGHT-1-y, 1)
with bitmapfont.BitmapFont(DISPLAY_WIDTH, DISPLAY_HEIGHT, matrix_pixel) as bf:
# Global state:
pos = DISPLAY_WIDTH # X position of the message start.
message_width = bf.width(MESSAGE) # Message width in pixels.
last = utime.ticks_ms() # Last frame millisecond tick time.
speed_ms = SPEED / 1000.0 # Scroll speed in pixels/ms.
# Main loop:
while True:
# Compute the time delta in milliseconds since the last frame.
current = utime.ticks_ms()
delta_ms = utime.ticks_diff(current, last)
last = current
# Compute position using speed and time delta.
pos -= speed_ms*delta_ms
if pos < -message_width:
pos = DISPLAY_WIDTH
# Clear the matrix and draw the text at the current position.
matrix.fill(0)
bf.text(MESSAGE, int(pos), 0)
# Update the matrix LEDs.
matrix.show()
# Sleep a bit to give USB mass storage some processing time (quirk
# of SAMD21 firmware right now).
utime.sleep_ms(20)
示例21
def main(i2c):
# Initialize LED matrix.
matrix = ht16k33_matrix.Matrix16x8(i2c)
# Initialize font renderer using a helper function to flip the Y axis
# when rendering so the origin is in the upper left.
def matrix_pixel(x, y):
matrix.pixel(x, DISPLAY_HEIGHT-1-y, 1)
with bitmapfont.BitmapFont(DISPLAY_WIDTH, DISPLAY_HEIGHT, matrix_pixel) as bf:
# Global state:
pos = DISPLAY_WIDTH # X position of the message start.
message_width = bf.width(MESSAGE) # Message width in pixels.
last = utime.ticks_ms() # Last frame millisecond tick time.
speed_ms = SPEED / 1000.0 # Scroll speed in pixels/ms.
# Main loop:
while True:
# Compute the time delta in milliseconds since the last frame.
current = utime.ticks_ms()
delta_ms = utime.ticks_diff(last, current)
last = current
# Compute position using speed and time delta.
pos -= speed_ms*delta_ms
if pos < -message_width:
pos = DISPLAY_WIDTH
# Clear the matrix and draw the text at the current position.
matrix.fill(0)
bf.text(MESSAGE, int(pos), 0)
# Update the matrix LEDs.
matrix.show()
# Sleep a bit to give USB mass storage some processing time (quirk
# of SAMD21 firmware right now).
utime.sleep_ms(20)
示例22
def compass(x):
print('Compass test.')
refresh(ssd, True) # Clear any prior image
dial = Dial(wri, 5, 5, height = 75, bdcolor=None, label=50, style = Dial.COMPASS)
bearing = Pointer(dial)
bearing.value(0 + 1j, RED)
dh = cmath.rect(1, -cmath.pi/30) # Rotate by 6 degrees CW
for n in range(x):
utime.sleep_ms(200)
bearing.value(bearing.value() * dh, RED)
refresh(ssd)
示例23
def __init__(self, spi, pincs, pindc, pinrs, height=128, width=128):
if height not in (96, 128):
raise ValueError('Unsupported height {}'.format(height))
self.spi = spi
self.rate = 11000000 # See baudrate note above.
self.pincs = pincs
self.pindc = pindc # 1 = data 0 = cmd
self.height = height # Required by Writer class
self.width = width
# Save color mode for use by writer_gui (blit)
self.mode = framebuf.RGB565
gc.collect()
self.buffer = bytearray(self.height * self.width * 2)
super().__init__(self.buffer, self.width, self.height, self.mode)
self.mvb = memoryview(self.buffer)
pinrs(0) # Pulse the reset line
utime.sleep_ms(1)
pinrs(1)
utime.sleep_ms(1)
# See above comment to explain this allocation-saving gibberish.
self._write(b'\xfd\x12\xfd\xb1\xae\xb3\xf1\xca\x7f\xa0\x74'\
b'\x15\x00\x7f\x75\x00\x7f\xa1\x00\xa2\x00\xb5\x00\xab\x01'\
b'\xb1\x32\xbe\x05\xa6\xc1\xc8\x80\xc8\xc7\x0f'\
b'\xb4\xa0\xb5\x55\xb6\x01\xaf', 0)
self.show()
gc.collect()
示例24
def __init__(self, spi, pincs, pindc, pinrs, height=128, width=128):
if height not in (96, 128):
raise ValueError('Unsupported height {}'.format(height))
self.spi = spi
self.rate = 11000000 # See baudrate note above.
self.pincs = pincs
self.pindc = pindc # 1 = data 0 = cmd
self.height = height # Required by Writer class
self.width = width
# Save color mode for use by writer_gui (blit)
self.mode = framebuf.GS8 # Use 8bit greyscale for 8 bit color.
gc.collect()
self.buffer = bytearray(self.height * self.width)
super().__init__(self.buffer, self.width, self.height, self.mode)
self.linebuf = bytearray(self.width * 2)
pinrs(0) # Pulse the reset line
utime.sleep_ms(1)
pinrs(1)
utime.sleep_ms(1)
# See above comment to explain this allocation-saving gibberish.
self._write(b'\xfd\x12\xfd\xb1\xae\xb3\xf1\xca\x7f\xa0\x74'\
b'\x15\x00\x7f\x75\x00\x7f\xa1\x00\xa2\x00\xb5\x00\xab\x01'\
b'\xb1\x32\xbe\x05\xa6\xc1\xc8\x80\xc8\xc7\x0f'\
b'\xb4\xa0\xb5\x55\xb6\x01\xaf', 0)
self.show()
gc.collect()
示例25
def rt_polar():
print('Simulate realtime polar data acquisition.')
refresh(ssd, True) # Clear any prior image
g = PolarGraph(wri, 2, 2, fgcolor=WHITE, gridcolor=LIGHTGREEN)
curvey = PolarCurve(g, YELLOW)
curver = PolarCurve(g, RED)
for x in range(100):
curvey.point(cmath.rect(x/100, -x * cmath.pi/30))
curver.point(cmath.rect((100 - x)/100, -x * cmath.pi/30))
utime.sleep_ms(60)
refresh(ssd)
示例26
def seq():
print('Time sequence test - sine and cosine.')
refresh(ssd, True) # Clear any prior image
# y axis at t==now, no border
g = CartesianGraph(wri, 2, 2, xorigin = 10, fgcolor=WHITE,
gridcolor=LIGHTGREEN, bdcolor=False)
tsy = TSequence(g, YELLOW, 50)
tsr = TSequence(g, RED, 50)
for t in range(100):
g.clear()
tsy.add(0.9*math.sin(t/10))
tsr.add(0.4*math.cos(t/10))
refresh(ssd)
utime.sleep_ms(100)
示例27
def calibrate(self, count=256, delay=0):
ox, oy, oz = (0.0, 0.0, 0.0)
self._gyro_offset = (0.0, 0.0, 0.0)
n = float(count)
while count:
utime.sleep_ms(delay)
gx, gy, gz = self.gyro
ox += gx
oy += gy
oz += gz
count -= 1
self._gyro_offset = (ox / n, oy / n, oz / n)
return self._gyro_offset
示例28
def calibrate(self, count=256, delay=200):
self._offset = (0, 0, 0)
self._scale = (1, 1, 1)
reading = self.magnetic
minx = maxx = reading[0]
miny = maxy = reading[1]
minz = maxz = reading[2]
while count:
utime.sleep_ms(delay)
reading = self.magnetic
minx = min(minx, reading[0])
maxx = max(maxx, reading[0])
miny = min(miny, reading[1])
maxy = max(maxy, reading[1])
minz = min(minz, reading[2])
maxz = max(maxz, reading[2])
count -= 1
# Hard iron correction
offset_x = (maxx + minx) / 2
offset_y = (maxy + miny) / 2
offset_z = (maxz + minz) / 2
self._offset = (offset_x, offset_y, offset_z)
# Soft iron correction
avg_delta_x = (maxx - minx) / 2
avg_delta_y = (maxy - miny) / 2
avg_delta_z = (maxz - minz) / 2
avg_delta = (avg_delta_x + avg_delta_y + avg_delta_z) / 3
scale_x = avg_delta / avg_delta_x
scale_y = avg_delta / avg_delta_y
scale_z = avg_delta / avg_delta_z
self._scale = (scale_x, scale_y, scale_z)
return self._offset, self._scale
示例29
def update_thread(self):
while True:
time.sleep_ms(10)
for i in self.motor.keys():
self.motor[i].update_no_block()
示例30
def run_to(self, pos, speed):
error_last_1 = 0
pwm_last = [255, 255, 255, 255]
self.position_update()
_distance = pos
pwm_now = 0
kp = 0.85
kd = 7.0
tick_now = time.ticks_ms()
num = 0
while True:
time.sleep_ms(1)
if time.ticks_ms() - tick_now > 10:
tick_now = time.ticks_ms()
error = _distance - self.read_encoder()
pwm_now = kp*error + kd*(error-error_last_1)
pwm_now = constrain(pwm_now, -speed, speed)
pwm_now = dead_area(pwm_now, -100, -3, 100, 3)
pwm_now = -pwm_now
error_last_1 = error
pwm_last = pwm_last[1:4]
if -15 < error < 15:
num += 1
else:
num = 0
if num > 10:
self.set_pwm(0)
break
self.set_pwm(pwm_now)
pwm_last.append(pwm_now)
if pwm_last == [0, 0, 0, 0]:
break