Whitney Knitter
Published © GPL3+

Lego Land Rover Defender Robot v2

This year, I upgraded my Lego Defender Robot with the fixes I mentioned it needed after rev 1 last year.

AdvancedWork in progress24 hours2,681
Lego Land Rover Defender Robot v2

Things used in this project

Hardware components

LEGO Land Rover Defender Technic 42110
×1
TP-Link Tapo 2K Indoor Security Camera
×1
Motor and Remote Control Upgrade Kit for Lego Technic Land Rover Defender 42110
×1
TP-Link USB Bluetooth Adapter for PC(UB400), 4.0 Bluetooth Dongle
×1

Software apps and online services

Edge Impulse Studio
Edge Impulse Studio
AMD Kria™ KV260 Vision AI Starter Kit
AMD Vivado Design Suite
AMD Vitis Unified Software Platform
Ubuntu Core
Ubuntu Core

Story

Read more

Code

legov2_classifier.py

Python
#!/usr/bin/env python

import os
import cv2
import time
import signal
import asyncio 
import sys, getopt
from bleak import BleakClient
from edge_impulse_linux.image import ImageImpulseRunner

bt_addr = "45:71:21:BD:26:13"
bt_uuid = "0000fff2-0000-1000-8000-00805f9b34fb"


header = bytes.fromhex("5A6B020005")
module_channel = bytes.fromhex("01")

portAB_command_stp = bytes.fromhex("0000")
portCD_command_stp = bytes.fromhex("0000")
command_chksum_stp = bytes.fromhex("CD")

portAB_command_fwd = bytes.fromhex("0196")
portCD_command_fwd = bytes.fromhex("0000")
command_chksum_fwd = bytes.fromhex("64")

portAB_command_rvs = bytes.fromhex("0288")
portCD_command_rvs = bytes.fromhex("0000")
command_chksum_rvs = bytes.fromhex("57")

portAB_command_rgt = bytes.fromhex("0000")
portCD_command_rgt = bytes.fromhex("049C")
command_chksum_rgt = bytes.fromhex("6D")

portAB_command_lft = bytes.fromhex("0000")
portCD_command_lft = bytes.fromhex("089C")
command_chksum_lft = bytes.fromhex("71")

command_chksum_fwdrgt = bytes.fromhex("D1")
command_chksum_rvsrgt = bytes.fromhex("C4")
command_chksum_fwdlft = bytes.fromhex("D5")
command_chksum_rvslft = bytes.fromhex("C8")

runner = None
# if you don't want to see a camera preview, set this to False
show_camera = True
if (sys.platform == 'linux' and not os.environ.get('DISPLAY')):
    show_camera = False

def now():
    return round(time.time() * 1000)

def sigint_handler(sig, frame):
    print('Interrupted')
    if (runner):
        runner.stop()
    sys.exit(0)

signal.signal(signal.SIGINT, sigint_handler)

def help():
    print('python classify.py <path_to_model.eim> <Camera port ID, only required when more than 1 camera is present>')

async def main(argv):
    try:
        opts, args = getopt.getopt(argv, "h", ["--help"])
    except getopt.GetoptError:
        help()
        sys.exit(2)

    for opt, arg in opts:
        if opt in ('-h', '--help'):
            help()
            sys.exit()

    if len(args) == 0:
        help()
        sys.exit(2)

    # connect to bluetooth motor controller
    client = BleakClient(bt_addr)
    await client.connect()

    model = args[0]

    dir_path = os.path.dirname(os.path.realpath(__file__))
    modelfile = os.path.join(dir_path, model)

    print('MODEL: ' + modelfile)
    
    defender_direction = "init"

    with ImageImpulseRunner(modelfile) as runner:
        try:
            model_info = runner.init()
            print('Loaded runner for "' + model_info['project']['owner'] + ' / ' + model_info['project']['name'] + '"')
            labels = model_info['model_parameters']['labels']
            
            videoCaptureDeviceId = "rtsp://<username>:<password>@192.168.1.100/stream2"
            camera = cv2.VideoCapture(videoCaptureDeviceId)
            
            ret = camera.read()[0]
            if ret:
                backendName = camera.getBackendName()
                w = camera.get(3)
                h = camera.get(4)
                print("Camera %s (%s x %s) in port %s selected." %(backendName,h,w, videoCaptureDeviceId))
                
                # set cv2 buffer size to 0
                camera.set(cv2.CAP_PROP_BUFFERSIZE, 0)
                
                camera.release()
            else:
                raise Exception("Couldn't initialize selected camera.")

            next_frame = 0 

            for res, img in runner.classifier(videoCaptureDeviceId):
                camera.set(cv2.CAP_PROP_BUFFERSIZE, 0)

                if "classification" in res["result"].keys():
                    for label in labels:
                        score = res['result']['classification'][label]

                elif "bounding_boxes" in res["result"].keys():
                    if (len(res["result"]["bounding_boxes"])) == 0:
                        if defender_direction != "straight": 
                            model_number = await client.write_gatt_char(bt_uuid, header + portAB_command_fwd + portCD_command_fwd + module_channel + command_chksum_fwd)
                            defender_direction = "straight"
                        
                    else:
                        for bb in res["result"]["bounding_boxes"]:
                            # object has been found & outputting which obj was found and where in the frame
                            img = cv2.rectangle(img, (bb['x'], bb['y']), (bb['x'] + bb['width'], bb['y'] + bb['height']), (255, 0, 0), 1)
                            # take an action for a given object (motor control code here)
                            if (bb['label']) == "pumpkin":
                                if (bb['x'] < 45):
                                    if defender_direction != "right": 
                                        model_number = await client.write_gatt_char(bt_uuid, header + portAB_command_fwd + portCD_command_rgt + module_channel + command_chksum_fwdrgt)
                                        defender_direction = "right"
                                elif (bb['x'] > 44):
                                    if defender_direction != "left": 
                                        model_number = await client.write_gatt_char(bt_uuid, header + portAB_command_fwd + portCD_command_lft + module_channel + command_chksum_fwdlft)
                                        defender_direction = "left"
                                else:
                                    model_number = 0
                                    defender_direction = defender_direction

                                
                if (show_camera):
                    cv2.imshow('edgeimpulse', cv2.cvtColor(img, cv2.COLOR_RGB2BGR))
                    if cv2.waitKey(1) == ord('q'):
                        break

                next_frame = now() + 100
        finally:
            if (runner):
                runner.stop()
                model_number = await client.write_gatt_char(bt_uuid, header + portAB_command_stp + portCD_command_stp + module_channel + command_chksum_stp)

                await client.disconnect()

if __name__ == "__main__":
    asyncio.run(main(sys.argv[1:]))

Credits

Whitney Knitter
178 projects • 2035 followers
All thoughts/opinions are my own and do not reflect those of any company/entity I currently/previously associate with.

Comments