This commit is contained in:
Maximilian Wagner
2024-01-30 20:04:56 +01:00
parent 64318902b4
commit 2c6c21b761
22 changed files with 1620 additions and 93 deletions

67
lib/html_builder.py Normal file
View File

@@ -0,0 +1,67 @@
class HTMLBuilder():
"""
Assembles HTML response through substitution according to the passed dictionary.
The keys need to be in curly braces inside one of the passed files.
The list of files gets glued together in the order they are in, so a head.html
would need to be the first element.
Can be used to glue files without substitution by not passing data dictionary.
Example:
sub_dict = {
"page_title": {
"type": "item",
"value": "My Booklist!"
},
"books": {
"type": "list",
"value": ["Book1", "Book2"]
},
"select": {
"type": "radio",
"value": ["Book1", "Book2"]
}
}
response = HTMLBuilder(["html/head.html", "html/home.html"], sub_dict).build()
:param list files: the files that are glued together in order
:param dict data: the dictionary with which placeholders get replaced
"""
def __init__(self, files: list, data=None) -> None:
self.data = data
self.file: str = ""
for file in files:
self.file += open(file, "r").read()
def build(self) -> bytes:
if self.data == None:
return self.__to_binary()
for key, value in self.data.items():
key = '{' + key + '}'
value_type = value["type"]
value_repl = value["value"]
if value_type == "item":
self.file = self.file.replace(key, value_repl)
if value_type == "list":
repl_string = ""
for litem in value_repl:
repl_string += f'<li>{litem}</li>\n'
self.file = self.file.replace(key, repl_string)
if value_type == "radio":
repl_string = ""
for litem in value_repl:
repl_string += f'<input type="radio" id="1" name="{key[1:-1]}" value="{litem}">\n'
repl_string += f'<label for="{litem}">{litem}</label><br>\n'
self.file = self.file.replace(key, repl_string)
return self.__to_binary()
def __to_binary(self) -> bytes:
return self.file.encode()

View File

@@ -0,0 +1,8 @@
# Minimal HTTP server
#
# Copyright 2021 (c) Erik de Lange
# Released under MIT license
from .response import HTTPResponse
from .sendfile import sendfile
from .server import CONNECTION_CLOSE, CONNECTION_KEEP_ALIVE, HTTPServer

View File

@@ -0,0 +1,96 @@
# Components of HTTP/1.1 responses
#
# Use when manually composing an HTTP response
# Expand as required for your use
#
# For HTTP/1.1 specification see: https://www.ietf.org/rfc/rfc2616.txt
# For MIME types see: https://www.iana.org/assignments/media-types/media-types.xhtml
#
# Copyright 2021 (c) Erik de Lange
# Released under MIT license
from lib.httpserver.sendfile import sendfile
reason = {
200: "OK",
301: "Moved Permanently",
302: "Found",
400: "Bad Request",
404: "Not Found",
418: "I'm a teapot"
}
mimetype = {
"HTML": b"Content-Type: text/html\r\n",
"EVENT_STREAM": b"Content-Type: text/event-stream\r\n",
"X_ICON": b"Content-Type: image/x-icon\r\n",
"JSON": b"Content-Type: application/json\r\n"
}
response_header = {
"CLOSE": b"Connection: close\r\n",
"KEEP_ALIVE": b"Connection: keep-alive\r\n"
}
class HTTPResponse:
def __init__(self, con, status, mimetype=None, close=True, header=None):
""" Create a response object
:param int status: HTTP status code
:param socket con: socket
:param str mimetype: HTTP mime type
:param bool close: if true close connection else keep alive
:param dict header: key,value pairs for HTTP response header fields
"""
self.status = status
self.con = con
self.mimetype = mimetype
self.close = close
if header is None:
self.header = {}
else:
self.header = header
def redirect(self, location):
""" Redirect client """
self.con.write(f"HTTP/1.1 {self.status} {reason.get(self.status, 'NA')}\nLocation: {location}\n")
self.con.write(response_header.get("CLOSE"))
def send_file(self, filepath: str = ""):
""" Send response to stream writer """
self.con.write(f"HTTP/1.1 {self.status} {reason.get(self.status, 'NA')}\n")
if self.mimetype:
self.con.write(mimetype.get(self.mimetype))
if self.close:
self.con.write(response_header.get("CLOSE"))
else:
self.con.write(response_header.get("KEEP_ALIVE"))
if len(self.header) > 0:
for key, value in self.header.items():
self.con.write(f"{key}: {value}\n")
sendfile(self.con, filepath)
def send_raw(self, raw: bytes):
""" Send response to stream writer """
self.con.write(f"HTTP/1.1 {self.status} {reason.get(self.status, 'NA')}\n")
if self.mimetype:
self.con.write(mimetype.get(self.mimetype))
if self.close:
self.con.write(response_header.get("CLOSE"))
else:
self.con.write(response_header.get("KEEP_ALIVE"))
if len(self.header) > 0:
for key, value in self.header.items():
self.con.write(f"{key}: {value};\n")
self.con.write(b'\r\n\r\n')
self.con.write(raw)

