add Sumorobot class
This commit is contained in:
parent
62287bc428
commit
ad4e753232
259
hal.py
259
hal.py
@ -1,7 +1,7 @@
|
||||
from utime import sleep_us
|
||||
from utime import sleep_us, sleep_ms
|
||||
from machine import Pin, PWM, ADC, time_pulse_us
|
||||
|
||||
#LEDs
|
||||
# LEDs
|
||||
ENEMY = 0
|
||||
STATUS = 1
|
||||
LEFT_LINE = 2
|
||||
@ -14,6 +14,9 @@ RIGHT = 2
|
||||
FORWARD = 3
|
||||
BACKWARD = 4
|
||||
|
||||
# blockl highlight
|
||||
BLOCK_HIGHLIGHT = True
|
||||
|
||||
# Battery resistor ladder ratio
|
||||
BATTERY_COEFF = 2.25
|
||||
|
||||
@ -24,113 +27,191 @@ ULTRASONIC_OFFSET = 800
|
||||
MOTOR_LEFT_TUNING = 33
|
||||
MOTOR_RIGHT_TUNING = 33
|
||||
|
||||
# Ultrasonic distance sensor
|
||||
echo = Pin(14, Pin.IN)
|
||||
trigger = Pin(27, Pin.OUT)
|
||||
|
||||
# Motor PWM-s
|
||||
pwm_left = PWM(Pin(15), freq=50, duty=0)
|
||||
pwm_right = PWM(Pin(4), freq=50, duty=0)
|
||||
|
||||
# bottom LED
|
||||
bottom_led = Pin(5, Pin.OUT)
|
||||
# bottom LED is in reverse polarity
|
||||
bottom_led.value(1)
|
||||
# sensor LEDs
|
||||
enemy_led = Pin(16, Pin.OUT)
|
||||
left_line_led = Pin(17, Pin.OUT)
|
||||
right_line_led = Pin(12, Pin.OUT)
|
||||
|
||||
# Battery gauge
|
||||
adc_battery = ADC(Pin(32))
|
||||
|
||||
# Optek sensors
|
||||
adc_line_left = ADC(Pin(34))
|
||||
adc_line_right = ADC(Pin(33))
|
||||
|
||||
# Set reference voltage to 3.3V
|
||||
adc_battery.atten(ADC.ATTN_11DB)
|
||||
adc_line_left.atten(ADC.ATTN_11DB)
|
||||
adc_line_right.atten(ADC.ATTN_11DB)
|
||||
|
||||
# Calibrate line sensors
|
||||
LINE_LEFT_THRESHOLD = adc_line_left.read()
|
||||
LINE_RIGHT_THRESHOLD = adc_line_right.read()
|
||||
LINE_LEFT_THRESHOLD = 1000
|
||||
LINE_RIGHT_THRESHOLD = 1000
|
||||
|
||||
def set_led(led, state):
|
||||
class Sumorobot(object):
|
||||
|
||||
# Ultrasonic distance sensor
|
||||
echo = Pin(14, Pin.IN)
|
||||
trigger = Pin(27, Pin.OUT)
|
||||
|
||||
# Motor PWM-s
|
||||
pwm_left = PWM(Pin(15), freq=50, duty=0)
|
||||
pwm_right = PWM(Pin(4), freq=50, duty=0)
|
||||
|
||||
# bottom LED
|
||||
bottom_led = Pin(5, Pin.OUT)
|
||||
# bottom LED is in reverse polarity
|
||||
bottom_led.value(1)
|
||||
# sensor LEDs
|
||||
enemy_led = Pin(16, Pin.OUT)
|
||||
left_line_led = Pin(17, Pin.OUT)
|
||||
right_line_led = Pin(12, Pin.OUT)
|
||||
|
||||
# Battery gauge
|
||||
adc_battery = ADC(Pin(32))
|
||||
|
||||
# Optek sensors
|
||||
adc_line_left = ADC(Pin(34))
|
||||
adc_line_right = ADC(Pin(33))
|
||||
|
||||
# Set reference voltage to 3.3V
|
||||
adc_battery.atten(ADC.ATTN_11DB)
|
||||
adc_line_left.atten(ADC.ATTN_11DB)
|
||||
adc_line_right.atten(ADC.ATTN_11DB)
|
||||
|
||||
# for highlighting blocks
|
||||
highlight_block = None
|
||||
|
||||
# for terminating sleep
|
||||
terminate = False
|
||||
|
||||
def __init__(self, highlight_block):
|
||||
self.highlight_block = highlight_block
|
||||
|
||||
def set_led(self, led, state):
|
||||
if led == STATUS:
|
||||
bottom_led.value(0 if state else 1)
|
||||
self.bottom_led.value(0 if state else 1)
|
||||
elif led == ENEMY:
|
||||
enemy_led.value(state)
|
||||
self.enemy_led.value(state)
|
||||
elif led == LEFT_LINE:
|
||||
left_line_led.value(state)
|
||||
self.left_line_led.value(state)
|
||||
elif led == RIGHT_LINE:
|
||||
right_line_led.value(state)
|
||||
self.right_line_led.value(state)
|
||||
|
||||
def get_battery_voltage():
|
||||
return round(BATTERY_COEFF * (adc_battery.read() * 3.3 / 4096), 2)
|
||||
def get_battery_voltage(self):
|
||||
return round(BATTERY_COEFF * (self.adc_battery.read() * 3.3 / 4096), 2)
|
||||
|
||||
enemy_score = 0
|
||||
def enemy_distance():
|
||||
global enemy_score
|
||||
|
||||
trigger.value(0)
|
||||
def get_enemy_distance(self):
|
||||
# send a pulse
|
||||
self.trigger.value(0)
|
||||
sleep_us(5)
|
||||
trigger.value(1)
|
||||
self.trigger.value(1)
|
||||
sleep_us(10)
|
||||
trigger.value(0)
|
||||
self.trigger.value(0)
|
||||
# wait for the pulse and calculate the distance
|
||||
enemy_distance = (time_pulse_us(echo, 1, 30000) / 2) / 29.1
|
||||
return (time_pulse_us(self.echo, 1, 30000) / 2) / 29.1
|
||||
|
||||
if enemy_distance < 60 and enemy_distance > 0:
|
||||
if enemy_score < 5:
|
||||
enemy_score += 1
|
||||
enemy_score = 0
|
||||
def is_enemy(self, block_id = None):
|
||||
if block_id and BLOCK_HIGHLIGHT:
|
||||
self.highlight_block(block_id)
|
||||
|
||||
# get the enemy distance
|
||||
self.enemy_distance = self.get_enemy_distance()
|
||||
# if the enemy is close and the ping actually returned
|
||||
if self.enemy_distance < 60 and self.enemy_distance > 0:
|
||||
# if not maximum score
|
||||
if self.enemy_score < 5:
|
||||
# increase the enemy score
|
||||
self.enemy_score += 1
|
||||
# if no enemy was detected
|
||||
else:
|
||||
if enemy_score > 0:
|
||||
enemy_score -= 1
|
||||
# if not lowest score
|
||||
if self.enemy_score > 0:
|
||||
# decrease the enemy score
|
||||
self.enemy_score -= 1
|
||||
|
||||
# indicate enemy LED
|
||||
set_led(ENEMY, True if enemy_score > 2 else False)
|
||||
# if the sensor saw something more than 2 times
|
||||
enemy = True if self.enemy_score > 2 else False
|
||||
|
||||
return True if enemy_score > 2 else False
|
||||
# trigger enemy LED
|
||||
self.set_led(ENEMY, enemy)
|
||||
|
||||
def line_left():
|
||||
line = abs(adc_line_left.read() - LINE_LEFT_THRESHOLD) > 1000
|
||||
set_led(LEFT_LINE, line)
|
||||
return enemy
|
||||
|
||||
def get_line(self, dir):
|
||||
# check for valid direction
|
||||
assert dir == LEFT or dir == RIGHT
|
||||
|
||||
if dir == LEFT:
|
||||
return self.adc_line_left.read()
|
||||
elif dir == RIGHT:
|
||||
return self.adc_line_right.read()
|
||||
|
||||
def is_line(self, dir, block_id = None):
|
||||
# check for valid direction
|
||||
assert dir == LEFT or dir == RIGHT
|
||||
|
||||
# if block_id given and BLOCK_HIGHLIGHT
|
||||
if block_id and BLOCK_HIGHLIGHT:
|
||||
self.highlight_block(block_id)
|
||||
|
||||
if dir == LEFT:
|
||||
line = abs(self.adc_line_left.read() - LINE_LEFT_THRESHOLD) > 1000
|
||||
self.set_led(LEFT_LINE, line)
|
||||
return line
|
||||
elif dir == RIGHT:
|
||||
line = abs(self.adc_line_right.read() - LINE_RIGHT_THRESHOLD) > 1000
|
||||
self.set_led(RIGHT_LINE, line)
|
||||
return line
|
||||
|
||||
def line_right():
|
||||
line = abs(adc_line_right.read() - LINE_RIGHT_THRESHOLD) > 1000
|
||||
set_led(RIGHT_LINE, line)
|
||||
return line
|
||||
def detach_servos(self):
|
||||
self.set_servo(LEFT, 0)
|
||||
self.set_servo(RIGHT, 0)
|
||||
|
||||
def detach_servos():
|
||||
motor_left(0)
|
||||
motor_right(0)
|
||||
prev_speed = {LEFT: 0, RIGHT: 0}
|
||||
def set_servo(self, dir, speed):
|
||||
# check for valid direction
|
||||
assert dir == LEFT or dir == RIGHT
|
||||
# check for valid speed
|
||||
assert speed <= 100 and speed >= -100
|
||||
|
||||
prev_left_speed = 0
|
||||
def motor_left(speed):
|
||||
global prev_left_speed
|
||||
if speed == prev_left_speed:
|
||||
# when the speed didn't change
|
||||
if speed == self.prev_speed[dir]:
|
||||
return
|
||||
prev_left_speed = speed
|
||||
assert speed >= -100
|
||||
assert speed <= 100
|
||||
pwm_left.duty(int(33 + MOTOR_LEFT_TUNING + speed * 33 / 100)) # -100 ... 100 to 33 .. 102
|
||||
if speed == 0:
|
||||
pwm_left.duty(0)
|
||||
|
||||
prev_right_speed = 0
|
||||
def motor_right(speed):
|
||||
global prev_right_speed
|
||||
if speed == prev_right_speed:
|
||||
return
|
||||
prev_right_speed = speed
|
||||
assert speed >= -100
|
||||
assert speed <= 100
|
||||
pwm_right.duty(int(33 + MOTOR_RIGHT_TUNING + speed * 33 / 100)) # -100 ... 100 to 33 .. 102
|
||||
if speed == 0:
|
||||
pwm_right.duty(0)
|
||||
# record the new speed
|
||||
self.prev_speed[dir] = speed
|
||||
|
||||
print("Battery voltage: %.2fV" % get_battery_voltage())
|
||||
print("Line sensor thresholds:", LINE_LEFT_THRESHOLD, LINE_RIGHT_THRESHOLD)
|
||||
if dir == LEFT:
|
||||
if speed == 0:
|
||||
self.pwm_left.duty(0)
|
||||
else:
|
||||
self.pwm_left.duty(int(33 + MOTOR_LEFT_TUNING + speed * 33 / 100)) # -100 ... 100 to 33 .. 102
|
||||
elif dir == RIGHT:
|
||||
if speed == 0:
|
||||
self.pwm_right.duty(0)
|
||||
else:
|
||||
self.pwm_right.duty(int(33 + MOTOR_RIGHT_TUNING + speed * 33 / 100)) # -100 ... 100 to 33 .. 102
|
||||
|
||||
|
||||
def move(self, dir, block_id = None):
|
||||
# check for valid direction
|
||||
assert dir == STOP or dir == RIGHT or dir == LEFT or dir == BACKWARD or dir == FORWARD
|
||||
|
||||
# if block_id given and BLOCK_HIGHLIGHT
|
||||
if block_id and BLOCK_HIGHLIGHT:
|
||||
self.highlight_block(block_id)
|
||||
|
||||
if dir == STOP:
|
||||
self.set_servo(LEFT, 0)
|
||||
self.set_servo(RIGHT, 0)
|
||||
elif dir == LEFT:
|
||||
self.set_servo(LEFT, -100)
|
||||
self.set_servo(RIGHT, -100)
|
||||
elif dir == RIGHT:
|
||||
self.set_servo(LEFT, 100)
|
||||
self.set_servo(RIGHT, 100)
|
||||
elif dir == FORWARD:
|
||||
self.set_servo(LEFT, 100)
|
||||
self.set_servo(RIGHT, -100)
|
||||
elif dir == BACKWARD:
|
||||
self.set_servo(LEFT, -100)
|
||||
self.set_servo(RIGHT, 100)
|
||||
|
||||
def sleep(self, delay, block_id = None):
|
||||
# check for valid delay
|
||||
assert delay > 0
|
||||
|
||||
# if block_id given and BLOCK_HIGHLIGHT
|
||||
if block_id and BLOCK_HIGHLIGHT:
|
||||
self.highlight_block(block_id)
|
||||
|
||||
for j in range(0, delay, 50):
|
||||
if self.terminate:
|
||||
return # TODO: raise exception
|
||||
else:
|
||||
sleep_ms(50)
|
||||
|
158
main.py
158
main.py
@ -1,7 +1,6 @@
|
||||
import _thread
|
||||
import ubinascii
|
||||
from utime import sleep_ms
|
||||
from machine import Pin, PWM, Timer, UART
|
||||
from machine import Pin, PWM, Timer
|
||||
|
||||
conn = None
|
||||
conn_highlight = None
|
||||
@ -16,169 +15,69 @@ url_highlight = "ws://iot.koodur.com:80/p2p/" + name + "-highlight/browser/"
|
||||
#url_highlight = "ws://10.42.0.1:80/p2p/" + name + "-highlight/browser/"
|
||||
|
||||
scope = dict()
|
||||
# send blocks IDs for highlighting
|
||||
no_delay = False
|
||||
highlight = True
|
||||
ast = "move(STOP, '')"
|
||||
|
||||
def is_enemy(block_id):
|
||||
if block_id != "" and highlight:
|
||||
conn_highlight.send(block_id)
|
||||
return enemy_distance()
|
||||
|
||||
def is_line(dir, block_id):
|
||||
if block_id != "" and highlight:
|
||||
conn_highlight.send(block_id)
|
||||
if dir == LEFT:
|
||||
return line_left()
|
||||
elif dir == RIGHT:
|
||||
return line_right()
|
||||
return False
|
||||
|
||||
def move(dir, block_id):
|
||||
if block_id != "" and highlight:
|
||||
conn_highlight.send(block_id)
|
||||
if dir == STOP:
|
||||
motor_left(0)
|
||||
motor_right(0)
|
||||
elif dir == LEFT:
|
||||
motor_left(-100)
|
||||
motor_right(-100)
|
||||
elif dir == RIGHT:
|
||||
motor_left(100)
|
||||
motor_right(100)
|
||||
elif dir == FORWARD:
|
||||
motor_left(100)
|
||||
motor_right(-100)
|
||||
elif dir == BACKWARD:
|
||||
motor_left(-100)
|
||||
motor_right(100)
|
||||
|
||||
def move2(dir, block_id):
|
||||
if block_id != "" and highlight:
|
||||
conn_highlight.send(block_id)
|
||||
if dir == STOP:
|
||||
motor_left(0)
|
||||
motor_right(0)
|
||||
elif dir == RIGHT:
|
||||
motor_left(-100)
|
||||
motor_right(-100)
|
||||
elif dir == LEFT:
|
||||
motor_left(100)
|
||||
motor_right(100)
|
||||
elif dir == BACKWARD:
|
||||
motor_left(100)
|
||||
motor_right(-100)
|
||||
elif dir == FORWARD:
|
||||
motor_left(-100)
|
||||
motor_right(100)
|
||||
|
||||
def sleep_wrapper(delay, block_id):
|
||||
if block_id != "" and highlight:
|
||||
conn_highlight.send(block_id)
|
||||
for j in range(0, delay, 10):
|
||||
if ast == "move(STOP, '')":
|
||||
return # TODO: raise exception
|
||||
else:
|
||||
sleep_ms(10)
|
||||
|
||||
exported_functions = dict(
|
||||
STOP = STOP,
|
||||
LEFT = LEFT,
|
||||
RIGHT = RIGHT,
|
||||
FORWARD = FORWARD,
|
||||
BACKWARD = BACKWARD,
|
||||
move = move,
|
||||
is_line = is_line,
|
||||
is_enemy = is_enemy,
|
||||
sleep = sleep_wrapper,
|
||||
motor_left = motor_left,
|
||||
motor_right = motor_right
|
||||
)
|
||||
|
||||
exported_functions2 = dict(
|
||||
STOP = STOP,
|
||||
LEFT = LEFT,
|
||||
RIGHT = RIGHT,
|
||||
FORWARD = FORWARD,
|
||||
BACKWARD = BACKWARD,
|
||||
move = move2,
|
||||
is_line = is_line,
|
||||
is_enemy = is_enemy,
|
||||
sleep = sleep_wrapper,
|
||||
motor_left = motor_left,
|
||||
motor_right = motor_right
|
||||
)
|
||||
sumorobot = None
|
||||
ast = "sumorobot.move(STOP)"
|
||||
|
||||
def step():
|
||||
global scope
|
||||
|
||||
shuffle = False
|
||||
while True:
|
||||
# update scope
|
||||
scope = dict(
|
||||
enemy = is_enemy(""),
|
||||
line_left = is_line(LEFT, ""),
|
||||
line_right = is_line(RIGHT, ""),
|
||||
battery_voltage = get_battery_voltage(),
|
||||
enemy = sumorobot.is_enemy(),
|
||||
line_left = sumorobot.is_line(LEFT),
|
||||
line_right = sumorobot.is_line(RIGHT),
|
||||
battery_voltage = sumorobot.get_battery_voltage(),
|
||||
)
|
||||
if no_delay:
|
||||
if shuffle:
|
||||
current = exported_functions.copy()
|
||||
else:
|
||||
current = exported_functions2.copy()
|
||||
shuffle = not shuffle
|
||||
else:
|
||||
current = exported_functions.copy()
|
||||
current.update(scope)
|
||||
exec(ast, current)
|
||||
# execute code
|
||||
exec(ast)
|
||||
if sumorobot.terminate:
|
||||
# disable forceful termination
|
||||
sumorobot.terminate = False
|
||||
sumorobot.move(STOP)
|
||||
# leave time to process WebSocket commands
|
||||
sleep_ms(50)
|
||||
|
||||
def ws_handler():
|
||||
global ast
|
||||
global conn
|
||||
global scope
|
||||
global no_delay
|
||||
|
||||
uart = UART(2, baudrate=115200, rx=34, tx=35, timeout=1)
|
||||
uart.write("uart works, yee!\n")
|
||||
while True:
|
||||
try:
|
||||
fin, opcode, data = conn.read_frame()
|
||||
cmd = uart.readline()
|
||||
except: # urror
|
||||
print("Exception while reading from socket, attempting reconnect")
|
||||
conn = uwebsockets.connect(url)
|
||||
continue
|
||||
|
||||
if cmd:
|
||||
print("command:", cmd)
|
||||
|
||||
if data == b"forward":
|
||||
#print("Going forward")
|
||||
ast = "move(FORWARD, '')"
|
||||
ast = ""
|
||||
sumorobot.move(FORWARD)
|
||||
elif data == b"backward":
|
||||
#print("Going backward")
|
||||
ast = "move(BACKWARD, '')"
|
||||
ast = ""
|
||||
sumorobot.move(BACKWARD)
|
||||
elif data == b"right":
|
||||
#print("Going right")
|
||||
ast = "move(RIGHT, '')"
|
||||
ast = ""
|
||||
sumorobot.move(RIGHT)
|
||||
elif data == b"left":
|
||||
#print("Going left")
|
||||
ast = "move(LEFT, '')"
|
||||
ast = ""
|
||||
sumorobot.move(LEFT)
|
||||
elif data == b"kick":
|
||||
conn.send(repr(scope))
|
||||
elif data == b"ping":
|
||||
conn.send(repr(scope))
|
||||
elif data.startswith("start:"):
|
||||
print("Got code:")#, data[6:])
|
||||
if b"sleep" in data or b"if" in data or len(data.splitlines()) == 1:
|
||||
no_delay = False
|
||||
else:
|
||||
no_delay = True
|
||||
print("Got code")#, data[6:])
|
||||
ast = compile(data[6:], "snippet", "exec")
|
||||
elif data == b"stop":
|
||||
ast = "move(STOP, '')"
|
||||
ast = ""
|
||||
sumorobot.move(STOP)
|
||||
sumorobot.terminate = True
|
||||
print("Got stop")
|
||||
elif b"Gone" in data:
|
||||
print("Server said 410 Gone, attempting to reconnect...")
|
||||
@ -202,8 +101,11 @@ conn.send("{'ip': '" + wlan.ifconfig()[0] + "'}")
|
||||
# connect to the block highlight websocket
|
||||
conn_highlight = uwebsockets.connect(url_highlight)
|
||||
|
||||
# initialize SumoRobot object
|
||||
sumorobot = Sumorobot(conn_highlight.send)
|
||||
|
||||
# indicate that the WebSocket is connected
|
||||
set_led(STATUS, True)
|
||||
sumorobot.set_led(STATUS, True)
|
||||
|
||||
print("Starting WebSocket and code loop")
|
||||
# start the code processing thread
|
||||
|
Loading…
Reference in New Issue
Block a user