Python源码示例:utime.ticks_us()
示例1
def test():
"""Bouncing sprite."""
try:
# Baud rate of 14500000 seems about the max
spi = SPI(2, baudrate=14500000, sck=Pin(18), mosi=Pin(23))
display = Display(spi, dc=Pin(17), cs=Pin(5), rst=Pin(16))
display.clear()
# Load sprite
logo = BouncingSprite('images/Python41x49.raw',
41, 49, 128, 128, 1, display)
while True:
timer = ticks_us()
logo.update_pos()
logo.draw()
# Attempt to set framerate to 30 FPS
timer_dif = 33333 - ticks_diff(ticks_us(), timer)
if timer_dif > 0:
sleep_us(timer_dif)
except KeyboardInterrupt:
display.cleanup()
示例2
def update(self):
if time.ticks_us() >= self.interval:
self.interval = time.ticks_us() + 10000
if not self.angle_point == None:
self.input_value = self.encoder_read() - self.enc_zero
output_value = self.angle_pid.get_pid(self.input_value, self.angle_point)
self.set_pwm(output_value)
if not self.speed_point == None:
# Encoder filter
self.input_value *= 0.75
self.input_value += self.encoder_incr() * 0.25
# self.input_value = constrain(self.input_value, -30, 30)
# Output pwm
output_value = self.speed_pid.get_pid(self.input_value, self.speed_point)
self.set_pwm(output_value)
# print("in:%0.2f, out:%0.2f\r\n" % (self.input_value, output_value))
示例3
def run(self, conversion): # Uses assembler for speed
if self.popfunc is not None:
self.popfunc(self) # Populate the data (for fwd transfers, just the real data)
if conversion != REVERSE: # Forward transform: real data assumed
setarray(self.im, 0, self._length)# Fast zero imaginary data
if self.windata is not None: # Fast apply the window function
winapply(self.re, self.windata, self._length)
start = utime.ticks_us()
fft(self.ctrl, conversion)
delta = utime.ticks_diff(utime.ticks_us(), start)
if (conversion & POLAR) == POLAR: # Ignore complex conjugates, convert 1st half of arrays
topolar(self.re, self.im, self._length//2) # Fast
if conversion == DB: # Ignore conjugates: convert 1st half only
for idx, val in enumerate(self.re[0:self._length//2]):
self.re[idx] = -80.0 if val <= 0.0 else 20*math.log10(val) - self.dboffset
return delta
# Subclass for acquiring data from Pyboard ADC using read_timed() method.
示例4
def __call__(self, ts):
if self.expect_ts:
if ts is None:
raise ValueError('Timestamp expected but not supplied.')
else:
if is_micropython:
ts = time.ticks_us()
else:
raise RuntimeError('Not MicroPython: provide timestamps and a timediff function')
# ts is now valid
if self.start_time is None: # 1st call: self.start_time is invalid
self.start_time = ts
return 0.0001 # 100μs notional delay. 1st reading is invalid in any case
dt = self.timediff(ts, self.start_time)
self.start_time = ts
return dt
示例5
def timed_function(f, *args, **kwargs):
#myname = str(f).split(' ')[1]
def new_func(*args, **kwargs):
#t = utime.ticks_us()
result = f(*args, **kwargs)
#if myname not in profile_stats_time_including_children:
# profile_stats_time_including_children[myname] = 0
# profile_stats_calls[myname] = 0
#profile_stats_time_including_children[myname] += utime.ticks_diff(utime.ticks_us(), t)
#profile_stats_calls[myname] += 1
return result
return new_func
示例6
def get_t_split(self):
state = machine.disable_irq()
t = self.t_ms
acquired = self.acquired
machine.enable_irq(state)
isecs, ims = divmod(t, 1000) # Get integer secs and ms
x, secs = divmod(isecs, 60)
hrs, mins = divmod(x, 60)
dt = utime.ticks_diff(utime.ticks_us(), acquired) # μs to time now
ds, us = divmod(dt, 1000000)
# If dt > 1e6 can add to secs without risk of rollover: see above.
self._time[0] = hrs
self._time[1] = mins
self._time[2] = secs + ds
self._time[3] = us + ims*1000
return self._time
示例7
def get_t_split(self):
state = machine.disable_irq()
t = self.t_ms
acquired = self.acquired
machine.enable_irq(state)
isecs, ims = divmod(t, 1000) # Get integer secs and ms
x, secs = divmod(isecs, 60)
hrs, mins = divmod(x, 60)
dt = utime.ticks_diff(utime.ticks_us(), acquired) # μs to time now
ds, us = divmod(dt, 1000000)
# If dt > 1e6 can add to secs without risk of rollover: see above.
self._time[0] = hrs
self._time[1] = mins
self._time[2] = secs + ds
self._time[3] = us + ims*1000
return self._time
示例8
def __init__(self, i2c=None):
if i2c is None:
self.i2c = i2c_bus.get(i2c_bus.M_BUS)
else:
self.i2c = i2c
self.imu = MPU6050(self.i2c)
if self.i2c.is_ready(M5GO_WHEEL_ADDR) or self.i2c.is_ready(M5GO_WHEEL_ADDR):
pass
else:
raise ImportError("Bala Motor not connect")
self.id = self.imu.whoami
# self.set_motor(0, 0)
self.imu.setGyroOffsets(-2.71, -0.01, -0.04)
self.loop_interval = time.ticks_us()
self.dt = time.ticks_us()
self.angleX = 0
self.angleX_offset = 0
self.last_angle = 0.0
self.last_wheel = 0.0
self.in_speed0 = 0
self.in_speed1 = 0
self.left = 0
self.right = 0
self.K1 = 40
self.K2 = 40
self.K3 = 6.5
self.K4 = 5.5
self.K5 = 0
self.enc_filter = 0.90
示例9
def ypr(self):
"""
yaw, pitch, roll as floats.
"""
accX, accY, accZ = self.acceleration
angleAccX = math.atan2(accY, accZ + abs(accX)) * SF_RAD_S
angleAccY = math.atan2(accX, accZ + abs(accY)) * (-SF_RAD_S);
gyroX, gyroY, gyroZ = self.gyro
gyroX -= self.gyroXoffset
gyroY -= self.gyroYoffset
gyroZ -= self.gyroZoffset
interval = (time.ticks_us() - self.preInterval) / 1000000
self.preInterval = time.ticks_us()
self.angleGyroX += gyroX * interval
self.angleGyroY += gyroY * interval
self.angleGyroZ += gyroZ * interval
self.angleX = (self.gyroCoef * (self.angleX + gyroX * interval)) + (self.accCoef * angleAccX);
self.angleY = (self.gyroCoef * (self.angleY + gyroY * interval)) + (self.accCoef * angleAccY);
self.angleZ = self.angleGyroZ
return tuple([round(self.angleZ, 3), round(self.angleX, 3), round(self.angleY, 3)])
示例10
def _run(self):
while True:
# If hardware link exists reboot Responder
await self.reboot()
self.txbyt = b''
self.rxbyt = b''
await self._sync()
await asyncio.sleep(1) # Ensure Responder is ready
if self.cr_go:
asyncio.create_task(self.cr_go(*self.go_args))
while True:
gc.collect()
try:
tstart = utime.ticks_us()
self._sendrx()
t = utime.ticks_diff(utime.ticks_us(), tstart)
except OSError: # Reboot remote.
break
await asyncio.sleep_ms(Initiator.t_poll)
self.block_max = max(self.block_max, t) # self measurement
self.block_cnt += 1
self.block_sum += t
self.nboots += 1
if self.cr_fail:
await self.cr_fail(*self.f_args)
if self.reset is None: # No means of recovery
raise OSError('Responder fail.')
示例11
def test():
"""Bouncing box."""
try:
# Baud rate of 14500000 seems about the max
spi = SPI(2, baudrate=14500000, sck=Pin(18), mosi=Pin(23))
display = Display(spi, dc=Pin(17), cs=Pin(5), rst=Pin(16))
display.clear()
colors = [color565(255, 0, 0),
color565(0, 255, 0),
color565(0, 0, 255),
color565(255, 255, 0),
color565(0, 255, 255),
color565(255, 0, 255)]
sizes = [12, 11, 10, 9, 8, 7]
boxes = [Box(128, 128, sizes[i], display,
colors[i]) for i in range(6)]
while True:
timer = ticks_us()
for b in boxes:
b.update_pos()
b.draw()
# Attempt to set framerate to 30 FPS
timer_dif = 33333 - ticks_diff(ticks_us(), timer)
if timer_dif > 0:
sleep_us(timer_dif)
except KeyboardInterrupt:
display.cleanup()
示例12
def run(self, conversion, duration):
tim = self.timer
tim.deinit()
tim.init(freq = int(self._length/duration))
self.adc.read_timed(self.buff, tim) # Note: blocks for duration
start = utime.ticks_us()
icopy(self.buff, self.re, self._length) # Fast copy integer array into real
super().run(conversion)
return utime.ticks_diff(utime.ticks_us(), start)
示例13
def getcal(self, minutes=5, cal=0, verbose=True):
if d_series:
return self._getcal_d(minutes, cal, verbose)
verbose and print('Pyboard 1.x. Waiting {} minutes for calibration factor.'.format(minutes))
rtc.calibration(cal) # Clear existing cal
self.save_time() # Set DS3231 from RTC
self.await_transition() # Wait for DS3231 to change: on a 1 second boundary
tus = utime.ticks_us()
st = rtc.datetime()[7]
while rtc.datetime()[7] == st: # Wait for RTC to change
pass
t1 = utime.ticks_diff(utime.ticks_us(), tus) # t1 is duration (μs) between DS and RTC change (start)
rtcstart = get_ms(rtc.datetime()) # RTC start time in mS
dsstart = utime.mktime(self.convert()) # DS start time in secs as recorded by await_transition
utime.sleep(minutes * 60)
self.await_transition() # DS second boundary
tus = utime.ticks_us()
st = rtc.datetime()[7]
while rtc.datetime()[7] == st:
pass
t2 = utime.ticks_diff(utime.ticks_us(), tus) # t2 is duration (μs) between DS and RTC change (end)
rtcend = get_ms(rtc.datetime())
dsend = utime.mktime(self.convert())
dsdelta = (dsend - dsstart) * 1000000 # Duration (μs) between DS edges as measured by DS3231
if rtcend < rtcstart: # It's run past midnight. Assumption: run time < 1 day!
rtcend += 24 * 3_600_000
rtcdelta = (rtcend - rtcstart) * 1000 + t1 - t2 # Duration (μs) between DS edges as measured by RTC and corrected
ppm = (1000000* (rtcdelta - dsdelta))/dsdelta
if cal:
verbose and print('Error {:4.1f}ppm {:4.1f}mins/year.'.format(ppm, ppm * 1.903))
return 0
cal = int(-ppm / 0.954)
verbose and print('Error {:4.1f}ppm {:4.1f}mins/year. Cal factor {}'.format(ppm, ppm * 1.903, cal))
return cal
# Version for Pyboard D. This has μs resolution.
示例14
def x_callback(self, line):
self.forward = self.pin_x.value() ^ self.pin_y.value() ^ self.reverse
self._pos += 1 if self.forward else -1
self.tprev = self.tlast
self.tlast = utime.ticks_us()
示例15
def y_callback(self, line):
self.forward = self.pin_x.value() ^ self.pin_y.value() ^ self.reverse ^ 1
self._pos += 1 if self.forward else -1
self.tprev = self.tlast
self.tlast = utime.ticks_us()
示例16
def rate(self): # Return rate in edges per second
self.x_interrupt.disable()
self.y_interrupt.disable()
if ticksdiff(self.tlast, utime.ticks_us) > 2000000: # It's stopped
result = 0.0
else:
result = 1000000.0/(ticksdiff(self.tprev, self.tlast))
self.x_interrupt.enable()
self.y_interrupt.enable()
result *= self.scale
return result if self.forward else -result
示例17
def timed_function(f, *args, **kwargs):
def new_func(*args, **kwargs):
t = utime.ticks_us()
result = f(*args, **kwargs)
delta = utime.ticks_diff(utime.ticks_us(), t)
print('Function {} Time = {:6.3f}ms'.format(f.__name__, delta/1000))
return result
return new_func
示例18
def read_coro():
imu.mag_trigger()
await asyncio.sleep_ms(20) # Plenty of time for mag to be ready
f.write(ujson.dumps([imu.accel.xyz, imu.gyro.xyz, imu.mag_nonblocking.xyz, time.ticks_us()]))
f.write('\n')
return imu.accel.xyz, imu.gyro.xyz, imu.mag_nonblocking.xyz
示例19
def timed_function(f, *args, **kwargs):
myname = str(f).split(' ')[1]
def new_func(*args, **kwargs):
t0 = utime.ticks_us()
timed_function_stack.append(myname)
result = f(*args, **kwargs)
if myname not in profile_stats_time_including_children:
profile_stats_time_including_children[myname] = 0
profile_stats_calls[myname] = 0
t1 = utime.ticks_us()
delta_us = utime.ticks_diff(t1, t0)
profile_stats_time_including_children[myname] += delta_us
profile_stats_calls[myname] += 1
if len(timed_function_stack) >= 2:
stack_last_two = tuple(timed_function_stack[-2:])
if stack_last_two not in stack_history:
stack_history[stack_last_two] = 0
stack_history[stack_last_two] += delta_us
timed_function_stack.pop()
return result
return new_func
示例20
def __init__(self, sleep_ms, verbose=False) :
self.sleep_ms = sleep_ms
self._verbose = verbose
self.state = TaskBase.INIT
self.disabled = False
self.num_calls = 0
self.num_failures = 0
self.last_retry_ms = None
self.ticks_us = 0
示例21
def stats(self) :
return {
'num_calls': self.num_calls,
'num_failures': self.num_failures,
'ticks_us': self.ticks_us
}
示例22
def loop(self) :
if self._verbose :
logging.info("TaskBase: loop starting.")
while self.isRunning() :
if not self.disabled :
try :
self.num_calls += 1
start_ticks_us = utime.ticks_us()
result = self.perform()
self.ticks_us += utime.ticks_diff(utime.ticks_us(), start_ticks_us)
if not result:
return
self.last_retry_ms = None
except Exception as e :
if not self.last_retry_ms :
self.last_retry_ms = 500
else :
self.last_retry_ms = min(self.sleep_ms, self.last_retry_ms * 2)
self.num_failures += 1
logging.info("An error occurred performing {}: {}".format(self, e))
sys.print_exception(e)
await uasyncio.sleep_ms(self.sleep_ms if not self.last_retry_ms else self.last_retry_ms)
else :
await uasyncio.sleep_ms(914)
self.state = TaskBase.STOPPED
if self._verbose :
logging.info("TaskBase: loop terminated.")
return
示例23
def _cb_pin(self, line):
t = ticks_us()
# On overrun ignore pulses until software timer times out
if self._edge <= _EDGECOUNT: # Allow 1 extra pulse to record overrun
if not self._ev_start.is_set(): # First edge received
loop = asyncio.get_event_loop()
self._ev_start.set(loop.time()) # asyncio latency compensation
self._times[self._edge] = t
self._edge += 1
示例24
def _run(self):
while True:
# If hardware link exists reboot Responder
await self.reboot()
self.txbyt = b''
self.rxbyt = b''
await self._sync()
await asyncio.sleep(1) # Ensure Responder is ready
if self.cr_go:
self.loop.create_task(self.cr_go(*self.go_args))
while True:
gc.collect()
try:
tstart = utime.ticks_us()
self._sendrx()
t = utime.ticks_diff(utime.ticks_us(), tstart)
except OSError:
break
await asyncio.sleep_ms(Initiator.t_poll)
self.block_max = max(self.block_max, t) # self measurement
self.block_cnt += 1
self.block_sum += t
self.nboots += 1
if self.cr_fail:
await self.cr_fail(*self.f_args)
if self.reset is None: # No means of recovery
raise OSError('Responder fail.')
# Send payload length (may be 0) then payload (if any)
示例25
def _cb_pin(self, line):
t = ticks_us()
# On overrun ignore pulses until software timer times out
if self._edge <= _EDGECOUNT: # Allow 1 extra pulse to record overrun
if not self._ev_start.is_set(): # First edge received
self._ev_start.set(ticks_ms()) # asyncio latency compensation
self._times[self._edge] = t
self._edge += 1
示例26
def _run(self):
while True:
# If hardware link exists reboot Responder
await self.reboot()
self.txbyt = b''
self.rxbyt = b''
await self._sync()
await asyncio.sleep(1) # Ensure Responder is ready
if self.cr_go:
self.loop.create_task(self.cr_go(*self.go_args))
while True:
gc.collect()
try:
tstart = utime.ticks_us()
self._sendrx()
t = utime.ticks_diff(utime.ticks_us(), tstart)
except OSError: # Reboot remote.
break
await asyncio.sleep_ms(Initiator.t_poll)
self.block_max = max(self.block_max, t) # self measurement
self.block_cnt += 1
self.block_sum += t
self.nboots += 1
if self.cr_fail:
await self.cr_fail(*self.f_args)
if self.reset is None: # No means of recovery
raise OSError('Responder fail.')
示例27
def us_cb(my_gps, tick, led):
global us_acquired # Time of previous PPS edge in ticks_us()
if us_acquired is not None:
# Trigger Message. Pass time between PPS measured by utime.ticks_us()
tick.set(utime.ticks_diff(my_gps.acquired, us_acquired))
us_acquired = my_gps.acquired
led.toggle()
# Setup initialises with above callback
示例28
def _isr(self, _):
acquired = utime.ticks_us() # Save time of PPS
# Time in last NMEA sentence was time of last PPS.
# Reduce to integer secs since midnight local time.
isecs = (self.epoch_time + int(3600*self.local_offset)) % 86400
# ms since midnight (28 bits). Add in any ms in RMC data
msecs = isecs * 1000 + self.msecs
# This PPS is presumed to be one update later
msecs += self._update_ms
if msecs >= 86400000: # Next PPS will deal with rollover
return
if self.t_ms == msecs: # No RMC message has arrived: nothing to do
return
self.t_ms = msecs # Current time in ms past midnight
self.acquired = acquired
# Set RTC if required and if last RMC indicated a 1 second boundary
if self._rtc_set:
# Time as int(seconds) in last NMEA sentence. Earlier test ensures
# no rollover when we add 1.
self._rtcbuf[6] = (isecs + 1) % 60
rtc.datetime(self._rtcbuf)
self._rtc_set = False
# Could be an outage here, so PPS arrives many secs after last sentence
# Is this right? Does PPS continue during outage?
self._pps_cb(self, *self._pps_cb_args)
# Called when base class updates the epoch_time.
# Need local time for setting Pyboard RTC in interrupt context
示例29
def _await_pps(self):
t0 = self.acquired
while self.acquired == t0: # Busy-wait on PPS interrupt: not time-critical
await asyncio.sleep_ms(0) # because acquisition time stored in ISR.
gc.collect() # Time-critical code follows
st = rtc.datetime()[7]
while rtc.datetime()[7] == st: # Wait for RTC to change (4ms max)
pass
dt = utime.ticks_diff(utime.ticks_us(), self.acquired)
trtc = self._get_rtc_usecs() - dt # Read RTC now and adjust for PPS edge
tgps = 1000000 * (self.epoch_time + 3600*self.local_offset + 1)
return trtc, tgps
# Non-realtime calculation of calibration factor. times are in μs
示例30
def get_ms(self):
state = machine.disable_irq()
t = self.t_ms
acquired = self.acquired
machine.enable_irq(state)
return t + utime.ticks_diff(utime.ticks_us(), acquired) // 1000
# Return accurate GPS time of day (hrs: int, mins: int, secs: int, μs: int)
# The ISR can skip an update of .secs if a day rollover would occur. Next
# RMC handles this, so if updates are at 1s intervals the subsequent ISR
# will see hms = 0, 0, 1 and a value of .acquired > 1000000.
# Even at the slowest update rate of 10s this can't overflow into minutes.