Python源码示例:utime.ticks_diff()

示例1
def test():
    """Bouncing sprite."""
    try:
        # Baud rate of 14500000 seems about the max
        spi = SPI(2, baudrate=14500000, sck=Pin(18), mosi=Pin(23))
        display = Display(spi, dc=Pin(17), cs=Pin(5), rst=Pin(16))
        display.clear()

        # Load sprite
        logo = BouncingSprite('images/Python41x49.raw',
                              41, 49, 128, 128, 1, display)

        while True:
            timer = ticks_us()
            logo.update_pos()
            logo.draw()
            # Attempt to set framerate to 30 FPS
            timer_dif = 33333 - ticks_diff(ticks_us(), timer)
            if timer_dif > 0:
                sleep_us(timer_dif)

    except KeyboardInterrupt:
        display.cleanup() 
示例2
def multi_fields(t):
    print('Dynamic labels.')
    refresh(ssd, True)  # Clear any prior image
    nfields = []
    dy = wri.height + 6
    y = 2
    col = 15
    width = wri.stringlen('99.99')
    for txt in ('X:', 'Y:', 'Z:'):
        Label(wri, y, 0, txt)  # Use wri default colors
        nfields.append(Label(wri, y, col, width, bdcolor=None))  # Specify a border, color TBD
        y += dy

    end = utime.ticks_add(utime.ticks_ms(), t * 1000)
    while utime.ticks_diff(end, utime.ticks_ms()) > 0:
        for field in nfields:
            value = int.from_bytes(uos.urandom(3),'little')/167772
            overrange =  None if value < 70 else YELLOW if value < 90 else RED
            field.value('{:5.2f}'.format(value), fgcolor = overrange, bdcolor = overrange)
        refresh(ssd)
        utime.sleep(1)
    Label(wri, 0, 64, ' OK ', True, fgcolor = RED)
    refresh(ssd)
    utime.sleep(1) 
示例3
def multi_fields(t):
    print('multi_fields')
    refresh(ssd, True)  # Clear any prior image
    nfields = []
    dy = wri.height + 6
    y = 2
    col = 15
    width = wri.stringlen('99.99')
    for txt in ('X:', 'Y:', 'Z:'):
        Label(wri, y, 0, txt)  # Use wri default colors
        nfields.append(Label(wri, y, col, width, bdcolor=None))  # Specify a border, color TBD
        y += dy

    end = utime.ticks_add(utime.ticks_ms(), t * 1000)
    while utime.ticks_diff(end, utime.ticks_ms()) > 0:
        for field in nfields:
            value = int.from_bytes(uos.urandom(3),'little')/167772
            overrange =  None if value < 70 else YELLOW if value < 90 else RED
            field.value('{:5.2f}'.format(value), fgcolor = overrange, bdcolor = overrange)
        refresh(ssd)
        utime.sleep(1)
    Label(wri, 0, 64, ' OK ', True, fgcolor = RED)
    refresh(ssd)
    utime.sleep(1) 
示例4
def broker_up(self):  # Test broker connectivity
        if not self.isconnected():
            return False
        tlast = self.last_rx
        if ticks_diff(ticks_ms(), tlast) < 1000:
            return True
        try:
            await self._ping()
        except OSError:
            return False
        t = ticks_ms()
        while not self._timeout(t):
            await asyncio.sleep_ms(100)
            if ticks_diff(self.last_rx, tlast) > 0:  # Response received
                return True
        return False 