View File

@@ -0,0 +1,25 @@
# Memory efficient file transfer
#
# Copyright 2021 (c) Erik de Lange
# Released under MIT license
_buffer = bytearray(512) # adjust size to your systems available memory
_bmview = memoryview(_buffer) # reuse pre-allocated _buffer
def sendfile(conn, filename):
""" Send a file to a connection in chunks - lowering memory usage.
:param socket conn: connection to send the file content to
:param str filename: name of file the send
"""
try:
with open(filename, "rb") as fp:
while True:
n = fp.readinto(_buffer)
if n == 0:
break
conn.write(_bmview[:n])
except:
print(f"WEB:File {filename} not found")

145
lib/httpserver/server.py Normal file
View File

@@ -0,0 +1,145 @@
# Minimal HTTP server
#
# Usage:
#
# from httpserver import HTTPServer, sendfile, CONNECTION_CLOSE
#
# app = HTTPServer()
# @app.route("GET", "/")
# def root(conn, request):
# response = HTTPResponse(200, "text/html", close=True)
# response.send(conn)
# sendfile(conn, "index.html")
#
# app.start()
#
# Handlers for the (method, path) combinations must be decorated with @route,
# and declared before the server is started (via a call to start).
# Every handler receives the connection socket and an object with all the
# details from the request (see url.py for exact content). The handler must
# construct and send a correct HTTP response. To avoid typos use the
# HTTPResponse component from response.py.
# When leaving the handler the connection will be closed, unless the return
# code of the handler is CONNECTION_KEEP_ALIVE.
# Any (method, path) combination which has not been declared using @route
# will, when received by the server, result in a 404 HTTP error.
# The server cannot be stopped unless an alert is raised. A KeyboardInterrupt
# will cause a controlled exit.
#
# Copyright 2021 (c) Erik de Lange
# Released under MIT license
import errno
import socket
from micropython import const
from .response import HTTPResponse
from .url import HTTPRequest, InvalidRequest
CONNECTION_CLOSE = const(0)
CONNECTION_KEEP_ALIVE = const(1)
class HTTPServerError(Exception):
pass
class HTTPServer:
def __init__(self, host="0.0.0.0", port=80, backlog=5, timeout=30):
self.host = host
self.port = port
self.backlog = backlog
self.timeout = timeout
self._routes = dict() # stores link between (method, path) and function to execute
def route(self, method="GET", path="/"):
""" Decorator which connects method and path to the decorated function. """
if (method, path) in self._routes:
raise HTTPServerError(f"route{(method, path)} already registered")
def wrapper(function):
self._routes[(method, path)] = function
return wrapper
def start(self):
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind((self.host, self.port))
server.listen(self.backlog)
print(f"HTTP server started on {self.host}:{self.port}")
while True:
try:
conn, addr = server.accept()
conn.settimeout(self.timeout)
request_line = conn.readline()
if request_line is None:
raise OSError(errno.ETIMEDOUT)
if request_line in [b"", b"\r\n"]:
print(f"empty request line from {addr[0]}")
conn.close()
continue
print(f"request line {request_line} from {addr[0]}")
try:
request = HTTPRequest(request_line)
except InvalidRequest as e:
while True:
# read and discard header fields
line = conn.readline()
if line is None:
raise OSError(errno.ETIMEDOUT)
if line in [b"", b"\r\n"]:
break
HTTPResponse(conn, 400, "HTML", close=True).send_file()
conn.write(repr(e).encode("utf-8"))
conn.close()
continue
while True:
# read header fields and add name / value to dict 'header'
line = conn.readline()
if line is None:
raise OSError(errno.ETIMEDOUT)
if line in [b"", b"\r\n"]:
break
else:
if line.find(b":") != 1:
name, value = line.split(b':', 1)
request.header[name] = value.strip()
# search function which is connected to (method, path)
func = self._routes.get((request.method, request.path))
if func:
if func(conn, request) != CONNECTION_KEEP_ALIVE:
# close connection unless explicitly kept alive
conn.close()
else: # no function found for (method, path) combination
response = HTTPResponse(conn, 404).send_file()
conn.close()
except KeyboardInterrupt: # will stop the server
conn.close()
break
except Exception as e:
conn.close()
if type(e) is OSError and e.errno == errno.ETIMEDOUT: # communication timeout
pass
elif type(e) is OSError and e.errno == errno.ECONNRESET: # client reset the connection
pass
else:
server.close()
raise e
server.close()
print("HTTP server stopped")

