Files
Esp32-Bulk-Keyboard/Shell_Transmitter/code.py

631 lines
26 KiB
Python
Raw Normal View History

2025-11-13 21:48:14 +00:00
import wifi
import time
import supervisor
import sys
import os
import socketpool
import errno
import controller_config as config
import controller_helpers as helpers
# --- 1. SET UP Wi-Fi HOTSPOT ---
helpers.setup_wifi_ap(config.WIFI_SSID, config.WIFI_PASSWORD, config.WIFI_CHANNEL)
pool = socketpool.SocketPool(wifi.radio)
server = None # Server socket
# --- 2. CLIENT & STATE MANAGEMENT ---
serial_input_buffer = ""
seq_num = 0
last_heartbeat_send_time = time.monotonic()
all_clients = {}
paired_clients = {}
known_paired_macs = set()
client_read_buffer = bytearray(256)
# --- NEW: Command Queue & State ---
command_send_buffer = [] # Master list of commands to be sent
g_system_halted = False # Pauses all sending on heartbeat fail
# --- END NEW ---
global_receiver_settings = {
"AUTO_ENTER": "OFF",
"POLL_DELAY_MS": 10
}
def start_server():
"""Starts the TCP server ONCE and leaves it open."""
global server
print(f"Starting TCP Server on {wifi.radio.ipv4_address_ap}:{config.LISTEN_PORT}...")
server = pool.socket(pool.AF_INET, pool.SOCK_STREAM)
server.setblocking(False)
try:
server.bind((str(wifi.radio.ipv4_address_ap), config.LISTEN_PORT))
server.listen(4)
print("--- 📡 Controller Ready ---")
print("Server is open for clients to connect at any time.")
print("Type '//pair' to pair newly connected devices.")
except Exception as e:
print(f"FATAL: Could not bind/listen: {e}")
print("Rebooting in 5s...")
time.sleep(5)
supervisor.reload()
def broadcast_to_single_client(client_sock, command_string):
"""Sends a single command to a single client, used for settings."""
global seq_num
if not client_sock:
return
seq_num = (seq_num + 1) % 10000
full_packet = f"SEQ:{seq_num}:{command_string}\n".encode('utf-8')
try:
client_sock.sendall(full_packet)
except Exception as e:
pass # Errors handled by heartbeat loop
def broadcast_to_paired(command_string, no_log=False):
"""
Sends a command to ALL paired clients immediately.
This bypasses the queue and busy flags.
Used for system commands like //stop, //ping, //exit.
"""
global seq_num, all_clients, paired_clients
if not paired_clients:
if not no_log:
print("[!] No paired devices. Command not sent.")
return
seq_num = (seq_num + 1) % 10000
full_packet = f"SEQ:{seq_num}:{command_string}\n".encode('utf-8')
if no_log:
print(f"Sending (SYSTEM) to {len(paired_clients)} device(s): **** (len: {len(command_string)})")
else:
print(f"Sending (SYSTEM) to {len(paired_clients)} device(s): {command_string}")
disconnected_clients = []
for client_sock, info in paired_clients.items():
try:
client_sock.sendall(full_packet)
except (OSError, Exception) as ex:
print(f"\n[!] Paired client '{info['name']}' ({info['mac']}) disconnected (send fail): {ex}")
client_sock.close()
disconnected_clients.append(client_sock)
for client in disconnected_clients:
if client in all_clients: del all_clients[client]
if client in paired_clients: del paired_clients[client]
def send_settings_to_client(client_sock, info):
"""Sends the current (non-default) settings to a client."""
print(f"Sending current settings to '{info['name']}'...")
if global_receiver_settings["AUTO_ENTER"] == "ON":
cmd = "//auto_enter_on"
print(f" -> {cmd}")
broadcast_to_single_client(client_sock, cmd)
if global_receiver_settings["POLL_DELAY_MS"] != 10:
delay_ms = global_receiver_settings["POLL_DELAY_MS"]
cmd = f"//set_poll_delay {delay_ms}"
print(f" -> {cmd}")
broadcast_to_single_client(client_sock, cmd)
# --- NEW: Process the command buffer ---
def process_command_buffer():
"""
Sends one command from the queue to all non-busy clients.
"""
global command_send_buffer, paired_clients, g_system_halted
# Don't send if buffer is empty or system is halted
if not command_send_buffer or g_system_halted:
return
# Check if all paired clients are busy
all_busy = True
if not paired_clients:
all_busy = False # No clients, not busy
for client_sock, info in paired_clients.items():
if not info.get('busy', False):
all_busy = False
break
if all_busy:
# print("All clients busy, pausing send.")
return
# At least one client is ready. Send the next command.
command_to_send = command_send_buffer.pop(0)
# We use broadcast_to_paired, which will skip busy clients
# This is a bit of a hack: broadcast_to_paired now only sends
# to non-busy clients for *buffer* commands.
# Let's redefine broadcast_to_paired's job.
# broadcast_to_paired sends to ALL.
# We need a new function.
send_buffered_command(command_to_send)
def send_buffered_command(command_string):
"""
Sends a command from the queue to all non-busy clients.
"""
global seq_num, all_clients, paired_clients, command_send_buffer
if not paired_clients:
print("[!] No paired devices. Clearing command buffer.")
command_send_buffer.clear()
return
seq_num = (seq_num + 1) % 10000
full_packet = f"SEQ:{seq_num}:{command_string}\n".encode('utf-8')
print(f"Sending (Q) to non-busy devices: {command_string}")
disconnected_clients = []
# Check if all clients are busy
all_busy = True
for client_sock, info in paired_clients.items():
if not info.get('busy', False):
all_busy = False
break
if all_busy:
print(" (All clients are busy. Command requeued.)")
command_send_buffer.insert(0, command_string) # Put it back
return
for client_sock, info in paired_clients.items():
# If client is busy, skip it. Command will be re-sent.
if info.get('busy', False):
continue
try:
client_sock.sendall(full_packet)
except (OSError, Exception) as ex:
print(f"\n[!] Paired client '{info['name']}' ({info['mac']}) disconnected (send fail): {ex}")
client_sock.close()
disconnected_clients.append(client_sock)
for client in disconnected_clients:
if client in all_clients: del all_clients[client]
if client in paired_clients: del paired_clients[client]
# --- END NEW ---
def handle_controller_command(line):
"""Handles serial commands that start with //"""
global all_clients, paired_clients, known_paired_macs, seq_num, global_receiver_settings, command_send_buffer
parts = line.split()
if not parts:
return
command = parts[0].lower()
# --- NEW: E-Stop ---
if command == "//stop" or command == "//s":
print("[!] EMERGENCY STOP")
command_send_buffer.clear()
broadcast_to_paired("//stop") # Send high-priority stop
return
# --- END NEW ---
# --- //pair ---
if command == "//pair":
if len(parts) >= 3 and parts[1].lower() == "/d":
target = " ".join(parts[2:])
target_is_mac = ":" in target
found = False
for sock, info in all_clients.items():
if not info: continue
match = False
if target_is_mac and info['mac'] == target.upper(): match = True
elif not target_is_mac and info['name'] == target: match = True
if match:
if sock not in paired_clients:
paired_clients[sock] = info
known_paired_macs.add(info['mac'])
print(f"[+] Paired device: {info['name']} ({info['mac']})")
send_settings_to_client(sock, info)
else:
print(f"[!] Device '{info['name']}' is already paired.")
found = True
break
if not found:
print(f"[!] Device '{target}' not found or not connected.")
else:
paired_count = 0
for sock, info in all_clients.items():
if info and sock not in paired_clients:
paired_clients[sock] = info
known_paired_macs.add(info['mac'])
print(f"[+] Paired device: {info['name']} ({info['mac']})")
send_settings_to_client(sock, info)
paired_count += 1
if paired_count == 0: print("No new devices to pair.")
print(f"Total paired devices: {len(paired_clients)}")
# --- //unpair (Unchanged) ---
elif command == "//unpair":
if len(parts) >= 3 and parts[1].lower() == "/d":
target = " ".join(parts[2:])
target_is_mac = ":" in target
sock_to_remove = None
info_to_remove = None
for sock, info in paired_clients.items():
match = False
if target_is_mac and info['mac'] == target.upper(): match = True
elif not target_is_mac and info['name'] == target: match = True
if match:
sock_to_remove = sock
info_to_remove = info
break
if sock_to_remove:
del paired_clients[sock_to_remove]
if info_to_remove['mac'] in known_paired_macs:
known_paired_macs.remove(info_to_remove['mac'])
print(f"[-] Unpaired device: {info_to_remove['name']} ({info_to_remove['mac']})")
print(f" Device will NOT auto-re-pair.")
else:
print(f"[!] Paired device '{target}' not found.")
elif len(parts) == 2 and parts[1].lower() == "all":
print(f"Unpairing all {len(paired_clients)} devices...")
paired_clients.clear()
known_paired_macs.clear()
print("All devices unpaired and will NOT auto-re-pair.")
else:
print("Usage: //unpair /d <name_or_mac> OR //unpair all")
# --- //list ---
elif command == "//list":
print(f"--- {len(all_clients)} Total Client(s) ---")
if not all_clients:
print(" (No clients connected)")
else:
for sock, info in all_clients.items():
if info:
status = "PAIRED" if sock in paired_clients else "UNPAIRED"
busy_str = " (BUSY)" if info.get('busy', False) else ""
print(f" [{status:^8}] - {info['name']} (MAC: {info['mac']}){busy_str}")
else:
peer = sock.getpeername()
print(f" [PENDING] - {peer[0] if peer else 'Unknown IP'} (Waiting for handshake)")
print("[-------------------------]")
print(f"Known MACs (auto-re-pair): {list(known_paired_macs)}")
print(f"Current Settings: Auto-Enter={global_receiver_settings['AUTO_ENTER']}, Delay={global_receiver_settings['POLL_DELAY_MS']}ms")
print(f"Send Buffer Queue: {len(command_send_buffer)} commands")
# --- //help (Unchanged) ---
elif command == "//help":
helpers.print_help_file()
# --- //ping ---
elif command == "//ping":
if not paired_clients:
print("[!] No paired devices to ping.")
else:
print(f"Pinging {len(paired_clients)} paired device(s)...")
seq_num = (seq_num + 1) % 10000
ping_packet = f"SEQ:{seq_num}://ping\n".encode('utf-8')
for client_sock, info in paired_clients.items():
try:
current_time = time.monotonic()
info['ping_start_time'] = current_time
if client_sock in all_clients:
all_clients[client_sock]['ping_start_time'] = current_time
client_sock.sendall(ping_packet)
except Exception as ex:
print(f"[!] Error sending ping to {info['name']}: {ex}")
# --- //pause ---
elif command == "//pause":
print("Controller paused. Press [Enter] to resume.")
while True:
if supervisor.runtime.serial_bytes_available:
if sys.stdin.read(1) == '\n':
break
time.sleep(0.1)
print("...Resumed.")
# --- //prompt ---
elif command == "//prompt":
prompt_text = "Input: "
if len(parts) > 2 and parts[1].lower() == "/p":
prompt_text = " ".join(parts[2:]) + ": "
print(prompt_text, end="")
user_input = sys.stdin.readline().strip()
command_send_buffer.append(user_input) # Add to queue
print(f"Queued: **** (len: {len(user_input)})")
# --- //script ---
elif command == "//script":
if len(parts) < 2:
print(" (Error: Usage: //script <filename>)")
else:
filename = parts[1]
try:
with open(filename, "r") as f:
count = 0
for script_line in f:
command_send_buffer.append(script_line.strip())
count += 1
print(f"Queued {count} lines from {filename}.")
except OSError:
print(f" (Error: File not found: {filename})")
except Exception as ex:
print(f" (Error running script: {ex})")
# --- //exit (Unchanged) ---
elif command == "//exit":
print("Sending disconnect to clients and halting AP...")
broadcast_to_paired("//exit")
time.sleep(1)
wifi.radio.stop_ap()
print("AP stopped. Controller is halting.")
sys.exit()
# --- Setting Commands ---
elif command == "//auto_enter_on":
print("Setting Auto-Enter to ON for all paired devices.")
global_receiver_settings["AUTO_ENTER"] = "ON"
broadcast_to_paired("//auto_enter_on") # Send high-priority
elif command == "//auto_enter_off":
print("Setting Auto-Enter to OFF for all paired devices.")
global_receiver_settings["AUTO_ENTER"] = "OFF"
broadcast_to_paired("//auto_enter_off") # Send high-priority
elif command == "//set_delay":
if len(parts) < 2:
print("Usage: //set_delay <ms> (e.g., //set_delay 20)")
return
try:
delay_ms = int(parts[1])
if delay_ms < 1: delay_ms = 1
global_receiver_settings["POLL_DELAY_MS"] = delay_ms
print(f"Setting Poll Delay to {delay_ms}ms for all paired devices.")
broadcast_to_paired(f"//set_poll_delay {delay_ms}") # Send high-priority
except ValueError:
print("Invalid delay. Must be a number (in ms).")
else:
# This is a system command we don't recognize
# We will send it high-priority
broadcast_to_paired(line)
def check_for_new_clients():
"""Accepts new clients. Runs all the time."""
global server
try:
client_sock, client_addr = server.accept()
client_sock.setblocking(False)
all_clients[client_sock] = {'read_buffer': "", 'busy': False} # Temp dict
print(f"\n[+] New connection from: {client_addr[0]}. Waiting for handshake...")
except OSError as ex:
if ex.errno == errno.EAGAIN:
pass # No new client, normal
else:
print(f"Discovery accept error: {ex}")
def check_for_incoming_data():
"""Reads data from all clients (handshakes, triggers, pongs)."""
global all_clients, paired_clients, known_paired_macs
disconnected_clients = []
for client_sock, current_info in list(all_clients.items()):
try:
read_len = client_sock.recv_into(client_read_buffer, 256)
if read_len > 0:
data = client_read_buffer[:read_len].decode('utf-8')
if 'read_buffer' not in current_info:
current_info['read_buffer'] = ""
current_info['read_buffer'] += data
while '\n' in current_info['read_buffer']:
line, current_info['read_buffer'] = current_info['read_buffer'].split('\n', 1)
line = line.strip()
if not line:
continue
current_info = all_clients[client_sock]
# --- Handshake Parsing ---
if line.startswith("HANDSHAKE:"):
try:
parts = line.split(':')
client_name = parts[1].split('=')[1]
client_mac = parts[2].split('=')[1]
existing_sock_to_prune = None
for sock, info in all_clients.items():
if info and 'mac' in info and info['mac'] == client_mac and sock != client_sock:
existing_sock_to_prune = sock
break
if existing_sock_to_prune:
print(f"[!] Device '{client_name}' reconnected, pruning old socket.")
old_info = all_clients.pop(existing_sock_to_prune, {})
if existing_sock_to_prune in paired_clients: del paired_clients[existing_sock_to_prune]
existing_sock_to_prune.close()
# Carry over busy status
current_info['busy'] = old_info.get('busy', False)
current_info['name'] = client_name
current_info['mac'] = client_mac
if client_mac in known_paired_macs:
paired_clients[client_sock] = current_info
print(f"\n[+] Client '{client_name}' ({client_mac}) reconnected and auto-paired.")
send_settings_to_client(client_sock, current_info)
else:
print(f"\n[+] New device, '{client_name}' ({client_mac}) connected, not paired.")
print(f" Type '//pair' or '//pair /d {client_name}' to pair.")
except Exception as e:
print(f"Malformed handshake: {line} ({e})")
# --- NEW: Flow Control ---
elif line.startswith("TRIGGER:BUSY:ON"):
if current_info and 'name' in current_info:
print(f"[!] Client '{current_info['name']}' is busy. Pausing sends to it.")
current_info['busy'] = True
elif line.startswith("TRIGGER:BUSY:OFF"):
if current_info and 'name' in current_info:
print(f"[+] Client '{current_info['name']}' is ready. Resuming sends.")
current_info['busy'] = False
# --- END NEW ---
elif line.startswith("TRIGGER:"):
trigger_msg = line.split(':',1)[1]
if current_info and 'name' in current_info:
print(f"\n[TRIGGER from {current_info['name']}]: {trigger_msg}")
elif line.startswith("PONG:"):
pong_msg = line.split(':',1)[1]
name = current_info.get('name', "Unknown")
if current_info and 'ping_start_time' in current_info:
end_time = time.monotonic()
start_time = current_info.pop('ping_start_time')
latency_ms = (end_time - start_time) * 1000
print(f"\n[PONG from {name}]: {pong_msg} (Latency: {latency_ms:.0f} ms)")
if client_sock in paired_clients:
paired_clients[client_sock].pop('ping_start_time', None)
else:
print(f"\n[PONG from {name}]: {pong_msg}")
elif read_len == 0:
raise OSError(0, "Client closed connection")
except OSError as ex:
if ex.errno == errno.EAGAIN:
pass # No data, normal
else:
info = all_clients.get(client_sock)
name = info.get('name', "Unknown") if info else "Unknown"
print(f"\n[!] Client '{name}' disconnected (read fail): {ex}")
client_sock.close()
disconnected_clients.append(client_sock)
for client in disconnected_clients:
if client in all_clients: del all_clients[client]
if client in paired_clients: del paired_clients[client]
# --- 3. MAIN LOOP (Modified) ---
start_server()
try:
while True:
check_for_new_clients()
check_for_incoming_data()
# --- NEW: Process command buffer ---
if not g_system_halted:
process_command_buffer()
# --- END NEW ---
if supervisor.runtime.serial_bytes_available:
num_bytes_available = supervisor.runtime.serial_bytes_available
if num_bytes_available > 0:
data_chunk = sys.stdin.read(num_bytes_available)
if data_chunk:
for char in data_chunk:
if char == '\n':
sys.stdout.write('\n')
# --- NEW: Handle system halt input ---
if g_system_halted:
if serial_input_buffer.lower() == 'continue':
print("...Resuming sends.")
g_system_halted = False
elif serial_input_buffer.lower() == 'stop':
print("...Clearing buffer and resuming.")
command_send_buffer.clear()
g_system_halted = False
else:
print("[!] Heartbeat fail. Type 'continue' to resume or 'stop' to clear queue.")
serial_input_buffer = ""
continue
# --- END NEW ---
line_to_check = serial_input_buffer.strip()
if line_to_check.startswith("//"):
handle_controller_command(line_to_check)
elif serial_input_buffer:
# Add text to the queue
command_send_buffer.append(serial_input_buffer)
print(f"Queued: {serial_input_buffer}")
else:
# Empty line, queue an //enter
command_send_buffer.append("//enter")
print("Queued: //enter")
serial_input_buffer = ""
elif char in ('\b', '\x7f'):
if len(serial_input_buffer) > 0:
serial_input_buffer = serial_input_buffer[:-1]
sys.stdout.write('\b \b')
elif char:
if len(serial_input_buffer) < 5120:
serial_input_buffer += char
sys.stdout.write(char)
if (time.monotonic() - last_heartbeat_send_time) > config.HEARTBEAT_INTERVAL:
last_heartbeat_send_time = time.monotonic()
seq_num = (seq_num + 1) % 10000
heartbeat_cmd = f"SEQ:{seq_num}://heartbeat\n".encode('utf-8')
disconnected_clients = []
for client_sock, info in list(all_clients.items()):
try:
client_sock.sendall(heartbeat_cmd)
if info and 'ping_start_time' in info:
ping_age = time.monotonic() - info['ping_start_time']
if ping_age > 5.0:
name = info.get('name', "Unknown")
print(f"\n[!] Ping timeout for {name}")
info.pop('ping_start_time')
if client_sock in paired_clients:
paired_clients[client_sock].pop('ping_start_time', None)
except (OSError, Exception) as ex:
name = info.get('name', "Unknown") if info else "Unknown"
print(f"\n[!] Heartbeat fail for {name}. Pausing all sends.")
print("[!] Type 'continue' to resume or 'stop' to clear queue.")
g_system_halted = True # <-- NEW
client_sock.close()
disconnected_clients.append(client_sock)
for client in disconnected_clients:
if client in all_clients: del all_clients[client]
if client in paired_clients: del paired_clients[client]
time.sleep(0.01)
# --- Critical Error Fallback (unchanged) ---
except Exception as main_exception:
print("\n" * 5)
print("=" * 40)
print(f"CRITICAL ERROR in Controller main loop: {main_exception}")
print("=" * 40)
if all_clients:
print("Attempting to send emergency disconnect to all peers...")
disconnect_msg = f"SEQ:9999://exit\n".encode('utf-8')
for client_sock in all_clients:
try: client_sock.sendall(disconnect_msg)
except: pass
print("Controller is rebooting in 10s...")
time.sleep(10)
supervisor.reload()