import os import time import fcntl import ctypes import warnings from ctypes import c_uint16 as _u16 from ctypes import c_int16 as _s16 from ctypes import c_uint32 as _u32 from ctypes import c_int32 as _s32 from ctypes import c_int64 as _s64 from concurrent.futures import ThreadPoolExecutor from typing import List import pyglet from .evdev_constants import * from pyglet.app.xlib import XlibSelectDevice from pyglet.input.base import Device, RelativeAxis, AbsoluteAxis, Button, Joystick, Controller from pyglet.input.base import DeviceOpenException, ControllerManager from pyglet.input.controller import get_mapping, Relation, create_guid _IOC_NRBITS = 8 _IOC_TYPEBITS = 8 _IOC_SIZEBITS = 14 _IOC_DIRBITS = 2 _IOC_NRSHIFT = 0 _IOC_TYPESHIFT = (_IOC_NRSHIFT + _IOC_NRBITS) _IOC_SIZESHIFT = (_IOC_TYPESHIFT + _IOC_TYPEBITS) _IOC_DIRSHIFT = (_IOC_SIZESHIFT + _IOC_SIZEBITS) _IOC_NONE = 0 _IOC_WRITE = 1 _IOC_READ = 2 def _IOC(dir, type, nr, size): return ((dir << _IOC_DIRSHIFT) | (type << _IOC_TYPESHIFT) | (nr << _IOC_NRSHIFT) | (size << _IOC_SIZESHIFT)) def _IOR(type, nr, struct): request = _IOC(_IOC_READ, ord(type), nr, ctypes.sizeof(struct)) def f(fileno): buffer = struct() fcntl.ioctl(fileno, request, buffer) return buffer return f def _IOR_len(type, nr): def f(fileno, buffer): request = _IOC(_IOC_READ, ord(type), nr, ctypes.sizeof(buffer)) fcntl.ioctl(fileno, request, buffer) return buffer return f def _IOR_str(type, nr): g = _IOR_len(type, nr) def f(fileno, length=256): return g(fileno, ctypes.create_string_buffer(length)).value return f def _IOW(type, nr): def f(fileno, buffer): request = _IOC(_IOC_WRITE, ord(type), nr, ctypes.sizeof(buffer)) fcntl.ioctl(fileno, request, buffer) return f # Structures from /linux/blob/master/include/uapi/linux/input.h class Timeval(ctypes.Structure): _fields_ = ( ('tv_sec', _s64), ('tv_usec', _s64) ) class InputEvent(ctypes.Structure): _fields_ = ( ('time', Timeval), ('type', _u16), ('code', _u16), ('value', _s32) ) class InputID(ctypes.Structure): _fields_ = ( ('bustype', _u16), ('vendor', _u16), ('product', _u16), ('version', _u16), ) class InputABSInfo(ctypes.Structure): _fields_ = ( ('value', _s32), ('minimum', _s32), ('maximum', _s32), ('fuzz', _s32), ('flat', _s32), ) class FFReplay(ctypes.Structure): _fields_ = ( ('length', _u16), ('delay', _u16) ) class FFTrigger(ctypes.Structure): _fields_ = ( ('button', _u16), ('interval', _u16) ) class FFEnvelope(ctypes.Structure): _fields_ = [ ('attack_length', _u16), ('attack_level', _u16), ('fade_length', _u16), ('fade_level', _u16), ] class FFConstantEffect(ctypes.Structure): _fields_ = [ ('level', _s16), ('ff_envelope', FFEnvelope), ] class FFRampEffect(ctypes.Structure): _fields_ = [ ('start_level', _s16), ('end_level', _s16), ('ff_envelope', FFEnvelope), ] class FFConditionEffect(ctypes.Structure): _fields_ = [ ('right_saturation', _u16), ('left_saturation', _u16), ('right_coeff', _s16), ('left_coeff', _s16), ('deadband', _u16), ('center', _s16), ] class FFPeriodicEffect(ctypes.Structure): _fields_ = [ ('waveform', _u16), ('period', _u16), ('magnitude', _s16), ('offset', _s16), ('phase', _u16), ('envelope', FFEnvelope), ('custom_len', _u32), ('custom_data', ctypes.POINTER(_s16)), ] class FFRumbleEffect(ctypes.Structure): _fields_ = ( ('strong_magnitude', _u16), ('weak_magnitude', _u16) ) class FFEffectType(ctypes.Union): _fields_ = ( ('ff_constant_effect', FFConstantEffect), ('ff_ramp_effect', FFRampEffect), ('ff_periodic_effect', FFPeriodicEffect), ('ff_condition_effect', FFConditionEffect * 2), ('ff_rumble_effect', FFRumbleEffect), ) class FFEvent(ctypes.Structure): _fields_ = ( ('type', _u16), ('id', _s16), ('direction', _u16), ('ff_trigger', FFTrigger), ('ff_replay', FFReplay), ('u', FFEffectType) ) EVIOCGVERSION = _IOR('E', 0x01, ctypes.c_int) EVIOCGID = _IOR('E', 0x02, InputID) EVIOCGNAME = _IOR_str('E', 0x06) EVIOCGPHYS = _IOR_str('E', 0x07) EVIOCGUNIQ = _IOR_str('E', 0x08) EVIOCSFF = _IOW('E', 0x80) def EVIOCGBIT(fileno, ev, buffer): return _IOR_len('E', 0x20 + ev)(fileno, buffer) def EVIOCGABS(fileno, abs): buffer = InputABSInfo() return _IOR_len('E', 0x40 + abs)(fileno, buffer) def get_set_bits(bytestring): bits = set() j = 0 for byte in bytestring: for i in range(8): if byte & 1: bits.add(j + i) byte >>= 1 j += 8 return bits _abs_names = { ABS_X: AbsoluteAxis.X, ABS_Y: AbsoluteAxis.Y, ABS_Z: AbsoluteAxis.Z, ABS_RX: AbsoluteAxis.RX, ABS_RY: AbsoluteAxis.RY, ABS_RZ: AbsoluteAxis.RZ, ABS_HAT0X: AbsoluteAxis.HAT_X, ABS_HAT0Y: AbsoluteAxis.HAT_Y, } _rel_names = { REL_X: RelativeAxis.X, REL_Y: RelativeAxis.Y, REL_Z: RelativeAxis.Z, REL_RX: RelativeAxis.RX, REL_RY: RelativeAxis.RY, REL_RZ: RelativeAxis.RZ, REL_WHEEL: RelativeAxis.WHEEL, } def _create_control(fileno, event_type, event_code): if event_type == EV_ABS: raw_name = abs_raw_names.get(event_code, 'EV_ABS(%x)' % event_code) name = _abs_names.get(event_code) absinfo = EVIOCGABS(fileno, event_code) value = absinfo.value minimum = absinfo.minimum maximum = absinfo.maximum control = AbsoluteAxis(name, minimum, maximum, raw_name) control.value = value if name == 'hat_y': control.inverted = True elif event_type == EV_REL: raw_name = rel_raw_names.get(event_code, 'EV_REL(%x)' % event_code) name = _rel_names.get(event_code) # TODO min/max? control = RelativeAxis(name, raw_name) elif event_type == EV_KEY: raw_name = key_raw_names.get(event_code, 'EV_KEY(%x)' % event_code) name = None control = Button(name, raw_name) else: value = minimum = maximum = 0 # TODO return None control._event_type = event_type control._event_code = event_code return control event_types = { EV_KEY: KEY_MAX, EV_REL: REL_MAX, EV_ABS: ABS_MAX, EV_MSC: MSC_MAX, EV_LED: LED_MAX, EV_SND: SND_MAX, EV_FF: FF_MAX, } class EvdevDevice(XlibSelectDevice, Device): _fileno = None def __init__(self, display, filename): self._filename = filename fileno = os.open(filename, os.O_RDONLY) # event_version = EVIOCGVERSION(fileno).value self._id = EVIOCGID(fileno) self.id_bustype = self._id.bustype self.id_vendor = hex(self._id.vendor) self.id_product = hex(self._id.product) self.id_version = self._id.version name = EVIOCGNAME(fileno) try: name = name.decode('utf-8') except UnicodeDecodeError: try: name = name.decode('latin-1') except UnicodeDecodeError: pass try: self.phys = EVIOCGPHYS(fileno) except OSError: self.phys = '' try: self.uniq = EVIOCGUNIQ(fileno) except OSError: self.uniq = '' self.controls = [] self.control_map = {} self.ff_types = [] event_types_bits = (ctypes.c_byte * 4)() EVIOCGBIT(fileno, 0, event_types_bits) for event_type in get_set_bits(event_types_bits): if event_type not in event_types: continue max_code = event_types[event_type] nbytes = max_code // 8 + 1 event_codes_bits = (ctypes.c_byte * nbytes)() EVIOCGBIT(fileno, event_type, event_codes_bits) if event_type == EV_FF: self.ff_types.extend(get_set_bits(event_codes_bits)) else: for event_code in get_set_bits(event_codes_bits): control = _create_control(fileno, event_type, event_code) if control: self.control_map[(event_type, event_code)] = control self.controls.append(control) self.controls.sort(key=lambda c: c._event_code) os.close(fileno) super().__init__(display, name) def get_guid(self): """Get the device's SDL2 style GUID string""" _id = self._id return create_guid(_id.bustype, _id.vendor, _id.product, _id.version, self.name, 0, 0) def open(self, window=None, exclusive=False): super().open(window, exclusive) try: self._fileno = os.open(self._filename, os.O_RDWR | os.O_NONBLOCK) except OSError as e: raise DeviceOpenException(e) pyglet.app.platform_event_loop.select_devices.add(self) def close(self): super().close() if not self._fileno: return pyglet.app.platform_event_loop.select_devices.remove(self) os.close(self._fileno) self._fileno = None def get_controls(self): return self.controls # Force Feedback methods def ff_upload_effect(self, structure): os.write(self._fileno, structure) # XlibSelectDevice interface def fileno(self): return self._fileno def poll(self): return False def select(self): if not self._fileno: return try: events = (InputEvent * 64)() bytes_read = os.readv(self._fileno, events) except OSError: self.close() return n_events = bytes_read // ctypes.sizeof(InputEvent) for event in events[:n_events]: try: control = self.control_map[(event.type, event.code)] control.value = event.value except KeyError: pass class FFController(Controller): """Controller that supports force-feedback""" _fileno = None _weak_effect = None _play_weak_event = None _stop_weak_event = None _strong_effect = None _play_strong_event = None _stop_strong_event = None def open(self, window=None, exclusive=False): super().open(window, exclusive) self._fileno = self.device.fileno() # Create Force Feedback effects & events when opened: # https://www.kernel.org/doc/html/latest/input/ff.html self._weak_effect = FFEvent(FF_RUMBLE, -1) EVIOCSFF(self._fileno, self._weak_effect) self._play_weak_event = InputEvent(Timeval(), EV_FF, self._weak_effect.id, 1) self._stop_weak_event = InputEvent(Timeval(), EV_FF, self._weak_effect.id, 0) self._strong_effect = FFEvent(FF_RUMBLE, -1) EVIOCSFF(self._fileno, self._strong_effect) self._play_strong_event = InputEvent(Timeval(), EV_FF, self._strong_effect.id, 1) self._stop_strong_event = InputEvent(Timeval(), EV_FF, self._strong_effect.id, 0) def rumble_play_weak(self, strength=1.0, duration=0.5): effect = self._weak_effect effect.u.ff_rumble_effect.weak_magnitude = int(max(min(1.0, strength), 0) * 0xFFFF) effect.ff_replay.length = int(duration * 1000) EVIOCSFF(self._fileno, effect) self.device.ff_upload_effect(self._play_weak_event) def rumble_play_strong(self, strength=1.0, duration=0.5): effect = self._strong_effect effect.u.ff_rumble_effect.strong_magnitude = int(max(min(1.0, strength), 0) * 0xFFFF) effect.ff_replay.length = int(duration * 1000) EVIOCSFF(self._fileno, effect) self.device.ff_upload_effect(self._play_strong_event) def rumble_stop_weak(self): self.device.ff_upload_effect(self._stop_weak_event) def rumble_stop_strong(self): self.device.ff_upload_effect(self._stop_strong_event) class EvdevControllerManager(ControllerManager, XlibSelectDevice): def __init__(self, display=None): super().__init__() self._display = display self._devices_file = open('/proc/bus/input/devices') self._device_names = self._get_device_names() self._controllers = {} self._thread_pool = ThreadPoolExecutor(max_workers=1) for name in self._device_names: path = os.path.join('/dev/input', name) try: device = EvdevDevice(self._display, path) except OSError: continue controller = _create_controller(device) if controller: self._controllers[name] = controller pyglet.app.platform_event_loop.select_devices.add(self) def __del__(self): self._devices_file.close() def fileno(self): """Allow this class to be Selectable""" return self._devices_file.fileno() @staticmethod def _get_device_names(): return {name for name in os.listdir('/dev/input') if name.startswith('event')} def _make_device_callback(self, future): name, device = future.result() if not device: return if name in self._controllers: controller = self._controllers.get(name) else: controller = _create_controller(device) self._controllers[name] = controller if controller: # Dispatch event in main thread: pyglet.app.platform_event_loop.post_event(self, 'on_connect', controller) def _make_device(self, name, count=1): path = os.path.join('/dev/input', name) while count > 0: try: return name, EvdevDevice(self._display, path) except OSError: if count > 0: time.sleep(0.1) count -= 1 return None, None def select(self): """Triggered whenever the devices_file changes.""" new_device_files = self._get_device_names() appeared = new_device_files - self._device_names disappeared = self._device_names - new_device_files self._device_names = new_device_files for name in appeared: future = self._thread_pool.submit(self._make_device, name, count=10) future.add_done_callback(self._make_device_callback) for name in disappeared: controller = self._controllers.get(name) if controller: self.dispatch_event('on_disconnect', controller) def get_controllers(self) -> List[Controller]: return list(self._controllers.values()) def get_devices(display=None): _devices = {} base = '/dev/input' for filename in os.listdir(base): if filename.startswith('event'): path = os.path.join(base, filename) if path in _devices: continue try: _devices[path] = EvdevDevice(display, path) except OSError: pass return list(_devices.values()) def _create_joystick(device): # Look for something with an ABS X and ABS Y axis, and a joystick 0 button have_x = False have_y = False have_button = False for control in device.controls: if control._event_type == EV_ABS and control._event_code == ABS_X: have_x = True elif control._event_type == EV_ABS and control._event_code == ABS_Y: have_y = True elif control._event_type == EV_KEY and control._event_code in (BTN_JOYSTICK, BTN_GAMEPAD): have_button = True if not (have_x and have_y and have_button): return return Joystick(device) def get_joysticks(display=None): return [joystick for joystick in [_create_joystick(device) for device in get_devices(display)] if joystick is not None] def _detect_controller_mapping(device): # If no explicit mapping is available, we can # detect it from the Linux gamepad specification: # https://www.kernel.org/doc/html/v4.13/input/gamepad.html # Note: legacy device drivers don't always adhere to this. mapping = dict(guid=device.get_guid(), name=device.name) _aliases = {BTN_MODE: 'guide', BTN_SELECT: 'back', BTN_START: 'start', BTN_SOUTH: 'a', BTN_EAST: 'b', BTN_WEST: 'x', BTN_NORTH: 'y', BTN_TL: 'leftshoulder', BTN_TR: 'rightshoulder', BTN_TL2: 'lefttrigger', BTN_TR2: 'righttrigger', BTN_THUMBL: 'leftstick', BTN_THUMBR: 'rightstick', BTN_DPAD_UP: 'dpup', BTN_DPAD_DOWN: 'dpdown', BTN_DPAD_LEFT: 'dpleft', BTN_DPAD_RIGHT: 'dpright', ABS_HAT0X: 'dpleft', # 'dpright', ABS_HAT0Y: 'dpup', # 'dpdown', ABS_Z: 'lefttrigger', ABS_RZ: 'righttrigger', ABS_X: 'leftx', ABS_Y: 'lefty', ABS_RX: 'rightx', ABS_RY: 'righty'} button_controls = [control for control in device.controls if isinstance(control, Button)] axis_controls = [control for control in device.controls if isinstance(control, AbsoluteAxis)] hat_controls = [control for control in device.controls if control.name in ('hat_x', 'hat_y')] for i, control in enumerate(button_controls): name = _aliases.get(control._event_code) if name: mapping[name] = Relation('button', i) for i, control in enumerate(axis_controls): name = _aliases.get(control._event_code) if name: mapping[name] = Relation('axis', i) for i, control in enumerate(hat_controls): name = _aliases.get(control._event_code) if name: index = 1 + i << 1 mapping[name] = Relation('hat0', index) return mapping def _create_controller(device): for control in device.controls: if control._event_type == EV_KEY and control._event_code == BTN_GAMEPAD: break else: return None # Game Controllers must have a BTN_GAMEPAD mapping = get_mapping(device.get_guid()) if not mapping: warnings.warn(f"Warning: {device} (GUID: {device.get_guid()}) " f"has no controller mappings. Update the mappings in the Controller DB.\n" f"Auto-detecting as defined by the 'Linux gamepad specification'") mapping = _detect_controller_mapping(device) if FF_RUMBLE in device.ff_types: return FFController(device, mapping) else: return Controller(device, mapping) def get_controllers(display=None): return [controller for controller in [_create_controller(device) for device in get_devices(display)] if controller is not None]