plover-machine-hid/plover_machine_hid.py

171 lines
5.1 KiB
Python

'''
A plover machine plugin for supporting the RKB1 HID protcol
This protocol is a simple HID-based protocol that sends the current state
of the steno machine every time that state changes.
See the README for more details on the protocol.
The order of the buttons (from left to right) is the same as in `KEYS_LAYOUT`.
Most buttons have the same names as in GeminiPR, except for the extra buttons
which are called X1-X26.
'''
from plover.engine import StenoEngine
from plover.machine.base import ThreadedStenotypeBase
from plover import log
from bitstring import BitArray
import hid
import platform
import threading
import time
from typing import Optional
# This is a hack to not open the hid device in exclusive mode on
# darwin, if the version of hidapi installed is current enough
if platform.system() == "Darwin":
import ctypes
try:
hid.hidapi.hid_darwin_set_open_exclusive.argtypes = (ctypes.c_int, )
hid.hidapi.hid_darwin_set_open_exclusive.restype = None
hid.hidapi.hid_darwin_set_open_exclusive(0)
except AttributeError:
log.error("hidapi < 0.12 in use, plover-hid will not work correctly")
USAGE_PAGE: int = 0xFF60
USAGE: int = 0x61
N_LEVERS: int = 80
# A simple report contains the report id 1 and one bit
# for each of the 64 buttons in the report.
SIMPLE_REPORT_TYPE: int = 0x62
SIMPLE_REPORT_LEN: int = 32
class InvalidReport(Exception):
pass
STENO_KEY_CHART = tuple(f"X{i}" for i in range(1, N_LEVERS + 1))
print('steno key chart', len(STENO_KEY_CHART))
class UnicodeSender:
def __init__(self, engine: StenoEngine):
self._engine = engine
def start(self):
global unicode_sender
unicode_sender = self
def stop(self):
global unicode_sender
unicode_sender = None
def send_string(self, string: str):
self._engine._keyboard_emulation.send_string(string)
unicode_sender: Optional[UnicodeSender] = None
class HidMachine(ThreadedStenotypeBase):
KEYS_LAYOUT: str = '''
X1 X2 X3 X4 X5 X6 X7 X8 X9 X10
X11 X12 X13 X14 X15 X16 X17 X18 X19 X20
X21 X22 X23 X24 X25 X26 X27 X28 X29 X30
X31 X32 X33 X34 X35 X36 X37 X38 X39 X40
X41 X42 X43 X44 X45 X46 X47 X48 X49 X50
X51 X52 X53 X54 X55 X56 X57 X58 X59 X60
X61 X62 X63 X64 X65 X66 X67 X68 X69 X70
X71 X72 X73 X74 X75 X76 X77 X78 X79 X80
'''
def __init__(self, params):
super().__init__()
self._params = params
self._hid = None
def _parse(self, report: bytes) -> Optional[BitArray]:
if len(report) != SIMPLE_REPORT_LEN:
raise InvalidReport()
if report[:3] == b"STN":
return BitArray(bytes=report[3:13])
elif report[:3] == b"UNI":
try:
s = report[3:].decode("UTF-8").strip('\0')
if unicode_sender is not None:
unicode_sender.send_string(s)
except UnicodeDecodeError:
raise InvalidReport()
else:
raise InvalidReport()
def _ping_thread(self):
while not self.finished.wait(0):
self._hid.write(b"STN")
time.sleep(10)
def run(self):
self._ready()
keystate = BitArray(N_LEVERS)
thread = threading.Thread(target=self._ping_thread)
thread.start()
while not self.finished.wait(0):
try:
report = self._hid.read(65536, timeout=1000)
except hid.HIDException:
self._error()
return
if not report:
continue
try:
report = self._parse(report)
if report is None:
continue
except InvalidReport:
continue
keystate |= report
if not report:
steno_actions = self.keymap.keys_to_actions(
[STENO_KEY_CHART[i] for (i, x) in enumerate(keystate) if x]
)
if steno_actions:
self._notify(steno_actions)
keystate = BitArray(N_LEVERS)
def start_capture(self):
self.finished.clear()
self._initializing()
# Enumerate all hid devices on the machine and if we find one with our
# usage page and usage we try to connect to it.
try:
devices = [
device["path"]
for device in hid.enumerate()
if device["usage_page"] == USAGE_PAGE
and device["usage"] == USAGE
]
if not devices:
self._error()
return
# FIXME: if multiple compatible devices are found we should either
# let the end user configure which one they want, or support
# reading from all connected plover hid devices at the same time.
self._hid = hid.Device(path=devices[0])
except hid.HIDException:
self._error()
return
self.start()
def stop_capture(self):
super().stop_capture()
if self._hid:
self._hid.close()
self._hid = None
@classmethod
def get_option_info(cls):
return {}