486 lines
15 KiB
Python
486 lines
15 KiB
Python
# ----------------------------------------------------------------------------
|
|
# pyglet
|
|
# Copyright (c) 2006-2008 Alex Holkner
|
|
# Copyright (c) 2008-2021 pyglet contributors
|
|
# All rights reserved.
|
|
#
|
|
# Redistribution and use in source and binary forms, with or without
|
|
# modification, are permitted provided that the following conditions
|
|
# are met:
|
|
#
|
|
# * Redistributions of source code must retain the above copyright
|
|
# notice, this list of conditions and the following disclaimer.
|
|
# * Redistributions in binary form must reproduce the above copyright
|
|
# notice, this list of conditions and the following disclaimer in
|
|
# the documentation and/or other materials provided with the
|
|
# distribution.
|
|
# * Neither the name of pyglet nor the names of its
|
|
# contributors may be used to endorse or promote products
|
|
# derived from this software without specific prior written
|
|
# permission.
|
|
#
|
|
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
|
# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
|
# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
|
|
# FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
|
|
# COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
|
|
# INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
|
|
# BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
|
|
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
|
|
# CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
|
|
# LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
|
|
# ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
|
|
# POSSIBILITY OF SUCH DAMAGE.
|
|
# ----------------------------------------------------------------------------
|
|
|
|
import os
|
|
import errno
|
|
import fcntl
|
|
import struct
|
|
import ctypes
|
|
|
|
import pyglet
|
|
|
|
from pyglet.app.xlib import XlibSelectDevice
|
|
from .base import Device, RelativeAxis, AbsoluteAxis, Button, Joystick, GameController
|
|
from .base import DeviceOpenException
|
|
from .evdev_constants import *
|
|
from .gamecontroller import is_game_controller
|
|
|
|
c = pyglet.lib.load_library('c')
|
|
|
|
_IOC_NRBITS = 8
|
|
_IOC_TYPEBITS = 8
|
|
_IOC_SIZEBITS = 14
|
|
_IOC_DIRBITS = 2
|
|
|
|
_IOC_NRMASK = ((1 << _IOC_NRBITS) - 1)
|
|
_IOC_TYPEMASK = ((1 << _IOC_TYPEBITS) - 1)
|
|
_IOC_SIZEMASK = ((1 << _IOC_SIZEBITS) - 1)
|
|
_IOC_DIRMASK = ((1 << _IOC_DIRBITS) - 1)
|
|
|
|
_IOC_NRSHIFT = 0
|
|
_IOC_TYPESHIFT = (_IOC_NRSHIFT + _IOC_NRBITS)
|
|
_IOC_SIZESHIFT = (_IOC_TYPESHIFT + _IOC_TYPEBITS)
|
|
_IOC_DIRSHIFT = (_IOC_SIZESHIFT + _IOC_SIZEBITS)
|
|
|
|
_IOC_NONE = 0
|
|
_IOC_WRITE = 1
|
|
_IOC_READ = 2
|
|
|
|
|
|
def _IOC(dir, type, nr, size):
|
|
return ((dir << _IOC_DIRSHIFT) |
|
|
(type << _IOC_TYPESHIFT) |
|
|
(nr << _IOC_NRSHIFT) |
|
|
(size << _IOC_SIZESHIFT))
|
|
|
|
|
|
def _IOR(type, nr, struct):
|
|
request = _IOC(_IOC_READ, ord(type), nr, ctypes.sizeof(struct))
|
|
|
|
def f(fileno):
|
|
buffer = struct()
|
|
if c.ioctl(fileno, request, ctypes.byref(buffer)) < 0:
|
|
err = ctypes.c_int.in_dll(c, 'errno').value
|
|
raise OSError(err, errno.errorcode[err])
|
|
return buffer
|
|
|
|
return f
|
|
|
|
|
|
def _IOR_len(type, nr):
|
|
def f(fileno, buffer):
|
|
request = _IOC(_IOC_READ, ord(type), nr, ctypes.sizeof(buffer))
|
|
if c.ioctl(fileno, request, ctypes.byref(buffer)) < 0:
|
|
err = ctypes.c_int.in_dll(c, 'errno').value
|
|
raise OSError(err, errno.errorcode[err])
|
|
return buffer
|
|
|
|
return f
|
|
|
|
|
|
def _IOR_str(type, nr):
|
|
g = _IOR_len(type, nr)
|
|
|
|
def f(fileno, len=256):
|
|
return g(fileno, ctypes.create_string_buffer(len)).value
|
|
|
|
return f
|
|
|
|
|
|
time_t = ctypes.c_long
|
|
suseconds_t = ctypes.c_long
|
|
|
|
|
|
class timeval(ctypes.Structure):
|
|
_fields_ = (
|
|
('tv_sec', time_t),
|
|
('tv_usec', suseconds_t)
|
|
)
|
|
|
|
|
|
class input_event(ctypes.Structure):
|
|
_fields_ = (
|
|
('time', timeval),
|
|
('type', ctypes.c_uint16),
|
|
('code', ctypes.c_uint16),
|
|
('value', ctypes.c_int32)
|
|
)
|
|
|
|
|
|
class input_id(ctypes.Structure):
|
|
_fields_ = (
|
|
('bustype', ctypes.c_uint16),
|
|
('vendor', ctypes.c_uint16),
|
|
('product', ctypes.c_uint16),
|
|
('version', ctypes.c_uint16),
|
|
)
|
|
|
|
|
|
class input_absinfo(ctypes.Structure):
|
|
_fields_ = (
|
|
('value', ctypes.c_int32),
|
|
('minimum', ctypes.c_int32),
|
|
('maximum', ctypes.c_int32),
|
|
('fuzz', ctypes.c_int32),
|
|
('flat', ctypes.c_int32),
|
|
)
|
|
|
|
|
|
EVIOCGVERSION = _IOR('E', 0x01, ctypes.c_int)
|
|
EVIOCGID = _IOR('E', 0x02, input_id)
|
|
EVIOCGNAME = _IOR_str('E', 0x06)
|
|
EVIOCGPHYS = _IOR_str('E', 0x07)
|
|
EVIOCGUNIQ = _IOR_str('E', 0x08)
|
|
|
|
|
|
def EVIOCGBIT(fileno, ev, buffer):
|
|
return _IOR_len('E', 0x20 + ev)(fileno, buffer)
|
|
|
|
|
|
def EVIOCGABS(fileno, abs):
|
|
buffer = input_absinfo()
|
|
return _IOR_len('E', 0x40 + abs)(fileno, buffer)
|
|
|
|
|
|
def get_set_bits(bytes):
|
|
bits = set()
|
|
j = 0
|
|
for byte in bytes:
|
|
for i in range(8):
|
|
if byte & 1:
|
|
bits.add(j + i)
|
|
byte >>= 1
|
|
j += 8
|
|
return bits
|
|
|
|
|
|
_abs_names = {
|
|
ABS_X: AbsoluteAxis.X,
|
|
ABS_Y: AbsoluteAxis.Y,
|
|
ABS_Z: AbsoluteAxis.Z,
|
|
ABS_RX: AbsoluteAxis.RX,
|
|
ABS_RY: AbsoluteAxis.RY,
|
|
ABS_RZ: AbsoluteAxis.RZ,
|
|
ABS_HAT0X: AbsoluteAxis.HAT_X,
|
|
ABS_HAT0Y: AbsoluteAxis.HAT_Y,
|
|
}
|
|
|
|
_rel_names = {
|
|
REL_X: RelativeAxis.X,
|
|
REL_Y: RelativeAxis.Y,
|
|
REL_Z: RelativeAxis.Z,
|
|
REL_RX: RelativeAxis.RX,
|
|
REL_RY: RelativeAxis.RY,
|
|
REL_RZ: RelativeAxis.RZ,
|
|
REL_WHEEL: RelativeAxis.WHEEL,
|
|
}
|
|
|
|
|
|
def _create_control(fileno, event_type, event_code):
|
|
if event_type == EV_ABS:
|
|
raw_name = abs_raw_names.get(event_code, 'EV_ABS(%x)' % event_code)
|
|
name = _abs_names.get(event_code)
|
|
absinfo = EVIOCGABS(fileno, event_code)
|
|
value = absinfo.value
|
|
minimum = absinfo.minimum
|
|
maximum = absinfo.maximum
|
|
control = AbsoluteAxis(name, minimum, maximum, raw_name)
|
|
control.value = value
|
|
|
|
if name == 'hat_y':
|
|
control.inverted = True
|
|
elif event_type == EV_REL:
|
|
raw_name = rel_raw_names.get(event_code, 'EV_REL(%x)' % event_code)
|
|
name = _rel_names.get(event_code)
|
|
# TODO min/max?
|
|
control = RelativeAxis(name, raw_name)
|
|
elif event_type == EV_KEY:
|
|
raw_name = key_raw_names.get(event_code, 'EV_KEY(%x)' % event_code)
|
|
name = None
|
|
control = Button(name, raw_name)
|
|
else:
|
|
value = minimum = maximum = 0 # TODO
|
|
return None
|
|
control._event_type = event_type
|
|
control._event_code = event_code
|
|
return control
|
|
|
|
|
|
event_types = {
|
|
EV_KEY: KEY_MAX,
|
|
EV_REL: REL_MAX,
|
|
EV_ABS: ABS_MAX,
|
|
EV_MSC: MSC_MAX,
|
|
EV_LED: LED_MAX,
|
|
EV_SND: SND_MAX,
|
|
}
|
|
|
|
|
|
class EvdevDevice(XlibSelectDevice, Device):
|
|
_fileno = None
|
|
|
|
def __init__(self, display, filename):
|
|
self._filename = filename
|
|
|
|
fileno = os.open(filename, os.O_RDONLY)
|
|
# event_version = EVIOCGVERSION(fileno).value
|
|
|
|
self._id = EVIOCGID(fileno)
|
|
self.id_bustype = self._id.bustype
|
|
self.id_vendor = hex(self._id.vendor)
|
|
self.id_product = hex(self._id.product)
|
|
self.id_version = self._id.version
|
|
|
|
name = EVIOCGNAME(fileno)
|
|
try:
|
|
name = name.decode('utf-8')
|
|
except UnicodeDecodeError:
|
|
try:
|
|
name = name.decode('latin-1')
|
|
except UnicodeDecodeError:
|
|
pass
|
|
|
|
try:
|
|
self.phys = EVIOCGPHYS(fileno)
|
|
except OSError:
|
|
self.phys = ''
|
|
try:
|
|
self.uniq = EVIOCGUNIQ(fileno)
|
|
except OSError:
|
|
self.uniq = ''
|
|
|
|
self.controls = []
|
|
self.control_map = {}
|
|
|
|
event_types_bits = (ctypes.c_byte * 4)()
|
|
EVIOCGBIT(fileno, 0, event_types_bits)
|
|
for event_type in get_set_bits(event_types_bits):
|
|
if event_type not in event_types:
|
|
continue
|
|
max_code = event_types[event_type]
|
|
nbytes = max_code // 8 + 1
|
|
event_codes_bits = (ctypes.c_byte * nbytes)()
|
|
EVIOCGBIT(fileno, event_type, event_codes_bits)
|
|
for event_code in get_set_bits(event_codes_bits):
|
|
control = _create_control(fileno, event_type, event_code)
|
|
if control:
|
|
self.control_map[(event_type, event_code)] = control
|
|
self.controls.append(control)
|
|
|
|
os.close(fileno)
|
|
|
|
super().__init__(display, name)
|
|
|
|
def get_guid(self):
|
|
"""Generate an SDL2 style GUID from the device ID"""
|
|
hex_bustype = format(self._id.bustype & 0xFF, '02x')
|
|
hex_vendor = format(self._id.vendor & 0xFF, '02x')
|
|
hex_product = format(self._id.product & 0xFF, '02x')
|
|
hex_version = format(self._id.version & 0xFF, '02x')
|
|
shifted_bustype = format(self._id.bustype >> 8, '02x')
|
|
shifted_vendor = format(self._id.vendor >> 8, '02x')
|
|
shifted_product = format(self._id.product >> 8, '02x')
|
|
shifted_version = format(self._id.version >> 8, '02x')
|
|
slug = "{:0>2}{:0>2}0000{:0>2}{:0>2}0000{:0>2}{:0>2}0000{:0>2}{:0>2}0000"
|
|
return slug.format(hex_bustype, shifted_bustype, hex_vendor, shifted_vendor,
|
|
hex_product, shifted_product, hex_version, shifted_version)
|
|
|
|
def open(self, window=None, exclusive=False):
|
|
super(EvdevDevice, self).open(window, exclusive)
|
|
|
|
try:
|
|
self._fileno = os.open(self._filename, os.O_RDWR | os.O_NONBLOCK)
|
|
except OSError as e:
|
|
raise DeviceOpenException(e)
|
|
|
|
pyglet.app.platform_event_loop.select_devices.add(self)
|
|
|
|
def close(self):
|
|
super(EvdevDevice, self).close()
|
|
|
|
if not self._fileno:
|
|
return
|
|
|
|
pyglet.app.platform_event_loop.select_devices.remove(self)
|
|
os.close(self._fileno)
|
|
self._fileno = None
|
|
|
|
def get_controls(self):
|
|
return self.controls
|
|
|
|
# Force Feedback methods
|
|
|
|
def supports_ff(self):
|
|
try:
|
|
self._fileno = os.open(self._filename, os.O_RDWR | os.O_NONBLOCK)
|
|
self.ff_create_effect(0, 0, 0)
|
|
os.close(self._fileno)
|
|
return True
|
|
except OSError:
|
|
os.close(self._fileno)
|
|
return False
|
|
|
|
def ff_create_effect(self, weak, strong, duration, effect=-1):
|
|
weak = int(max(min(1, weak), 0) * 0xFFFF) # Clamp range from 0-1, convert to 16bit
|
|
strong = int(max(min(1, strong), 0) * 0xFFFF) # Clamp range from 0-1, convert to 16bit
|
|
duration = int(duration * 1000)
|
|
effect = bytearray(struct.pack('HhHHHHHxHH', FF_RUMBLE, effect, 0, 0, 0, duration, 0, strong, weak))
|
|
view = memoryview(effect).cast('h')
|
|
|
|
fcntl.ioctl(self._fileno, 0x40304580, view, True)
|
|
return view[1] # effect ID
|
|
|
|
def ff_play(self, effect):
|
|
ev_play = struct.pack('LLHHi', 0, 0, EV_FF, effect, 1)
|
|
os.write(self._fileno, ev_play)
|
|
|
|
def ff_stop(self, effect):
|
|
ev_stop = struct.pack('LLHHi', 0, 0, EV_FF, effect, 0)
|
|
os.write(self._fileno, ev_stop)
|
|
|
|
# XlibSelectDevice interface
|
|
|
|
def fileno(self):
|
|
return self._fileno
|
|
|
|
def poll(self):
|
|
return True
|
|
|
|
def select(self):
|
|
if not self._fileno:
|
|
return
|
|
|
|
events = (input_event * 64)()
|
|
bytes_read = c.read(self._fileno, events, ctypes.sizeof(events))
|
|
if bytes_read < 0:
|
|
return
|
|
|
|
n_events = bytes_read // ctypes.sizeof(input_event)
|
|
for event in events[:n_events]:
|
|
try:
|
|
control = self.control_map[(event.type, event.code)]
|
|
control.value = event.value
|
|
except KeyError:
|
|
pass
|
|
|
|
|
|
class EvdevGameController(GameController):
|
|
|
|
_rumble_weak = -1
|
|
_rumble_strong = -1
|
|
|
|
def open(self, window=None, exclusive=False):
|
|
super().open(window, exclusive)
|
|
# Create Force Feedback effects when the device is opened:
|
|
self._rumble_weak = self.device.ff_create_effect(0, 0, 0)
|
|
self._rumble_strong = self.device.ff_create_effect(0, 0, 0)
|
|
|
|
def rumble_play_weak(self, strength=1.0, duration=0.5):
|
|
effect = self.device.ff_create_effect(strength, 0, duration, self._rumble_weak)
|
|
self.device.ff_play(effect)
|
|
|
|
def rumble_play_strong(self, strength=1.0, duration=0.5):
|
|
effect = self.device.ff_create_effect(0, strength, duration, self._rumble_strong)
|
|
self.device.ff_play(effect)
|
|
|
|
def rumble_stop_weak(self):
|
|
"""Stop playing rumble effects on the weak motor."""
|
|
self.device.ff_stop(self._rumble_weak)
|
|
|
|
def rumble_stop_strong(self):
|
|
"""Stop playing rumble effects on the strong motor."""
|
|
self.device.ff_stop(self._rumble_strong)
|
|
|
|
|
|
def get_devices(display=None):
|
|
_devices = {}
|
|
base = '/dev/input'
|
|
for filename in os.listdir(base):
|
|
if filename.startswith('event'):
|
|
path = os.path.join(base, filename)
|
|
if path in _devices:
|
|
continue
|
|
|
|
try:
|
|
_devices[path] = EvdevDevice(display, path)
|
|
except OSError:
|
|
pass
|
|
|
|
return list(_devices.values())
|
|
|
|
|
|
def _create_joystick(device):
|
|
# Look for something with an ABS X and ABS Y axis, and a joystick 0 button
|
|
have_x = False
|
|
have_y = False
|
|
have_button = False
|
|
for control in device.controls:
|
|
if control._event_type == EV_ABS and control._event_code == ABS_X:
|
|
have_x = True
|
|
elif control._event_type == EV_ABS and control._event_code == ABS_Y:
|
|
have_y = True
|
|
elif control._event_type == EV_KEY and control._event_code in (BTN_JOYSTICK, BTN_GAMEPAD):
|
|
have_button = True
|
|
if not (have_x and have_y and have_button):
|
|
return
|
|
|
|
return Joystick(device)
|
|
|
|
|
|
def get_joysticks(display=None):
|
|
return [joystick for joystick in
|
|
[_create_joystick(device) for device in get_devices(display)]
|
|
if joystick is not None]
|
|
|
|
|
|
def _create_game_controller(device):
|
|
# Look for something with an ABS X and ABS Y axis, and a joystick 0 button
|
|
have_x = False
|
|
have_y = False
|
|
have_button = False
|
|
if not is_game_controller(device):
|
|
return
|
|
device.controls.sort(key=lambda ctrl: ctrl._event_code)
|
|
for control in device.controls:
|
|
if control._event_type == EV_ABS and control._event_code == ABS_X:
|
|
have_x = True
|
|
elif control._event_type == EV_ABS and control._event_code == ABS_Y:
|
|
have_y = True
|
|
elif control._event_type == EV_KEY and control._event_code in (BTN_JOYSTICK, BTN_GAMEPAD):
|
|
have_button = True
|
|
if not (have_x and have_y and have_button):
|
|
return
|
|
|
|
if device.supports_ff():
|
|
return EvdevGameController(device)
|
|
else:
|
|
return GameController(device)
|
|
|
|
|
|
def get_game_controllers(display=None):
|
|
return [controller for controller in
|
|
[_create_game_controller(device) for device in get_devices(display)]
|
|
if controller is not None]
|