54
lib/httpserver/sse.py Normal file
View File

@@ -0,0 +1,54 @@
# Server-sent event support
#
# Usage:
#
# import _thread, time
#
# from httpserver.sse import EventSource
#
# @app.route("GET", "/api/greeting")
# def api_greeting(conn, request):
# """ Say hello every 5 seconds """
#
# def greeting_task(conn, eventsource):
# while True:
# time.sleep(5)
# try:
# eventsource.send(event="greeting", data="hello")
# except Exception: # catch (a.o.) ECONNRESET when the client has disappeared
# conn.close()
# break # exit thread
#
# _thread.start_new_thread(greeting_task, (conn, EventSource(conn)))
#
# Copyright 2022 (c) Erik de Lange
# Released under MIT license
from .response import HTTPResponse
class EventSource:
""" Open and use an event stream connection to the client """
def __init__(self, conn):
""" Set up an event stream connection """
self.conn = conn
HTTPResponse(conn, 200, "EVENT_STREAM", close=False, header={"Cache-Control": "no-cache"}).send_file()
def send(self, data=":", id=None, event=None, retry=None):
""" Send event to client following the event stream format
:param str data: event data to send to client. mandatory
:param int id: optional event id
:param str event: optional event type, used for dispatching at client
:param int retry: retry interval in milliseconds
"""
writer = self.conn
if id is not None:
writer.write(f"id: {id}\n")
if event is not None:
writer.write(f"event: {event}\n")
if retry is not None:
writer.write(f"retry: {retry}\n")
writer.write(f"data: {data}\n\n")

120
lib/httpserver/url.py Normal file
View File

