Files
Esp32-Bulk-Keyboard/Receiver/code.py
2025-11-13 21:51:57 +00:00

560 lines
23 KiB
Python

import wifi
import socketpool
import os
import time
import errno
import supervisor
# --- HID Imports ---
import usb_hid
from adafruit_hid.keyboard import Keyboard
from adafruit_hid.keyboard_layout_us import KeyboardLayoutUS
from adafruit_hid.keycode import Keycode
from adafruit_hid.mouse import Mouse
from adafruit_hid.consumer_control import ConsumerControl
from adafruit_hid.consumer_control_code import ConsumerControlCode
# --- Configuration ---
CONTROLLER_IP = "192.168.4.1" # Corrected IP
CONTROLLER_PORT = 5000
CLIENT_NAME = "ESP32-S2-Keyboard-Mouse"
RECONNECT_DELAY = 5
# --- Global Settings ---
global_poll_delay_sec = 0.14
auto_enter_enabled = False
# --- Command Queue & Flow Control ---
command_queue = [] # (Fast) From network, holds raw strings
hid_action_queue = [] # (Slow) To USB, holds atomic HID actions
receiver_is_busy = False
BUSY_THRESHOLD = 50
RESUME_THRESHOLD = 10
# --- HID State Machine ---
g_last_hid_action_time = 0
g_hid_wait_delay = 0
g_last_keys_pressed = set() # NEW: Tracks the "physical" keyboard state
# --- Keycode Mapping (Unchanged) ---
KEY_MAP = {
# Special Keys
"ENTER": Keycode.ENTER, "TAB": Keycode.TAB, "ESC": Keycode.ESCAPE,
"DEL": Keycode.DELETE, "BACKSPACE": Keycode.BACKSPACE, "INSERT": Keycode.INSERT,
"PRTSCR": Keycode.PRINT_SCREEN, "UP": Keycode.UP_ARROW, "DOWN": Keycode.DOWN_ARROW,
"LEFT": Keycode.LEFT_ARROW, "RIGHT": Keycode.RIGHT_ARROW,
"CAPS_LOCK": Keycode.CAPS_LOCK, "SCROLL_LOCK": Keycode.SCROLL_LOCK,
"NUM_LOCK": Keycode.KEYPAD_NUMLOCK, "F1": Keycode.F1, "F2": Keycode.F2,
"F3": Keycode.F3, "F4": Keycode.F4, "F5": Keycode.F5, "F6": Keycode.F6,
"F7": Keycode.F7, "F8": Keycode.F8, "F9": Keycode.F9, "F10": Keycode.F10,
"F11": Keycode.F11, "F12": Keycode.F12, "WIN": Keycode.GUI,
# Modifiers
"L_CTRL": Keycode.LEFT_CONTROL, "L_SHIFT": Keycode.LEFT_SHIFT,
"L_ALT": Keycode.LEFT_ALT, "L_WIN": Keycode.LEFT_GUI,
"R_CTRL": Keycode.RIGHT_CONTROL, "R_SHIFT": Keycode.RIGHT_SHIFT,
"R_ALT": Keycode.RIGHT_ALT, "R_WIN": Keycode.RIGHT_GUI,
# Aliases
"CTRL": Keycode.LEFT_CONTROL, "SHIFT": Keycode.LEFT_SHIFT, "ALT": Keycode.LEFT_ALT,
# Alphanumeric Keys
"A": Keycode.A, "B": Keycode.B, "C": Keycode.C, "D": Keycode.D, "E": Keycode.E,
"F": Keycode.F, "G": Keycode.G, "H": Keycode.H, "I": Keycode.I, "J": Keycode.J,
"K": Keycode.K, "L": Keycode.L, "M": Keycode.M, "N": Keycode.N, "O": Keycode.O,
"P": Keycode.P, "Q": Keycode.Q, "R": Keycode.R, "S": Keycode.S, "T": Keycode.T,
"U": Keycode.U, "V": Keycode.V, "W": Keycode.W, "X": Keycode.X, "Y": Keycode.Y,
"Z": Keycode.Z,
"1": Keycode.ONE, "2": Keycode.TWO, "3": Keycode.THREE, "4": Keycode.FOUR,
"5": Keycode.FIVE, "6": Keycode.SIX, "7": Keycode.SEVEN, "8": Keycode.EIGHT,
"9": Keycode.NINE, "0": Keycode.ZERO,
}
MOUSE_MAP = {
"L_CLICK": Mouse.LEFT_BUTTON, "R_CLICK": Mouse.RIGHT_BUTTON,
"M_CLICK": Mouse.MIDDLE_BUTTON, "BACK_CLICK": Mouse.BACK_BUTTON,
"FWD_CLICK": Mouse.FORWARD_BUTTON,
}
CC_MAP = {
"VOL_UP": ConsumerControlCode.VOLUME_INCREMENT,
"VOL_DOWN": ConsumerControlCode.VOLUME_DECREMENT,
"MUTE": ConsumerControlCode.MUTE,
"PLAY_PAUSE": ConsumerControlCode.PLAY_PAUSE,
"NEXT_TRACK": ConsumerControlCode.SCAN_NEXT_TRACK,
"PREV_TRACK": ConsumerControlCode.SCAN_PREVIOUS_TRACK,
"STOP": ConsumerControlCode.STOP,
}
# --- Setup (Unchanged) ---
try:
kbd = Keyboard(usb_hid.devices)
kbd_layout = KeyboardLayoutUS(kbd)
mouse = Mouse(usb_hid.devices)
cc = ConsumerControl(usb_hid.devices)
print("HID Keyboard, Mouse & ConsumerControl Initialized")
except Exception as e:
print(f"Error initializing HID: {e}")
supervisor.reload()
pool = socketpool.SocketPool(wifi.radio)
sock = None
read_buffer = bytearray(256)
line_buffer = ""
last_led_status = kbd.led_status
# --- Helper Functions (Unchanged) ---
def send_to_controller(message):
global sock
if sock:
try:
sock.send(f"{message}\n".encode('utf-8'))
except Exception as e:
pass # Don't print, this can get spammy
def connect_to_controller():
global g_last_keys_pressed
ssid = os.getenv("CIRCUITPY_WIFI_SSID")
password = os.getenv("CIRCUITPY_WIFI_PASSWORD")
if not ssid:
print("ERROR: CIRCUITPY_WIFI_SSID not set in settings.toml")
return None
print(f"Connecting to Wi-Fi AP: {ssid}...")
while not wifi.radio.connected:
try:
wifi.radio.connect(ssid, password)
except ConnectionError as e:
print(f"Failed to connect to Wi-Fi, retrying... ({e})")
time.sleep(RECONNECT_DELAY)
print(f"Wi-Fi Connected! IP: {wifi.radio.ipv4_address}")
# --- FIXED IP TYPO ---
print(f"Connecting to server at {CONTROLLER_IP}:{CONTROLLER_PORT}...")
try:
new_sock = pool.socket(pool.AF_INET, pool.SOCK_STREAM)
new_sock.connect((CONTROLLER_IP, CONTROLLER_PORT))
mac_addr = ":".join([f"{b:02X}" for b in wifi.radio.mac_address])
handshake = f"HANDSHAKE:NAME={CLIENT_NAME}:MAC={mac_addr}\n"
new_sock.send(handshake.encode('utf-8'))
new_sock.setblocking(False)
g_last_keys_pressed.clear() # Reset key state on connect
print("TCP Connected and Handshake Sent!")
return new_sock
except Exception as e:
print(f"Failed to connect to TCP server: {e}")
if new_sock: new_sock.close()
return None
def parse_keys(key_string_list):
keycodes = []
for key_str in key_string_list:
key_upper = key_str.upper()
if key_upper.startswith("//"):
key_name = key_upper[2:]
else:
key_name = key_upper
if key_name in KEY_MAP:
keycodes.append(KEY_MAP[key_name])
else:
print(f"Warning: Unknown key '{key_str}'")
return keycodes
# --- NEW: `_parse_and_queue_hid_actions` ---
# This function is now the heart of the receiver.
# It's a "fast parser" that translates one command string
# into a series of *atomic* HID actions, optimized for speed.
def _parse_and_queue_hid_actions(command):
global auto_enter_enabled, global_poll_delay_sec, hid_action_queue, g_last_keys_pressed, kbd_layout
try:
command_stripped = command.lstrip()
if not command_stripped:
# It's just whitespace. We must release any pressed keys.
if g_last_keys_pressed:
hid_action_queue.append( ('update_keys', ((), tuple(g_last_keys_pressed))) )
hid_action_queue.append( ('delay', global_poll_delay_sec) )
g_last_keys_pressed.clear()
_queue_string_reliable(command) # Type the whitespace
if auto_enter_enabled:
print("Auto-ENTER: Queuing ENTER")
_queue_simple_command("ENTER")
return
cmd_parts = command_stripped.split()
cmd_first_word = cmd_parts[0].lower()
# --- 1. Handle Multi-part HID Commands ---
if cmd_first_word == "//hold":
keys_to_hold = set(parse_keys(cmd_parts[1:]))
keys_to_press = tuple(keys_to_hold - g_last_keys_pressed)
if keys_to_press:
print(f"Queuing Hold: {keys_to_press}")
hid_action_queue.append( ('update_keys', (keys_to_press, ())) )
g_last_keys_pressed.update(keys_to_press)
elif cmd_first_word == "//release":
if len(cmd_parts) > 1 and cmd_parts[1].lower() == "all":
print("Queuing Release All")
if g_last_keys_pressed:
hid_action_queue.append( ('update_keys', ((), tuple(g_last_keys_pressed))) )
g_last_keys_pressed.clear()
else:
keys_to_release = set(parse_keys(cmd_parts[1:]))
keys_that_can_be_released = tuple(keys_to_release & g_last_keys_pressed)
if keys_that_can_be_released:
print(f"Queuing Release: {keys_that_can_be_released}")
hid_action_queue.append( ('update_keys', ((), keys_that_can_be_released)) )
g_last_keys_pressed.difference_update(keys_that_can_be_released)
elif cmd_first_word == "//delay":
if len(cmd_parts) > 1:
hid_action_queue.append( ('delay', int(cmd_parts[1]) / 1000.0) )
elif cmd_first_word == "//relx":
if len(cmd_parts) > 1: hid_action_queue.append( ('mouse_move', (int(cmd_parts[1]), 0, 0)) )
elif cmd_first_word == "//rely":
if len(cmd_parts) > 1: hid_action_queue.append( ('mouse_move', (0, int(cmd_parts[1]), 0)) )
elif cmd_first_word == "//scroll":
if len(cmd_parts) > 1: hid_action_queue.append( ('mouse_move', (0, 0, int(cmd_parts[1]))) )
# --- 2. Setting Commands ---
elif cmd_first_word == "//auto_enter_on":
auto_enter_enabled = True
print("Auto-ENTER mode: ENABLED")
send_to_controller("TRIGGER:Auto-ENTER ENABLED")
elif cmd_first_word == "//auto_enter_off":
auto_enter_enabled = False
print("Auto-ENTER mode: DISABLED")
send_to_controller("TRIGGER:Auto-ENTER DISABLED")
elif cmd_first_word == "//set_poll_delay":
if len(cmd_parts) > 1:
try:
delay_ms = int(cmd_parts[1])
if delay_ms < 1: delay_ms = 1
global_poll_delay_sec = delay_ms / 1000.0
print(f"Setting POLL_DELAY to {delay_ms}ms")
send_to_controller(f"TRIGGER:POLL_DELAY set to {delay_ms}ms")
except ValueError:
print("Invalid delay value")
# --- 3. Handle In-line Commands and Text ---
else:
print(f"Parsing: {command}")
typed_text = False
current_pos = 0
while current_pos < len(command):
next_cmd_pos = command.find("//", current_pos)
# --- Part 1: Handle text (if any) ---
text_chunk = ""
if next_cmd_pos == -1: # No more commands, type rest of string
text_chunk = command[current_pos:]
current_pos = len(command)
elif next_cmd_pos > current_pos: # Text exists before the command
text_chunk = command[current_pos:next_cmd_pos]
current_pos = next_cmd_pos
if text_chunk:
typed_text = True
# This is the new fast typing loop
for char in text_chunk:
try:
keycodes = set(kbd_layout.keycodes(char))
except Exception:
continue # Can't type this char
# Handle "hello" (same key twice)
if keycodes == g_last_keys_pressed:
# Release, wait
hid_action_queue.append( ('update_keys', ((), tuple(g_last_keys_pressed))) )
hid_action_queue.append( ('delay', global_poll_delay_sec) )
g_last_keys_pressed.clear()
keys_to_press = tuple(keycodes - g_last_keys_pressed)
keys_to_release = tuple(g_last_keys_pressed - keycodes)
hid_action_queue.append( ('update_keys', (keys_to_press, keys_to_release)) )
hid_action_queue.append( ('delay', global_poll_delay_sec) )
g_last_keys_pressed = keycodes
# --- Part 2: Handle command (if any) ---
if next_cmd_pos != -1: # We found a "//"
end_cmd_pos = command.find(" ", next_cmd_pos)
if end_cmd_pos == -1: end_cmd_pos = len(command)
cmd_word = command[next_cmd_pos:end_cmd_pos]
cmd_upper = cmd_word[2:].upper().strip()
if cmd_upper in KEY_MAP or cmd_upper in MOUSE_MAP or cmd_upper in CC_MAP:
# Release any typed chars before running a simple command
if g_last_keys_pressed:
hid_action_queue.append( ('update_keys', ((), tuple(g_last_keys_pressed))) )
hid_action_queue.append( ('delay', global_poll_delay_sec) )
g_last_keys_pressed.clear()
_queue_simple_command(cmd_upper)
else:
# Unknown command, type it literally
typed_text = True
for char in cmd_word: # Type "//unknown"
try:
keycodes = set(kbd_layout.keycodes(char))
except Exception: continue
if keycodes == g_last_keys_pressed:
hid_action_queue.append( ('update_keys', ((), tuple(g_last_keys_pressed))) )
hid_action_queue.append( ('delay', global_poll_delay_sec) )
g_last_keys_pressed.clear()
keys_to_press = tuple(keycodes - g_last_keys_pressed)
keys_to_release = tuple(g_last_keys_pressed - keycodes)
hid_action_queue.append( ('update_keys', (keys_to_press, keys_to_release)) )
hid_action_queue.append( ('delay', global_poll_delay_sec) )
g_last_keys_pressed = keycodes
current_pos = end_cmd_pos
if current_pos < len(command) and command[current_pos] == " ":
current_pos += 1
# After line is done, check for auto-enter
if auto_enter_enabled and typed_text:
print("Auto-ENTER: Queuing ENTER")
# Release any lingering keys from typing
if g_last_keys_pressed:
hid_action_queue.append( ('update_keys', ((), tuple(g_last_keys_pressed))) )
hid_action_queue.append( ('delay', global_poll_delay_sec) )
g_last_keys_pressed.clear()
_queue_simple_command("ENTER")
except Exception as e:
print(f"Error parsing command '{command}': {e}")
hid_action_queue.append( ('release_all', None) )
# --- NEW: Helper function, formerly merged ---
def _queue_simple_command(cmd_upper_no_slash):
global hid_action_queue, global_poll_delay_sec, g_last_keys_pressed
# Simple commands must release any held keys first
if g_last_keys_pressed:
hid_action_queue.append( ('update_keys', ((), tuple(g_last_keys_pressed))) )
hid_action_queue.append( ('delay', global_poll_delay_sec) )
g_last_keys_pressed.clear()
if cmd_upper_no_slash in KEY_MAP:
key = KEY_MAP[cmd_upper_no_slash]
hid_action_queue.append( ('update_keys', ((key,), ())) )
hid_action_queue.append( ('delay', global_poll_delay_sec) )
hid_action_queue.append( ('update_keys', ((), (key,))) )
hid_action_queue.append( ('delay', global_poll_delay_sec) )
elif cmd_upper_no_slash in MOUSE_MAP:
hid_action_queue.append( ('mouse_click', MOUSE_MAP[cmd_upper_no_slash]) )
elif cmd_upper_no_slash in CC_MAP:
hid_action_queue.append( ('cc_send', CC_MAP[cmd_upper_no_slash]) )
# --- NEW: Fast Network Packet Handler (Modified) ---
def _queue_command(line):
global command_queue, kbd, mouse, g_last_keys_pressed
line = line.rstrip()
if not line: return
try:
parts = line.split(':', 2)
if len(parts) < 3 or parts[0] != "SEQ":
print(f"Received malformed line: {line}")
return
command = parts[2]
cmd_word = command.lstrip().split(" ", 1)[0].lower()
if cmd_word == "//stop" or cmd_word == "//s":
print("[!] E-STOP received. Clearing queues and releasing keys.")
command_queue.clear()
hid_action_queue.clear()
g_last_keys_pressed.clear()
kbd.release_all()
mouse.release_all()
elif cmd_word == "//ping":
send_to_controller("PONG:OK")
elif cmd_word == "//heartbeat":
pass
elif cmd_word == "//exit":
print("Controller commanded disconnect. Closing socket.")
sock.close()
else:
command_queue.append(command)
except Exception as e:
print(f"Error queuing command '{line}': {e}")
# --- NEW: The "Slow" HID Executor (Modified) ---
def _process_hid_action_queue():
global hid_action_queue, kbd, mouse, cc
global g_last_hid_action_time, g_hid_wait_delay
now = time.monotonic()
if (now - g_last_hid_action_time) < g_hid_wait_delay:
return
if not hid_action_queue:
# After all actions, release any dangling keys
global g_last_keys_pressed
if g_last_keys_pressed:
print("Queue empty, releasing straggler keys.")
kbd.release(*g_last_keys_pressed)
g_last_keys_pressed.clear()
return
action, data = hid_action_queue.pop(0)
g_last_hid_action_time = now
g_hid_wait_delay = 0
try:
# --- NEW: Handle the 'update_keys' action ---
if action == 'update_keys':
press_keys, release_keys = data
if release_keys:
kbd.release(*release_keys)
if press_keys:
kbd.press(*press_keys)
# --- End New ---
elif action == 'release_all':
kbd.release_all()
elif action == 'mouse_click':
mouse.click(data)
elif action == 'mouse_move':
mouse.move(x=data[0], y=data[1], wheel=data[2])
elif action == 'cc_send':
cc.send(data)
elif action == 'delay':
g_hid_wait_delay = data
except Exception as e:
print(f"Error executing HID action '{action}': {e}")
kbd.release_all()
mouse.release_all()
# --- Network Loop (Unchanged) ---
def _handle_network():
global sock, line_buffer
if sock is None:
return
try:
bytes_recvd = sock.recv_into(read_buffer)
if bytes_recvd > 0:
data = read_buffer[:bytes_recvd].decode('utf-8')
line_buffer += data
while '\n' in line_buffer:
line, line_buffer = line_buffer.split('\n', 1)
_queue_command(line)
elif bytes_recvd == 0:
print("Server closed connection.")
sock.close()
sock = None
except OSError as ex:
if ex.errno == errno.EAGAIN:
pass
else:
print(f"Socket Error: {ex}")
sock.close()
sock = None
# --- Flow Control Loop (Unchanged) ---
def _handle_flow_control():
global receiver_is_busy, command_queue, hid_action_queue
total_queue_len = len(command_queue) + len(hid_action_queue)
if not receiver_is_busy and total_queue_len > BUSY_THRESHOLD:
receiver_is_busy = True
send_to_controller("TRIGGER:BUSY:ON")
print(f"Queue > {BUSY_THRESHOLD}, sending BUSY ON")
elif receiver_is_busy and total_queue_len < RESUME_THRESHOLD:
receiver_is_busy = False
send_to_controller("TRIGGER:BUSY:OFF")
print(f"Queue < {RESUME_THRESHOLD}, sending BUSY OFF")
# --- check_led_status (Unchanged) ---
def check_led_status():
global last_led_status, kbd
current_led_status = kbd.led_status
if current_led_status != last_led_status:
caps_changed = (current_led_status & Keyboard.LED_CAPS_LOCK) != (last_led_status & Keyboard.LED_CAPS_LOCK)
num_changed = (current_led_status & Keyboard.LED_NUM_LOCK) != (last_led_status & Keyboard.LED_NUM_LOCK)
scroll_changed = (current_led_status & Keyboard.LED_SCROLL_LOCK) != (last_led_status & Keyboard.LED_SCROLL_LOCK)
caps_on = (current_led_status & Keyboard.LED_CAPS_LOCK) > 0
num_on = (current_led_status & Keyboard.LED_NUM_LOCK) > 0
scroll_on = (current_led_status & Keyboard.LED_SCROLL_LOCK) > 0
trigger_msg = None
if caps_changed:
trigger_msg = f"TRIGGER:LED_STATUS CapsLock:{'On' if caps_on else 'Off'}"
elif num_changed:
trigger_msg = f"TRIGGER:LED_STATUS NumLock:{'On' if num_on else 'Off'}"
elif scroll_changed:
trigger_msg = f"TRIGGER:LED_STATUS ScrollLock:{'On' if scroll_on else 'Off'}"
if trigger_msg:
print(trigger_msg)
send_to_controller(trigger_msg)
last_led_status = current_led_status
# --- Main Loop (RE-ARCHITECTED) ---
try:
while True:
# --- 1. Handle Connection ---
if sock is None:
sock = connect_to_controller()
if sock is None:
time.sleep(RECONNECT_DELAY)
continue
line_buffer = ""
last_led_status = kbd.led_status
command_queue.clear()
hid_action_queue.clear()
g_last_keys_pressed.clear() # Reset state
receiver_is_busy = False
g_last_hid_action_time = 0
g_hid_wait_delay = 0
# --- 2. Network Read (Fast) ---
_handle_network()
# --- 3. HID & LED Check (Fast) ---
check_led_status()
# --- 4. Flow Control Check (Fast) ---
_handle_flow_control()
# --- 5. Parse ONE Command (Fast, if HID queue is empty) ---
if command_queue and not hid_action_queue:
command_to_parse = command_queue.pop(0)
_parse_and_queue_hid_actions(command_to_parse)
# --- 6. Process ONE HID Action (Can be slow, but is non-blocking) ---
_process_hid_action_queue()
# --- 7. Sleep (NEW: 1ms tick) ---
time.sleep(0.001)
except Exception as e:
print("\n" * 5)
print("=" * 40)
print(f"CRITICAL ERROR in Receiver main loop: {e}")
print("=" * 40)
print("Rebooting in 10 seconds...")
time.sleep(10)
supervisor.reload()