# Picoweb web pico-framework for MicroPython # Copyright (c) 2014-2018 Paul Sokolovsky # SPDX-License-Identifier: MIT import sys import gc import micropython import utime import uio import ure as re import uerrno import uasyncio as asyncio def unquote_plus(s): # TODO: optimize s = s.replace("+", " ") arr = s.split("%") arr2 = [chr(int(x[:2], 16)) + x[2:] for x in arr[1:]] return arr[0] + "".join(arr2) def parse_qs(s): res = {} if s: pairs = s.split("&") for p in pairs: vals = [unquote_plus(x) for x in p.split("=", 1)] if len(vals) == 1: vals.append(True) old = res.get(vals[0]) if old is not None: if not isinstance(old, list): old = [old] res[vals[0]] = old old.append(vals[1]) else: res[vals[0]] = vals[1] return res def get_mime_type(fname): # Provide minimal detection of important file # types to keep browsers happy if fname.endswith(".html"): return "text/html" if fname.endswith(".css"): return "text/css" if fname.endswith(".png") or fname.endswith(".jpg"): return "image" return "text/plain" def sendstream(writer, f): buf = bytearray(64) while True: l = f.readinto(buf) if not l: break yield from writer.awrite(buf, 0, l) def jsonify(writer, dict): import ujson yield from start_response(writer, "application/json") yield from writer.awrite(ujson.dumps(dict)) def start_response(writer, content_type="text/html", status="200", headers=None): yield from writer.awrite("HTTP/1.0 %s NA\r\n" % status) yield from writer.awrite("Content-Type: ") yield from writer.awrite(content_type) if not headers: yield from writer.awrite("\r\n\r\n") return yield from writer.awrite("\r\n") if isinstance(headers, bytes) or isinstance(headers, str): yield from writer.awrite(headers) else: for k, v in headers.items(): yield from writer.awrite(k) yield from writer.awrite(": ") yield from writer.awrite(v) yield from writer.awrite("\r\n") yield from writer.awrite("\r\n") def http_error(writer, status): yield from start_response(writer, status=status) yield from writer.awrite(status) class HTTPRequest: def __init__(self): pass def read_form_data(self): size = int(self.headers[b"Content-Length"]) data = yield from self.reader.read(size) form = parse_qs(data.decode()) self.form = form def parse_qs(self): form = parse_qs(self.qs) self.form = form class WebApp: def __init__(self, pkg, routes=None): if routes: self.url_map = routes else: self.url_map = [] if pkg and pkg != "__main__": self.pkg = pkg.split(".", 1)[0] else: self.pkg = None self.mounts = [] self.inited = False # Instantiated lazily self.template_loader = None self.headers_mode = "parse" def parse_headers(self, reader): headers = {} while True: l = yield from reader.readline() if l == b"\r\n": break k, v = l.split(b":", 1) headers[k] = v.strip() return headers def _handle(self, reader, writer): close = True try: request_line = yield from reader.readline() if request_line == b"": yield from writer.aclose() return req = HTTPRequest() # TODO: bytes vs str request_line = request_line.decode() method, path, proto = request_line.split() path = path.split("?", 1) qs = "" if len(path) > 1: qs = path[1] path = path[0] #print("================") #print(req, writer) #print(req, (method, path, qs, proto), req.headers) # Find which mounted subapp (if any) should handle this request app = self while True: found = False for subapp in app.mounts: root = subapp.url #print(path, "vs", root) if path[:len(root)] == root: app = subapp found = True path = path[len(root):] if not path.startswith("/"): path = "/" + path break if not found: break # We initialize apps on demand, when they really get requests if not app.inited: app.init() # Find handler to serve this request in app's url_map found = False for e in app.url_map: pattern = e[0] handler = e[1] extra = {} if len(e) > 2: extra = e[2] if path == pattern: found = True break elif not isinstance(pattern, str): # Anything which is non-string assumed to be a ducktype # pattern matcher, whose .match() method is called. (Note: # Django uses .search() instead, but .match() is more # efficient and we're not exactly compatible with Django # URL matching anyway.) m = pattern.match(path) if m: req.url_match = m found = True break if not found: headers_mode = "skip" else: headers_mode = extra.get("headers", self.headers_mode) if headers_mode == "skip": while True: l = yield from reader.readline() if l == b"\r\n": break elif headers_mode == "parse": req.headers = yield from self.parse_headers(reader) else: assert headers_mode == "leave" if found: req.method = method req.path = path req.qs = qs req.reader = reader close = yield from handler(req, writer) else: yield from start_response(writer, status="404") yield from writer.awrite("404\r\n") #print(req, "After response write") except Exception as e: pass if close is not False: yield from writer.aclose() def mount(self, url, app): "Mount a sub-app at the url of current app." # Inspired by Bottle. It might seem that dispatching to # subapps would rather be handled by normal routes, but # arguably, that's less efficient. Taking into account # that paradigmatically there's difference between handing # an action and delegating responisibilities to another # app, Bottle's way was followed. app.url = url self.mounts.append(app) def route(self, url, **kwargs): def _route(f): self.url_map.append((url, f, kwargs)) return f return _route def add_url_rule(self, url, func, **kwargs): # Note: this method skips Flask's "endpoint" argument, # because it's alleged bloat. self.url_map.append((url, func, kwargs)) def _load_template(self, tmpl_name): if self.template_loader is None: import utemplate.source self.template_loader = utemplate.source.Loader(self.pkg, "templates") return self.template_loader.load(tmpl_name) def render_template(self, writer, tmpl_name, args=()): tmpl = self._load_template(tmpl_name) for s in tmpl(*args): yield from writer.awrite(s) def render_str(self, tmpl_name, args=()): #TODO: bloat tmpl = self._load_template(tmpl_name) return ''.join(tmpl(*args)) def init(self): """Initialize a web application. This is for overriding by subclasses. This is good place to connect to/initialize a database, for example.""" self.inited = True def run(self, host="0.0.0.0", port=80, lazy_init=False): gc.collect() self.init() if not lazy_init: for app in self.mounts: app.init() loop = asyncio.get_event_loop() loop.create_task(asyncio.start_server(self._handle, host, port)) loop.run_forever() loop.close()