示例5
def run(self, conversion):          # Uses assembler for speed
        if self.popfunc is not None:
            self.popfunc(self)          # Populate the data (for fwd transfers, just the real data)
        if conversion != REVERSE:       # Forward transform: real data assumed
            setarray(self.im, 0, self._length)# Fast zero imaginary data
            if self.windata is not None:  # Fast apply the window function
                winapply(self.re, self.windata, self._length)
        start = utime.ticks_us()
        fft(self.ctrl, conversion)
        delta = utime.ticks_diff(utime.ticks_us(), start)
        if (conversion & POLAR) == POLAR: # Ignore complex conjugates, convert 1st half of arrays
            topolar(self.re, self.im, self._length//2) # Fast
            if conversion == DB:        # Ignore conjugates: convert 1st half only
                for idx, val in enumerate(self.re[0:self._length//2]):
                    self.re[idx] = -80.0 if val <= 0.0 else 20*math.log10(val) - self.dboffset
        return delta
# Subclass for acquiring data from Pyboard ADC using read_timed() method. 
示例6
def readline(s, timeout):
    line = b''
    start = utime.ticks_ms()
    while True:
        if line.endswith(b'\n'):
            if len(line) > 1:
                return line
            line = b''
            start = utime.ticks_ms()  # A blank line is just a  keepalive
        await asyncio.sleep_ms(100)  # See note above
        d = s.readline()
        if d == b'':
            raise OSError
        if d is not None:
            line = b''.join((line, d))
        if utime.ticks_diff(utime.ticks_ms(), start) > timeout:
            raise OSError 
示例7
def readline(self):
        line = b''
        start = utime.ticks_ms()
        while True:
            if line.endswith(b'\n'):
                if len(line) > 1:
                    return line
                line = b''
                start = utime.ticks_ms()  # Blank line is keepalive
                self.led(not self.led())
            await asyncio.sleep_ms(100)  # nonzero wait seems empirically necessary
            d = self.sock.readline()
            if d == b'':
                raise OSError
            if d is not None:
                line = b''.join((line, d))
            if utime.ticks_diff(utime.ticks_ms(), start) > self.timeout:
                raise OSError 
示例8
def timed_function(f, *args, **kwargs):
        #myname = str(f).split(' ')[1]

        def new_func(*args, **kwargs):
            #t = utime.ticks_us()
            result = f(*args, **kwargs)

            #if myname not in profile_stats_time_including_children:
            #    profile_stats_time_including_children[myname] = 0
            #    profile_stats_calls[myname] = 0

            #profile_stats_time_including_children[myname] += utime.ticks_diff(utime.ticks_us(), t)
            #profile_stats_calls[myname] += 1

            return result

        return new_func 
示例9
def perform(self) :
        current_tick = utime.ticks_ms()
        if not self.initial_tick :
            self.initial_tick = current_tick
        else :
            x = utime.ticks_diff(current_tick, self.initial_tick)
            if x == 0 :
                pass
            elif 0 < x :
                rgb = self.get_color(x)
                if rgb != self.prev_rgb :
                    if self.verbose :
                        logging.info("Lamp: setting color to {} from x={}", rgb, x)
                    self.fill_pixels(rgb)
                    self.prev_rgb = rgb
            else : # wrap around; start over
                logging.info("Lamp: tick wrap")
                self.initial_tick = current_tick
        return True 
示例10
def adcread(chan): # 16 temp 17 vbat 18 vref
    assert chan >= 16 and chan <= 18, 'Invalid ADC channel'
    start = utime.ticks_ms()
    timeout = 100
    stm.mem32[stm.RCC + stm.RCC_APB2ENR] |= 0x100 # enable ADC1 clock.0x4100
    stm.mem32[stm.ADC1 + stm.ADC_CR2] = 1 # Turn on ADC
    stm.mem32[stm.ADC1 + stm.ADC_CR1] = 0 # 12 bit
    if chan == 17:
        stm.mem32[stm.ADC1 + stm.ADC_SMPR1] = 0x200000 # 15 cycles
        stm.mem32[stm.ADC + 4] = 1 << 23
    elif chan == 18:
        stm.mem32[stm.ADC1 + stm.ADC_SMPR1] = 0x1000000
        stm.mem32[stm.ADC + 4] = 0xc00000
    else:
        stm.mem32[stm.ADC1 + stm.ADC_SMPR1] = 0x40000
        stm.mem32[stm.ADC + 4] = 1 << 23
    stm.mem32[stm.ADC1 + stm.ADC_SQR3] = chan
    stm.mem32[stm.ADC1 + stm.ADC_CR2] = 1 | (1 << 30) | (1 << 10) # start conversion
    while not stm.mem32[stm.ADC1 + stm.ADC_SR] & 2: # wait for EOC
        if utime.ticks_diff(utime.ticks_ms(), start) > timeout:
            raise OSError('ADC timout')
    data = stm.mem32[stm.ADC1 + stm.ADC_DR] # clear down EOC
    stm.mem32[stm.ADC1 + stm.ADC_CR2] = 0 # Turn off ADC
    return data 
示例11
def get_t_split(self):
        state = machine.disable_irq()
        t = self.t_ms
        acquired = self.acquired
        machine.enable_irq(state)
        isecs, ims = divmod(t, 1000)  # Get integer secs and ms
        x, secs = divmod(isecs, 60)
        hrs, mins = divmod(x, 60)
        dt = utime.ticks_diff(utime.ticks_us(), acquired)  # μs to time now
        ds, us = divmod(dt, 1000000)
        # If dt > 1e6 can add to secs without risk of rollover: see above.
        self._time[0] = hrs
        self._time[1] = mins
        self._time[2] = secs + ds
        self._time[3] = us + ims*1000
        return self._time 
示例12
def run_cancel_test6(loop):
    for name in ('complete', 'cancel me'):
        loop.create_task(asyn.NamedTask(name, cant60, name)())
    loop.create_task(asyn.Cancellable(cant61)())
    await asyncio.sleep(4.5)
    print('Cancelling task \"{}\". 1.5 secs latency.'.format(name))
    await asyn.NamedTask.cancel(name)
    await asyncio.sleep(7)
    name = 'cancel wait'
    loop.create_task(asyn.NamedTask(name, cant60, name)())
    await asyncio.sleep(0.5)
    print('Cancelling task \"{}\". 1.5 secs latency.'.format(name))
    t = time.ticks_ms()
    await asyn.NamedTask.cancel('cancel wait', nowait=False)
    print('Was cancelled in {} ms'.format(time.ticks_diff(time.ticks_ms(), t)))
    print('Cancelling cant61')
    await asyn.Cancellable.cancel_all()
    print('Done') 
示例13
def main(i2c):
    # Initialize Charlieplex matrix wing.
    matrix = is31fl3731.CharlieWing(i2c)
    matrix.fill(0)
    # Initialize font renderer.
    with bitmapfont.BitmapFont(DISPLAY_WIDTH, DISPLAY_HEIGHT, matrix.pixel) as bf:
        # Global state:
        pos = DISPLAY_WIDTH                 # X position of the message start.
        message_width = bf.width(MESSAGE)   # Message width in pixels.
        frame = 0                           # Currently displayed frame.
        last = utime.ticks_ms()             # Last frame millisecond tick time.
        speed_ms = SPEED / 1000.0           # Scroll speed in pixels/ms.
        while True:
            # Compute the time delta in milliseconds since the last frame.
            current = utime.ticks_ms()
            delta_ms = utime.ticks_diff(last, current)
            last = current
            # Compute position using speed and time delta.
            pos -= speed_ms*delta_ms
            if pos < -message_width:
                pos = DISPLAY_WIDTH
            # Swap frames to start drawing on a non-visible frame (double buffering).
            frame = (frame + 1) % 2
            matrix.frame(frame, show=False)
            # Clear the frame and draw the text at the current position.
            matrix.fill(0)
            bf.text(MESSAGE, int(pos), 0, INTENSITY)
            # Swap to the new frame on the display.
            matrix.frame(frame)
            # Sleep a bit to give USB mass storage some processing time (quirk
            # of SAMD21 firmware right now).
            utime.sleep_ms(20) 
示例14
def main(i2c):
    # Initialize Charlieplex matrix wing.
    matrix = is31fl3731.CharlieWing(i2c)
    matrix.fill(0)
    # Initialize font renderer.
    with bitmapfont.BitmapFont(DISPLAY_WIDTH, DISPLAY_HEIGHT, matrix.pixel) as bf:
        # Global state:
        pos = DISPLAY_WIDTH                 # X position of the message start.
        message_width = bf.width(MESSAGE)   # Message width in pixels.
        frame = 0                           # Currently displayed frame.
        last = utime.ticks_ms()             # Last frame millisecond tick time.
        speed_ms = SPEED / 1000.0           # Scroll speed in pixels/ms.
        while True:
            # Compute the time delta in milliseconds since the last frame.
            current = utime.ticks_ms()
            delta_ms = utime.ticks_diff(current, last)
            last = current
            # Compute position using speed and time delta.
            pos -= speed_ms*delta_ms
            if pos < -message_width:
                pos = DISPLAY_WIDTH
            # Swap frames to start drawing on a non-visible frame (double buffering).
            frame = (frame + 1) % 2
            matrix.frame(frame, show=False)
            # Clear the frame and draw the text at the current position.
            matrix.fill(0)
            bf.text(MESSAGE, int(pos), 0, INTENSITY)
            # Swap to the new frame on the display.
            matrix.frame(frame)
            # Sleep a bit to give USB mass storage some processing time (quirk
            # of SAMD21 firmware right now).
            utime.sleep_ms(20) 
示例15
def main(i2c):
    # Initialize LED matrix.
    matrix = ht16k33_matrix.Matrix16x8(i2c)
    # Initialize font renderer using a helper function to flip the Y axis
    # when rendering so the origin is in the upper left.
    def matrix_pixel(x, y):
        matrix.pixel(x, DISPLAY_HEIGHT-1-y, 1)
    with bitmapfont.BitmapFont(DISPLAY_WIDTH, DISPLAY_HEIGHT, matrix_pixel) as bf:
        # Global state:
        pos = DISPLAY_WIDTH                 # X position of the message start.
        message_width = bf.width(MESSAGE)   # Message width in pixels.
        last = utime.ticks_ms()             # Last frame millisecond tick time.
        speed_ms = SPEED / 1000.0           # Scroll speed in pixels/ms.
        # Main loop:
        while True:
            # Compute the time delta in milliseconds since the last frame.
            current = utime.ticks_ms()
            delta_ms = utime.ticks_diff(current, last)
            last = current
            # Compute position using speed and time delta.
            pos -= speed_ms*delta_ms
            if pos < -message_width:
                pos = DISPLAY_WIDTH
            # Clear the matrix and draw the text at the current position.
            matrix.fill(0)
            bf.text(MESSAGE, int(pos), 0)
            # Update the matrix LEDs.
            matrix.show()
            # Sleep a bit to give USB mass storage some processing time (quirk
            # of SAMD21 firmware right now).
            utime.sleep_ms(20) 
示例16
def main(i2c):
    # Initialize LED matrix.
    matrix = ht16k33_matrix.Matrix16x8(i2c)
    # Initialize font renderer using a helper function to flip the Y axis
    # when rendering so the origin is in the upper left.
    def matrix_pixel(x, y):
        matrix.pixel(x, DISPLAY_HEIGHT-1-y, 1)
    with bitmapfont.BitmapFont(DISPLAY_WIDTH, DISPLAY_HEIGHT, matrix_pixel) as bf:
        # Global state:
        pos = DISPLAY_WIDTH                 # X position of the message start.
        message_width = bf.width(MESSAGE)   # Message width in pixels.
        last = utime.ticks_ms()             # Last frame millisecond tick time.
        speed_ms = SPEED / 1000.0           # Scroll speed in pixels/ms.
        # Main loop:
        while True:
            # Compute the time delta in milliseconds since the last frame.
            current = utime.ticks_ms()
            delta_ms = utime.ticks_diff(last, current)
            last = current
            # Compute position using speed and time delta.
            pos -= speed_ms*delta_ms
            if pos < -message_width:
                pos = DISPLAY_WIDTH
            # Clear the matrix and draw the text at the current position.
            matrix.fill(0)
            bf.text(MESSAGE, int(pos), 0)
            # Update the matrix LEDs.
            matrix.show()
            # Sleep a bit to give USB mass storage some processing time (quirk
            # of SAMD21 firmware right now).
            utime.sleep_ms(20) 
示例17
def time_since_fix(self):
        """Returns number of millisecond since the last sentence with a valid fix was parsed. Returns 0 if
        no fix has been found"""

        # Test if a Fix has been found
        if self.fix_time == 0:
            return -1

        # Try calculating fix time using utime; if not running MicroPython
        # time.time() returns a floating point value in secs
        try:
            current = utime.ticks_diff(utime.ticks_ms(), self.fix_time)
        except NameError:
            current = (time.time() - self.fix_time) * 1000  # ms

        return current 
示例18
def _timeout(self, t):
        return ticks_diff(ticks_ms(), t) > self._response_time 
示例19
def _keep_alive(self):
        while self.isconnected():
            pings_due = ticks_diff(ticks_ms(), self.last_rx) // self._ping_interval
            if pings_due >= 4:
                self.dprint('Reconnect: broker fail.')
                break
            elif pings_due >= 1:
                try:
                    await self._ping()
                except OSError:
                    break
            await asyncio.sleep(1)
        self._reconnect()  # Broker or WiFi fail.

    # DEBUG: show RAM messages. 
示例20
def _synchronise(self):   # wait for clock
        t = ticks_ms()
        while self.ckin() == self.phase ^ self.passive ^ 1:
            # Other tasks can clear self._running by calling stop()
            if (self._timeout and ticks_diff(ticks_ms(), t) > self._timeout) or not self._running:
                raise SynComError
            await asyncio.sleep_ms(0)
        self.indata = (self.indata | (self.din() << _BITS_SYN)) >> 1
        odata = self.odata
        self.dout(odata & 1)
        self.odata = odata >> 1
        self.phase ^= 1
        self.ckout(self.phase)      # set clock 
示例21
def _get_bit(self, dest):
        t = ticks_ms()
        while self.ckin() == self.phase ^ self.passive ^ 1:
            if (self._timeout and ticks_diff(ticks_ms(), t) > self._timeout) or not self._running:
                raise SynComError
            yield  # Faster than await asyncio.sleep_ms()
        dest = (dest | (self.din() << _BITS_PER_CH)) >> 1
        obyte = self.odata
        self.dout(obyte & 1)
        self.odata = obyte >> 1
        self.phase ^= 1
        self.ckout(self.phase)
        return dest 
示例22
def waitfor(self, val):  # Wait for response for 1 sec
        tim = utime.ticks_ms()
        while not self.rem() == val:
            if utime.ticks_diff(utime.ticks_ms(), tim) > 1000:
                raise OSError 
示例23
def _run(self):
        while True:
            # If hardware link exists reboot Responder
            await self.reboot()
            self.txbyt = b''
            self.rxbyt = b''
            await self._sync()
            await asyncio.sleep(1)  # Ensure Responder is ready
            if self.cr_go:
                asyncio.create_task(self.cr_go(*self.go_args))
            while True:
                gc.collect()
                try:
                    tstart = utime.ticks_us()
                    self._sendrx()
                    t = utime.ticks_diff(utime.ticks_us(), tstart)
                except OSError:  # Reboot remote.
                    break
                await asyncio.sleep_ms(Initiator.t_poll)
                self.block_max = max(self.block_max, t)  # self measurement
                self.block_cnt += 1
                self.block_sum += t
            self.nboots += 1
            if self.cr_fail:
                await self.cr_fail(*self.f_args)
            if self.reset is None:  # No means of recovery
                raise OSError('Responder fail.') 
示例24
def _do_qos(self, mid, line):
        while True:
            # Wait for any outage to clear
            await self
            # Wait for the matching ACK.
            tstart = utime.ticks_ms()
            while utime.ticks_diff(utime.ticks_ms(), tstart) < self._to:
                await asyncio.sleep_ms(self._tim_short)
                if mid not in self._acks_pend:
                    return  # ACK was received
            # ACK was not received. Re-send.
            await self._write(line)
            self._verbose and print('Repeat', line, 'to server app')

    # Make an attempt to connect to WiFi. May not succeed. 
示例25
def _keepalive(self):
        while True:
            due = self._tim_ka - utime.ticks_diff(utime.ticks_ms(), self._last_wr)
            if due <= 0:
                # error sets ._evfail, .run cancels this coro
                await self._send(b'\n')
            else:
                await asyncio.sleep_ms(due)

    # Read a line from nonblocking socket: reads can return partial data which
    # are joined into a line. Blank lines are keepalive packets which reset
    # the timeout: _readline() pauses until a complete line has been received. 
示例26
def _readline(self, to):
        led = self._led
        line = b''
        start = utime.ticks_ms()
        while True:
            if line.endswith(b'\n'):
                self._ok = True  # Got at least 1 packet after an outage.
                if len(line) > 1:
                    return line
                # Got a keepalive: discard, reset timers, toggle LED.
                self._feed(0)
                line = b''
                if led is not None:
                    if isinstance(led, machine.Pin):
                        led(not led())
                    else:  # On Pyboard D
                        led.toggle()
            try:
                d = self._sock.readline()
            except Exception as e:
                self._verbose and print('_readline exception', d)
                raise
            if d == b'':
                self._verbose and print('_readline peer disconnect')
                raise OSError
            if d is None:  # Nothing received: wait on server
                if utime.ticks_diff(utime.ticks_ms(), start) > to:
                    self._verbose and print('_readline timeout')
                    raise OSError
                await asyncio.sleep_ms(0)
            else:  # Something received: reset timer
                start = utime.ticks_ms()
                line = b''.join((line, d)) if line else d 
示例27
def test():
    """Bouncing box."""
    try:
        # Baud rate of 14500000 seems about the max
        spi = SPI(2, baudrate=14500000, sck=Pin(18), mosi=Pin(23))
        display = Display(spi, dc=Pin(17), cs=Pin(5), rst=Pin(16))
        display.clear()

        colors = [color565(255, 0, 0),
                  color565(0, 255, 0),
                  color565(0, 0, 255),
                  color565(255, 255, 0),
                  color565(0, 255, 255),
                  color565(255, 0, 255)]
        sizes = [12, 11, 10, 9, 8, 7]
        boxes = [Box(128, 128, sizes[i], display,
                 colors[i]) for i in range(6)]

        while True:
            timer = ticks_us()
            for b in boxes:
                b.update_pos()
                b.draw()
            # Attempt to set framerate to 30 FPS
            timer_dif = 33333 - ticks_diff(ticks_us(), timer)
            if timer_dif > 0:
                sleep_us(timer_dif)

    except KeyboardInterrupt:
        display.cleanup() 
示例28
def run(self, conversion, duration):
        tim = self.timer
        tim.deinit()
        tim.init(freq = int(self._length/duration))
        self.adc.read_timed(self.buff, tim) # Note: blocks for duration
        start = utime.ticks_us()
        icopy(self.buff, self.re, self._length) # Fast copy integer array into real
        super().run(conversion)
        return utime.ticks_diff(utime.ticks_us(), start) 
示例29
def rtc_test(self, runtime=600, ppm=False, verbose=True):
        if rtc is None:
            raise RuntimeError('machine.RTC does not exist')
        verbose and print('Waiting {} minutes for result'.format(runtime//60))
        factor = 1_000_000 if ppm else 114_155_200  # seconds per year

        self.await_transition()  # Start on transition of DS3231. Record time in .timebuf
        t = utime.ticks_ms()  # Get system time now
        ss = rtc.datetime()[6]  # Seconds from system RTC
        while ss == rtc.datetime()[6]:
            pass
        ds = utime.ticks_diff(utime.ticks_ms(), t)  # ms to transition of RTC
        ds3231_start = utime.mktime(self.convert())  # Time when transition occurred
        t = rtc.datetime()
        rtc_start = utime.mktime((t[0], t[1], t[2], t[4], t[5], t[6], t[3] - 1, 0))  # y m d h m s wday 0

        utime.sleep(runtime)  # Wait a while (precision doesn't matter)

        self.await_transition()  # of DS3231 and record the time
        t = utime.ticks_ms()  # and get system time now
        ss = rtc.datetime()[6]  # Seconds from system RTC
        while ss == rtc.datetime()[6]:
            pass
        de = utime.ticks_diff(utime.ticks_ms(), t)  # ms to transition of RTC
        ds3231_end = utime.mktime(self.convert())  # Time when transition occurred
        t = rtc.datetime()
        rtc_end = utime.mktime((t[0], t[1], t[2], t[4], t[5], t[6], t[3] - 1, 0))  # y m d h m s wday 0

        d_rtc = 1000 * (rtc_end - rtc_start) + de - ds  # ms recorded by RTC
        d_ds3231 = 1000 * (ds3231_end - ds3231_start)  # ms recorded by DS3231
        ratio = (d_ds3231 - d_rtc) / d_ds3231
        ppm = ratio * 1_000_000
        verbose and print('DS3231 leads RTC by {:4.1f}ppm {:4.1f}mins/yr'.format(ppm, ppm*1.903))
        return ratio * factor 
示例30
def getcal(self, minutes=5, cal=0, verbose=True):
        if d_series:
            return self._getcal_d(minutes, cal, verbose)
        verbose and print('Pyboard 1.x. Waiting {} minutes for calibration factor.'.format(minutes))
        rtc.calibration(cal)  # Clear existing cal
        self.save_time()  # Set DS3231 from RTC
        self.await_transition()  # Wait for DS3231 to change: on a 1 second boundary
        tus = utime.ticks_us()
        st = rtc.datetime()[7]
        while rtc.datetime()[7] == st:  # Wait for RTC to change
            pass
        t1 = utime.ticks_diff(utime.ticks_us(), tus)  # t1 is duration (μs) between DS and RTC change (start)
        rtcstart = get_ms(rtc.datetime())  # RTC start time in mS
        dsstart = utime.mktime(self.convert())  # DS start time in secs as recorded by await_transition

        utime.sleep(minutes * 60)

        self.await_transition()  # DS second boundary
        tus = utime.ticks_us()
        st = rtc.datetime()[7]
        while rtc.datetime()[7] == st:
            pass
        t2 = utime.ticks_diff(utime.ticks_us(), tus)  # t2 is duration (μs) between DS and RTC change (end)
        rtcend = get_ms(rtc.datetime())
        dsend = utime.mktime(self.convert())
        dsdelta = (dsend - dsstart) * 1000000  # Duration (μs) between DS edges as measured by DS3231
        if rtcend < rtcstart:  # It's run past midnight. Assumption: run time < 1 day!
            rtcend += 24 * 3_600_000
        rtcdelta = (rtcend - rtcstart) * 1000 + t1 - t2  # Duration (μs) between DS edges as measured by RTC and corrected
        ppm = (1000000* (rtcdelta - dsdelta))/dsdelta
        if cal:
            verbose and print('Error {:4.1f}ppm {:4.1f}mins/year.'.format(ppm, ppm * 1.903))
            return 0
        cal = int(-ppm / 0.954)
        verbose and print('Error {:4.1f}ppm {:4.1f}mins/year. Cal factor {}'.format(ppm, ppm * 1.903, cal))
        return cal

    # Version for Pyboard D. This has μs resolution.