Pycom
Published

PyBike

A virtual bike racing game using real bikes

AdvancedShowcase (no instructions)10 hours2,084
PyBike

Things used in this project

Hardware components

LoPy4
Pycom LoPy4
×3
Pycom Expansion Board
×3
Seeed Studio Grove LCD RGB Backlight
×2
Reed switch
×2

Story

Read more

Custom parts and enclosures

Reed switch mount

LCD case bottom

LCD case lid

Button extension

Make it easier to push the expansion board button when it is inside a PyCase

Button extension

Code

Bike

Python
the main.py file for the bikes, be sure to change the ID to either 0x01 or 0x02
import socket
import i2c_lcd
import struct
import machine
from machine import Timer, Pin
from pycom import heartbeat, rgbled
from network import LoRa
import time

# Structs
_LORA_PKG_FORMAT = "BBB"
_LORA_PKG_FINISH_FORMAT = "BBdd"

# Device IDs
DEVICE_ID = 0x2
GATEWAY_ID = 0x4
ALL_DEVICES = 0x3

# Settings
DISTANCE_TARGET = 100

heartbeat(False)


class RotationCounter:
    def __init__(self, pin, debounce_ms):
        self._pin = Pin(pin, mode=Pin.IN, pull=Pin.PULL_DOWN)
        self._debounce_ms = debounce_ms
        self._last_count_ms = None
        self._pin.callback(Pin.IRQ_RISING, self._handler)
        self.counter = 0

    def _handler(self, pin):
        time_ms = time.ticks_ms()
        if ((self._last_count_ms is not None) and
                (time_ms - self._last_count_ms > self._debounce_ms)):
            self.counter += 1
            self._last_count_ms = None
        else:
            self._last_count_ms = time_ms


class Clock:
    def __init__(self, d):
        self.rt = RotationCounter(Pin.exp_board.G8, 30)
        self.currentDist = 0
        self.finish = False
        self.prevDist = 0
        self.distTravelled = 0
        self.d = d
        self.notify = True
        self.speed = 0
        self.seconds = 0
        self.initialise()

    def _seconds_handler(self, alarm):
        self.currentDist = (self.rt.counter * 2.07)
        # Calculate the remaining distance
        remainingDist = DISTANCE_TARGET - self.currentDist
        self.d.clear()
        self.seconds += 1
        self.d.home()
        self.d.write("T:{}s".format(self.seconds))
        self.d.move(0, 1)
        self.d.write("D:{}m".format(int(remainingDist)))

        if (self.seconds % 5 == 0):
            self.distTravelled = self.currentDist - self.prevDist
            self.speed = (self.distTravelled / 5)
            self.prevDist = self.currentDist
            self.distTravelled = 0
        self.d.move(9, 1)
        self.d.write("S:{}mph".format(int(self.speed * 2.237)))

        if (remainingDist <= 0):
            self.finish = True
            alarm.cancel()
            self.end()

    def initialise(self):
        self.d.home()
        self.d.color(255, 255, 255)
        self.d.write("Please press")
        self.d.move(0, 1)
        self.d.write("start...")

    def start(self):
        self.d.home()
        self.d.clear()
        # Countdown with colours
        self.d.color(255, 0, 0)
        self.d.write("READY")
        time.sleep(1)
        self.d.clear()
        self.d.color(255, 123, 0)
        self.d.write("SET")
        time.sleep(1)
        self.d.clear()
        self.d.color(0, 255, 0)
        self.d.write("GO!")
        self.__alarm = Timer.Alarm(self._seconds_handler, 1, periodic=True)

    def end(self):
        self.d.clear()
        self.d.write("You finished in:")
        self.d.move(0, 1)
        self.d.write("{} seconds".format(int(self.seconds)))


