Compare commits

..

No commits in common. "udp-remote-controllable" and "master" have entirely different histories.

8 changed files with 577 additions and 101 deletions

26
.drone.yml Normal file
View File

@ -0,0 +1,26 @@
---
kind: pipeline
type: kubernetes
name: default
steps:
- name: build
image: harbor.k-space.ee/k-space/micropython-esp32
settings:
mtu: 1300
commands:
- cp /drone/src/*.py /src/ports/esp32/modules/
- cd /src/ports/esp32
- bash -c "source /opt/esp/idf/export.sh && make"
- cp build-GENERIC/firmware.bin /drone/src/
- name: gitea_release
image: plugins/gitea-release
settings:
app_key: xxx
base_url: https://git.k-space.ee
files: firmware.bin
when:
event: tag
image_pull_secrets:
- dockerconfig

21
LICENSE Executable file
View File

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2021 RoboKoding LTD
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

9
Makefile Normal file → Executable file
View File

@ -1,7 +1,7 @@
#!/bin/bash
# When this baud does not work, try 115200
FLASH_BAUD := 500000
FLASH_BAUD := 230400
# Image to flash
FLASH_IMAGE := sumofirmware.bin
@ -9,7 +9,7 @@ FLASH_IMAGE := sumofirmware.bin
# Try to automatically find the serialport
SERIAL_PORT := $(shell find /dev -iname "tty*usb*")
all: erase flash delay update reset
all: erase flash delay config update reset
delay:
sleep 3
@ -18,7 +18,12 @@ reset:
esptool.py -p $(SERIAL_PORT) --after hard_reset read_mac
update:
ampy -d 0.5 -p $(SERIAL_PORT) put hal.py
ampy -d 0.5 -p $(SERIAL_PORT) put main.py
ampy -d 0.5 -p $(SERIAL_PORT) put boot.py
config:
ampy -d 0.5 -p $(SERIAL_PORT) put config.json
erase:
esptool.py -p $(SERIAL_PORT) -b $(FLASH_BAUD) erase_flash

26
README.md Normal file → Executable file
View File

@ -1,14 +1,22 @@
# Special SumoRobot firmware for usage with the remote
Uses UDP packets from remote to the robot instead of BLE or WebSocket
# sumorobot-firmware
MicroPython is ESP32 Generic
The software that is running on the SumoRobots
* https://micropython.org/download/ESP32_GENERIC/
<img alt="Code" src="https://www.robokoding.com/assets/img/sumorobot_firmware.png" width="50%">
# Usage
# Instructions
* Change the SERIAL_PORT in the Makefile
* Install [Python](https://www.python.org/downloads/)
* Install [esptool](https://github.com/espressif/esptool) (to flash MicroPython to the ESP32)
* Install [ampy](https://github.com/adafruit/ampy) (for uploading files)
* Download [the MicroPython binary](http://micropython.org/download#esp32) to this directory
* Upload the MicroPython binary and the SumoRobot firmware to your ESP32 (open a terminal and type: make all)
1. Get latest MicroPython and name it `sumofirmware.bin`
2. Run `make` to flash the firmware
# Support
If you find our work useful, please consider donating : )
[![Donate using Liberapay](https://liberapay.com/assets/widgets/donate.svg)](https://liberapay.com/robokoding/donate)
The sumo robot will connect to `sumo` wifi and during DHCP request it also provides its hostname
as `sumo-<ID>`. It will be later used by the Remote to do a DNS lookup from dnsmasq DNS server.
# Credits
* [Lauri Võsandi](https://lauri.xn--vsandi-pxa.com/)
* [K-SPACE MTÜ](https://k-space.ee/)

34
boot.py Executable file
View File

@ -0,0 +1,34 @@
import os
import utime
import machine
# Give time to cancel this boot script
print("Press Ctrl-C to stop new boot script...")
utime.sleep_ms(1000)
root_files = os.listdir()
update_files = ['boot.py.new', 'main.py.new', 'hal.py.new']
files_to_update = []
# Check for FW updates and verify new FW files
for file in update_files:
if file in root_files:
print("boot.py: starting to update:", file)
# Try to load the user code
try:
with open(file, 'r') as code:
compile(code.read(), "snippet", 'exec')
files_to_update.append(file)
except Exception as error:
print("boot.py:", file, "compilation failed:", error)
files_to_update.clear()
break
# If valid updates replace with new FW
for file in files_to_update:
os.rename(file, file.replace('.new', ''))
# If updates, reboot to load new FW
if len(files_to_update) != 0:
machine.reset()

15
config.json Executable file
View File

@ -0,0 +1,15 @@
{
"status_led_pin": 5,
"battery_coeff": 2.25,
"sumorobot_name": "SumoRobot",
"firmware_timestamp": "2021.06.04 22:23",
"firmware_version": "1.1",
"left_servo_calib": [37, 73, 81, 116],
"right_servo_calib": [37, 73, 81, 116],
"sonar_threshold": 40,
"boot_code": "code.py",
"left_line_value": 1000,
"right_line_value": 1000,
"left_line_threshold": 1000,
"right_line_threshold": 1000
}

282
hal.py Executable file
View File

@ -0,0 +1,282 @@
from utime import sleep_us, sleep_ms
from machine import Pin, PWM, ADC, time_pulse_us
# LEDs
STATUS = 0
SONAR = 1
LEFT_LINE = 2
RIGHT_LINE = 3
# Directions
STOP = 0
LEFT = 1
RIGHT = 2
SEARCH = 3
FORWARD = 4
BACKWARD = 5
class Sumorobot(object):
# Constructor
def __init__(self, config = None):
# Config file
self.config = config
# Sonar distance sensor
self.echo = Pin(14, Pin.IN)
self.trigger = Pin(27, Pin.OUT)
# Servo PWM-s
self.pwm = {
LEFT: PWM(Pin(15), freq=50, duty=0),
RIGHT: PWM(Pin(4), freq=50, duty=0)
}
# LED sensor feedback
self.sensor_feedback = True
# Bottom status LED
self.status_led = Pin(self.config['status_led_pin'], Pin.OUT)
# Bottom status LED is in reverse polarity
self.status_led.value(1)
# Sensor LEDs
self.sonar_led = Pin(16, Pin.OUT)
self.left_line_led = Pin(17, Pin.OUT)
self.right_line_led = Pin(12, Pin.OUT)
# Battery level in %
self.battery_level = 0
# Battery gauge
self.bat_status = 4.3
self.move_counter = 0
self.adc_battery = ADC(Pin(32))
self.bat_charge = Pin(25, Pin.IN)
# The pullups for the phototransistors
Pin(19, Pin.IN, Pin.PULL_UP)
Pin(23, Pin.IN, Pin.PULL_UP)
# The phototransistors
self.last_line = LEFT
self.adc_line_left = ADC(Pin(34))
self.adc_line_right = ADC(Pin(33))
# Set reference voltage to 3.3V
self.adc_battery.atten(ADC.ATTN_11DB)
self.adc_line_left.atten(ADC.ATTN_11DB)
self.adc_line_right.atten(ADC.ATTN_11DB)
# To smooth out sonar sensor value
self.sonar_score = 0
# For terminating sleep
self.terminate = False
# For search mode
self.search = False
self.search_counter = 0
# Memorise previous servo speeds
self.prev_speed = {LEFT: 0, RIGHT: 0}
# Function to set LED states
def set_led(self, led, value):
# Turn the given LED on or off
if led == STATUS:
# Status LED is reverse polarity
self.status_led.value(0 if value else 1)
elif led == SONAR:
self.sonar_led.value(value)
elif led == LEFT_LINE:
self.left_line_led.value(value)
elif led == RIGHT_LINE:
self.right_line_led.value(value)
# Function to get battery level in percentage
def get_battery_level(self):
# When the SumoRobot is not moving
if self.prev_speed[LEFT] == 0 and self.prev_speed[RIGHT] == 0:
# Calculate battery voltage
battery_voltage = round(self.config['battery_coeff'] * (self.adc_battery.read() * 3.3 / 4096), 2)
# Map battery voltage to percentage
temp_battery_level = 0.0 + ((100.0 - 0.0) / (4.2 - 3.2)) * (battery_voltage - 3.2)
# When battery level changed more than 5 percent
if abs(self.battery_level - temp_battery_level) > 5:
# Update battery level
self.battery_level = round(temp_battery_level)
# Return the battery level in percentage
return min(100, max(0, self.battery_level))
# Function to get distance (cm) from the object in front of the SumoRobot
def get_sonar_value(self):
# Send a pulse
self.trigger.value(0)
sleep_us(5)
self.trigger.value(1)
sleep_us(10)
self.trigger.value(0)
# Wait for the pulse and calculate the distance
return round((time_pulse_us(self.echo, 1, 30000) / 2) / 29.1)
# Function to get boolean if there is something in front of the SumoRobot
def is_sonar(self):
# Get the sonar value
self.sonar_value = self.get_sonar_value()
# When the sonar value is small and the ping actually returned
if self.sonar_value < self.config['sonar_threshold'] and self.sonar_value > 0:
# When not maximum score
if self.sonar_score < 5:
# Increase the sonar score
self.sonar_score += 1
# When no sonar ping was returned
else:
# When not lowest score
if self.sonar_score > 0:
# Decrease the sonar score
self.sonar_score -= 1
# When the sensor saw something more than 2 times
value = True if self.sonar_score > 2 else False
# Trigger sonar LED
self.set_led(SONAR, value)
return value
# Function to update the config file
def update_config_file(self):
# Update the config file
with open('config.part', 'w') as config_file:
config_file.write(ujson.dumps(self.config))
os.rename('config.part', 'config.json')
# Function to update line calibration and write it to the config file
def calibrate_line_values(self):
# Read the line sensor values
self.config['left_line_value'] = self.adc_line_left.read()
self.config['right_line_value'] = self.adc_line_right.read()
# Function to get light inensity from the phototransistors
def get_line(self, line):
# Check if the direction is valid
assert line in (LEFT, RIGHT)
# Return the given line sensor value
if line == LEFT:
return self.adc_line_left.read()
elif line == RIGHT:
return self.adc_line_right.read()
def is_line(self, line):
# Check if the direction is valid
assert line in (LEFT, RIGHT)
# Define feedback LED
led = LEFT_LINE if line == LEFT else RIGHT_LINE
# Define config prefix
prefix = 'left' if line == LEFT else 'right'
# Check for line
value = abs(self.get_line(line) - self.config[prefix + '_line_value']) > self.config[prefix + '_line_threshold']
# Show LED feedback
self.set_led(led, value)
# Update last line direction if line was detected
self.last_line = value if value else self.last_line
# Return the given line sensor value
return value
def set_servo(self, servo, speed):
# Check if the direction is valid
assert servo in (LEFT, RIGHT)
# Check if the speed is valid
assert speed <= 100 and speed >= -100
# When the speed didn't change
if speed == self.prev_speed[servo]:
return
# Save the new speed
self.prev_speed[servo] = speed
# Set the given servo speed
if speed == 0:
self.pwm[servo].duty(0)
else:
# Define config prefix
prefix = 'left' if servo == LEFT else 'right'
# -100 ... 100 to min_tuning .. max_tuning
min_tuning = self.config[prefix + '_servo_min_tuning']
max_tuning = self.config[prefix + '_servo_max_tuning']
self.pwm[servo].duty(int((speed + 100) / 200 * (max_tuning - min_tuning) + min_tuning))
def move(self, dir):
# Check if the direction is valid
assert dir in (SEARCH, STOP, RIGHT, LEFT, BACKWARD, FORWARD)
# Go to the given direction
if dir == STOP:
self.set_servo(LEFT, 0)
self.set_servo(RIGHT, 0)
elif dir == LEFT:
self.set_servo(LEFT, -100)
self.set_servo(RIGHT, -100)
elif dir == RIGHT:
self.set_servo(LEFT, 100)
self.set_servo(RIGHT, 100)
elif dir == SEARCH:
# Change search mode after X seconds
if self.search_counter == 50:
self.search = not self.search
self.search_counter = 0
# When in search mode
if self.search:
self.move(FORWARD)
elif self.last_line == RIGHT:
self.move(LEFT)
else:
self.move(RIGHT)
# Increase search counter
self.search_counter += 1
elif dir == FORWARD:
self.set_servo(LEFT, 100)
self.set_servo(RIGHT, -100)
elif dir == BACKWARD:
self.set_servo(LEFT, -100)
self.set_servo(RIGHT, 100)
def update_sensor_feedback(self):
if self.sensor_feedback:
# Execute to see LED feedback for sensors
self.is_sonar()
self.is_line(LEFT)
self.is_line(RIGHT)
def get_sensor_scope(self):
# TODO: implement sensor value caching
return str(self.get_sonar_value()) + ',' \
+ str(self.get_line(LEFT)) + ',' \
+ str(self.get_line(RIGHT)) + ',' \
+ str(self.bat_charge.value()) + ',' \
+ str(self.get_battery_level())
def get_configuration_scope(self):
return str(self.config['sumorobot_name']) + ',' \
+ str(self.config['firmware_version']) + ',' \
+ str(self.config['left_line_value']) + ',' \
+ str(self.config['right_line_value']) + ',' \
+ str(self.config['left_line_threshold']) + ',' \
+ str(self.config['right_line_threshold']) + ',' \
+ str(self.config['sonar_threshold'])
def sleep(self, delay):
# Check for valid delay
assert delay > 0
# Split the delay into 50ms chunks
while delay:
# Check for forceful termination
if self.terminate:
# Terminate the delay
return
else:
sleep_ms(50)
delay -= 50

259
main.py Normal file → Executable file
View File

@ -1,101 +1,186 @@
import ubinascii
import utime
import _thread
import os
import network
from utime import sleep_ms
from machine import Timer, Pin, PWM
import ubluetooth
import micropython
print("Press Ctrl-C to stop boot script...")
sleep_ms(200)
led_power = Pin(25, Pin.OUT)
led_power.value(1)
pwm_left = PWM(Pin(15), freq=50, duty=0)
pwm_right = PWM(Pin(13), freq=50, duty=0)
HOSTNAME = "sumo-%s" % \
ubinascii.hexlify(network.WLAN().config('mac'),':').decode().replace(":","")[6:]
url = "ws://sumo.koodur.com:80/p2p/%s/browser/" % HOSTNAME
ap_if = network.WLAN(network.AP_IF)
ap_if.active(False)
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.config(dhcp_hostname=HOSTNAME)
wlan.connect("sumo", "salakala")
#wlan.connect("k-space.ee legacy","")
while not wlan.isconnected():
print("Connecting to wifi...")
sleep_ms(100)
print("Connected to wifi!", wlan.ifconfig())
print("Hi my name is:")
print(HOSTNAME)
def map_vals(value, leftMin, leftMax, rightMin, rightMax):
#http://stackoverflow.com/questions/1969240/mapping-a-range-of-values-to-another
# Figure out how 'wide' each range is
leftSpan = leftMax - leftMin
rightSpan = rightMax - rightMin
# Convert the left range into a 0-1 range (float)
valueScaled = float(value - leftMin) / float(leftSpan)
# Convert the 0-1 range into a value in the right range.
return int(rightMin + (valueScaled * rightSpan))
from hal import *
# Loading libraries takes ca 400ms
# BLE events
_IRQ_CENTRAL_CONNECT = micropython.const(1)
_IRQ_CENTRAL_DISCONNECT = micropython.const(2)
_IRQ_GATTS_WRITE = micropython.const(3)
import socket
import time
from time import sleep_ms
# SumoRobot functionality
sumorobot = Sumorobot()
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
sock.settimeout(0.3)
sock.bind(("", 44444))
def advertise_ble_name(name):
payload = b'\x02\x01\x02' + bytes([len(name) + 1])
payload += b'\x09' + name.encode()
ble.gap_advertise(100, payload)
def update_battery_level(timer):
if conn_handle is not None:
battery_level = sumorobot.get_battery_level()
ble.gatts_notify(conn_handle, battery, bytes([battery_level]))
def sensor_feedback_thread():
while True:
try:
buf = sock.recv(20)
except OSError: # timed out
pwm_left.duty(0)
pwm_right.duty(0)
# Leave time to process other threads
utime.sleep_ms(50)
# Execute to see LED feedback for sensors
sumorobot.update_sensor_feedback()
def code_process_thread():
global prev_bat_level, python_code
while True:
# Leave time to process other threads
utime.sleep_ms(50)
# When no code to execute
if python_code == b'':
continue
# Try to execute the Python code
try:
value, forward, backward = buf.decode("ascii").split(":")
except ValueError:
continue
forward, backward = int(forward), int(backward)
exec(compile(python_code, "snippet", 'exec'))
except Exception as error:
print("main.py: the python code had errors:", error)
finally:
print("main.py: finized python code execution")
# Erase the code
python_code = b''
# Stop the robot
sumorobot.move(STOP)
# Cancel code termination
sumorobot.terminate = False
if forward:
left, right = 10, -10
elif backward:
left, right = -10, 10
else:
left, right = 0, 0
mapped = map_vals(int(value),-400,400,-10,10)
if abs(mapped) > 5:
left -= mapped
right -= mapped
# The BLE handler thread
def ble_handler(event, data):
global conn_handle, python_code, temp_python_code
if left < -10:
left = -10
if left > 10:
left = 10
if right < -10:
right = -10
if right > 10:
right = 10
if abs(left) > 2:
pwm_left.duty(78 + left)
if event is _IRQ_CENTRAL_CONNECT:
conn_handle, _, _, = data
# Turn ON the status LED
sumorobot.set_led(STATUS, True)
update_battery_level(None)
advertise_ble_name(sumorobot.config['sumorobot_name'])
elif event is _IRQ_CENTRAL_DISCONNECT:
conn_handle = None
# Turn OFF status LED
sumorobot.set_led(STATUS, False)
# Advertise with name
advertise_ble_name(sumorobot.config['sumorobot_name'])
elif event is _IRQ_GATTS_WRITE:
# Read the command
cmd = ble.gatts_read(rx)
print(cmd)
if b'<stop>' in cmd:
python_code = b''
sumorobot.move(STOP)
sumorobot.terminate = True
elif b'<forward>' in cmd:
python_code = b''
sumorobot.move(FORWARD)
elif b'<backward>' in cmd:
python_code = b''
sumorobot.move(BACKWARD)
elif b'<left>' in cmd:
python_code = b''
sumorobot.move(LEFT)
elif b'<right>' in cmd:
python_code = b''
sumorobot.move(RIGHT)
elif b'<sensors>' in cmd:
ble.gatts_notify(conn_handle, tx, sumorobot.get_sensor_scope())
elif b'<config>' in cmd:
ble.gatts_notify(conn_handle, tx, sumorobot.get_configuration_scope())
elif b'<pwm>' in cmd:
servo, speed = cmd[5:].decode().split(',')
servo = LEFT if servo == 'LEFT' else RIGHT
sumorobot.pwm[servo].duty(int(speed))
elif b'<code>' in cmd:
temp_python_code = b'\n'
elif b'<code/>' in cmd:
python_code = temp_python_code
temp_python_code = b''
elif temp_python_code != b'':
temp_python_code += cmd
else:
pwm_left.duty(0)
if abs(right) > 2:
pwm_right.duty(78 + right)
else:
pwm_right.duty(0)
sleep_ms(20)
temp_python_code = b''
print("main.py: unknown cmd=", cmd)
conn_handle = None
temp_python_code = b''
python_code = b''
# When boot code exists
if sumorobot.config['boot_code'] in root_files:
print("main.py: trying to load", sumorobot.config['boot_code'])
# Try to load and compile the boot code
try:
with open(sumorobot.config['boot_code'], 'r') as file:
boot_code = file.read()
compile(boot_code, "snippet", 'exec')
python_code = boot_code
except Exception as error:
print("main.py:", sumorobot.config['boot_code'], "compilation failed:", error)
# Start BLE
ble = ubluetooth.BLE()
ble.config(gap_name=sumorobot.config['sumorobot_name'])
ble.active(True)
# Register the BLE hander
ble.irq(ble_handler)
# BLE info serivce
INFO_SERVICE_UUID = ubluetooth.UUID(0x180a)
MODEL_CHARACTERISTIC = (ubluetooth.UUID(0x2a24), ubluetooth.FLAG_READ,)
FIRMWARE_CHARACTERISTIC = (ubluetooth.UUID(0x2a26), ubluetooth.FLAG_READ,)
MANUFACTURER_CHARACTERISTIC = (ubluetooth.UUID(0x2a29), ubluetooth.FLAG_READ,)
INFO_SERVICE = (INFO_SERVICE_UUID, (MODEL_CHARACTERISTIC, FIRMWARE_CHARACTERISTIC, MANUFACTURER_CHARACTERISTIC,),)
# BLE battery service
BATTERY_SERVICE_UUID = ubluetooth.UUID(0x180f)
BATTERY_CHARACTERISTIC = (ubluetooth.UUID(0x2a19), ubluetooth.FLAG_READ | ubluetooth.FLAG_NOTIFY,)
BATTERY_SERVICE = (BATTERY_SERVICE_UUID, (BATTERY_CHARACTERISTIC,),)
# BLE UART service
UART_SERVICE_UUID = ubluetooth.UUID('6E400001-B5A3-F393-E0A9-E50E24DCCA9E')
RX_CHARACTERISTIC = (ubluetooth.UUID('6E400002-B5A3-F393-E0A9-E50E24DCCA9E'), ubluetooth.FLAG_WRITE,)
TX_CHARACTERISTIC = (ubluetooth.UUID('6E400003-B5A3-F393-E0A9-E50E24DCCA9E'), ubluetooth.FLAG_READ | ubluetooth.FLAG_NOTIFY,)
UART_SERVICE = (UART_SERVICE_UUID, (TX_CHARACTERISTIC, RX_CHARACTERISTIC,),)
# Register BLE services
SERVICES = (INFO_SERVICE, BATTERY_SERVICE, UART_SERVICE,)
((model, firmware, manufacturer,), (battery,), (tx, rx,),) = ble.gatts_register_services(SERVICES)
# Set BLE info service values
ble.gatts_write(model, "SumoRobot")
ble.gatts_write(manufacturer, "RoboKoding LTD")
ble.gatts_write(firmware, sumorobot.config['firmware_version'])
# Start BLE advertising with name
advertise_ble_name(sumorobot.config['sumorobot_name'])
# Start the threads
_thread.start_new_thread(code_process_thread, ())
_thread.start_new_thread(sensor_feedback_thread, ())
# Start BLE battery percentage update timer
battery_timer = machine.Timer(machine.Timer.PERIODIC)
battery_timer.init(period=3000, callback=update_battery_level)
# Clean up
import gc
gc.collect()