Joseph Wezensky
Published © MIT

Portable WiFi LED Matrix Marquee

A self-contained handheld display that can scroll the users custom message that is entered via a WiFi connection.

BeginnerFull instructions provided3 hours1,654
Portable WiFi LED Matrix Marquee

Things used in this project

Hardware components

MAX7219 8x32 LED Matrix Array
×1
Wemos D1 Mini
Espressif Wemos D1 Mini
×1
TP4056 Lithium Charger Module
×1
DC-DC Boost Module 5V
×1
LiPo Battery
×1

Software apps and online services

MicroPython
MicroPython

Story

Read more

Custom parts and enclosures

Enclosure

Schematics

Wiring Diagram

Code

marquee.py

MicroPython
Main file to import to run the device.
# marquee.py
# wifi controlled message scroller
#
# Author: Joseph G. Wezensky
# License: MIT License (https://opensource.org/licenses/MIT)
#
# Wiring...
# Wemos D1 Mini  max7219 8x8 LED Matrix
#    5V          VCC
#    GND         GND
#    D7 MOSI     DIN
#    D8          CS
#    D5 SCK      CLK

import uasyncio as asyncio
import uos
import socket
import network
import dnsquery
from drivers import max7219m
from machine import Pin, SPI

webroot = 'wwwroot'
default = 'marquee.html'
redirect_file = '/wwwroot/captive.html'
ssid = 'MyMarquee'
message = "Connect your wifi to MyMarquee"
dic = {"%21":"!","%22":'"',"%23":"#","%24":"$","%26":"&","%27":"'","%28":"(","%29":")","%2A":"*","%2B":"+","%2C":",","%2F":"/","%3A":":","%3B":";","%3D":"=","%3F":"?","%40":"@","%5B":"[","%5D":"]","%7B":"{","%7D":"}"}

@asyncio.coroutine
def capture_dns(udps, ip):
    while True:
        try:
            data, addr = udps.recvfrom(1024)
            p=dnsquery.DNSQuery(data)
            udps.sendto(p.respuesta(ip), addr)
        except:
            pass
            #print('no dns')
        await asyncio.sleep_ms(300)

@asyncio.coroutine
def scroll_message(display):
    global message
    while True:
        try:
            start = 33
            extent = 0 - (len(message) * 8) - 32

            for i in range(start, extent, -1):
                display.fill(0)
                display.text(message, i, 0, 1)
                display.show()
                await asyncio.sleep_ms(25)

            await asyncio.sleep(1)
        except:
            pass

        await asyncio.sleep_ms(100)

def urldecode(str):
    global dic
    for k,v in dic.items(): str=str.replace(k,v)
    return str

# Breaks an HTTP request into its parts and boils it down to a physical file (if possible)
def decode_path(req):
    global message
    global default
    cmd, headers = req.decode("utf-8").split('\r\n', 1)
    parts = cmd.split(' ')
    method, path = parts[0], parts[1]
    # remove any query string
    query = ''
    r = path.find('?')
    if r > 0:
        query = path[r:]
        path = path[:r]
    # check for use of default document
    if path == '/':
        path = default
    else:
        path = path[1:]

    #print('METHOD:' + method)
    #print('PATH  :' + path)
    #print('QRYSTR:' + query)

    if path == default and query != '' and query.startswith('?msg='):
        message = urldecode(query[5:].replace('+', ' '))
        print('MSG:"' + message + '"')

    # return the physical path of the response file
    return webroot + '/' + path

# Looks up the content-type based on the file extension
def get_mime_type(file):
    if file.endswith(".html"):
        return "text/html", False
    if file.endswith(".css"):
        return "text/css", True
    if file.endswith(".js"):
        return "text/javascript", True
    if file.endswith(".png"):
        return "image/png", True
    if file.endswith(".gif"):
        return "image/gif", True
    if file.endswith(".jpeg") or file.endswith(".jpg"):
        return "image/jpeg", True
    if file.endswith(".pdf"):
        return "application/pdf", True
    return "text/plain", False

# Quick check if a file exists
def exists(file):
    try:
        s = uos.stat(file)
        return True
    except:
        return False    

@asyncio.coroutine
def serve_http(reader, writer):
    try:
        file = decode_path((yield from reader.read()))

        if exists(file):
            mime_type, cacheable = get_mime_type(file)
            yield from writer.awrite("HTTP/1.0 200 OK\r\n")
            yield from writer.awrite("Content-Type: {}\r\n".format(mime_type))
            if cacheable:
                yield from writer.awrite("Cache-Control: max-age=86400\r\n")
            yield from writer.awrite("\r\n")
        else:
            yield from writer.awrite("HTTP/1.0 200 OK\r\n")
            yield from writer.awrite("Content-Type: text/html\r\n")
            yield from writer.awrite("\r\n")
            file = redirect_file

        buf = bytearray(512)
        f = open(file, "rb")
        size = f.readinto(buf)
        while size > 0:
            yield from writer.awrite(buf, sz=size)
            size = f.readinto(buf)
        f.close()

    except:
        #import machine
        #machine.reset()
        pass
    finally:
        yield from writer.aclose()

def initialize():
    global ssid

    # shutdown any local wifi connect
    sta = network.WLAN(network.STA_IF)
    if sta.active():
        sta.active(False)
        while sta.isconnected():
            pass

    # startup the device as an AP
    ap = network.WLAN(network.AP_IF)
    if not ap.active():
        ap.active(True)
        ap.config(essid=ssid, password="", authmode=1)

    return ap.ifconfig()[0]

def shutdown():
    sta = network.WLAN(network.STA_IF)
    if sta.active():
        sta.active(False)
        while sta.isconnected():
            pass
    ap = network.WLAN(network.AP_IF)
    if ap.active():
        ap.active(False)

def run():
    # setup as Access point
    ip = initialize()

    # prepare to capture DNS requests
    udps = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
    udps.setblocking(False)
    udps.bind(('',53))

    # prepare to scroll text
    spi = SPI(1, baudrate=10000000, polarity=0, phase=0)
    display = max7219m.Max7219M(spi, Pin(15), 4)
    display.brightness(7)

    # load up the coros and run
    loop = asyncio.get_event_loop()
    loop.create_task(capture_dns(udps, ip))
    loop.create_task(scroll_message(display))
    loop.create_task(asyncio.start_server(serve_http, "0.0.0.0", 80, 20))
    loop.run_forever()

    # if we ever finish
    loop.close()
    udps.close()

    #shutdown access point
    shutdown()

run()

WiFi Marquee

Credits

Joseph Wezensky

Joseph Wezensky

2 projects • 5 followers

Comments