class Rider:
    def __init__(self):
        self.start_button = Pin('P10', mode=Pin.IN, pull=Pin.PULL_UP)
        self.start_button.callback(Pin.IRQ_FALLING, self._pin_handler)
        self._last_count_ms = None
        self._debounce_ms = 80

        self.state = 'IDLE'

        self.lora = LoRa(mode=LoRa.LORA)
        self.lora_sock = socket.socket(socket.AF_LORA, socket.SOCK_RAW)
        self.lora.callback(trigger=(LoRa.RX_PACKET_EVENT),
                           handler=self._lora_cb)
        self.lora_sock.setblocking(False)

        self.data = []
        self.winner = False
        self.d = i2c_lcd.Display()
        self.clk = Clock(self.d)

    def _pin_handler(self, arg):
        time_ms = time.ticks_ms()
        if ((self._last_count_ms is not None) and
                (time_ms - self._last_count_ms > self._debounce_ms)):
            if self.state == 'IDLE':
                self.state = 'READY'
                self.send_lora(0x1, DEVICE_ID)
                self._last_count_ms = None
        else:
            self._last_count_ms = time_ms

    def send_lora(self, cmd, _id, payload=0x0):
        if(self.state == 'FINISHED'):
            print('Sending FIN MSG to gateway: {}'.format(cmd))
            self.lora_sock.setblocking(True)
            pkg = struct.pack(_LORA_PKG_FINISH_FORMAT,
                              cmd,
                              _id,
                              payload[0],
                              payload[1])
            self.lora_sock.send(pkg)
        else:
            print('Sending MSG to gateway: {}'.format(cmd))
            pkg = struct.pack(_LORA_PKG_FORMAT,
                              cmd,
                              _id)
            self.lora_sock.send(pkg)

    def _lora_cb(self, lora):
        try:
            self.data = self.lora_sock.recv(64)
            cmd, _id, msg = struct.unpack(_LORA_PKG_FORMAT, self.data)
            if((_id == DEVICE_ID) or (_id == ALL_DEVICES)):
                print('Received: {} {} {}'.format(hex(cmd),
                                                  hex(_id),
                                                  hex(msg)))
                if(cmd == 0x1):
                    self.state = 'START'
                elif(cmd == 0x2):
                    self.state = 'END'
                    if msg == DEVICE_ID:
                        print('WINNER!')
                        self.winner = True
                elif(cmd == 0x3):
                    self.state = 'RESET'
                elif(cmd == 0x0):
                    print('ACK')
                else:
                    print('ERROR')
            else:
                print('Unknown Device: {}'.format(_id))
        except Exception as e:
            print('Corrupted LoRa packet')
            print(e)

    def run(self):
        while True:
            if self.state == 'IDLE':
                rgbled(0xFFFFFF)

            elif self.state == 'READY':
                rgbled(0xFFA500)
                self.d.clear()
                self.d.color(0, 0, 255)
                self.d.write('Ready...')
                self.lora_sock.setblocking(True)
                self.state = 'READY_WAIT'

            elif self.state == 'READY_WAIT':
                pass  # Wait for reply

            elif self.state == 'START':
                self.d.clear()
                self.d.color(255, 255, 255)
                self.d.write('Starting...')
                self.clk.start()
                rgbled(0x00FF00)
                self.state = 'RUNNING'

            elif self.state == 'RUNNING':
                if(self.clk.finish):
                    self.state = 'FINISHED'
                    self.send_lora(0x2,
                                   DEVICE_ID,
                                   (self.clk.seconds, self.clk.speed))

            elif self.state == 'FINISHED':
                self.d.clear()
                self.d.color(0, 0, 255)
                self.d.write('Awaiting Results')
                self.state = 'FINISHED_WAIT'
                self.lora_sock.setblocking(False)
                rgbled(0x0000FF)

            elif self.state == 'FINISHED_WAIT':
                pass  # Wait for reply

            elif self.state == 'END':
                if self.winner:
                    self.d.clear()
                    self.d.color(0, 255, 0)
                    self.d.write('Winner!')
                else:
                    self.d.clear()
                    self.d.color(255, 0, 0)
                    self.d.write('Not Today...')
                time.sleep(10)
                self.state = 'RESET'

            elif self.state == 'RESET':
                rgbled(0xFF0000)
                self.d.clear()
                self.d.color(255, 0, 0)
                self.d.write('Preparing')
                self.d.move(0, 1)
                self.d.write('Next Race')
                time.sleep(5)
                self.send_lora(0x3,
                               DEVICE_ID,
                               0x0)
                machine.reset()

            time.sleep(0.01)


rider = Rider()
rider.run()

Base station

Python
The base station the two bikes connect to
import socket
import struct
from pycom import heartbeat, rgbled
from network import LoRa
from network import WLAN
# from mqtt import MQTTClient
import time

# Structs
_LORA_PKG_FORMAT = "BBB"
_LORA_PKG_FINISH_FORMAT = "BBdd"

# Device IDs
DEVICE_ID = 0x2
GATEWAY_ID = 0x4
ALL_DEVICES = 0x3

bike_states = {0x01: {'state': 'IDLE', 'speed': None, 'time': None},
               0x02: {'state': 'IDLE', 'speed': None, 'time': None}}

heartbeat(False)


