Difficult-Rocket/libs/pyglet/clock.py
2022-07-16 20:20:23 +08:00

736 lines
25 KiB
Python

# ----------------------------------------------------------------------------
# pyglet
# Copyright (c) 2006-2008 Alex Holkner
# Copyright (c) 2008-2022 pyglet contributors
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
# * Neither the name of pyglet nor the names of its
# contributors may be used to endorse or promote products
# derived from this software without specific prior written
# permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# ----------------------------------------------------------------------------
"""Precise framerate calculation function scheduling.
Measuring time
==============
The `tick` and `get_frequency` functions can be used in conjunction to fulfil most
games' basic requirements::
from pyglet import clock
while True:
dt = clock.tick()
# ... update and render ...
print(f"FPS is {clock.get_frequency()}")
The ``dt`` value returned gives the number of seconds (as a float) since the
last "tick".
The `get_frequency` function averages the framerate over a sliding window of
approximately 1 second. (You can calculate the instantaneous framerate by
taking the reciprocal of ``dt``).
Always remember to `tick` the clock!
Scheduling
==========
You can schedule a function to be called every time the clock is ticked::
def callback(dt):
print(f"{dt} seconds since last callback")
clock.schedule(callback)
The `schedule_interval` method causes a function to be called every "n"
seconds::
clock.schedule_interval(callback, .5) # called twice a second
The `schedule_once` method causes a function to be called once "n" seconds
in the future::
clock.schedule_once(callback, 5) # called in 5 seconds
All of the `schedule` methods will pass on any additional args or keyword args
you specify to the callback function::
def move(dt, velocity, sprite):
sprite.position += dt * velocity
clock.schedule(move, velocity=5.0, sprite=alien)
You can cancel a function scheduled with any of these methods using
`unschedule`::
clock.unschedule(move)
Using multiple clocks
=====================
The clock functions are all relayed to an instance of
:py:class:`~pyglet.clock.Clock` which is initialised with the module. You can
get this instance to use directly::
clk = clock.get_default()
You can also replace the default clock with your own:
myclk = clock.Clock()
clock.set_default(myclk)
Each clock maintains its own set of scheduled functions and FPS
measurement. Each clock must be "ticked" separately.
Multiple and derived clocks potentially allow you to separate "game-time" and
"wall-time", or to synchronise your clock to an audio or video stream instead
of the system clock.
"""
import time as _time
from heapq import heappop as _heappop
from heapq import heappush as _heappush
from heapq import heappushpop as _heappushpop
from operator import attrgetter as _attrgetter
from collections import deque as _deque
class _ScheduledItem:
__slots__ = ['func', 'args', 'kwargs']
def __init__(self, func, args, kwargs):
self.func = func
self.args = args
self.kwargs = kwargs
class _ScheduledIntervalItem:
__slots__ = ['func', 'interval', 'last_ts', 'next_ts', 'args', 'kwargs']
def __init__(self, func, interval, last_ts, next_ts, args, kwargs):
self.func = func
self.interval = interval
self.last_ts = last_ts
self.next_ts = next_ts
self.args = args
self.kwargs = kwargs
def __lt__(self, other):
try:
return self.next_ts < other.next_ts
except AttributeError:
return self.next_ts < other
class Clock:
"""Class for calculating and limiting framerate.
It is also used for calling scheduled functions.
"""
# List of functions to call every tick.
_schedule_items = None
# List of schedule interval items kept in sort order.
_schedule_interval_items = None
# If True, a sleep(0) is inserted on every tick.
_force_sleep = False
def __init__(self, time_function=_time.perf_counter):
"""Initialise a Clock, with optional custom time function.
:Parameters:
`time_function` : function
Function to return the elapsed time of the application,
in seconds. Defaults to time.time, but can be replaced
to allow for easy time dilation effects or game pausing.
"""
super(Clock, self).__init__()
self.time = time_function
self.next_ts = self.time()
self.last_ts = None
# Used by self.get_frequency to show update frequency
self.times = _deque()
self.cumulative_time = 0
self.window_size = 60
self._schedule_items = []
self._schedule_interval_items = []
self._current_interval_item = None
@staticmethod
def sleep(microseconds):
_time.sleep(microseconds * 1e-6)
def update_time(self):
"""Get the elapsed time since the last call to `update_time`.
This updates the clock's internal measure of time and returns
the difference since the last update (or since the clock was created).
.. versionadded:: 1.2
:rtype: float
:return: The number of seconds since the last `update_time`, or 0
if this was the first time it was called.
"""
ts = self.time()
if self.last_ts is None:
delta_t = 0
else:
delta_t = ts - self.last_ts
self.times.appendleft(delta_t)
if len(self.times) > self.window_size:
self.cumulative_time -= self.times.pop()
self.cumulative_time += delta_t
self.last_ts = ts
return delta_t
def call_scheduled_functions(self, dt):
"""Call scheduled functions that elapsed on the last `update_time`.
.. versionadded:: 1.2
:Parameters:
dt : float
The elapsed time since the last update to pass to each
scheduled function. This is *not* used to calculate which
functions have elapsed.
:rtype: bool
:return: True if any functions were called, otherwise False.
"""
now = self.last_ts
result = False # flag indicates if any function was called
# handle items scheduled for every tick
if self._schedule_items:
result = True
# duplicate list in case event unschedules itself
for item in list(self._schedule_items):
item.func(dt, *item.args, **item.kwargs)
# check the next scheduled item that is not called each tick
# if it is scheduled in the future, then exit
interval_items = self._schedule_interval_items
try:
if interval_items[0].next_ts > now:
return result
# raised when the interval_items list is empty
except IndexError:
return result
# NOTE: there is no special handling required to manage things
# that are scheduled during this loop, due to the heap
self._current_interval_item = item = None
get_soft_next_ts = self._get_soft_next_ts
while interval_items:
# the scheduler will hold onto a reference to an item in
# case it needs to be rescheduled. it is more efficient
# to push and pop the heap at once rather than two operations
if item is None:
item = _heappop(interval_items)
else:
item = _heappushpop(interval_items, item)
# a scheduled function may try and unschedule itself
# so we need to keep a reference to the current
# item no longer on heap to be able to check
self._current_interval_item = item
# if next item is scheduled in the future then break
if item.next_ts > now:
break
# execute the callback
try:
item.func(now - item.last_ts, *item.args, **item.kwargs)
except ReferenceError:
pass # weakly-referenced object no longer exists.
if item.interval:
# Try to keep timing regular, even if overslept this time;
# but don't schedule in the past (which could lead to
# infinitely-worsening error).
item.next_ts = item.last_ts + item.interval
item.last_ts = now
# test the schedule for the next execution
if item.next_ts <= now:
# the scheduled time of this item has already passed
# so it must be rescheduled
if now - item.next_ts < 0.05:
# missed execution time by 'reasonable' amount, so
# reschedule at normal interval
item.next_ts = now + item.interval
else:
# missed by significant amount, now many events have
# likely missed execution. do a soft reschedule to
# avoid lumping many events together.
# in this case, the next dt will not be accurate
item.next_ts = get_soft_next_ts(now, item.interval)
item.last_ts = item.next_ts - item.interval
else:
# not an interval, so this item will not be rescheduled
self._current_interval_item = item = None
if item is not None:
_heappush(interval_items, item)
return True
def tick(self, poll=False):
"""Signify that one frame has passed.
This will call any scheduled functions that have elapsed.
:Parameters:
`poll` : bool
If True, the function will call any scheduled functions
but will not sleep or busy-wait for any reason. Recommended
for advanced applications managing their own sleep timers
only.
Since pyglet 1.1.
:rtype: float
:return: The number of seconds since the last "tick", or 0 if this was
the first frame.
"""
if not poll and self._force_sleep:
self.sleep(0)
delta_t = self.update_time()
self.call_scheduled_functions(delta_t)
return delta_t
def get_sleep_time(self, sleep_idle):
"""Get the time until the next item is scheduled.
Applications can choose to continue receiving updates at the
maximum framerate during idle time (when no functions are scheduled),
or they can sleep through their idle time and allow the CPU to
switch to other processes or run in low-power mode.
If `sleep_idle` is ``True`` the latter behaviour is selected, and
``None`` will be returned if there are no scheduled items.
Otherwise, if `sleep_idle` is ``False``, or if any scheduled items
exist, a value of 0 is returned.
:Parameters:
`sleep_idle` : bool
If True, the application intends to sleep through its idle
time; otherwise it will continue ticking at the maximum
frame rate allowed.
:rtype: float
:return: Time until the next scheduled event in seconds, or ``None``
if there is no event scheduled.
.. versionadded:: 1.1
"""
if self._schedule_items or not sleep_idle:
return 0.0
if self._schedule_interval_items:
return max(self._schedule_interval_items[0].next_ts - self.time(), 0.0)
return None
def get_frequency(self):
"""Get the average clock update frequency of recent history.
The result is the average of a sliding window of the last "n" updates,
where "n" is some number designed to cover approximately 1 second.
This is **not** the Window redraw rate.
:rtype: float
:return: The measured updates per second.
"""
if not self.cumulative_time:
return 0
return len(self.times) / self.cumulative_time
def _get_nearest_ts(self):
"""Get the nearest timestamp.
Schedule from now, unless now is sufficiently close to last_ts, in
which case use last_ts. This clusters together scheduled items that
probably want to be scheduled together. The old (pre 1.1.1)
behaviour was to always use self.last_ts, and not look at ts. The
new behaviour is needed because clock ticks can now be quite
irregular, and span several seconds.
"""
last_ts = self.last_ts or self.next_ts
ts = self.time()
if ts - last_ts > 0.2:
return ts
return last_ts
def _get_soft_next_ts(self, last_ts, interval):
def taken(ts, e):
"""Check if `ts` has already got an item scheduled nearby."""
# TODO this function is slow and called very often.
# Optimise it, maybe?
for item in self._schedule_interval_items:
if abs(item.next_ts - ts) <= e:
return True
elif item.next_ts > ts + e:
return False
return False
# sorted list is required required to produce expected results
# taken() will iterate through the heap, expecting it to be sorted
# and will not always catch smallest value, so sort here.
# do not remove the sort key...it is faster than relaying comparisons
# NOTE: do not rewrite as popping from heap, as that is super slow!
self._schedule_interval_items.sort(key=_attrgetter('next_ts'))
# Binary division over interval:
#
# 0 interval
# |--------------------------|
# 5 3 6 2 7 4 8 1 Order of search
#
# i.e., first scheduled at interval,
# then at interval/2
# then at interval/4
# then at interval*3/4
# then at ...
#
# Schedule is hopefully then evenly distributed for any interval,
# and any number of scheduled functions.
next_ts = last_ts + interval
if not taken(next_ts, interval / 4):
return next_ts
dt = interval
divs = 1
while True:
next_ts = last_ts
for i in range(divs - 1):
next_ts += dt
if not taken(next_ts, dt / 4):
return next_ts
dt /= 2
divs *= 2
# Avoid infinite loop in pathological case
if divs > 16:
return next_ts
def schedule(self, func, *args, **kwargs):
"""Schedule a function to be called every frame.
The function should have a prototype that includes ``dt`` as the
first argument, which gives the elapsed time, in seconds, since the
last clock tick. Any additional arguments given to this function
are passed on to the callback::
def callback(dt, *args, **kwargs):
pass
:Parameters:
`func` : callable
The function to call each frame.
"""
item = _ScheduledItem(func, args, kwargs)
self._schedule_items.append(item)
def schedule_once(self, func, delay, *args, **kwargs):
"""Schedule a function to be called once after `delay` seconds.
The callback function prototype is the same as for `schedule`.
:Parameters:
`func` : callable
The function to call when the timer lapses.
`delay` : float
The number of seconds to wait before the timer lapses.
"""
last_ts = self._get_nearest_ts()
next_ts = last_ts + delay
item = _ScheduledIntervalItem(func, 0, last_ts, next_ts, args, kwargs)
_heappush(self._schedule_interval_items, item)
def schedule_interval(self, func, interval, *args, **kwargs):
"""Schedule a function to be called every `interval` seconds.
Specifying an interval of 0 prevents the function from being
called again (see `schedule` to call a function as often as possible).
The callback function prototype is the same as for `schedule`.
:Parameters:
`func` : callable
The function to call when the timer lapses.
`interval` : float
The number of seconds to wait between each call.
"""
last_ts = self._get_nearest_ts()
next_ts = last_ts + interval
item = _ScheduledIntervalItem(func, interval, last_ts, next_ts, args, kwargs)
_heappush(self._schedule_interval_items, item)
def schedule_interval_soft(self, func, interval, *args, **kwargs):
"""Schedule a function to be called every ``interval`` seconds.
This method is similar to `schedule_interval`, except that the
clock will move the interval out of phase with other scheduled
functions so as to distribute CPU more load evenly over time.
This is useful for functions that need to be called regularly,
but not relative to the initial start time. :py:mod:`pyglet.media`
does this for scheduling audio buffer updates, which need to occur
regularly -- if all audio updates are scheduled at the same time
(for example, mixing several tracks of a music score, or playing
multiple videos back simultaneously), the resulting load on the
CPU is excessive for those intervals but idle outside. Using
the soft interval scheduling, the load is more evenly distributed.
Soft interval scheduling can also be used as an easy way to schedule
graphics animations out of phase; for example, multiple flags
waving in the wind.
.. versionadded:: 1.1
:Parameters:
`func` : callable
The function to call when the timer lapses.
`interval` : float
The number of seconds to wait between each call.
"""
next_ts = self._get_soft_next_ts(self._get_nearest_ts(), interval)
last_ts = next_ts - interval
item = _ScheduledIntervalItem(func, interval, last_ts, next_ts, args, kwargs)
_heappush(self._schedule_interval_items, item)
def unschedule(self, func):
"""Remove a function from the schedule.
If the function appears in the schedule more than once, all occurrences
are removed. If the function was not scheduled, no error is raised.
:Parameters:
`func` : callable
The function to remove from the schedule.
"""
# clever remove item without disturbing the heap:
# 1. set function to an empty lambda -- original function is not called
# 2. set interval to 0 -- item will be removed from heap eventually
valid_items = set(item for item in self._schedule_interval_items if item.func == func)
if self._current_interval_item:
if self._current_interval_item.func == func:
valid_items.add(self._current_interval_item)
for item in valid_items:
item.interval = 0
item.func = lambda x, *args, **kwargs: x
self._schedule_items = [i for i in self._schedule_items if i.func != func]
# Default clock.
_default = Clock()
def set_default(default):
"""Set the default clock to use for all module-level functions.
By default an instance of :py:class:`~pyglet.clock.Clock` is used.
:Parameters:
`default` : `Clock`
The default clock to use.
"""
global _default
_default = default
def get_default():
"""Get the pyglet default Clock.
Return the :py:class:`~pyglet.clock.Clock` instance that is used by all
module-level clock functions.
:rtype: `Clock`
:return: The default clock.
"""
return _default
def tick(poll=False):
"""Signify that one frame has passed on the default clock.
This will call any scheduled functions that have elapsed.
:Parameters:
`poll` : bool
If True, the function will call any scheduled functions
but will not sleep or busy-wait for any reason. Recommended
for advanced applications managing their own sleep timers
only.
Since pyglet 1.1.
:rtype: float
:return: The number of seconds since the last "tick", or 0 if this was the
first frame.
"""
return _default.tick(poll)
def get_sleep_time(sleep_idle):
"""Get the time until the next item is scheduled on the default clock.
See `Clock.get_sleep_time` for details.
:Parameters:
`sleep_idle` : bool
If True, the application intends to sleep through its idle
time; otherwise it will continue ticking at the maximum
frame rate allowed.
:rtype: float
:return: Time until the next scheduled event in seconds, or ``None``
if there is no event scheduled.
.. versionadded:: 1.1
"""
return _default.get_sleep_time(sleep_idle)
def get_frequency():
"""Get the average clock update frequency.
The result is the sliding average of the last "n" updates,
where "n" is some number designed to cover approximately 1
second. This is the internal clock update rate, **not** the
Window redraw rate. Platform events, such as moving the
mouse rapidly, will cause the clock to refresh more often.
:rtype: float
:return: The measured updates per second.
"""
return _default.get_frequency()
def schedule(func, *args, **kwargs):
"""Schedule 'func' to be called every frame on the default clock.
The arguments passed to func are ``dt``, followed by any ``*args`` and
``**kwargs`` given here.
:Parameters:
`func` : callable
The function to call each frame.
"""
_default.schedule(func, *args, **kwargs)
def schedule_interval(func, interval, *args, **kwargs):
"""Schedule ``func`` on the default clock every interval seconds.
The arguments passed to ``func`` are ``dt`` (time since last function
call), followed by any ``*args`` and ``**kwargs`` given here.
:Parameters:
`func` : callable
The function to call when the timer lapses.
`interval` : float
The number of seconds to wait between each call.
"""
_default.schedule_interval(func, interval, *args, **kwargs)
def schedule_interval_soft(func, interval, *args, **kwargs):
"""Schedule ``func`` on the default clock every interval seconds.
The clock will move the interval out of phase with other scheduled
functions so as to distribute CPU more load evenly over time.
The arguments passed to ``func`` are ``dt`` (time since last function
call), followed by any ``*args`` and ``**kwargs`` given here.
:see: `Clock.schedule_interval_soft`
.. versionadded:: 1.1
:Parameters:
`func` : callable
The function to call when the timer lapses.
`interval` : float
The number of seconds to wait between each call.
"""
_default.schedule_interval_soft(func, interval, *args, **kwargs)
def schedule_once(func, delay, *args, **kwargs):
"""Schedule ``func`` to be called once after ``delay`` seconds.
This function uses the default clock. ``delay`` can be a float. The
arguments passed to ``func`` are ``dt`` (time since last function call),
followed by any ``*args`` and ``**kwargs`` given here.
If no default clock is set, the func is queued and will be scheduled
on the default clock as soon as it is created.
:Parameters:
`func` : callable
The function to call when the timer lapses.
`delay` : float
The number of seconds to wait before the timer lapses.
"""
_default.schedule_once(func, delay, *args, **kwargs)
def unschedule(func):
"""Remove ``func`` from the default clock's schedule.
No error is raised if the ``func`` was never scheduled.
:Parameters:
`func` : callable
The function to remove from the schedule.
"""
_default.unschedule(func)