Split client out

This commit is contained in:
zvecr 2022-07-06 17:10:04 +01:00
parent bf6f88182a
commit 8b133897dc
2 changed files with 216 additions and 204 deletions

View file

@ -1,197 +1,17 @@
"""Interactions with compatible XAP devices
"""
import cmd
import json
import random
import gzip
import threading
import functools
from enum import IntFlag
from platform import platform
from milc import cli
from qmk.keyboard import render_layout
from qmk.xap.common import get_xap_keycodes
from .xap_client import XAPClient, XAPEventType, XAPSecureStatus
KEYCODE_MAP = get_xap_keycodes('latest')
def _u32toBCD(val): # noqa: N802
"""Create BCD string
"""
return f'{val>>24}.{val>>16 & 0xFF}.{val & 0xFFFF}'
class XAPFlags(IntFlag):
SUCCESS = 0x01
class XAPDevice:
def __init__(self, dev):
"""Constructor opens hid device and starts dependent services
"""
self.responses = {}
self.dev = hid.Device(path=dev['path'])
self.bg = threading.Thread(target=self._read_loop, daemon=True)
self.bg.start()
def _read_loop(self):
"""Background thread to signal waiting transactions
"""
while 1:
array_alpha = self.dev.read(64, 100)
if array_alpha:
token = str(array_alpha[:2])
event = self.responses.get(token)
if event:
event._ret = array_alpha
event.set()
def _query_device_info(self):
datalen = int.from_bytes(self.transaction(0x01, 0x05) or bytes(0), "little")
if not datalen:
return {}
data = []
offset = 0
while offset < datalen:
chunk = self.transaction(0x01, 0x06, offset)
data += chunk
offset += len(chunk)
str_data = gzip.decompress(bytearray(data[:datalen]))
return json.loads(str_data)
def listen(self):
"""Receive a "broadcast" message
"""
token = b"\xFF\xFF"
event = threading.Event()
self.responses[str(token)] = event
event.wait()
return event._ret
def transaction(self, sub, route, *args):
"""Request/Receive
"""
# token cannot start with zero or be FFFF
token = random.randrange(0x0100, 0xFFFE).to_bytes(2, byteorder='big')
# send with padding
# TODO: this code is total garbage
args_data = []
args_len = 2
if len(args) == 1:
if isinstance(args[0], (bytes, bytearray)):
args_len += len(args[0])
args_data = args[0]
else:
args_len += 2
args_data = args[0].to_bytes(2, byteorder='little')
padding_len = 64 - 3 - args_len
padding = b"\x00" * padding_len
if args_data:
padding = args_data + padding
buffer = token + args_len.to_bytes(1, byteorder='little') + sub.to_bytes(1, byteorder='little') + route.to_bytes(1, byteorder='little') + padding
# prepend 0 on windows because reasons...
if 'windows' in platform().lower():
buffer = b"\x00" + buffer
event = threading.Event()
self.responses[str(token)] = event
self.dev.write(buffer)
event.wait(timeout=1)
self.responses.pop(str(token), None)
if not hasattr(event, '_ret'):
return None
array_alpha = event._ret
if int(array_alpha[2]) != XAPFlags.SUCCESS:
return None
payload_len = int(array_alpha[3])
return array_alpha[4:4 + payload_len]
@functools.cache
def version(self):
ver = int.from_bytes(self.transaction(0x00, 0x00) or bytes(0), 'little')
return {'xap': _u32toBCD(ver)}
@functools.cache
def info(self):
data = self._query_device_info()
data['_id'] = self.transaction(0x01, 0x08)
data['xap'] = self.version()['xap']
return data
def unlock(self):
self.transaction(0x00, 0x04)
class XAPClient:
@staticmethod
def _lazy_imports():
# Lazy load to avoid missing dependency issues
global hid
import hid
@staticmethod
def list(search=None):
"""Find compatible XAP devices
"""
XAPClient._lazy_imports()
def _is_xap_usage(x):
return x['usage_page'] == 0xFF51 and x['usage'] == 0x0058
def _is_filtered_device(x):
name = "%04x:%04x" % (x['vendor_id'], x['product_id'])
return name.lower().startswith(search.lower())
devices = filter(_is_xap_usage, hid.enumerate())
if search:
devices = filter(_is_filtered_device, devices)
return list(devices)
def connect(self, dev):
"""Connect to a given XAP device
"""
XAPClient._lazy_imports()
return XAPDevice(dev)
# def _query_device_secure(device):
# secure = int.from_bytes(_xap_transaction(device, 0x00, 0x03), 'little')
# secure = 'unlocked' if secure == 2 else 'LOCKED'
# return {'secure': secure}
#
# def xap_dummy(device):
# # get layer count
# layers = _xap_transaction(device, 0x04, 0x02)
# layers = int.from_bytes(layers, "little")
# print(f'layers:{layers}')
# # get keycode [layer:0, row:0, col:0]
# # keycode = _xap_transaction(device, 0x04, 0x03, b"\x00\x00\x00")
# # get encoder [layer:0, index:0, clockwise:0]
# keycode = _xap_transaction(device, 0x04, 0x04, b"\x00\x00\x00")
# keycode = int.from_bytes(keycode, "little")
# print(f'keycode:{KEYCODE_MAP.get(keycode, "unknown")}[{keycode}]')
# # set encoder [layer:0, index:0, clockwise:0, keycode:KC_A]
# _xap_transaction(device, 0x05, 0x04, b"\x00\x00\x00\x04\00")
def print_dotted_output(kb_info_json, prefix=''):
"""Print the info.json in a plain text format with dot-joined keys.
"""
@ -227,10 +47,10 @@ def _list_devices():
device = XAPClient().connect(dev)
data = device.info()
cli.log.info(" %04x:%04x %s %s [API:%s]", dev['vendor_id'], dev['product_id'], dev['manufacturer_string'], dev['product_string'], data['xap'])
cli.log.info(' %04x:%04x %s %s [API:%s]', dev['vendor_id'], dev['product_id'], dev['manufacturer_string'], dev['product_string'], data['xap'])
if cli.config.general.verbose:
# TODO: better formatting like "lsusb -v"?
# TODO: better formatting like 'lsusb -v'?
print_dotted_output(data)
@ -254,32 +74,38 @@ class XAPShell(cmd.Cmd):
"""Initiate secure unlock
"""
self.device.unlock()
print("Done")
print('Unlock Requested...')
def do_listen(self, arg):
"""Log out XAP broadcast messages
"""
try:
cli.log.info("Listening for XAP broadcasts...")
cli.log.info('Listening for XAP broadcasts...')
while 1:
array_alpha = self.device.listen()
if array_alpha[2] == 1:
cli.log.info(" Broadcast: Secure[%02x]", array_alpha[4])
(event, data) = self.device.listen()
if event == XAPEventType.SECURE:
secure_status = XAPSecureStatus(data[0]).name
cli.log.info(' Secure[%s]', secure_status)
else:
cli.log.info(" Broadcast: type[%02x] data:[%02x]", array_alpha[2], array_alpha[4])
data_str = ' '.join(['{:02X}'.format(b) for b in data])
cli.log.info(' Broadcast: type[%02x] data:[%s]', event, data_str)
except KeyboardInterrupt:
cli.log.info("Stopping...")
cli.log.info('Stopping...')
def do_keycode(self, arg):
"""Prints out the keycode value of a certain layer, row, and column
"""
data = bytes(map(int, arg.split()))
if len(data) != 3:
cli.log.error("Invalid args")
cli.log.error('Invalid args')
return
keycode = self.device.transaction(0x04, 0x03, data)
keycode = int.from_bytes(keycode, "little")
keycode = self.device.transaction(b'\x04\x03', data)
keycode = int.from_bytes(keycode, 'little')
print(f'keycode:{self.keycodes.get(keycode, "unknown")}[{keycode}]')
def do_keymap(self, arg):
@ -287,7 +113,7 @@ class XAPShell(cmd.Cmd):
"""
data = bytes(map(int, arg.split()))
if len(data) != 1:
cli.log.error("Invalid args")
cli.log.error('Invalid args')
return
info = self.device.info()
@ -297,8 +123,8 @@ class XAPShell(cmd.Cmd):
for r in range(rows):
for c in range(cols):
q = data + r.to_bytes(1, byteorder='little') + c.to_bytes(1, byteorder='little')
keycode = self.device.transaction(0x04, 0x03, q)
keycode = int.from_bytes(keycode, "little")
keycode = self.device.transaction(b'\x04\x03', q)
keycode = int.from_bytes(keycode, 'little')
print(f'| {self.keycodes.get(keycode, "unknown").ljust(7)} ', end='', flush=True)
print('|')
@ -307,7 +133,7 @@ class XAPShell(cmd.Cmd):
"""
data = bytes(map(int, arg.split()))
if len(data) != 1:
cli.log.error("Invalid args")
cli.log.error('Invalid args')
return
info = self.device.info()
@ -319,9 +145,9 @@ class XAPShell(cmd.Cmd):
keycodes = []
for item in layout:
q = data + bytes(item['matrix'])
keycode = self.device.transaction(0x04, 0x03, q)
keycode = int.from_bytes(keycode, "little")
keycodes.append(self.keycodes.get(keycode, "???"))
keycode = self.device.transaction(b'\x04\x03', q)
keycode = int.from_bytes(keycode, 'little')
keycodes.append(self.keycodes.get(keycode, '???'))
print(render_layout(layout, False, keycodes))
@ -360,11 +186,11 @@ def xap(cli):
# Connect to first available device
devices = XAPClient.list()
if not devices:
cli.log.error("No devices found!")
cli.log.error('No devices found!')
return False
dev = devices[0]
cli.log.info("Connecting to:%04x:%04x %s %s", dev['vendor_id'], dev['product_id'], dev['manufacturer_string'], dev['product_string'])
cli.log.info('Connecting to:%04x:%04x %s %s', dev['vendor_id'], dev['product_id'], dev['manufacturer_string'], dev['product_string'])
device = XAPClient().connect(dev)
# shell?
@ -372,4 +198,4 @@ def xap(cli):
XAPShell(device).loop()
return True
XAPShell(device).onecmd(" ".join(cli.args.action))
XAPShell(device).onecmd(' '.join(cli.args.action))

