From ac1b57a98a3df8c0e04ece0011bc1f92db7a01af Mon Sep 17 00:00:00 2001 From: Luke Date: Thu, 13 Nov 2025 22:01:10 +0000 Subject: [PATCH] Delete Reciver/code.py --- Reciver/code.py | 560 ------------------------------------------------ 1 file changed, 560 deletions(-) delete mode 100644 Reciver/code.py diff --git a/Reciver/code.py b/Reciver/code.py deleted file mode 100644 index 850741b..0000000 --- a/Reciver/code.py +++ /dev/null @@ -1,560 +0,0 @@ -import wifi -import socketpool -import os -import time -import errno -import supervisor - -# --- HID Imports --- -import usb_hid -from adafruit_hid.keyboard import Keyboard -from adafruit_hid.keyboard_layout_us import KeyboardLayoutUS -from adafruit_hid.keycode import Keycode -from adafruit_hid.mouse import Mouse -from adafruit_hid.consumer_control import ConsumerControl -from adafruit_hid.consumer_control_code import ConsumerControlCode - -# --- Configuration --- -CONTROLLER_IP = "192.168.4.1" # Corrected IP -CONTROLLER_PORT = 5000 -CLIENT_NAME = "ESP32-S2-Keyboard-Mouse" -RECONNECT_DELAY = 5 - -# --- Global Settings --- -global_poll_delay_sec = 0.14 -auto_enter_enabled = False - -# --- Command Queue & Flow Control --- -command_queue = [] # (Fast) From network, holds raw strings -hid_action_queue = [] # (Slow) To USB, holds atomic HID actions -receiver_is_busy = False -BUSY_THRESHOLD = 50 -RESUME_THRESHOLD = 10 - -# --- HID State Machine --- -g_last_hid_action_time = 0 -g_hid_wait_delay = 0 -g_last_keys_pressed = set() # NEW: Tracks the "physical" keyboard state - -# --- Keycode Mapping (Unchanged) --- -KEY_MAP = { - # Special Keys - "ENTER": Keycode.ENTER, "TAB": Keycode.TAB, "ESC": Keycode.ESCAPE, - "DEL": Keycode.DELETE, "BACKSPACE": Keycode.BACKSPACE, "INSERT": Keycode.INSERT, - "PRTSCR": Keycode.PRINT_SCREEN, "UP": Keycode.UP_ARROW, "DOWN": Keycode.DOWN_ARROW, - "LEFT": Keycode.LEFT_ARROW, "RIGHT": Keycode.RIGHT_ARROW, - "CAPS_LOCK": Keycode.CAPS_LOCK, "SCROLL_LOCK": Keycode.SCROLL_LOCK, - "NUM_LOCK": Keycode.KEYPAD_NUMLOCK, "F1": Keycode.F1, "F2": Keycode.F2, - "F3": Keycode.F3, "F4": Keycode.F4, "F5": Keycode.F5, "F6": Keycode.F6, - "F7": Keycode.F7, "F8": Keycode.F8, "F9": Keycode.F9, "F10": Keycode.F10, - "F11": Keycode.F11, "F12": Keycode.F12, "WIN": Keycode.GUI, - # Modifiers - "L_CTRL": Keycode.LEFT_CONTROL, "L_SHIFT": Keycode.LEFT_SHIFT, - "L_ALT": Keycode.LEFT_ALT, "L_WIN": Keycode.LEFT_GUI, - "R_CTRL": Keycode.RIGHT_CONTROL, "R_SHIFT": Keycode.RIGHT_SHIFT, - "R_ALT": Keycode.RIGHT_ALT, "R_WIN": Keycode.RIGHT_GUI, - # Aliases - "CTRL": Keycode.LEFT_CONTROL, "SHIFT": Keycode.LEFT_SHIFT, "ALT": Keycode.LEFT_ALT, - # Alphanumeric Keys - "A": Keycode.A, "B": Keycode.B, "C": Keycode.C, "D": Keycode.D, "E": Keycode.E, - "F": Keycode.F, "G": Keycode.G, "H": Keycode.H, "I": Keycode.I, "J": Keycode.J, - "K": Keycode.K, "L": Keycode.L, "M": Keycode.M, "N": Keycode.N, "O": Keycode.O, - "P": Keycode.P, "Q": Keycode.Q, "R": Keycode.R, "S": Keycode.S, "T": Keycode.T, - "U": Keycode.U, "V": Keycode.V, "W": Keycode.W, "X": Keycode.X, "Y": Keycode.Y, - "Z": Keycode.Z, - "1": Keycode.ONE, "2": Keycode.TWO, "3": Keycode.THREE, "4": Keycode.FOUR, - "5": Keycode.FIVE, "6": Keycode.SIX, "7": Keycode.SEVEN, "8": Keycode.EIGHT, - "9": Keycode.NINE, "0": Keycode.ZERO, -} -MOUSE_MAP = { - "L_CLICK": Mouse.LEFT_BUTTON, "R_CLICK": Mouse.RIGHT_BUTTON, - "M_CLICK": Mouse.MIDDLE_BUTTON, "BACK_CLICK": Mouse.BACK_BUTTON, - "FWD_CLICK": Mouse.FORWARD_BUTTON, -} -CC_MAP = { - "VOL_UP": ConsumerControlCode.VOLUME_INCREMENT, - "VOL_DOWN": ConsumerControlCode.VOLUME_DECREMENT, - "MUTE": ConsumerControlCode.MUTE, - "PLAY_PAUSE": ConsumerControlCode.PLAY_PAUSE, - "NEXT_TRACK": ConsumerControlCode.SCAN_NEXT_TRACK, - "PREV_TRACK": ConsumerControlCode.SCAN_PREVIOUS_TRACK, - "STOP": ConsumerControlCode.STOP, -} - -# --- Setup (Unchanged) --- -try: - kbd = Keyboard(usb_hid.devices) - kbd_layout = KeyboardLayoutUS(kbd) - mouse = Mouse(usb_hid.devices) - cc = ConsumerControl(usb_hid.devices) - print("HID Keyboard, Mouse & ConsumerControl Initialized") -except Exception as e: - print(f"Error initializing HID: {e}") - supervisor.reload() - -pool = socketpool.SocketPool(wifi.radio) -sock = None -read_buffer = bytearray(256) -line_buffer = "" -last_led_status = kbd.led_status - -# --- Helper Functions (Unchanged) --- -def send_to_controller(message): - global sock - if sock: - try: - sock.send(f"{message}\n".encode('utf-8')) - except Exception as e: - pass # Don't print, this can get spammy - -def connect_to_controller(): - global g_last_keys_pressed - ssid = os.getenv("CIRCUITPY_WIFI_SSID") - password = os.getenv("CIRCUITPY_WIFI_PASSWORD") - if not ssid: - print("ERROR: CIRCUITPY_WIFI_SSID not set in settings.toml") - return None - print(f"Connecting to Wi-Fi AP: {ssid}...") - while not wifi.radio.connected: - try: - wifi.radio.connect(ssid, password) - except ConnectionError as e: - print(f"Failed to connect to Wi-Fi, retrying... ({e})") - time.sleep(RECONNECT_DELAY) - print(f"Wi-Fi Connected! IP: {wifi.radio.ipv4_address}") - - # --- FIXED IP TYPO --- - print(f"Connecting to server at {CONTROLLER_IP}:{CONTROLLER_PORT}...") - try: - new_sock = pool.socket(pool.AF_INET, pool.SOCK_STREAM) - new_sock.connect((CONTROLLER_IP, CONTROLLER_PORT)) - mac_addr = ":".join([f"{b:02X}" for b in wifi.radio.mac_address]) - handshake = f"HANDSHAKE:NAME={CLIENT_NAME}:MAC={mac_addr}\n" - new_sock.send(handshake.encode('utf-8')) - new_sock.setblocking(False) - g_last_keys_pressed.clear() # Reset key state on connect - print("TCP Connected and Handshake Sent!") - return new_sock - except Exception as e: - print(f"Failed to connect to TCP server: {e}") - if new_sock: new_sock.close() - return None - -def parse_keys(key_string_list): - keycodes = [] - for key_str in key_string_list: - key_upper = key_str.upper() - if key_upper.startswith("//"): - key_name = key_upper[2:] - else: - key_name = key_upper - if key_name in KEY_MAP: - keycodes.append(KEY_MAP[key_name]) - else: - print(f"Warning: Unknown key '{key_str}'") - return keycodes - -# --- NEW: `_parse_and_queue_hid_actions` --- -# This function is now the heart of the receiver. -# It's a "fast parser" that translates one command string -# into a series of *atomic* HID actions, optimized for speed. -def _parse_and_queue_hid_actions(command): - global auto_enter_enabled, global_poll_delay_sec, hid_action_queue, g_last_keys_pressed, kbd_layout - - try: - command_stripped = command.lstrip() - - if not command_stripped: - # It's just whitespace. We must release any pressed keys. - if g_last_keys_pressed: - hid_action_queue.append( ('update_keys', ((), tuple(g_last_keys_pressed))) ) - hid_action_queue.append( ('delay', global_poll_delay_sec) ) - g_last_keys_pressed.clear() - - _queue_string_reliable(command) # Type the whitespace - - if auto_enter_enabled: - print("Auto-ENTER: Queuing ENTER") - _queue_simple_command("ENTER") - return - - cmd_parts = command_stripped.split() - cmd_first_word = cmd_parts[0].lower() - - # --- 1. Handle Multi-part HID Commands --- - if cmd_first_word == "//hold": - keys_to_hold = set(parse_keys(cmd_parts[1:])) - keys_to_press = tuple(keys_to_hold - g_last_keys_pressed) - if keys_to_press: - print(f"Queuing Hold: {keys_to_press}") - hid_action_queue.append( ('update_keys', (keys_to_press, ())) ) - g_last_keys_pressed.update(keys_to_press) - - elif cmd_first_word == "//release": - if len(cmd_parts) > 1 and cmd_parts[1].lower() == "all": - print("Queuing Release All") - if g_last_keys_pressed: - hid_action_queue.append( ('update_keys', ((), tuple(g_last_keys_pressed))) ) - g_last_keys_pressed.clear() - else: - keys_to_release = set(parse_keys(cmd_parts[1:])) - keys_that_can_be_released = tuple(keys_to_release & g_last_keys_pressed) - if keys_that_can_be_released: - print(f"Queuing Release: {keys_that_can_be_released}") - hid_action_queue.append( ('update_keys', ((), keys_that_can_be_released)) ) - g_last_keys_pressed.difference_update(keys_that_can_be_released) - - elif cmd_first_word == "//delay": - if len(cmd_parts) > 1: - hid_action_queue.append( ('delay', int(cmd_parts[1]) / 1000.0) ) - elif cmd_first_word == "//relx": - if len(cmd_parts) > 1: hid_action_queue.append( ('mouse_move', (int(cmd_parts[1]), 0, 0)) ) - elif cmd_first_word == "//rely": - if len(cmd_parts) > 1: hid_action_queue.append( ('mouse_move', (0, int(cmd_parts[1]), 0)) ) - elif cmd_first_word == "//scroll": - if len(cmd_parts) > 1: hid_action_queue.append( ('mouse_move', (0, 0, int(cmd_parts[1]))) ) - - # --- 2. Setting Commands --- - elif cmd_first_word == "//auto_enter_on": - auto_enter_enabled = True - print("Auto-ENTER mode: ENABLED") - send_to_controller("TRIGGER:Auto-ENTER ENABLED") - - elif cmd_first_word == "//auto_enter_off": - auto_enter_enabled = False - print("Auto-ENTER mode: DISABLED") - send_to_controller("TRIGGER:Auto-ENTER DISABLED") - - elif cmd_first_word == "//set_poll_delay": - if len(cmd_parts) > 1: - try: - delay_ms = int(cmd_parts[1]) - if delay_ms < 1: delay_ms = 1 - global_poll_delay_sec = delay_ms / 1000.0 - print(f"Setting POLL_DELAY to {delay_ms}ms") - send_to_controller(f"TRIGGER:POLL_DELAY set to {delay_ms}ms") - except ValueError: - print("Invalid delay value") - - # --- 3. Handle In-line Commands and Text --- - else: - print(f"Parsing: {command}") - typed_text = False - current_pos = 0 - - while current_pos < len(command): - next_cmd_pos = command.find("//", current_pos) - - # --- Part 1: Handle text (if any) --- - text_chunk = "" - if next_cmd_pos == -1: # No more commands, type rest of string - text_chunk = command[current_pos:] - current_pos = len(command) - elif next_cmd_pos > current_pos: # Text exists before the command - text_chunk = command[current_pos:next_cmd_pos] - current_pos = next_cmd_pos - - if text_chunk: - typed_text = True - # This is the new fast typing loop - for char in text_chunk: - try: - keycodes = set(kbd_layout.keycodes(char)) - except Exception: - continue # Can't type this char - - # Handle "hello" (same key twice) - if keycodes == g_last_keys_pressed: - # Release, wait - hid_action_queue.append( ('update_keys', ((), tuple(g_last_keys_pressed))) ) - hid_action_queue.append( ('delay', global_poll_delay_sec) ) - g_last_keys_pressed.clear() - - keys_to_press = tuple(keycodes - g_last_keys_pressed) - keys_to_release = tuple(g_last_keys_pressed - keycodes) - - hid_action_queue.append( ('update_keys', (keys_to_press, keys_to_release)) ) - hid_action_queue.append( ('delay', global_poll_delay_sec) ) - g_last_keys_pressed = keycodes - - # --- Part 2: Handle command (if any) --- - if next_cmd_pos != -1: # We found a "//" - end_cmd_pos = command.find(" ", next_cmd_pos) - if end_cmd_pos == -1: end_cmd_pos = len(command) - - cmd_word = command[next_cmd_pos:end_cmd_pos] - cmd_upper = cmd_word[2:].upper().strip() - - if cmd_upper in KEY_MAP or cmd_upper in MOUSE_MAP or cmd_upper in CC_MAP: - # Release any typed chars before running a simple command - if g_last_keys_pressed: - hid_action_queue.append( ('update_keys', ((), tuple(g_last_keys_pressed))) ) - hid_action_queue.append( ('delay', global_poll_delay_sec) ) - g_last_keys_pressed.clear() - - _queue_simple_command(cmd_upper) - else: - # Unknown command, type it literally - typed_text = True - for char in cmd_word: # Type "//unknown" - try: - keycodes = set(kbd_layout.keycodes(char)) - except Exception: continue - - if keycodes == g_last_keys_pressed: - hid_action_queue.append( ('update_keys', ((), tuple(g_last_keys_pressed))) ) - hid_action_queue.append( ('delay', global_poll_delay_sec) ) - g_last_keys_pressed.clear() - - keys_to_press = tuple(keycodes - g_last_keys_pressed) - keys_to_release = tuple(g_last_keys_pressed - keycodes) - - hid_action_queue.append( ('update_keys', (keys_to_press, keys_to_release)) ) - hid_action_queue.append( ('delay', global_poll_delay_sec) ) - g_last_keys_pressed = keycodes - - current_pos = end_cmd_pos - if current_pos < len(command) and command[current_pos] == " ": - current_pos += 1 - - # After line is done, check for auto-enter - if auto_enter_enabled and typed_text: - print("Auto-ENTER: Queuing ENTER") - # Release any lingering keys from typing - if g_last_keys_pressed: - hid_action_queue.append( ('update_keys', ((), tuple(g_last_keys_pressed))) ) - hid_action_queue.append( ('delay', global_poll_delay_sec) ) - g_last_keys_pressed.clear() - _queue_simple_command("ENTER") - - except Exception as e: - print(f"Error parsing command '{command}': {e}") - hid_action_queue.append( ('release_all', None) ) - -# --- NEW: Helper function, formerly merged --- -def _queue_simple_command(cmd_upper_no_slash): - global hid_action_queue, global_poll_delay_sec, g_last_keys_pressed - - # Simple commands must release any held keys first - if g_last_keys_pressed: - hid_action_queue.append( ('update_keys', ((), tuple(g_last_keys_pressed))) ) - hid_action_queue.append( ('delay', global_poll_delay_sec) ) - g_last_keys_pressed.clear() - - if cmd_upper_no_slash in KEY_MAP: - key = KEY_MAP[cmd_upper_no_slash] - hid_action_queue.append( ('update_keys', ((key,), ())) ) - hid_action_queue.append( ('delay', global_poll_delay_sec) ) - hid_action_queue.append( ('update_keys', ((), (key,))) ) - hid_action_queue.append( ('delay', global_poll_delay_sec) ) - - elif cmd_upper_no_slash in MOUSE_MAP: - hid_action_queue.append( ('mouse_click', MOUSE_MAP[cmd_upper_no_slash]) ) - - elif cmd_upper_no_slash in CC_MAP: - hid_action_queue.append( ('cc_send', CC_MAP[cmd_upper_no_slash]) ) - -# --- NEW: Fast Network Packet Handler (Modified) --- -def _queue_command(line): - global command_queue, kbd, mouse, g_last_keys_pressed - - line = line.rstrip() - if not line: return - - try: - parts = line.split(':', 2) - if len(parts) < 3 or parts[0] != "SEQ": - print(f"Received malformed line: {line}") - return - - command = parts[2] - cmd_word = command.lstrip().split(" ", 1)[0].lower() - - if cmd_word == "//stop" or cmd_word == "//s": - print("[!] E-STOP received. Clearing queues and releasing keys.") - command_queue.clear() - hid_action_queue.clear() - g_last_keys_pressed.clear() - kbd.release_all() - mouse.release_all() - - elif cmd_word == "//ping": - send_to_controller("PONG:OK") - - elif cmd_word == "//heartbeat": - pass - - elif cmd_word == "//exit": - print("Controller commanded disconnect. Closing socket.") - sock.close() - - else: - command_queue.append(command) - - except Exception as e: - print(f"Error queuing command '{line}': {e}") - -# --- NEW: The "Slow" HID Executor (Modified) --- -def _process_hid_action_queue(): - global hid_action_queue, kbd, mouse, cc - global g_last_hid_action_time, g_hid_wait_delay - - now = time.monotonic() - if (now - g_last_hid_action_time) < g_hid_wait_delay: - return - - if not hid_action_queue: - # After all actions, release any dangling keys - global g_last_keys_pressed - if g_last_keys_pressed: - print("Queue empty, releasing straggler keys.") - kbd.release(*g_last_keys_pressed) - g_last_keys_pressed.clear() - return - - action, data = hid_action_queue.pop(0) - g_last_hid_action_time = now - g_hid_wait_delay = 0 - - try: - # --- NEW: Handle the 'update_keys' action --- - if action == 'update_keys': - press_keys, release_keys = data - if release_keys: - kbd.release(*release_keys) - if press_keys: - kbd.press(*press_keys) - # --- End New --- - elif action == 'release_all': - kbd.release_all() - elif action == 'mouse_click': - mouse.click(data) - elif action == 'mouse_move': - mouse.move(x=data[0], y=data[1], wheel=data[2]) - elif action == 'cc_send': - cc.send(data) - elif action == 'delay': - g_hid_wait_delay = data - - except Exception as e: - print(f"Error executing HID action '{action}': {e}") - kbd.release_all() - mouse.release_all() - -# --- Network Loop (Unchanged) --- -def _handle_network(): - global sock, line_buffer - if sock is None: - return - - try: - bytes_recvd = sock.recv_into(read_buffer) - if bytes_recvd > 0: - data = read_buffer[:bytes_recvd].decode('utf-8') - line_buffer += data - while '\n' in line_buffer: - line, line_buffer = line_buffer.split('\n', 1) - _queue_command(line) - elif bytes_recvd == 0: - print("Server closed connection.") - sock.close() - sock = None - - except OSError as ex: - if ex.errno == errno.EAGAIN: - pass - else: - print(f"Socket Error: {ex}") - sock.close() - sock = None - -# --- Flow Control Loop (Unchanged) --- -def _handle_flow_control(): - global receiver_is_busy, command_queue, hid_action_queue - - total_queue_len = len(command_queue) + len(hid_action_queue) - - if not receiver_is_busy and total_queue_len > BUSY_THRESHOLD: - receiver_is_busy = True - send_to_controller("TRIGGER:BUSY:ON") - print(f"Queue > {BUSY_THRESHOLD}, sending BUSY ON") - elif receiver_is_busy and total_queue_len < RESUME_THRESHOLD: - receiver_is_busy = False - send_to_controller("TRIGGER:BUSY:OFF") - print(f"Queue < {RESUME_THRESHOLD}, sending BUSY OFF") - -# --- check_led_status (Unchanged) --- -def check_led_status(): - global last_led_status, kbd - - current_led_status = kbd.led_status - if current_led_status != last_led_status: - caps_changed = (current_led_status & Keyboard.LED_CAPS_LOCK) != (last_led_status & Keyboard.LED_CAPS_LOCK) - num_changed = (current_led_status & Keyboard.LED_NUM_LOCK) != (last_led_status & Keyboard.LED_NUM_LOCK) - scroll_changed = (current_led_status & Keyboard.LED_SCROLL_LOCK) != (last_led_status & Keyboard.LED_SCROLL_LOCK) - - caps_on = (current_led_status & Keyboard.LED_CAPS_LOCK) > 0 - num_on = (current_led_status & Keyboard.LED_NUM_LOCK) > 0 - scroll_on = (current_led_status & Keyboard.LED_SCROLL_LOCK) > 0 - - trigger_msg = None - if caps_changed: - trigger_msg = f"TRIGGER:LED_STATUS CapsLock:{'On' if caps_on else 'Off'}" - elif num_changed: - trigger_msg = f"TRIGGER:LED_STATUS NumLock:{'On' if num_on else 'Off'}" - elif scroll_changed: - trigger_msg = f"TRIGGER:LED_STATUS ScrollLock:{'On' if scroll_on else 'Off'}" - - if trigger_msg: - print(trigger_msg) - send_to_controller(trigger_msg) - - last_led_status = current_led_status - - -# --- Main Loop (RE-ARCHITECTED) --- -try: - while True: - # --- 1. Handle Connection --- - if sock is None: - sock = connect_to_controller() - if sock is None: - time.sleep(RECONNECT_DELAY) - continue - line_buffer = "" - last_led_status = kbd.led_status - command_queue.clear() - hid_action_queue.clear() - g_last_keys_pressed.clear() # Reset state - receiver_is_busy = False - g_last_hid_action_time = 0 - g_hid_wait_delay = 0 - - # --- 2. Network Read (Fast) --- - _handle_network() - - # --- 3. HID & LED Check (Fast) --- - check_led_status() - - # --- 4. Flow Control Check (Fast) --- - _handle_flow_control() - - # --- 5. Parse ONE Command (Fast, if HID queue is empty) --- - if command_queue and not hid_action_queue: - command_to_parse = command_queue.pop(0) - _parse_and_queue_hid_actions(command_to_parse) - - # --- 6. Process ONE HID Action (Can be slow, but is non-blocking) --- - _process_hid_action_queue() - - # --- 7. Sleep (NEW: 1ms tick) --- - time.sleep(0.001) - -except Exception as e: - print("\n" * 5) - print("=" * 40) - print(f"CRITICAL ERROR in Receiver main loop: {e}") - print("=" * 40) - print("Rebooting in 10 seconds...") - time.sleep(10) - supervisor.reload() \ No newline at end of file