@@ -0,0 +1,120 @@
# Routines for decoding an HTTP request line.
#
# HTTP request line as understood by this package:
#
# Request line: Method SP Request-URL SP HTTP-Version CRLF
# Request URL: Path ? Query
# Query: key=value&key=value
#
# Example: b"GET /page?key1=0.07&key2=0.03&key3=0.13 HTTP/1.1\r\n"
#
# Method: GET
# Request URL: /page?key1=0.07&key2=0.03&key3=0.13
# HTTP version: HTTP/1.1
# Path: /page
# Query: key1=0.07&key2=0.03&key3=0.13
#
# See also: https://www.tutorialspoint.com/http/http_requests.htm
# https://en.wikipedia.org/wiki/Uniform_Resource_Identifier
#
# For MicroPython applications which process HTTP requests.
#
# Copyright 2021,2022 (c) Erik de Lange
# Released under MIT license
class InvalidRequest(Exception):
pass
class HTTPRequest:
def __init__(self, request_line) -> None:
""" Separate an HTTP request line in its elements.
:param bytes request_line: the complete HTTP request line
:return Request: instance containing
method the request method ("GET", "PUT", ...)
url the request URL, including the query string (if any)
path the request path from the URL
query the query string from the URL (if any, else "")
version the HTTP version
parameters dictionary with key-value pairs from the query string
header empty dict, placeholder for key-value pairs from request header fields
:raises InvalidRequest: if line does not contain exactly 3 components separated by spaces
if method is not in IETF standardized set
aside from these no other checks done here
"""
try:
self.method, self.url, self.version = request_line.decode("utf-8").split()
# note that method, url and version are str, not bytes
except ValueError:
raise InvalidRequest(f"Expected 3 elements in {request_line}")
if self.version.find("/") != -1:
self.version = self.version.split("/", 1)[1]
if self.method not in ["GET", "HEAD", "POST", "PUT", "DELETE", "CONNECT", "OPTIONS", "TRACE"]:
raise InvalidRequest(f"Invalid method {self.method} in {request_line}")
if self.url.find("?") != -1:
self.path, self.query = self.url.split("?", 1)
self.parameters = query(self.query)
else:
self.path = self.url
self.query = ""
self.parameters = dict()
self.header = dict()
def query(query):
""" Place all key-value pairs from a request URLs query string into a dict.
Example: request b"GET /page?key1=0.07&key2=0.03&key3=0.13 HTTP/1.1\r\n"
yields dictionary {'key1': '0.07', 'key2': '0.03', 'key3': '0.13'}.
:param str query: the query part (everything after the '?') from an HTTP request line
:return dict: dictionary with zero or more entries
"""
d = dict()
if len(query) > 0:
for pair in query.split("&"):
try:
key, value = pair.split("=", 1)
if key not in d: # only first occurrence taken into account
d[key] = value
except ValueError: # skip malformed parameter (like missing '=')
pass
return d
# if __name__ == "__main__":
# request_lines = [b"GET / HTTP/1.1\r\n",
# b"GET /page/sub HTTP/1.1\r\n",
# b"GET /page?key1=0.07&key2=0.03 HTTP/1.1\r\n",
# b"GET /page?key1=0.07&key1=0.03 HTTP/1.1\r\n",
# b"GET /page?key1=0.07& HTTP/1.1\r\n",
# b"GET /page?key1=0.07 HTTP/1.1\r\n",
# b"GET /page?key1= HTTP/1.1\r\n",
# b"GET /page?key1 HTTP/1.1\r\n",
# b"GET /page? HTTP/1.1\r\n",
# b"GET HTTP/1.1\r\n",
# b"GET / HTTP/1.1 one_too_much\r\n",
# b"UNKNOWN / HTTP/1.1\r\n"]
#
# for line in request_lines:
# print("request line", line)
# try:
# request = HTTPRequest(line)
# print("method:", request.method)
# print("url:", request.url)
# print("version:", request.version)
# print("path:", request.path)
# print("query:", request.query)
# print("parameters:", request.parameters)
# except Exception as e:
# print("exception", repr(e))
# finally:
# print()

305
lib/sh1106.py Normal file
View File

