From ad4e7532321b141de6c7f3186c5bfe766e4547b8 Mon Sep 17 00:00:00 2001 From: Silver Kuusik Date: Thu, 11 Jan 2018 19:35:09 +0100 Subject: [PATCH] add Sumorobot class --- hal.py | 277 ++++++++++++++++++++++++++++++++++++-------------------- main.py | 158 ++++++-------------------------- 2 files changed, 209 insertions(+), 226 deletions(-) diff --git a/hal.py b/hal.py index 89ee6c1..3080ec1 100644 --- a/hal.py +++ b/hal.py @@ -1,7 +1,7 @@ -from utime import sleep_us +from utime import sleep_us, sleep_ms from machine import Pin, PWM, ADC, time_pulse_us -#LEDs +# LEDs ENEMY = 0 STATUS = 1 LEFT_LINE = 2 @@ -14,6 +14,9 @@ RIGHT = 2 FORWARD = 3 BACKWARD = 4 +# blockl highlight +BLOCK_HIGHLIGHT = True + # Battery resistor ladder ratio BATTERY_COEFF = 2.25 @@ -24,113 +27,191 @@ ULTRASONIC_OFFSET = 800 MOTOR_LEFT_TUNING = 33 MOTOR_RIGHT_TUNING = 33 -# Ultrasonic distance sensor -echo = Pin(14, Pin.IN) -trigger = Pin(27, Pin.OUT) - -# Motor PWM-s -pwm_left = PWM(Pin(15), freq=50, duty=0) -pwm_right = PWM(Pin(4), freq=50, duty=0) - -# bottom LED -bottom_led = Pin(5, Pin.OUT) -# bottom LED is in reverse polarity -bottom_led.value(1) -# sensor LEDs -enemy_led = Pin(16, Pin.OUT) -left_line_led = Pin(17, Pin.OUT) -right_line_led = Pin(12, Pin.OUT) - -# Battery gauge -adc_battery = ADC(Pin(32)) - -# Optek sensors -adc_line_left = ADC(Pin(34)) -adc_line_right = ADC(Pin(33)) - -# Set reference voltage to 3.3V -adc_battery.atten(ADC.ATTN_11DB) -adc_line_left.atten(ADC.ATTN_11DB) -adc_line_right.atten(ADC.ATTN_11DB) - # Calibrate line sensors -LINE_LEFT_THRESHOLD = adc_line_left.read() -LINE_RIGHT_THRESHOLD = adc_line_right.read() +LINE_LEFT_THRESHOLD = 1000 +LINE_RIGHT_THRESHOLD = 1000 -def set_led(led, state): - if led == STATUS: - bottom_led.value(0 if state else 1) - elif led == ENEMY: - enemy_led.value(state) - elif led == LEFT_LINE: - left_line_led.value(state) - elif led == RIGHT_LINE: - right_line_led.value(state) +class Sumorobot(object): -def get_battery_voltage(): - return round(BATTERY_COEFF * (adc_battery.read() * 3.3 / 4096), 2) + # Ultrasonic distance sensor + echo = Pin(14, Pin.IN) + trigger = Pin(27, Pin.OUT) -enemy_score = 0 -def enemy_distance(): - global enemy_score + # Motor PWM-s + pwm_left = PWM(Pin(15), freq=50, duty=0) + pwm_right = PWM(Pin(4), freq=50, duty=0) - trigger.value(0) - sleep_us(5) - trigger.value(1) - sleep_us(10) - trigger.value(0) - # wait for the pulse and calculate the distance - enemy_distance = (time_pulse_us(echo, 1, 30000) / 2) / 29.1 + # bottom LED + bottom_led = Pin(5, Pin.OUT) + # bottom LED is in reverse polarity + bottom_led.value(1) + # sensor LEDs + enemy_led = Pin(16, Pin.OUT) + left_line_led = Pin(17, Pin.OUT) + right_line_led = Pin(12, Pin.OUT) - if enemy_distance < 60 and enemy_distance > 0: - if enemy_score < 5: - enemy_score += 1 - else: - if enemy_score > 0: - enemy_score -= 1 + # Battery gauge + adc_battery = ADC(Pin(32)) - # indicate enemy LED - set_led(ENEMY, True if enemy_score > 2 else False) + # Optek sensors + adc_line_left = ADC(Pin(34)) + adc_line_right = ADC(Pin(33)) - return True if enemy_score > 2 else False + # Set reference voltage to 3.3V + adc_battery.atten(ADC.ATTN_11DB) + adc_line_left.atten(ADC.ATTN_11DB) + adc_line_right.atten(ADC.ATTN_11DB) -def line_left(): - line = abs(adc_line_left.read() - LINE_LEFT_THRESHOLD) > 1000 - set_led(LEFT_LINE, line) - return line + # for highlighting blocks + highlight_block = None -def line_right(): - line = abs(adc_line_right.read() - LINE_RIGHT_THRESHOLD) > 1000 - set_led(RIGHT_LINE, line) - return line + # for terminating sleep + terminate = False -def detach_servos(): - motor_left(0) - motor_right(0) + def __init__(self, highlight_block): + self.highlight_block = highlight_block -prev_left_speed = 0 -def motor_left(speed): - global prev_left_speed - if speed == prev_left_speed: - return - prev_left_speed = speed - assert speed >= -100 - assert speed <= 100 - pwm_left.duty(int(33 + MOTOR_LEFT_TUNING + speed * 33 / 100)) # -100 ... 100 to 33 .. 102 - if speed == 0: - pwm_left.duty(0) + def set_led(self, led, state): + if led == STATUS: + self.bottom_led.value(0 if state else 1) + elif led == ENEMY: + self.enemy_led.value(state) + elif led == LEFT_LINE: + self.left_line_led.value(state) + elif led == RIGHT_LINE: + self.right_line_led.value(state) -prev_right_speed = 0 -def motor_right(speed): - global prev_right_speed - if speed == prev_right_speed: - return - prev_right_speed = speed - assert speed >= -100 - assert speed <= 100 - pwm_right.duty(int(33 + MOTOR_RIGHT_TUNING + speed * 33 / 100)) # -100 ... 100 to 33 .. 102 - if speed == 0: - pwm_right.duty(0) + def get_battery_voltage(self): + return round(BATTERY_COEFF * (self.adc_battery.read() * 3.3 / 4096), 2) -print("Battery voltage: %.2fV" % get_battery_voltage()) -print("Line sensor thresholds:", LINE_LEFT_THRESHOLD, LINE_RIGHT_THRESHOLD) + def get_enemy_distance(self): + # send a pulse + self.trigger.value(0) + sleep_us(5) + self.trigger.value(1) + sleep_us(10) + self.trigger.value(0) + # wait for the pulse and calculate the distance + return (time_pulse_us(self.echo, 1, 30000) / 2) / 29.1 + + enemy_score = 0 + def is_enemy(self, block_id = None): + if block_id and BLOCK_HIGHLIGHT: + self.highlight_block(block_id) + + # get the enemy distance + self.enemy_distance = self.get_enemy_distance() + # if the enemy is close and the ping actually returned + if self.enemy_distance < 60 and self.enemy_distance > 0: + # if not maximum score + if self.enemy_score < 5: + # increase the enemy score + self.enemy_score += 1 + # if no enemy was detected + else: + # if not lowest score + if self.enemy_score > 0: + # decrease the enemy score + self.enemy_score -= 1 + + # if the sensor saw something more than 2 times + enemy = True if self.enemy_score > 2 else False + + # trigger enemy LED + self.set_led(ENEMY, enemy) + + return enemy + + def get_line(self, dir): + # check for valid direction + assert dir == LEFT or dir == RIGHT + + if dir == LEFT: + return self.adc_line_left.read() + elif dir == RIGHT: + return self.adc_line_right.read() + + def is_line(self, dir, block_id = None): + # check for valid direction + assert dir == LEFT or dir == RIGHT + + # if block_id given and BLOCK_HIGHLIGHT + if block_id and BLOCK_HIGHLIGHT: + self.highlight_block(block_id) + + if dir == LEFT: + line = abs(self.adc_line_left.read() - LINE_LEFT_THRESHOLD) > 1000 + self.set_led(LEFT_LINE, line) + return line + elif dir == RIGHT: + line = abs(self.adc_line_right.read() - LINE_RIGHT_THRESHOLD) > 1000 + self.set_led(RIGHT_LINE, line) + return line + + def detach_servos(self): + self.set_servo(LEFT, 0) + self.set_servo(RIGHT, 0) + + prev_speed = {LEFT: 0, RIGHT: 0} + def set_servo(self, dir, speed): + # check for valid direction + assert dir == LEFT or dir == RIGHT + # check for valid speed + assert speed <= 100 and speed >= -100 + + # when the speed didn't change + if speed == self.prev_speed[dir]: + return + + # record the new speed + self.prev_speed[dir] = speed + + if dir == LEFT: + if speed == 0: + self.pwm_left.duty(0) + else: + self.pwm_left.duty(int(33 + MOTOR_LEFT_TUNING + speed * 33 / 100)) # -100 ... 100 to 33 .. 102 + elif dir == RIGHT: + if speed == 0: + self.pwm_right.duty(0) + else: + self.pwm_right.duty(int(33 + MOTOR_RIGHT_TUNING + speed * 33 / 100)) # -100 ... 100 to 33 .. 102 + + + def move(self, dir, block_id = None): + # check for valid direction + assert dir == STOP or dir == RIGHT or dir == LEFT or dir == BACKWARD or dir == FORWARD + + # if block_id given and BLOCK_HIGHLIGHT + if block_id and BLOCK_HIGHLIGHT: + self.highlight_block(block_id) + + if dir == STOP: + self.set_servo(LEFT, 0) + self.set_servo(RIGHT, 0) + elif dir == LEFT: + self.set_servo(LEFT, -100) + self.set_servo(RIGHT, -100) + elif dir == RIGHT: + self.set_servo(LEFT, 100) + self.set_servo(RIGHT, 100) + elif dir == FORWARD: + self.set_servo(LEFT, 100) + self.set_servo(RIGHT, -100) + elif dir == BACKWARD: + self.set_servo(LEFT, -100) + self.set_servo(RIGHT, 100) + + def sleep(self, delay, block_id = None): + # check for valid delay + assert delay > 0 + + # if block_id given and BLOCK_HIGHLIGHT + if block_id and BLOCK_HIGHLIGHT: + self.highlight_block(block_id) + + for j in range(0, delay, 50): + if self.terminate: + return # TODO: raise exception + else: + sleep_ms(50) diff --git a/main.py b/main.py index 43eb3fe..f046f1a 100644 --- a/main.py +++ b/main.py @@ -1,7 +1,6 @@ import _thread import ubinascii -from utime import sleep_ms -from machine import Pin, PWM, Timer, UART +from machine import Pin, PWM, Timer conn = None conn_highlight = None @@ -16,169 +15,69 @@ url_highlight = "ws://iot.koodur.com:80/p2p/" + name + "-highlight/browser/" #url_highlight = "ws://10.42.0.1:80/p2p/" + name + "-highlight/browser/" scope = dict() -# send blocks IDs for highlighting -no_delay = False -highlight = True -ast = "move(STOP, '')" - -def is_enemy(block_id): - if block_id != "" and highlight: - conn_highlight.send(block_id) - return enemy_distance() - -def is_line(dir, block_id): - if block_id != "" and highlight: - conn_highlight.send(block_id) - if dir == LEFT: - return line_left() - elif dir == RIGHT: - return line_right() - return False - -def move(dir, block_id): - if block_id != "" and highlight: - conn_highlight.send(block_id) - if dir == STOP: - motor_left(0) - motor_right(0) - elif dir == LEFT: - motor_left(-100) - motor_right(-100) - elif dir == RIGHT: - motor_left(100) - motor_right(100) - elif dir == FORWARD: - motor_left(100) - motor_right(-100) - elif dir == BACKWARD: - motor_left(-100) - motor_right(100) - -def move2(dir, block_id): - if block_id != "" and highlight: - conn_highlight.send(block_id) - if dir == STOP: - motor_left(0) - motor_right(0) - elif dir == RIGHT: - motor_left(-100) - motor_right(-100) - elif dir == LEFT: - motor_left(100) - motor_right(100) - elif dir == BACKWARD: - motor_left(100) - motor_right(-100) - elif dir == FORWARD: - motor_left(-100) - motor_right(100) - -def sleep_wrapper(delay, block_id): - if block_id != "" and highlight: - conn_highlight.send(block_id) - for j in range(0, delay, 10): - if ast == "move(STOP, '')": - return # TODO: raise exception - else: - sleep_ms(10) - -exported_functions = dict( - STOP = STOP, - LEFT = LEFT, - RIGHT = RIGHT, - FORWARD = FORWARD, - BACKWARD = BACKWARD, - move = move, - is_line = is_line, - is_enemy = is_enemy, - sleep = sleep_wrapper, - motor_left = motor_left, - motor_right = motor_right -) - -exported_functions2 = dict( - STOP = STOP, - LEFT = LEFT, - RIGHT = RIGHT, - FORWARD = FORWARD, - BACKWARD = BACKWARD, - move = move2, - is_line = is_line, - is_enemy = is_enemy, - sleep = sleep_wrapper, - motor_left = motor_left, - motor_right = motor_right -) +sumorobot = None +ast = "sumorobot.move(STOP)" def step(): global scope - shuffle = False while True: + # update scope scope = dict( - enemy = is_enemy(""), - line_left = is_line(LEFT, ""), - line_right = is_line(RIGHT, ""), - battery_voltage = get_battery_voltage(), + enemy = sumorobot.is_enemy(), + line_left = sumorobot.is_line(LEFT), + line_right = sumorobot.is_line(RIGHT), + battery_voltage = sumorobot.get_battery_voltage(), ) - if no_delay: - if shuffle: - current = exported_functions.copy() - else: - current = exported_functions2.copy() - shuffle = not shuffle - else: - current = exported_functions.copy() - current.update(scope) - exec(ast, current) + # execute code + exec(ast) + if sumorobot.terminate: + # disable forceful termination + sumorobot.terminate = False + sumorobot.move(STOP) # leave time to process WebSocket commands sleep_ms(50) def ws_handler(): global ast global conn - global scope global no_delay - uart = UART(2, baudrate=115200, rx=34, tx=35, timeout=1) - uart.write("uart works, yee!\n") while True: try: fin, opcode, data = conn.read_frame() - cmd = uart.readline() except: # urror print("Exception while reading from socket, attempting reconnect") conn = uwebsockets.connect(url) continue - if cmd: - print("command:", cmd) - if data == b"forward": #print("Going forward") - ast = "move(FORWARD, '')" + ast = "" + sumorobot.move(FORWARD) elif data == b"backward": #print("Going backward") - ast = "move(BACKWARD, '')" + ast = "" + sumorobot.move(BACKWARD) elif data == b"right": #print("Going right") - ast = "move(RIGHT, '')" + ast = "" + sumorobot.move(RIGHT) elif data == b"left": #print("Going left") - ast = "move(LEFT, '')" + ast = "" + sumorobot.move(LEFT) elif data == b"kick": conn.send(repr(scope)) elif data == b"ping": conn.send(repr(scope)) elif data.startswith("start:"): - print("Got code:")#, data[6:]) - if b"sleep" in data or b"if" in data or len(data.splitlines()) == 1: - no_delay = False - else: - no_delay = True + print("Got code")#, data[6:]) ast = compile(data[6:], "snippet", "exec") elif data == b"stop": - ast = "move(STOP, '')" + ast = "" + sumorobot.move(STOP) + sumorobot.terminate = True print("Got stop") elif b"Gone" in data: print("Server said 410 Gone, attempting to reconnect...") @@ -202,8 +101,11 @@ conn.send("{'ip': '" + wlan.ifconfig()[0] + "'}") # connect to the block highlight websocket conn_highlight = uwebsockets.connect(url_highlight) +# initialize SumoRobot object +sumorobot = Sumorobot(conn_highlight.send) + # indicate that the WebSocket is connected -set_led(STATUS, True) +sumorobot.set_led(STATUS, True) print("Starting WebSocket and code loop") # start the code processing thread