7 changed files with 616 additions and 2 deletions
@ -0,0 +1,19 @@
@@ -0,0 +1,19 @@
|
||||
# Makefile for the board
|
||||
|
||||
NAME=esp32-20180222-v1.9.3-347-g6e675c1b.bin
|
||||
|
||||
all: flash console |
||||
|
||||
flash: |
||||
esptool.py -p /dev/ttyUSB0 -b 921600 erase_flash
|
||||
esptool.py -p /dev/ttyUSB0 -b 921600 write_flash --flash_mode dio 0x1000 bin/${NAME}
|
||||
sleep 5
|
||||
ampy -p /dev/ttyUSB0 put ssd1306.py
|
||||
|
||||
console: |
||||
echo "Ctrl-A + Ctrl-Q to close Picocom"
|
||||
picocom -b115200 /dev/ttyUSB0
|
||||
|
||||
dep: |
||||
sudo apt install python3-pip
|
||||
sudo pip3 install adafruit-ampy
|
@ -1,3 +1,193 @@
@@ -1,3 +1,193 @@
|
||||
# micropython-skeleton |
||||
# Hello MicroPython |
||||
|
||||
MicroPython project skeleton |
||||
## Getting started |
||||
|
||||
MicroPython project skeleton |
||||
|
||||
``` |
||||
git clone http://git.k-space.ee/lauri/micropython-skeleton |
||||
cd micropython-skeleton |
||||
make |
||||
``` |
||||
|
||||
First let's some LED-s blinking. |
||||
Press Ctrl-E for paste mode, otherwise spaces get mangled. |
||||
Press Ctrl-Shift-V for pasting. |
||||
Press Ctrl-D to exit paste mode and evaluate the code. |
||||
|
||||
``` |
||||
from time import sleep |
||||
from machine import Pin |
||||
|
||||
# RGB LED is connected to programmable pins 12, 13, 15 |
||||
led_red = Pin(12, Pin.OUT) |
||||
led_green = Pin(13, Pin.OUT) |
||||
led_blue = Pin(15, Pin.OUT) |
||||
|
||||
# The values are inverted because 3.3v is common pin |
||||
led_red.value(1) |
||||
led_green.value(1) |
||||
led_blue.value(1) |
||||
|
||||
for j in range(0, 5): |
||||
led_red.value(0) |
||||
sleep(1) |
||||
led_red.value(1) |
||||
led_green.value(0) |
||||
sleep(1) |
||||
led_green.value(1) |
||||
led_blue.value(0) |
||||
sleep(1) |
||||
led_blue.value(1) |
||||
``` |
||||
|
||||
Tasks: |
||||
|
||||
1. Modify the code so yellow, cyan, magenda and white would be included. |
||||
|
||||
|
||||
|
||||
# Button presses |
||||
|
||||
On the board there is button labelled "Boot", this is hooked up to pin 2. |
||||
By default there is a resistor which pulls the voltage on the pin to 3.3v, but when button is pressed the pin is shorted to ground so the voltage goes to 0v. |
||||
Most modern solutions use interrupts to detect voltage change on the pin: |
||||
|
||||
``` |
||||
from machine import Pin |
||||
from time import sleep |
||||
|
||||
Pin(12, Pin.OUT).value(1) |
||||
Pin(13, Pin.OUT).value(1) |
||||
led_blue = Pin(15, Pin.OUT) |
||||
button = Pin(0) |
||||
|
||||
turned_off = False |
||||
|
||||
def callback(p): |
||||
global turned_off |
||||
turned_off = not turned_off |
||||
led_blue.value(turned_off) |
||||
|
||||
# Execute function 'callback' when voltage goes from 3.3v to 0v on pin 0 |
||||
button.irq(trigger=Pin.IRQ_FALLING, handler=callback) |
||||
``` |
||||
|
||||
Tasks: |
||||
|
||||
1. Modify the code so pressing button shuffles between off, red, green, blue, yellow, cyan, magenta and white |
||||
|
||||
|
||||
|
||||
# Driving OLED screens |
||||
|
||||
Let's get some pixels on the screen. |
||||
There's 128x64 pixels monochrome OLED screen connected via I2C bus on the pins 4 and 5. |
||||
|
||||
``` |
||||
from machine import Pin, I2C |
||||
from ssd1306 import SSD1306_I2C |
||||
|
||||
i2c = I2C(-1, Pin(4),Pin(5), freq=400000) # Bitbanged I2C bus |
||||
oled = SSD1306_I2C(128, 64, i2c) |
||||
oled.invert(0) # White text on black background |
||||
oled.contrast(255) # Maximum contrast |
||||
|
||||
oled.fill(0) |
||||
name = "Lauri" |
||||
oled.text("Hi %s" % name, 10, 10) |
||||
oled.show() |
||||
``` |
||||
|
||||
Tasks: |
||||
|
||||
1. When button is pressed show a corresponding message on the screen - lights turned on/off or the name of the color shown |
||||
|
||||
## Temperature & humidity |
||||
|
||||
Next let's hook up DHT11 sensor to the board and measure the temperature. |
||||
|
||||
``` |
||||
from time import sleep |
||||
from machine import Pin |
||||
from dht import DHT11 |
||||
|
||||
d = DHT11(Pin(4)) |
||||
|
||||
try: |
||||
d.measure() |
||||
except OSError: |
||||
print("Sensor not connected") |
||||
else: |
||||
print("Temperature %sC" % d.temperature()) |
||||
print("Humidity %s%%" % d.humidity()) |
||||
finally: |
||||
sleep(1) |
||||
``` |
||||
|
||||
Tasks: |
||||
|
||||
1. Get temperature and humidity displayed on the screen |
||||
|
||||
## Connecting to internet |
||||
|
||||
Exit the serial console by pressing Ctrl-A and then Ctrl-Q. |
||||
Upload module to handle WebSockets and return to Python prompt: |
||||
|
||||
``` |
||||
ampy -p /dev/ttyUSB0 put uwebsockets.py |
||||
ampy -p /dev/ttyUSB0 put boot.py # Script that connects to itcollege network |
||||
make console |
||||
``` |
||||
|
||||
Press EN button on the board to reset the board. |
||||
|
||||
Paste following: |
||||
|
||||
``` |
||||
import sys |
||||
import uwebsockets |
||||
from machine import Pin |
||||
|
||||
Pin(12, Pin.OUT).value(1) |
||||
Pin(13, Pin.OUT).value(1) |
||||
led_blue = Pin(15, Pin.OUT) |
||||
|
||||
channel = "living-room-of-lauri" |
||||
uri = "ws://iot.koodur.com:80/ws/" + channel |
||||
print("Connecting to:", uri) |
||||
conn = uwebsockets.connect(uri) |
||||
conn.send("alive") |
||||
|
||||
turned_off = False |
||||
|
||||
while True: |
||||
print("Reading message...") |
||||
fin, opcode, data = conn.read_frame() |
||||
if data == "toggle": |
||||
turned_off = not turned_off |
||||
led_blue.value(turned_off) |
||||
else: |
||||
print("Got unknown command:", data) |
||||
``` |
||||
|
||||
Using web browser navigate [here](http://iot.koodur.com/demo2.html#living-room-of-lauri) |
||||
|
||||
1. Move to another channel to prevent flipping lights in my living room |
||||
2. Improve the code so the "Boot" button and button in the web interface both work simultaneously |
||||
3. Download the HTML file and add buttons to select different colors, adjust Python code to handle new commands |
||||
|
||||
|
||||
# Summary |
||||
|
||||
ESP32 microcontroller with MicroPython is a really cheap way to get started with the IoT stuff. See more detailed information [here](https://lauri.xn--vsandi-pxa.com/2017/06/espressif.html). |
||||
|
||||
Some more tricks to try: |
||||
|
||||
* Add dimming of LED-s with PWM |
||||
* Add [colorpicker](https://developer.mozilla.org/en-US/docs/Web/HTML/Element/input/color) |
||||
|
||||
Other interesting projects with ESP8266 and ESP32 microcontrollers: |
||||
|
||||
* [Nixie clock](https://github.com/k-space-ee/nixiesp12) with ESP8266 |
||||
* [Sumorobot](http://robot.itcollege.ee/sumorobot/2017/08/25/sumesp-prototype/) with ESP32 |
Binary file not shown.
@ -0,0 +1,5 @@
@@ -0,0 +1,5 @@
|
||||
# Connect to wireless network as client |
||||
import network |
||||
wlan = network.WLAN(network.STA_IF) |
||||
wlan.active(True) |
||||
wlan.connect("itcollege") |
@ -0,0 +1,18 @@
@@ -0,0 +1,18 @@
|
||||
from time import sleep_ms |
||||
from machine import Pin, I2C |
||||
from ssd1306 import SSD1306_I2C |
||||
|
||||
i2c = I2C(-1, Pin(4),Pin(5),freq=400000) # Bitbanged I2C bus |
||||
assert 60 in i2c.scan(), "No OLED display detected!" |
||||
oled = SSD1306_I2C(128, 64, i2c) |
||||
buf = "wubba lubba dub dub " |
||||
oled.invert(0) # White text on black background |
||||
oled.contrast(255) # Maximum contrast |
||||
j = 0 |
||||
|
||||
while True: |
||||
oled.fill(0) |
||||
oled.text(buf[j%len(buf):]+buf, 10, 10) |
||||
oled.show() |
||||
sleep_ms(20) |
||||
j += 1 |
@ -0,0 +1,147 @@
@@ -0,0 +1,147 @@
|
||||
# MicroPython SSD1306 OLED driver, I2C and SPI interfaces |
||||
|
||||
from micropython import const |
||||
import framebuf |
||||
|
||||
|
||||
# register definitions |
||||
SET_CONTRAST = const(0x81) |
||||
SET_ENTIRE_ON = const(0xa4) |
||||
SET_NORM_INV = const(0xa6) |
||||
SET_DISP = const(0xae) |
||||
SET_MEM_ADDR = const(0x20) |
||||
SET_COL_ADDR = const(0x21) |
||||
SET_PAGE_ADDR = const(0x22) |
||||
SET_DISP_START_LINE = const(0x40) |
||||
SET_SEG_REMAP = const(0xa0) |
||||
SET_MUX_RATIO = const(0xa8) |
||||
SET_COM_OUT_DIR = const(0xc0) |
||||
SET_DISP_OFFSET = const(0xd3) |
||||
SET_COM_PIN_CFG = const(0xda) |
||||
SET_DISP_CLK_DIV = const(0xd5) |
||||
SET_PRECHARGE = const(0xd9) |
||||
SET_VCOM_DESEL = const(0xdb) |
||||
SET_CHARGE_PUMP = const(0x8d) |
||||
|
||||
# Subclassing FrameBuffer provides support for graphics primitives |
||||
# http://docs.micropython.org/en/latest/pyboard/library/framebuf.html |
||||
class SSD1306(framebuf.FrameBuffer): |
||||
def __init__(self, width, height, external_vcc): |
||||
self.width = width |
||||
self.height = height |
||||
self.external_vcc = external_vcc |
||||
self.pages = self.height // 8 |
||||
self.buffer = bytearray(self.pages * self.width) |
||||
super().__init__(self.buffer, self.width, self.height, framebuf.MONO_VLSB) |
||||
self.init_display() |
||||
|
||||
def init_display(self): |
||||
for cmd in ( |
||||
SET_DISP | 0x00, # off |
||||
# address setting |
||||
SET_MEM_ADDR, 0x00, # horizontal |
||||
# resolution and layout |
||||
SET_DISP_START_LINE | 0x00, |
||||
SET_SEG_REMAP | 0x01, # column addr 127 mapped to SEG0 |
||||
SET_MUX_RATIO, self.height - 1, |
||||
SET_COM_OUT_DIR | 0x08, # scan from COM[N] to COM0 |
||||
SET_DISP_OFFSET, 0x00, |
||||
SET_COM_PIN_CFG, 0x02 if self.height == 32 else 0x12, |
||||
# timing and driving scheme |
||||
SET_DISP_CLK_DIV, 0x80, |
||||
SET_PRECHARGE, 0x22 if self.external_vcc else 0xf1, |
||||
SET_VCOM_DESEL, 0x30, # 0.83*Vcc |
||||
# display |
||||
SET_CONTRAST, 0xff, # maximum |
||||
SET_ENTIRE_ON, # output follows RAM contents |
||||
SET_NORM_INV, # not inverted |
||||
# charge pump |
||||
SET_CHARGE_PUMP, 0x10 if self.external_vcc else 0x14, |
||||
SET_DISP | 0x01): # on |
||||
self.write_cmd(cmd) |
||||
self.fill(0) |
||||
self.show() |
||||
|
||||
def poweroff(self): |
||||
self.write_cmd(SET_DISP | 0x00) |
||||
|
||||
def poweron(self): |
||||
self.write_cmd(SET_DISP | 0x01) |
||||
|
||||
def contrast(self, contrast): |
||||
self.write_cmd(SET_CONTRAST) |
||||
self.write_cmd(contrast) |
||||
|
||||
def invert(self, invert): |
||||
self.write_cmd(SET_NORM_INV | (invert & 1)) |
||||
|
||||
def show(self): |
||||
x0 = 0 |
||||
x1 = self.width - 1 |
||||
if self.width == 64: |
||||
# displays with width of 64 pixels are shifted by 32 |
||||
x0 += 32 |
||||
x1 += 32 |
||||
self.write_cmd(SET_COL_ADDR) |
||||
self.write_cmd(x0) |
||||
self.write_cmd(x1) |
||||
self.write_cmd(SET_PAGE_ADDR) |
||||
self.write_cmd(0) |
||||
self.write_cmd(self.pages - 1) |
||||
self.write_data(self.buffer) |
||||
|
||||
|
||||
class SSD1306_I2C(SSD1306): |
||||
def __init__(self, width, height, i2c, addr=0x3c, external_vcc=False): |
||||
self.i2c = i2c |
||||
self.addr = addr |
||||
self.temp = bytearray(2) |
||||
super().__init__(width, height, external_vcc) |
||||
|
||||
def write_cmd(self, cmd): |
||||
self.temp[0] = 0x80 # Co=1, D/C#=0 |
||||
self.temp[1] = cmd |
||||
self.i2c.writeto(self.addr, self.temp) |
||||
|
||||
def write_data(self, buf): |
||||
self.temp[0] = self.addr << 1 |
||||
self.temp[1] = 0x40 # Co=0, D/C#=1 |
||||
self.i2c.start() |
||||
self.i2c.write(self.temp) |
||||
self.i2c.write(buf) |
||||
self.i2c.stop() |
||||
|
||||
|
||||
class SSD1306_SPI(SSD1306): |
||||
def __init__(self, width, height, spi, dc, res, cs, external_vcc=False): |
||||
self.rate = 10 * 1024 * 1024 |
||||
dc.init(dc.OUT, value=0) |
||||
res.init(res.OUT, value=0) |
||||
cs.init(cs.OUT, value=1) |
||||
self.spi = spi |
||||
self.dc = dc |
||||
self.res = res |
||||
self.cs = cs |
||||
import time |
||||
self.res(1) |
||||
time.sleep_ms(1) |
||||
self.res(0) |
||||
time.sleep_ms(10) |
||||
self.res(1) |
||||
super().__init__(width, height, external_vcc) |
||||
|
||||
def write_cmd(self, cmd): |
||||
self.spi.init(baudrate=self.rate, polarity=0, phase=0) |
||||
self.cs(1) |
||||
self.dc(0) |
||||
self.cs(0) |
||||
self.spi.write(bytearray([cmd])) |
||||
self.cs(1) |
||||
|
||||
def write_data(self, buf): |
||||
self.spi.init(baudrate=self.rate, polarity=0, phase=0) |
||||
self.cs(1) |
||||
self.dc(1) |
||||
self.cs(0) |
||||
self.spi.write(buf) |
||||
self.cs(1) |
@ -0,0 +1,235 @@
@@ -0,0 +1,235 @@
|
||||
""" |
||||
Websockets client for micropython |
||||
|
||||
Based very heavily on |
||||
https://github.com/aaugustin/websockets/blob/master/websockets/client.py |
||||
""" |
||||
|
||||
import ubinascii as binascii |
||||
import urandom as random |
||||
import ure as re |
||||
import ustruct as struct |
||||
import usocket as socket |
||||
from ucollections import namedtuple |
||||
|
||||
# Opcodes |
||||
OP_CONT = const(0x0) |
||||
OP_TEXT = const(0x1) |
||||
OP_BYTES = const(0x2) |
||||
OP_CLOSE = const(0x8) |
||||
OP_PING = const(0x9) |
||||
OP_PONG = const(0xa) |
||||
|
||||
# Close codes |
||||
CLOSE_OK = const(1000) |
||||
CLOSE_GOING_AWAY = const(1001) |
||||
CLOSE_PROTOCOL_ERROR = const(1002) |
||||
CLOSE_DATA_NOT_SUPPORTED = const(1003) |
||||
CLOSE_BAD_DATA = const(1007) |
||||
CLOSE_POLICY_VIOLATION = const(1008) |
||||
CLOSE_TOO_BIG = const(1009) |
||||
CLOSE_MISSING_EXTN = const(1010) |
||||
CLOSE_BAD_CONDITION = const(1011) |
||||
|
||||
URL_RE = re.compile(r'ws://([A-Za-z0-9\-\.]+)(?:\:([0-9]+))?(/.+)?') |
||||
URI = namedtuple('URI', ('hostname', 'port', 'path')) |
||||
|
||||
def urlparse(uri): |
||||
match = URL_RE.match(uri) |
||||
if match: |
||||
return URI(match.group(1), int(match.group(2)), match.group(3)) |
||||
else: |
||||
raise ValueError("Invalid URL: %s" % uri) |
||||
|
||||
class Websocket: |
||||
is_client = False |
||||
|
||||
def __init__(self, sock): |
||||
self._sock = sock |
||||
self.open = True |
||||
|
||||
def __enter__(self): |
||||
return self |
||||
|
||||
def __exit__(self, exc_type, exc, tb): |
||||
self.close() |
||||
|
||||
def settimeout(self, timeout): |
||||
self._sock.settimeout(timeout) |
||||
|
||||
def read_frame(self, max_size=None): |
||||
|
||||
# Frame header |
||||
byte1, byte2 = struct.unpack('!BB', self._sock.read(2)) |
||||
|
||||
# Byte 1: FIN(1) _(1) _(1) _(1) OPCODE(4) |
||||
fin = bool(byte1 & 0x80) |
||||
opcode = byte1 & 0x0f |
||||
|
||||
# Byte 2: MASK(1) LENGTH(7) |
||||
mask = bool(byte2 & (1 << 7)) |
||||
length = byte2 & 0x7f |
||||
|
||||
if length == 126: # Magic number, length header is 2 bytes |
||||
length, = struct.unpack('!H', self._sock.read(2)) |
||||
elif length == 127: # Magic number, length header is 8 bytes |
||||
length, = struct.unpack('!Q', self._sock.read(8)) |
||||
|
||||
if mask: # Mask is 4 bytes |
||||
mask_bits = self._sock.read(4) |
||||
|
||||
try: |
||||
data = self._sock.read(length) |
||||
except MemoryError: |
||||
# We can't receive this many bytes, close the socket |
||||
self.close(code=CLOSE_TOO_BIG) |
||||
return True, OP_CLOSE, None |
||||
|
||||
if mask: |
||||
data = bytes(b ^ mask_bits[i % 4] |
||||
for i, b in enumerate(data)) |
||||
|
||||
return fin, opcode, data.decode("utf-8") |
||||
|
||||
def write_frame(self, opcode, data=b''): |
||||
|
||||
fin = True |
||||
mask = self.is_client # messages sent by client are masked |
||||
|
||||
length = len(data) |
||||
|
||||
# Frame header |
||||
# Byte 1: FIN(1) _(1) _(1) _(1) OPCODE(4) |
||||
byte1 = 0x80 if fin else 0 |
||||
byte1 |= opcode |
||||
|
||||
# Byte 2: MASK(1) LENGTH(7) |
||||
byte2 = 0x80 if mask else 0 |
||||
|
||||
if length < 126: # 126 is magic value to use 2-byte length header |
||||
byte2 |= length |
||||
self._sock.write(struct.pack('!BB', byte1, byte2)) |
||||
|
||||
elif length < (1 << 16): # Length fits in 2-bytes |
||||
byte2 |= 126 # Magic code |
||||
self._sock.write(struct.pack('!BBH', byte1, byte2, length)) |
||||
|
||||
elif length < (1 << 64): |
||||
byte2 |= 127 # Magic code |
||||
self._sock.write(struct.pack('!BBQ', byte1, byte2, length)) |
||||
|
||||
else: |
||||
raise ValueError() |
||||
|
||||
if mask: # Mask is 4 bytes |
||||
mask_bits = struct.pack('!I', random.getrandbits(32)) |
||||
self._sock.write(mask_bits) |
||||
|
||||
data = bytes(b ^ mask_bits[i % 4] |
||||
for i, b in enumerate(data)) |
||||
|
||||
self._sock.write(data) |
||||
|
||||
def recv(self): |
||||
assert self.open |
||||
|
||||
while self.open: |
||||
try: |
||||
fin, opcode, data = self.read_frame() |
||||
except ValueError: |
||||
self._close() |
||||
return |
||||
|
||||
if not fin: |
||||
raise NotImplementedError() |
||||
|
||||
if opcode == OP_TEXT: |
||||
return data.decode('utf-8') |
||||
elif opcode == OP_BYTES: |
||||
return data |
||||
elif opcode == OP_CLOSE: |
||||
self._close() |
||||
return |
||||
elif opcode == OP_PONG: |
||||
# Ignore this frame, keep waiting for a data frame |
||||
continue |
||||
elif opcode == OP_PING: |
||||
# We need to send a pong frame |
||||
self.write_frame(OP_PONG, data) |
||||
# And then wait to receive |
||||
continue |
||||
elif opcode == OP_CONT: |
||||
# This is a continuation of a previous frame |
||||
raise NotImplementedError(opcode) |
||||
else: |
||||
raise ValueError(opcode) |
||||
|
||||
def send(self, buf): |
||||
|
||||
assert self.open |
||||
|
||||
if isinstance(buf, str): |
||||
opcode = OP_TEXT |
||||
buf = buf.encode('utf-8') |
||||
elif isinstance(buf, bytes): |
||||
opcode = OP_BYTES |
||||
else: |
||||
raise TypeError() |
||||
|
||||
self.write_frame(opcode, buf) |
||||
|
||||
def close(self, code=CLOSE_OK, reason=''): |
||||
|
||||
if not self.open: |
||||
return |
||||
|
||||
buf = struct.pack('!H', code) + reason.encode('utf-8') |
||||
|
||||
self.write_frame(OP_CLOSE, buf) |
||||
self._close() |
||||
|
||||
def _close(self): |
||||
self.open = False |
||||
self._sock.close() |
||||
|
||||
class WebsocketClient(Websocket): |
||||
is_client = True |
||||
|
||||
|
||||
def connect(uri): |
||||
""" |
||||
Connect a websocket. |
||||
""" |
||||
|
||||
uri = urlparse(uri) |
||||
assert uri |
||||
|
||||
sock = socket.socket() |
||||
addr = socket.getaddrinfo(uri.hostname, uri.port) |
||||
sock.connect(addr[0][4]) |
||||
|
||||
def send_header(header, *args): |
||||
sock.send(header % args + '\r\n') |
||||
|
||||
# Sec-WebSocket-Key is 16 bytes of random base64 encoded |
||||
key = binascii.b2a_base64(bytes(random.getrandbits(8) |
||||
for _ in range(16)))[:-1] |
||||
|
||||
send_header(b'GET %s HTTP/1.1', uri.path or '/') |
||||
send_header(b'Host: %s:%s', uri.hostname, uri.port) |
||||
send_header(b'Connection: Upgrade') |
||||
send_header(b'Upgrade: websocket') |
||||
send_header(b'Sec-WebSocket-Key: %s', key) |
||||
send_header(b'Sec-WebSocket-Version: 13') |
||||
send_header(b'Origin: http://localhost') |
||||
send_header(b'') |
||||
|
||||
header = sock.readline()[:-2] |
||||
assert header == b'HTTP/1.1 101 Switching Protocols', header |
||||
|
||||
# We don't (currently) need these headers |
||||
# FIXME: should we check the return key? |
||||
while header: |
||||
header = sock.readline()[:-2] |
||||
|
||||
return WebsocketClient(sock) |
Loading…
Reference in new issue