Difficult-Rocket/libs/pyglet/input/evdev.py

577 lines
16 KiB
Python
Raw Normal View History

2021-04-16 23:21:06 +08:00
# ----------------------------------------------------------------------------
# pyglet
# Copyright (c) 2006-2008 Alex Holkner
2021-04-17 01:14:38 +08:00
# Copyright (c) 2008-2021 pyglet contributors
2021-04-16 23:21:06 +08:00
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
#
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in
# the documentation and/or other materials provided with the
# distribution.
# * Neither the name of pyglet nor the names of its
# contributors may be used to endorse or promote products
# derived from this software without specific prior written
# permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
# ----------------------------------------------------------------------------
import os
import fcntl
2021-04-16 23:21:06 +08:00
import ctypes
2022-03-22 23:20:07 +08:00
from ctypes import c_uint16 as _u16
from ctypes import c_int16 as _s16
from ctypes import c_uint32 as _u32
from ctypes import c_int32 as _s32
from ctypes import c_int64 as _s64
2021-04-16 23:21:06 +08:00
import pyglet
from pyglet.app.xlib import XlibSelectDevice
2022-03-05 23:10:18 +08:00
from .base import Device, RelativeAxis, AbsoluteAxis, Button, Joystick, Controller
2021-04-16 23:21:06 +08:00
from .base import DeviceOpenException
from .evdev_constants import *
2022-03-05 23:10:18 +08:00
from .controller import get_mapping
2021-04-16 23:21:06 +08:00
_IOC_NRBITS = 8
_IOC_TYPEBITS = 8
_IOC_SIZEBITS = 14
_IOC_DIRBITS = 2
_IOC_NRSHIFT = 0
_IOC_TYPESHIFT = (_IOC_NRSHIFT + _IOC_NRBITS)
_IOC_SIZESHIFT = (_IOC_TYPESHIFT + _IOC_TYPEBITS)
_IOC_DIRSHIFT = (_IOC_SIZESHIFT + _IOC_SIZEBITS)
_IOC_NONE = 0
_IOC_WRITE = 1
_IOC_READ = 2
def _IOC(dir, type, nr, size):
return ((dir << _IOC_DIRSHIFT) |
(type << _IOC_TYPESHIFT) |
(nr << _IOC_NRSHIFT) |
(size << _IOC_SIZESHIFT))
def _IOR(type, nr, struct):
request = _IOC(_IOC_READ, ord(type), nr, ctypes.sizeof(struct))
def f(fileno):
buffer = struct()
2022-03-22 23:20:07 +08:00
fcntl.ioctl(fileno, request, buffer)
2021-04-16 23:21:06 +08:00
return buffer
return f
def _IOR_len(type, nr):
def f(fileno, buffer):
request = _IOC(_IOC_READ, ord(type), nr, ctypes.sizeof(buffer))
2022-03-22 23:20:07 +08:00
fcntl.ioctl(fileno, request, buffer)
2021-04-16 23:21:06 +08:00
return buffer
return f
def _IOR_str(type, nr):
g = _IOR_len(type, nr)
2022-03-22 23:20:07 +08:00
def f(fileno, length=256):
return g(fileno, ctypes.create_string_buffer(length)).value
2021-04-16 23:21:06 +08:00
return f
2022-03-22 23:20:07 +08:00
def _IOW(type, nr):
def f(fileno, buffer):
request = _IOC(_IOC_WRITE, ord(type), nr, ctypes.sizeof(buffer))
fcntl.ioctl(fileno, request, buffer)
return f
# Structures from /linux/blob/master/include/uapi/linux/input.h
class Timeval(ctypes.Structure):
_fields_ = (
('tv_sec', _s64),
('tv_usec', _s64)
)
class InputEvent(ctypes.Structure):
_fields_ = (
('time', Timeval),
('type', _u16),
('code', _u16),
('value', _s32)
)
2021-04-16 23:21:06 +08:00
2022-03-22 23:20:07 +08:00
class InputID(ctypes.Structure):
2021-04-16 23:21:06 +08:00
_fields_ = (
2022-03-22 23:20:07 +08:00
('bustype', _u16),
('vendor', _u16),
('product', _u16),
('version', _u16),
2021-04-16 23:21:06 +08:00
)
2022-03-22 23:20:07 +08:00
class InputABSInfo(ctypes.Structure):
2021-04-16 23:21:06 +08:00
_fields_ = (
2022-03-22 23:20:07 +08:00
('value', _s32),
('minimum', _s32),
('maximum', _s32),
('fuzz', _s32),
('flat', _s32),
2021-04-16 23:21:06 +08:00
)
2022-03-22 23:20:07 +08:00
class FFReplay(ctypes.Structure):
2021-04-16 23:21:06 +08:00
_fields_ = (
2022-03-22 23:20:07 +08:00
('length', _u16),
('delay', _u16)
2021-04-16 23:21:06 +08:00
)
2022-03-22 23:20:07 +08:00
class FFTrigger(ctypes.Structure):
2021-04-16 23:21:06 +08:00
_fields_ = (
2022-03-22 23:20:07 +08:00
('button', _u16),
('interval', _u16)
)
class FFEnvelope(ctypes.Structure):
_fields_ = [
('attack_length', _u16),
('attack_level', _u16),
('fade_length', _u16),
('fade_level', _u16),
]
class FFConstantEffect(ctypes.Structure):
_fields_ = [
('level', _s16),
('ff_envelope', FFEnvelope),
]
class FFRampEffect(ctypes.Structure):
_fields_ = [
('start_level', _s16),
('end_level', _s16),
('ff_envelope', FFEnvelope),
]
class FFConditionEffect(ctypes.Structure):
_fields_ = [
('right_saturation', _u16),
('left_saturation', _u16),
('right_coeff', _s16),
('left_coeff', _s16),
('deadband', _u16),
('center', _s16),
]
class FFPeriodicEffect(ctypes.Structure):
_fields_ = [
('waveform', _u16),
('period', _u16),
('magnitude', _s16),
('offset', _s16),
('phase', _u16),
('envelope', FFEnvelope),
('custom_len', _u32),
('custom_data', ctypes.POINTER(_s16)),
]
class FFRumbleEffect(ctypes.Structure):
_fields_ = (
('strong_magnitude', _u16),
('weak_magnitude', _u16)
)
class FFEffectType(ctypes.Union):
_fields_ = (
('ff_constant_effect', FFConstantEffect),
('ff_ramp_effect', FFRampEffect),
('ff_periodic_effect', FFPeriodicEffect),
('ff_condition_effect', FFConditionEffect * 2),
('ff_rumble_effect', FFRumbleEffect),
)
class FFEvent(ctypes.Structure):
_fields_ = (
('type', _u16),
('id', _s16),
('direction', _u16),
('ff_trigger', FFTrigger),
('ff_replay', FFReplay),
('u', FFEffectType)
2021-04-16 23:21:06 +08:00
)
EVIOCGVERSION = _IOR('E', 0x01, ctypes.c_int)
2022-03-22 23:20:07 +08:00
EVIOCGID = _IOR('E', 0x02, InputID)
2021-04-16 23:21:06 +08:00
EVIOCGNAME = _IOR_str('E', 0x06)
EVIOCGPHYS = _IOR_str('E', 0x07)
EVIOCGUNIQ = _IOR_str('E', 0x08)
2022-03-22 23:20:07 +08:00
EVIOCSFF = _IOW('E', 0x80)
2021-04-16 23:21:06 +08:00
def EVIOCGBIT(fileno, ev, buffer):
return _IOR_len('E', 0x20 + ev)(fileno, buffer)
def EVIOCGABS(fileno, abs):
2022-03-22 23:20:07 +08:00
buffer = InputABSInfo()
2021-04-16 23:21:06 +08:00
return _IOR_len('E', 0x40 + abs)(fileno, buffer)
2022-03-22 23:20:07 +08:00
def get_set_bits(bytestring):
2021-04-16 23:21:06 +08:00
bits = set()
j = 0
2022-03-22 23:20:07 +08:00
for byte in bytestring:
2021-04-16 23:21:06 +08:00
for i in range(8):
if byte & 1:
bits.add(j + i)
byte >>= 1
j += 8
return bits
_abs_names = {
ABS_X: AbsoluteAxis.X,
ABS_Y: AbsoluteAxis.Y,
ABS_Z: AbsoluteAxis.Z,
ABS_RX: AbsoluteAxis.RX,
ABS_RY: AbsoluteAxis.RY,
ABS_RZ: AbsoluteAxis.RZ,
ABS_HAT0X: AbsoluteAxis.HAT_X,
ABS_HAT0Y: AbsoluteAxis.HAT_Y,
}
_rel_names = {
REL_X: RelativeAxis.X,
REL_Y: RelativeAxis.Y,
REL_Z: RelativeAxis.Z,
REL_RX: RelativeAxis.RX,
REL_RY: RelativeAxis.RY,
REL_RZ: RelativeAxis.RZ,
REL_WHEEL: RelativeAxis.WHEEL,
}
def _create_control(fileno, event_type, event_code):
if event_type == EV_ABS:
raw_name = abs_raw_names.get(event_code, 'EV_ABS(%x)' % event_code)
name = _abs_names.get(event_code)
absinfo = EVIOCGABS(fileno, event_code)
value = absinfo.value
minimum = absinfo.minimum
maximum = absinfo.maximum
control = AbsoluteAxis(name, minimum, maximum, raw_name)
2021-04-16 23:21:06 +08:00
control.value = value
if name == 'hat_y':
control.inverted = True
elif event_type == EV_REL:
raw_name = rel_raw_names.get(event_code, 'EV_REL(%x)' % event_code)
name = _rel_names.get(event_code)
# TODO min/max?
control = RelativeAxis(name, raw_name)
elif event_type == EV_KEY:
raw_name = key_raw_names.get(event_code, 'EV_KEY(%x)' % event_code)
name = None
control = Button(name, raw_name)
else:
value = minimum = maximum = 0 # TODO
2021-04-16 23:21:06 +08:00
return None
control._event_type = event_type
control._event_code = event_code
return control
event_types = {
EV_KEY: KEY_MAX,
EV_REL: REL_MAX,
EV_ABS: ABS_MAX,
EV_MSC: MSC_MAX,
EV_LED: LED_MAX,
EV_SND: SND_MAX,
2022-03-22 23:20:07 +08:00
EV_FF: FF_MAX,
2021-04-16 23:21:06 +08:00
}
class EvdevDevice(XlibSelectDevice, Device):
_fileno = None
def __init__(self, display, filename):
self._filename = filename
fileno = os.open(filename, os.O_RDONLY)
# event_version = EVIOCGVERSION(fileno).value
self._id = EVIOCGID(fileno)
self.id_bustype = self._id.bustype
self.id_vendor = hex(self._id.vendor)
self.id_product = hex(self._id.product)
self.id_version = self._id.version
2021-04-16 23:21:06 +08:00
name = EVIOCGNAME(fileno)
try:
name = name.decode('utf-8')
except UnicodeDecodeError:
try:
name = name.decode('latin-1')
except UnicodeDecodeError:
pass
try:
self.phys = EVIOCGPHYS(fileno)
except OSError:
self.phys = ''
try:
self.uniq = EVIOCGUNIQ(fileno)
except OSError:
self.uniq = ''
self.controls = []
self.control_map = {}
2022-03-22 23:20:07 +08:00
self.ff_types = []
2021-04-16 23:21:06 +08:00
event_types_bits = (ctypes.c_byte * 4)()
EVIOCGBIT(fileno, 0, event_types_bits)
for event_type in get_set_bits(event_types_bits):
if event_type not in event_types:
continue
max_code = event_types[event_type]
nbytes = max_code // 8 + 1
event_codes_bits = (ctypes.c_byte * nbytes)()
EVIOCGBIT(fileno, event_type, event_codes_bits)
2022-03-22 23:20:07 +08:00
if event_type == EV_FF:
self.ff_types.extend(get_set_bits(event_codes_bits))
else:
for event_code in get_set_bits(event_codes_bits):
control = _create_control(fileno, event_type, event_code)
if control:
self.control_map[(event_type, event_code)] = control
self.controls.append(control)
2021-04-16 23:21:06 +08:00
os.close(fileno)
super().__init__(display, name)
def get_guid(self):
"""Generate an SDL2 style GUID from the device ID"""
hex_bustype = format(self._id.bustype & 0xFF, '02x')
hex_vendor = format(self._id.vendor & 0xFF, '02x')
hex_product = format(self._id.product & 0xFF, '02x')
hex_version = format(self._id.version & 0xFF, '02x')
shifted_bustype = format(self._id.bustype >> 8, '02x')
shifted_vendor = format(self._id.vendor >> 8, '02x')
shifted_product = format(self._id.product >> 8, '02x')
shifted_version = format(self._id.version >> 8, '02x')
slug = "{:0>2}{:0>2}0000{:0>2}{:0>2}0000{:0>2}{:0>2}0000{:0>2}{:0>2}0000"
return slug.format(hex_bustype, shifted_bustype, hex_vendor, shifted_vendor,
hex_product, shifted_product, hex_version, shifted_version)
2021-04-16 23:21:06 +08:00
def open(self, window=None, exclusive=False):
2022-03-22 23:20:07 +08:00
super().open(window, exclusive)
2021-04-16 23:21:06 +08:00
try:
self._fileno = os.open(self._filename, os.O_RDWR | os.O_NONBLOCK)
2021-04-16 23:21:06 +08:00
except OSError as e:
raise DeviceOpenException(e)
pyglet.app.platform_event_loop.select_devices.add(self)
2021-04-16 23:21:06 +08:00
def close(self):
2022-03-22 23:20:07 +08:00
super().close()
2021-04-16 23:21:06 +08:00
if not self._fileno:
return
pyglet.app.platform_event_loop.select_devices.remove(self)
2021-04-16 23:21:06 +08:00
os.close(self._fileno)
self._fileno = None
def get_controls(self):
return self.controls
# Force Feedback methods
2022-03-22 23:20:07 +08:00
def ff_upload_effect(self, structure):
os.write(self._fileno, structure)
2021-04-16 23:21:06 +08:00
# XlibSelectDevice interface
def fileno(self):
return self._fileno
def poll(self):
2022-03-05 23:10:18 +08:00
return False
2021-04-16 23:21:06 +08:00
def select(self):
if not self._fileno:
return
2022-03-22 23:20:07 +08:00
try:
events = (InputEvent * 64)()
bytes_read = os.readv(self._fileno, events)
except OSError:
self.close()
2021-04-16 23:21:06 +08:00
return
2022-03-22 23:20:07 +08:00
n_events = bytes_read // ctypes.sizeof(InputEvent)
2021-04-16 23:21:06 +08:00
for event in events[:n_events]:
try:
control = self.control_map[(event.type, event.code)]
control.value = event.value
except KeyError:
pass
2022-03-22 23:20:07 +08:00
class FFController(Controller):
"""Controller that supports force-feedback"""
_fileno = None
_weak_effect = None
_play_weak_event = None
_stop_weak_event = None
_strong_effect = None
_play_strong_event = None
_stop_strong_event = None
def open(self, window=None, exclusive=False):
super().open(window, exclusive)
2022-03-22 23:20:07 +08:00
self._fileno = self.device.fileno()
# Create Force Feedback effects & events when opened:
# https://www.kernel.org/doc/html/latest/input/ff.html
self._weak_effect = self._create_effect()
self._play_weak_event = InputEvent(Timeval(), EV_FF, self._weak_effect.id, 1)
self._stop_weak_event = InputEvent(Timeval(), EV_FF, self._weak_effect.id, 0)
self._strong_effect = self._create_effect()
self._play_strong_event = InputEvent(Timeval(), EV_FF, self._strong_effect.id, 1)
self._stop_strong_event = InputEvent(Timeval(), EV_FF, self._strong_effect.id, 0)
def _create_effect(self):
event = FFEvent(FF_RUMBLE, -1)
EVIOCSFF(self._fileno, event)
return event
def rumble_play_weak(self, strength=1.0, duration=0.5):
2022-03-22 23:20:07 +08:00
effect = self._weak_effect
effect.u.ff_rumble_effect.weak_magnitude = int(max(min(1.0, strength), 0) * 0xFFFF)
effect.ff_replay.length = int(duration * 1000)
EVIOCSFF(self._fileno, effect)
self.device.ff_upload_effect(self._play_weak_event)
def rumble_play_strong(self, strength=1.0, duration=0.5):
2022-03-22 23:20:07 +08:00
effect = self._strong_effect
effect.u.ff_rumble_effect.strong_magnitude = int(max(min(1.0, strength), 0) * 0xFFFF)
effect.ff_replay.length = int(duration * 1000)
EVIOCSFF(self._fileno, effect)
self.device.ff_upload_effect(self._play_strong_event)
def rumble_stop_weak(self):
"""Stop playing rumble effects on the weak motor."""
2022-03-22 23:20:07 +08:00
self.device.ff_upload_effect(self._stop_weak_event)
def rumble_stop_strong(self):
"""Stop playing rumble effects on the strong motor."""
2022-03-22 23:20:07 +08:00
self.device.ff_upload_effect(self._stop_strong_event)
2021-04-16 23:21:06 +08:00
def get_devices(display=None):
_devices = {}
2021-04-16 23:21:06 +08:00
base = '/dev/input'
for filename in os.listdir(base):
if filename.startswith('event'):
path = os.path.join(base, filename)
if path in _devices:
continue
try:
_devices[path] = EvdevDevice(display, path)
except OSError:
pass
return list(_devices.values())
def _create_joystick(device):
# Look for something with an ABS X and ABS Y axis, and a joystick 0 button
have_x = False
have_y = False
have_button = False
for control in device.controls:
if control._event_type == EV_ABS and control._event_code == ABS_X:
have_x = True
elif control._event_type == EV_ABS and control._event_code == ABS_Y:
have_y = True
elif control._event_type == EV_KEY and control._event_code in (BTN_JOYSTICK, BTN_GAMEPAD):
have_button = True
if not (have_x and have_y and have_button):
return
return Joystick(device)
2021-04-16 23:21:06 +08:00
def get_joysticks(display=None):
return [joystick for joystick in
[_create_joystick(device) for device in get_devices(display)]
2021-04-16 23:21:06 +08:00
if joystick is not None]
2022-03-05 23:10:18 +08:00
def _create_controller(device):
# Look for something with an ABS X and ABS Y axis, and BTN_GAMEPAD
have_button = False
2022-03-05 23:10:18 +08:00
device.controls.sort(key=lambda ctrl: ctrl._event_code)
for control in device.controls:
2022-03-05 23:10:18 +08:00
if control._event_type == EV_KEY and control._event_code == BTN_GAMEPAD:
have_button = True
2022-03-05 23:10:18 +08:00
mapping = get_mapping(device.get_guid())
# TODO: detect via device types, and use default mapping.
if have_button is False or mapping is None:
return
2022-03-22 23:20:07 +08:00
if FF_RUMBLE in device.ff_types:
return FFController(device, mapping)
else:
2022-03-05 23:10:18 +08:00
return Controller(device, mapping)
2022-03-05 23:10:18 +08:00
def get_controllers(display=None):
return [controller for controller in
2022-03-05 23:10:18 +08:00
[_create_controller(device) for device in get_devices(display)]
if controller is not None]