sumorobot-firmware/main.py

180 lines
5.4 KiB
Python

import _thread
import ubinascii
import ujson
import uwebsockets
import os
from debounce import DebouncedSwitch
# Code to execute
ast = ""
executeCode = False
# Scope, info to be sent to the client
scope = dict()
def buttoncallback(p=True):
global executeCode
sumorobot.terminate = executeCode
sumorobot.led_power.value(not executeCode)
sleep_ms(50)
executeCode = not executeCode
print(executeCode)
def writeCodeTofile(data):
with open("code.part", "w") as code_file:
code_file.write(ujson.dumps(data))
sleep_ms(50)
os.rename("code.part", "code")
sleep_ms(50)
def step():
global scope
while True:
# Update scope
scope = dict(
line_left = sumorobot.line_left,
line_right = sumorobot.line_right,
line_middle = sumorobot.line_middle,
opponent = sumorobot.get_opponent_distance(),
battery_voltage = 0,
state = executeCode
)
# Execute code
if(executeCode):
exec(ast)
#sumorobot.playStop.irq(trigger=Pin.IRQ_RISING, handler=buttoncallback)
sw = DebouncedSwitch(sumorobot.playStop, buttoncallback, "dummy")
# When robot was stopped
if sumorobot.terminate:
# Disable forceful termination of delays in code
sumorobot.terminate = False
# Stop the robot
sumorobot.move(STOP)
# Leave time to process WebSocket commands
sleep_ms(50)
def ws_handler():
global executeCode
global ast
global has_wifi_connection
while True:
# When WiFi has just been reconnected
if wlan.isconnected() and not has_wifi_connection:
#conn = uwebsockets.connect(url)
#sumorobot.set_led(STATUS, True)
has_wifi_connection = True
# When WiFi has just been disconnected
elif not wlan.isconnected() and has_wifi_connection:
#sumorobot.set_led(STATUS, False)
has_wifi_connection = False
elif not wlan.isconnected():
# Continue to wait for a WiFi connection
continue
try: # Try to read from the WebSocket
data = conn.recv()
except Exception as e: # Socket timeout, no data received
# Continue to try to read data
#print(e)
#if not conn.open:
#conn = uwebsockets.connect(url)
continue
# When an empty frame was received
if not data:
# Continue to receive data
continue
elif b'forward' in data:
ast = ""
sumorobot.move(FORWARD)
elif b'backward' in data:
ast = ""
sumorobot.move(BACKWARD)
elif b'right' in data:
ast = ""
sumorobot.move(RIGHT)
elif b'left' in data:
ast = ""
sumorobot.move(LEFT)
elif b'ping' in data:
#conn.send(ujson.dumps({"cmd":"sensord", "data":ujson.dumps(scope)}))
conn.send(ujson.dumps({"cmd":"sensors", "data":ujson.loads(repr(scope).replace("'", '"').replace("False","false").replace("True","true"))}))
elif b'code' in data:
executeCode = False
try:
data = ujson.loads(data)
writeCodeTofile(data['val'])
conn.send(ujson.dumps({"cmd":"code_upload", "status":True}))
except Exception as e:
conn.send(ujson.dumps({"cmd":"code_upload", "status":False}))
continue
data['val'] = data['val'].replace(";;", "\n")
print(data['val'])
ast = compile(data['val'], "snippet", "exec")
elif b'start' in data:
#buttoncallback()
executeCode = True
sumorobot.led_power.value(1)
sleep_ms(50)
data = ujson.loads(data)
writeCodeTofile(data['val'])
data['val'] = data['val'].replace(";;", "\n")
ast = compile(data['val'], "snippet", "exec")
elif b'stop' in data:
#ast = ""
sumorobot.led_power.value(0)
executeCode = False
sumorobot.move(STOP)
# for terminating delays in code
sumorobot.terminate = True
elif b'calibrate_line' in data:
sumorobot.led_power.value(1)
sleep_ms(50)
sumorobot.calibrate_line()
sumorobot.led_power.value(0)
elif b'Gone' in data:
print("server said 410 Gone, attempting to reconnect...")
#conn = uwebsockets.connect(url)
else:
print("unknown cmd:", data)
# Wait for WiFi to get connected
while not wlan.isconnected():
sleep_ms(100)
# Connect to the websocket
url = "ws://%s/p2p/sumo-%s/browser/" % (config['sumo_server'], config['sumo_id'])
conn = uwebsockets.connect(url)
# Set X seconds timeout for socket reads
conn.settimeout(3)
# Stop bootup blinking
#timer.deinit()
# WiFi is connected
has_wifi_connection = True
# Indicate that the WebSocket is connected
#sumorobot.set_led(STATUS, True)
if('code' in os.listdir()):
with open("code", "r") as code_file:
data = ujson.load(code_file).replace(";;", "\n")
ast = compile(data, "snippet", "exec")
#print(ast)
# Start the code processing thread
_thread.start_new_thread(step, ())
# Start the Websocket processing thread
_thread.start_new_thread(ws_handler, ())