View file

@ -0,0 +1,186 @@
"""Dummy XAP Client
"""
import json
import random
import gzip
import threading
import functools
from struct import Struct, pack, unpack
from collections import namedtuple
from enum import IntFlag, IntEnum
from platform import platform
RequestPacket = namedtuple('RequestPacket', 'token length data')
RequestStruct = Struct('<HB61s')
ResponsePacket = namedtuple('ResponsePacket', 'token flags length data')
ResponseStruct = Struct('<HBB60s')
def _gen_token():
"""Generate XAP token - cannot start with 00xx or be FFFF
"""
token = random.randrange(0x0100, 0xFFFE)
# swap endianness
return unpack('<H', pack('>H', token))[0]
def _u32toBCD(val): # noqa: N802
"""Create BCD string
"""
return f'{val>>24}.{val>>16 & 0xFF}.{val & 0xFFFF}'
class XAPSecureStatus(IntEnum):
LOCKED = 0x00
UNLOCKING = 0x01
UNLOCKED = 0x02
class XAPFlags(IntFlag):
FAILURE = 0
SUCCESS = 1 << 0
SECURE_FAILURE = 1 << 1
UNLOCK_IN_PROGRESS = 1 << 6
UNLOCKED = 1 << 7
class XAPEventType(IntEnum):
SECURE = 0x01
KEYBOARD = 0x02
USER = 0x03
class XAPDevice:
def __init__(self, dev):
"""Constructor opens hid device and starts dependent services
"""
self.responses = {}
self.dev = hid.Device(path=dev['path'])
self.bg = threading.Thread(target=self._read_loop, daemon=True)
self.bg.start()
def _read_loop(self):
"""Background thread to signal waiting transactions
"""
while 1:
array_alpha = self.dev.read(64, 100)
if array_alpha:
token = int.from_bytes(array_alpha[:2], 'little')
event = self.responses.get(token)
if event:
event._ret = array_alpha
event.set()
def _query_device_info(self):
datalen = int.from_bytes(self.transaction(b'\x01\x05') or bytes(0), 'little')
if not datalen:
return {}
data = []
offset = 0
while offset < datalen:
chunk = self.transaction(b'\x01\x06', offset)
data += chunk
offset += len(chunk)
str_data = gzip.decompress(bytearray(data[:datalen]))
return json.loads(str_data)
def listen(self):
"""Receive a 'broadcast' message
"""
token = 0xFFFF
event = threading.Event()
self.responses[token] = event
event.wait()
r = ResponsePacket._make(ResponseStruct.unpack(event._ret))
return (r.flags, r.data[:r.length])
def transaction(self, *args):
"""Request/Receive
"""
# convert args to array of bytes
data = bytes()
for arg in args:
if isinstance(arg, (bytes, bytearray)):
data += arg
if isinstance(arg, int): # TODO: remove terrible assumption of u16
data += arg.to_bytes(2, byteorder='little')
token = _gen_token()
p = RequestPacket(token, len(data), data)
buffer = RequestStruct.pack(*list(p))
event = threading.Event()
self.responses[token] = event
# prepend 0 on windows because reasons...
if 'windows' in platform().lower():
buffer = b'\x00' + buffer
self.dev.write(buffer)
event.wait(timeout=1)
self.responses.pop(token, None)
if not hasattr(event, '_ret'):
return None
r = ResponsePacket._make(ResponseStruct.unpack(event._ret))
if r.flags != XAPFlags.SUCCESS:
return None
return r.data[:r.length]
@functools.cache
def version(self):
ver = int.from_bytes(self.transaction(b'\x00\x00') or bytes(0), 'little')
return {'xap': _u32toBCD(ver)}
@functools.cache
def info(self):
data = self._query_device_info()
data['_id'] = self.transaction(b'\x01\x08')
data['xap'] = self.version()['xap']
return data
def unlock(self):
self.transaction(b'\x00\x04')
class XAPClient:
@staticmethod
def _lazy_imports():
# Lazy load to avoid missing dependency issues
global hid
import hid
@staticmethod
def list(search=None):
"""Find compatible XAP devices
"""
XAPClient._lazy_imports()
def _is_xap_usage(x):
return x['usage_page'] == 0xFF51 and x['usage'] == 0x0058
def _is_filtered_device(x):
name = '%04x:%04x' % (x['vendor_id'], x['product_id'])
return name.lower().startswith(search.lower())
devices = filter(_is_xap_usage, hid.enumerate())
if search:
devices = filter(_is_filtered_device, devices)
return list(devices)
def connect(self, dev):
"""Connect to a given XAP device
"""
XAPClient._lazy_imports()
return XAPDevice(dev)