class NanoGateWay:
    def __init__(self):
        print('Gateway Starting Up...')
        self.connected = False
        self.riders = bike_states
        self.state = 'IDLE'
        self.data = [None, None, None, None, None]  # cmd, _id, msg
        self.winner = None

        # initialize LoRa as a Gateway (with Tx IQ inversion)
        self.lora = LoRa(mode=LoRa.LORA)
        self.lora_sock = socket.socket(socket.AF_LORA, socket.SOCK_RAW)
        self.lora_sock.setblocking(False)
        self.lora.callback(trigger=(LoRa.RX_PACKET_EVENT),
                           handler=self._lora_cb)

    def send_lora(self, cmd, _id, payload=0x0):
        print('Sending cmd:{} to device:{} with msg:{}'.format(hex(cmd),
                                                               hex(_id),
                                                               hex(payload)))
        pkg = struct.pack(_LORA_PKG_FORMAT, cmd, _id, payload)
        self.lora_sock.send(pkg)

    def set_led(self):
        if(self.state == 'IDLE'):
            rgbled(0xCCCCCC)
        elif(self.state == 'READY'):
            rgbled(0xFFA500)
        elif(self.state == 'RUNNING'):
            rgbled(0x00FF00)
        elif(self.state == 'END'):
            rgbled(0xFF0000)

    def get_winner(self):
        for i, (rider_id, rider) in enumerate(self.riders.items()):
            if (i == 0) or (rider['time'] < time_to_beat):
                time_to_beat = rider['time']
                rider_to_beat = rider_id
            elif time_to_beat == rider['time']:
                time_to_beat = rider['time']
                rider_to_beat = 0x0
        return(rider_to_beat, time_to_beat)

    def _lora_cb(self, lora):
        recv_pkg = self.lora_sock.recv(64)

        # Input
        cmd = dev_id = msg = duration = speed = None

        # Unpack Data
        try:
            if(len(recv_pkg) != 3):
                print('FIN PKG')
                data = struct.unpack(_LORA_PKG_FINISH_FORMAT, recv_pkg)
                cmd, dev_id, duration, speed = data
                print(cmd, hex(dev_id), duration, speed)

            else:
                data = struct.unpack(_LORA_PKG_FORMAT, recv_pkg)
                cmd, dev_id, msg = data
                print(cmd, hex(dev_id))
        except Exception as e:
            print('Corrupted LoRa packet')
            print(e)
            return

        # Process Commands
        if dev_id in self.riders:
            if cmd == 0x1:  # Device Ready to Start
                self.riders[dev_id]['state'] = 'READY'
                print("Rider: {} | READY".format(dev_id))
                rgbled(0xff9d00)
                self.state = 'READY'

                are_ready = [rider['state'] == 'READY'
                             for rider in self.riders.values()]
                if all(are_ready):
                    self.state = 'RUNNING'
                    for rider in self.riders.values():
                        rider['state'] = 'RUNNING'
                    out_dev_id = ALL_DEVICES
                    time.sleep(5)
                    print('Riders Start!')

                    # Send start command to riders
                    self.send_lora(0x1, ALL_DEVICES, 0x0)
                else:
                    # Send ACK
                    self.send_lora(0x0, dev_id, 0x0)

            elif cmd == 0x2:  # Device Has Finished

                self.riders[dev_id]['state'] = 'FINISHED'
                print("Rider {} | TIME: {} | SPEED: {}".format(dev_id,
                                                               duration,
                                                               speed))
                self.riders[dev_id]['time'] = duration
                self.riders[dev_id]['speed'] = speed

                are_finished = [rider['state'] == 'FINISHED'
                                for rider in self.riders.values()]
                if all(are_finished):
                    self.state = 'END'
                    print('All Riders Complete')

                    self.winner = self.get_winner()

                    time.sleep(5)

                    print('Winner: {}'.format(self.winner))

                    # Send results to riders
                    self.send_lora(0x2, ALL_DEVICES, self.winner[0])
                else:
                    # Send ACK
                    self.send_lora(0x0, dev_id, 0x0)

            elif cmd == 0x3:  # Device to Reset

                self.riders[dev_id]['state'] = 'IDLE'
                print("Rider: {} | RESET".format(dev_id))
                self.state = 'RESET'

            for rider in self.riders.values():
                print("Rider: {}".format(rider['state']))

        else:
            print('Unknown Device')

        self.set_led()


gateway = NanoGateWay()

Credits

Pycom

Pycom

3 projects • 8 followers
Pycom enables and inspires everyone to be an inventor. Multi-network, MicroPython-and ESP32-based dev boards and a full free IoT stack.

Comments