import ctypes import warnings from typing import List, Dict, Optional from pyglet.libs.win32.constants import WM_DEVICECHANGE, DBT_DEVICEARRIVAL, DBT_DEVICEREMOVECOMPLETE, \ DBT_DEVTYP_DEVICEINTERFACE, DEVICE_NOTIFY_WINDOW_HANDLE from pyglet.event import EventDispatcher import pyglet from pyglet.input import base from pyglet.libs import win32 from pyglet.libs.win32 import dinput, _user32, DEV_BROADCAST_DEVICEINTERFACE, com, DEV_BROADCAST_HDR from pyglet.libs.win32 import _kernel32 from pyglet.input.controller import get_mapping from pyglet.input.base import ControllerManager from pyglet.libs.win32.dinput import DIPROPHEADER # These instance names are not defined anywhere, obtained by experiment. The # GUID names (which seem to be ideally what are needed) are wrong/missing for # most of my devices. _abs_instance_names = { 0: 'x', 1: 'y', 2: 'z', 3: 'rx', 4: 'ry', 5: 'rz', } _rel_instance_names = { 0: 'x', 1: 'y', 2: 'wheel', } _btn_instance_names = {} def _create_control(object_instance): raw_name = object_instance.tszName ctrl_type = object_instance.dwType instance = dinput.DIDFT_GETINSTANCE(ctrl_type) if ctrl_type & dinput.DIDFT_ABSAXIS: name = _abs_instance_names.get(instance) control = base.AbsoluteAxis(name, 0, 0xffff, raw_name) elif ctrl_type & dinput.DIDFT_RELAXIS: name = _rel_instance_names.get(instance) control = base.RelativeAxis(name, raw_name) elif ctrl_type & dinput.DIDFT_BUTTON: name = _btn_instance_names.get(instance) control = base.Button(name, raw_name) elif ctrl_type & dinput.DIDFT_POV: control = base.AbsoluteAxis(base.AbsoluteAxis.HAT, 0, 0xffffffff, raw_name) else: return control._type = object_instance.dwType return control class DirectInputDevice(base.Device): def __init__(self, display, device, device_instance): name = device_instance.tszInstanceName super(DirectInputDevice, self).__init__(display, name) self._type = device_instance.dwDevType & 0xff self._subtype = device_instance.dwDevType & 0xff00 self._device = device self._init_controls() self._set_format() self.id_name = device_instance.tszProductName self.id_product_guid = format(device_instance.guidProduct.Data1, "08x") def __del__(self): self._device.Release() def get_guid(self): """Generate an SDL2 style GUID from the product guid.""" first = self.id_product_guid[6:8] + self.id_product_guid[4:6] second = self.id_product_guid[2:4] + self.id_product_guid[0:2] return f"03000000{first}0000{second}000000000000" def _init_controls(self): self.controls = [] self._device.EnumObjects(dinput.LPDIENUMDEVICEOBJECTSCALLBACK(self._object_enum), None, dinput.DIDFT_ALL) def _object_enum(self, object_instance, arg): control = _create_control(object_instance.contents) if control: self.controls.append(control) return dinput.DIENUM_CONTINUE def _set_format(self): if not self.controls: return object_formats = (dinput.DIOBJECTDATAFORMAT * len(self.controls))() offset = 0 for object_format, control in zip(object_formats, self.controls): object_format.dwOfs = offset object_format.dwType = control._type offset += 4 fmt = dinput.DIDATAFORMAT() fmt.dwSize = ctypes.sizeof(fmt) fmt.dwObjSize = ctypes.sizeof(dinput.DIOBJECTDATAFORMAT) fmt.dwFlags = 0 fmt.dwDataSize = offset fmt.dwNumObjs = len(object_formats) fmt.rgodf = ctypes.cast(ctypes.pointer(object_formats), dinput.LPDIOBJECTDATAFORMAT) self._device.SetDataFormat(fmt) prop = dinput.DIPROPDWORD() prop.diph.dwSize = ctypes.sizeof(prop) prop.diph.dwHeaderSize = ctypes.sizeof(prop.diph) prop.diph.dwObj = 0 prop.diph.dwHow = dinput.DIPH_DEVICE prop.dwData = 64 * ctypes.sizeof(dinput.DIDATAFORMAT) self._device.SetProperty(dinput.DIPROP_BUFFERSIZE, ctypes.byref(prop.diph)) def open(self, window=None, exclusive=False): if not self.controls: return if window is None: # Pick any open window, or the shadow window if no windows # have been created yet. window = pyglet.gl._shadow_window for window in pyglet.app.windows: break flags = dinput.DISCL_BACKGROUND if exclusive: flags |= dinput.DISCL_EXCLUSIVE else: flags |= dinput.DISCL_NONEXCLUSIVE self._wait_object = _kernel32.CreateEventW(None, False, False, None) self._device.SetEventNotification(self._wait_object) pyglet.app.platform_event_loop.add_wait_object(self._wait_object, self._dispatch_events) self._device.SetCooperativeLevel(window._hwnd, flags) self._device.Acquire() def close(self): if not self.controls: return pyglet.app.platform_event_loop.remove_wait_object(self._wait_object) self._device.Unacquire() self._device.SetEventNotification(None) _kernel32.CloseHandle(self._wait_object) def get_controls(self): return self.controls def _dispatch_events(self): if not self.controls: return events = (dinput.DIDEVICEOBJECTDATA * 64)() n_events = win32.DWORD(len(events)) try: self._device.GetDeviceData(ctypes.sizeof(dinput.DIDEVICEOBJECTDATA), ctypes.cast(ctypes.pointer(events), dinput.LPDIDEVICEOBJECTDATA), ctypes.byref(n_events), 0) except OSError: return for event in events[:n_events.value]: index = event.dwOfs // 4 self.controls[index].value = event.dwData def matches(self, guid_id, device_instance): if (self.id_product_guid == guid_id and self.id_name == device_instance.contents.tszProductName and self._type == device_instance.contents.dwDevType & 0xff and self._subtype == device_instance.contents.dwDevType & 0xff00): return True return False def _init_directinput(): _i_dinput = dinput.IDirectInput8() module_handle = _kernel32.GetModuleHandleW(None) dinput.DirectInput8Create(module_handle, dinput.DIRECTINPUT_VERSION, dinput.IID_IDirectInput8W, ctypes.byref(_i_dinput), None) return _i_dinput _i_dinput = _init_directinput() GUID_DEVINTERFACE_HID = com.GUID(0x4D1E55B2, 0xF16F, 0x11CF, 0x88, 0xCB, 0x00, 0x11, 0x11, 0x00, 0x00, 0x30) class DIManager(EventDispatcher): def __init__(self): # Pick any open window, or the shadow window if no windows have been created yet. window = pyglet.gl._shadow_window for window in pyglet.app.windows: break self.window = window dbi = DEV_BROADCAST_DEVICEINTERFACE() dbi.dbcc_size = ctypes.sizeof(dbi) dbi.dbcc_devicetype = DBT_DEVTYP_DEVICEINTERFACE dbi.dbcc_classguid = GUID_DEVINTERFACE_HID # Register we look for HID device unplug/plug. _user32.RegisterDeviceNotificationW(window._hwnd, ctypes.byref(dbi), DEVICE_NOTIFY_WINDOW_HANDLE) window._event_handlers[WM_DEVICECHANGE] = self._event_devicechange # All devices. self.devices: List[DirectInputDevice] = [] new_devices, _ = self._get_devices() self.devices.extend(new_devices) def __del__(self): del self.window._event_handlers[WM_DEVICECHANGE] # Remove handler. def _get_devices(self, display=None): """Enumerate all the devices on the system. Returns two values: new devices, missing devices""" _missing_devices = list(self.devices) _new_devices = [] _xinput_devices = [] if not pyglet.options["win32_disable_xinput"]: try: from pyglet.input.win32.xinput import get_xinput_guids _xinput_devices = get_xinput_guids() except ImportError: pass def _device_enum(device_instance, arg): # DIDEVICEINSTANCE guid_id = format(device_instance.contents.guidProduct.Data1, "08x") # Only XInput should handle XInput compatible devices if enabled. Filter them out. if guid_id in _xinput_devices: return dinput.DIENUM_CONTINUE # Check if device already exists. for dev in list(_missing_devices): if dev.matches(guid_id, device_instance): _missing_devices.remove(dev) return dinput.DIENUM_CONTINUE device = dinput.IDirectInputDevice8() _i_dinput.CreateDevice(device_instance.contents.guidInstance, ctypes.byref(device), None) di_dev = DirectInputDevice(display, device, device_instance.contents) _new_devices.append(di_dev) return dinput.DIENUM_CONTINUE _i_dinput.EnumDevices(dinput.DI8DEVCLASS_ALL, dinput.LPDIENUMDEVICESCALLBACK(_device_enum), None, dinput.DIEDFL_ATTACHEDONLY) return _new_devices, _missing_devices def _recheck_devices(self): new_devices, missing_devices = self._get_devices() if new_devices: self.devices.extend(new_devices) for device in new_devices: self.dispatch_event('on_connect', device) if missing_devices: for device in missing_devices: self.devices.remove(device) self.dispatch_event('on_disconnect', device) def _event_devicechange(self, msg, wParam, lParam): if lParam == 0: return if wParam == DBT_DEVICEARRIVAL or wParam == DBT_DEVICEREMOVECOMPLETE: hdr_ptr = ctypes.cast(lParam, ctypes.POINTER(DEV_BROADCAST_HDR)) if hdr_ptr.contents.dbch_devicetype == DBT_DEVTYP_DEVICEINTERFACE: self._recheck_devices() DIManager.register_event_type('on_connect') DIManager.register_event_type('on_disconnect') _di_manager = DIManager() class DIControllerManager(ControllerManager): def __init__(self, display=None): self._display = display self._controllers: Dict[DirectInputDevice, base.Controller] = {} for device in _di_manager.devices: self._add_controller(device) @_di_manager.event def on_connect(di_device): if di_device not in self._controllers: if self._add_controller(di_device): pyglet.app.platform_event_loop.post_event(self, 'on_connect', self._controllers[di_device]) @_di_manager.event def on_disconnect(di_device): if di_device in self._controllers: _controller = self._controllers[di_device] del self._controllers[di_device] pyglet.app.platform_event_loop.post_event(self, 'on_disconnect', _controller) def _add_controller(self, device: DirectInputDevice) -> Optional[base.Controller]: controller = _create_controller(device) if controller: self._controllers[device] = controller return controller return None def get_controllers(self): return list(self._controllers.values()) def get_devices(display=None): _init_directinput() _devices = [] _xinput_devices = [] if not pyglet.options["win32_disable_xinput"]: try: from pyglet.input.win32.xinput import get_xinput_guids _xinput_devices = get_xinput_guids() except ImportError: pass def _device_enum(device_instance, arg): guid_id = format(device_instance.contents.guidProduct.Data1, "08x") # Only XInput should handle DirectInput devices if enabled. Filter them out. if guid_id in _xinput_devices: # Log somewhere? return dinput.DIENUM_CONTINUE device = dinput.IDirectInputDevice8() _i_dinput.CreateDevice(device_instance.contents.guidInstance, ctypes.byref(device), None) _devices.append(DirectInputDevice(display, device, device_instance.contents)) return dinput.DIENUM_CONTINUE _i_dinput.EnumDevices(dinput.DI8DEVCLASS_ALL, dinput.LPDIENUMDEVICESCALLBACK(_device_enum), None, dinput.DIEDFL_ATTACHEDONLY) return _devices def _create_controller(device): mapping = get_mapping(device.get_guid()) if device._type in (dinput.DI8DEVTYPE_JOYSTICK, dinput.DI8DEVTYPE_1STPERSON, dinput.DI8DEVTYPE_GAMEPAD): if mapping is not None: return base.Controller(device, mapping) else: warnings.warn(f"Warning: {device} (GUID: {device.get_guid()}) " f"has no controller mappings. Update the mappings in the Controller DB.") def _create_joystick(device): if device._type in (dinput.DI8DEVTYPE_JOYSTICK, dinput.DI8DEVTYPE_1STPERSON, dinput.DI8DEVTYPE_GAMEPAD, dinput.DI8DEVTYPE_SUPPLEMENTAL): return base.Joystick(device) def get_joysticks(display=None): return [joystick for joystick in [_create_joystick(device) for device in _di_manager.devices] if joystick is not None] def get_controllers(display=None): return [controller for controller in [_create_controller(device) for device in _di_manager.devices] if controller is not None]