@@ -0,0 +1,305 @@
#
# MicroPython SH1106 OLED driver, I2C and SPI interfaces
#
# The MIT License (MIT)
#
# Copyright (c) 2016 Radomir Dopieralski (@deshipu),
# 2017-2021 Robert Hammelrath (@robert-hh)
# 2021 Tim Weber (@scy)
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
# THE SOFTWARE.
#
# Sample code sections for ESP8266 pin assignments
# ------------ SPI ------------------
# Pin Map SPI
# - 3v - xxxxxx - Vcc
# - G - xxxxxx - Gnd
# - D7 - GPIO 13 - Din / MOSI fixed
# - D5 - GPIO 14 - Clk / Sck fixed
# - D8 - GPIO 4 - CS (optional, if the only connected device)
# - D2 - GPIO 5 - D/C
# - D1 - GPIO 2 - Res
#
# for CS, D/C and Res other ports may be chosen.
#
# from machine import Pin, SPI
# import sh1106
# spi = SPI(1, baudrate=1000000)
# display = sh1106.SH1106_SPI(128, 64, spi, Pin(5), Pin(2), Pin(4))
# display.sleep(False)
# display.fill(0)
# display.text('Testing 1', 0, 0, 1)
# display.show()
#
# --------------- I2C ------------------
#
# Pin Map I2C
# - 3v - xxxxxx - Vcc
# - G - xxxxxx - Gnd
# - D2 - GPIO 5 - SCK / SCL
# - D1 - GPIO 4 - DIN / SDA
# - D0 - GPIO 16 - Res
# - G - xxxxxx CS
# - G - xxxxxx D/C
#
# Pin's for I2C can be set almost arbitrary
#
# from machine import Pin, I2C
# import sh1106
#
# i2c = I2C(scl=Pin(5), sda=Pin(4), freq=400000)
# display = sh1106.SH1106_I2C(128, 64, i2c, Pin(16), 0x3c)
# display.sleep(False)
# display.fill(0)
# display.text('Testing 1', 0, 0, 1)
# display.show()
from micropython import const
import utime as time
import framebuf
# a few register definitions
_SET_CONTRAST = const(0x81)
_SET_NORM_INV = const(0xa6)
_SET_DISP = const(0xae)
_SET_SCAN_DIR = const(0xc0)
_SET_SEG_REMAP = const(0xa0)
_LOW_COLUMN_ADDRESS = const(0x00)
_HIGH_COLUMN_ADDRESS = const(0x10)
_SET_PAGE_ADDRESS = const(0xB0)
class SH1106(framebuf.FrameBuffer):
def __init__(self, width, height, external_vcc, rotate=0):
self.width = width
self.height = height
self.external_vcc = external_vcc
self.flip_en = rotate == 180 or rotate == 270
self.rotate90 = rotate == 90 or rotate == 270
self.pages = self.height // 8
self.bufsize = self.pages * self.width
self.renderbuf = bytearray(self.bufsize)
self.pages_to_update = 0
if self.rotate90:
self.displaybuf = bytearray(self.bufsize)
# HMSB is required to keep the bit order in the render buffer
# compatible with byte-for-byte remapping to the display buffer,
# which is in VLSB. Else we'd have to copy bit-by-bit!
super().__init__(self.renderbuf, self.height, self.width,
framebuf.MONO_HMSB)
else:
self.displaybuf = self.renderbuf
super().__init__(self.renderbuf, self.width, self.height,
framebuf.MONO_VLSB)
# flip() was called rotate() once, provide backwards compatibility.
self.rotate = self.flip
self.init_display()
def init_display(self):
self.reset()
self.fill(0)
self.show()
self.poweron()
# rotate90 requires a call to flip() for setting up.
self.flip(self.flip_en)
def poweroff(self):
self.write_cmd(_SET_DISP | 0x00)
def poweron(self):
self.write_cmd(_SET_DISP | 0x01)
if self.delay:
time.sleep_ms(self.delay)
def flip(self, flag=None, update=True):
if flag is None:
flag = not self.flip_en
mir_v = flag ^ self.rotate90
mir_h = flag
self.write_cmd(_SET_SEG_REMAP | (0x01 if mir_v else 0x00))
self.write_cmd(_SET_SCAN_DIR | (0x08 if mir_h else 0x00))
self.flip_en = flag
if update:
self.show(True) # full update
def sleep(self, value):
self.write_cmd(_SET_DISP | (not value))
def contrast(self, contrast):
self.write_cmd(_SET_CONTRAST)
self.write_cmd(contrast)
def invert(self, invert):
self.write_cmd(_SET_NORM_INV | (invert & 1))
def show(self, full_update = False):
# self.* lookups in loops take significant time (~4fps).
(w, p, db, rb) = (self.width, self.pages,
self.displaybuf, self.renderbuf)
if self.rotate90:
for i in range(self.bufsize):
db[w * (i % p) + (i // p)] = rb[i]
if full_update:
pages_to_update = (1 << self.pages) - 1
else:
pages_to_update = self.pages_to_update
#print("Updating pages: {:08b}".format(pages_to_update))
for page in range(self.pages):
if (pages_to_update & (1 << page)):
self.write_cmd(_SET_PAGE_ADDRESS | page)
self.write_cmd(_LOW_COLUMN_ADDRESS | 2)
self.write_cmd(_HIGH_COLUMN_ADDRESS | 0)
self.write_data(db[(w*page):(w*page+w)])
self.pages_to_update = 0
def pixel(self, x, y, color=None):
if color is None:
return super().pixel(x, y)
else:
super().pixel(x, y , color)
page = y // 8
self.pages_to_update |= 1 << page
def text(self, text, x, y, color=1):
super().text(text, x, y, color)
self.register_updates(y, y+7)
def line(self, x0, y0, x1, y1, color):
super().line(x0, y0, x1, y1, color)
self.register_updates(y0, y1)
def hline(self, x, y, w, color):
super().hline(x, y, w, color)
self.register_updates(y)
def vline(self, x, y, h, color):
super().vline(x, y, h, color)
self.register_updates(y, y+h-1)
def fill(self, color):
super().fill(color)
self.pages_to_update = (1 << self.pages) - 1
def blit(self, fbuf, x, y, key=-1, palette=None):
super().blit(fbuf, x, y, key, palette)
self.register_updates(y, y+self.height)
def scroll(self, x, y):
# my understanding is that scroll() does a full screen change
super().scroll(x, y)
self.pages_to_update = (1 << self.pages) - 1
def fill_rect(self, x, y, w, h, color):
super().fill_rect(x, y, w, h, color)
self.register_updates(y, y+h-1)
def rect(self, x, y, w, h, color):
super().rect(x, y, w, h, color)
self.register_updates(y, y+h-1)
def register_updates(self, y0, y1=None):
# this function takes the top and optional bottom address of the changes made
# and updates the pages_to_change list with any changed pages
# that are not yet on the list
start_page = max(0, y0 // 8)
end_page = max(0, y1 // 8) if y1 is not None else start_page
# rearrange start_page and end_page if coordinates were given from bottom to top
if start_page > end_page:
start_page, end_page = end_page, start_page
for page in range(start_page, end_page+1):
self.pages_to_update |= 1 << page
def reset(self, res):
if res is not None:
res(1)
time.sleep_ms(1)
res(0)
time.sleep_ms(20)
res(1)
time.sleep_ms(20)
class SH1106_I2C(SH1106):
def __init__(self, width, height, i2c, res=None, addr=0x3c,
rotate=0, external_vcc=False, delay=0):
self.i2c = i2c
self.addr = addr
self.res = res
self.temp = bytearray(2)
self.delay = delay
if res is not None:
res.init(res.OUT, value=1)
super().__init__(width, height, external_vcc, rotate)
def write_cmd(self, cmd):
self.temp[0] = 0x80 # Co=1, D/C#=0
self.temp[1] = cmd
self.i2c.writeto(self.addr, self.temp)
def write_data(self, buf):
self.i2c.writeto(self.addr, b'\x40'+buf)
def reset(self):
super().reset(self.res)
class SH1106_SPI(SH1106):
def __init__(self, width, height, spi, dc, res=None, cs=None,
rotate=0, external_vcc=False, delay=0):
dc.init(dc.OUT, value=0)
if res is not None:
res.init(res.OUT, value=0)
if cs is not None:
cs.init(cs.OUT, value=1)
self.spi = spi
self.dc = dc
self.res = res
self.cs = cs
self.delay = delay
super().__init__(width, height, external_vcc, rotate)
def write_cmd(self, cmd):
if self.cs is not None:
self.cs(1)
self.dc(0)
self.cs(0)
self.spi.write(bytearray([cmd]))
self.cs(1)
else:
self.dc(0)
self.spi.write(bytearray([cmd]))
def write_data(self, buf):
if self.cs is not None:
self.cs(1)
self.dc(1)
self.cs(0)
self.spi.write(buf)
self.cs(1)
else:
self.dc(1)
self.spi.write(buf)
def reset(self):
super().reset(self.res)

188
lib/tm_sim.py Normal file
View File

@@ -0,0 +1,188 @@
class TuringMachine():
def __init__(self, turing_machine_def: str) -> None:
# if a placeholder is needed, create it with empty string
if not turing_machine_def:
return
# STATE VARIABLES
self.state_start: str = ''
self.state_accept: str = ''
self.state_reject: str = ''
self.state_current: str = ''
# BAND VARIABLES
self.band: list = []
self.band_num: int = 0
self.band_current: int = 0
# RULE VARIABLES
self.rules: list = []
self.rule_used: str = ''
# ERR Variables
self.errored: bool = False
# PARSE BANDS
def_split = turing_machine_def.replace(' ', '').replace('\n', '').lower().split(';')
self.band_num = int(def_split[0])
# PARSE STATES
self.state_start = def_split[1]
self.state_current = def_split[1]
self.state_accept = def_split[2]
self.state_reject = def_split[3]
def_split_truncated = def_split[4:]
# PARSE RULES
for x in def_split_truncated:
if not x: continue
rule_current = x.split(',')
tmp = rule_current[1].split('->')
rule_current[1] = tmp[0]
rule_current.insert(2, tmp[1])
self.rules.append(rule_current)
def turing_run(self) -> None:
'''Runs the TM without giving the option to step through manually.'''
while not self.turing_finished():
self.turing_step()
if self.errored: return
def turing_step(self) -> bool:
'''Steps through the TM one rule at a time. A step is changing
the value of the current element and moving the band by one.
Returns whether the TM has finished running or has an empty band.'''
if len(self.band) == 0:
return True
# Add blank if current location is out of bounds and adjust location
if self.band_current < 0:
self.band.insert(0, '_')
self.band_current = 0
if self.band_current == len(self.band):
self.band.append('_')
for rule in self.rules:
# Skip if rule does not apply to current state
if not rule[0] == self.state_current: continue
# Skip if rule does not apply to current band element
if not rule[1] == self.band[self.band_current]: continue
# Set band element according to rule
self.band[self.band_current] = rule[3]
# Set current state according to rule
self.state_current = rule[2]
# Move band according to rule
if rule[4] == '<':
self.band_current -= 1;
if rule[4] == '>':
self.band_current += 1;
self.rule_used = ( rule[0] + ', ' +
rule[1] + ' -> ' +
rule[2] + ', ' +
rule[3] + ', ' +
rule[4])
break
return self.turing_finished()
def turing_finished(self) -> bool:
'''Returns whether the TM is in accepting state, rejecting state or has
halted because no applicable rules are left.'''
return ( self.state_current == self.state_accept or
self.state_current == self.state_reject or
self.errored )
def turing_accepted(self) -> bool:
'''Returns whether the TM, with the given input, has reached accepted state.'''
return self.state_current == self.state_accept
def turing_reset(self) -> None:
'''Resets current state to start state and band location to zero.'''
self.state_current = self.state_start
self.band_current = 0
def get_rules(self) -> list:
return self.rules
def get_rule_used(self) -> str:
return self.rule_used
def get_band(self) -> str:
return (''.join(self.band)).replace('_', '')
def get_band_context(self, context: int) -> str:
'''Returns context chars to the left and right of the current element as well as the element.'''
ret = ""
for i in range(self.band_current - context, self.band_current + context + 1):
if i < 0:
ret += "_ "
elif i+1 > len(self.band):
ret += "_ "
else:
ret += f"{self.band[i]} "
return ret.rstrip()
def get_band_current_element(self) -> str:
if self.band_current < 0 or self.band_current == len(self.band):
return '_'
return self.band[self.band_current]
def get_state_start(self) -> str:
return self.state_start
def get_state_current(self) -> str:
return self.state_current
def get_state_accept(self) -> str:
return self.state_accept
def get_state_reject(self) -> str:
return self.state_reject
def get_has_errored(self) -> bool:
return self.errored
def set_band(self, input: str):
self.band = list(input)
def redefine(self, turing_machine_def: str):
self.__init__(turing_machine_def)
def test():
tm = TuringMachine(''' 1; q0; q3; q4;
q0, 0 -> q0, 0, > ;
q0, 1 -> q0, 1, > ;
q0, _ -> q1, _, < ;
q1, 0 -> q2, 1, < ;
q1, 1 -> q1, 0, < ;
q1, _ -> q3, 1, - ;
q2, 1 -> q2, 1, < ;
q2, 0 -> q2, 0, < ;
q2, _ -> q3, _, >
''')
tm.set_band("1011")
tm.turing_run()
if tm.get_band() == "1100":
print("Successfull")
else:
print("Failed")