Whitney Knitter
Published © GPL3+

Lego Land Rover Defender Robot v2

This year, I upgraded my Lego Defender Robot with the fixes I mentioned it needed after rev 1 last year.

AdvancedWork in progress24 hours1,532
Lego Land Rover Defender Robot v2

Things used in this project

Hardware components

Kria™ KR260 Robotics Starter Kit
AMD Kria™ KR260 Robotics Starter Kit
×1
LEGO Land Rover Defender Technic 42110
×1
TP-Link Tapo 2K Indoor Security Camera
×1
Motor and Remote Control Upgrade Kit for Lego Technic Land Rover Defender 42110
×1
TP-Link USB Bluetooth Adapter for PC(UB400), 4.0 Bluetooth Dongle
×1

Software apps and online services

Edge Impulse Studio
Edge Impulse Studio
Vivado Design Suite
AMD Vivado Design Suite
Vitis Unified Software Platform
AMD Vitis Unified Software Platform
Snappy Ubuntu Core
Snappy Ubuntu Core

Story

Read more

Code

legov2_classifier.py

Python
#!/usr/bin/env python

import os
import cv2
import time
import signal
import asyncio 
import sys, getopt
from bleak import BleakClient
from edge_impulse_linux.image import ImageImpulseRunner

bt_addr = "45:71:21:BD:26:13"
bt_uuid = "0000fff2-0000-1000-8000-00805f9b34fb"


header = bytes.fromhex("5A6B020005")
module_channel = bytes.fromhex("01")

portAB_command_stp = bytes.fromhex("0000")
portCD_command_stp = bytes.fromhex("0000")
command_chksum_stp = bytes.fromhex("CD")

portAB_command_fwd = bytes.fromhex("0196")
portCD_command_fwd = bytes.fromhex("0000")
command_chksum_fwd = bytes.fromhex("64")

portAB_command_rvs = bytes.fromhex("0288")
portCD_command_rvs = bytes.fromhex("0000")
command_chksum_rvs = bytes.fromhex("57")

portAB_command_rgt = bytes.fromhex("0000")
portCD_command_rgt = bytes.fromhex("049C")
command_chksum_rgt = bytes.fromhex("6D")

portAB_command_lft = bytes.fromhex("0000")
portCD_command_lft = bytes.fromhex("089C")
command_chksum_lft = bytes.fromhex("71")

command_chksum_fwdrgt = bytes.fromhex("D1")
command_chksum_rvsrgt = bytes.fromhex("C4")
command_chksum_fwdlft = bytes.fromhex("D5")
command_chksum_rvslft = bytes.fromhex("C8")

runner = None
# if you don't want to see a camera preview, set this to False
show_camera = True
if (sys.platform == 'linux' and not os.environ.get('DISPLAY')):
    show_camera = False

def now():
    return round(time.time() * 1000)

def sigint_handler(sig, frame):
    print('Interrupted')
    if (runner):
        runner.stop()
    sys.exit(0)

signal.signal(signal.SIGINT, sigint_handler)

def help():
    print('python classify.py <path_to_model.eim> <Camera port ID, only required when more than 1 camera is present>')

async def main(argv):
    try:
        opts, args = getopt.getopt(argv, "h", ["--help"])
    except getopt.GetoptError:
        help()
        sys.exit(2)

    for opt, arg in opts:
        if opt in ('-h', '--help'):
            help()
            sys.exit()

    if len(args) == 0:
        help()
        sys.exit(2)

    # connect to bluetooth motor controller
    client = BleakClient(bt_addr)
    await client.connect()

    model = args[0]

    dir_path = os.path.dirname(os.path.realpath(__file__))
    modelfile = os.path.join(dir_path, model)

    print('MODEL: ' + modelfile)
    
    defender_direction = "init"

    with ImageImpulseRunner(modelfile) as runner:
        try:
            model_info = runner.init()
            print('Loaded runner for "' + model_info['project']['owner'] + ' / ' + model_info['project']['name'] + '"')
            labels = model_info['model_parameters']['labels']
            
            videoCaptureDeviceId = "rtsp://<username>:<password>@192.168.1.100/stream2"
            camera = cv2.VideoCapture(videoCaptureDeviceId)
            
            ret = camera.read()[0]
            if ret:
                backendName = camera.getBackendName()
                w = camera.get(3)
                h = camera.get(4)
                print("Camera %s (%s x %s) in port %s selected." %(backendName,h,w, videoCaptureDeviceId))
                
                # set cv2 buffer size to 0
                camera.set(cv2.CAP_PROP_BUFFERSIZE, 0)
                
                camera.release()
            else:
                raise Exception("Couldn't initialize selected camera.")

            next_frame = 0 

            for res, img in runner.classifier(videoCaptureDeviceId):
                camera.set(cv2.CAP_PROP_BUFFERSIZE, 0)

                if "classification" in res["result"].keys():
                    for label in labels:
                        score = res['result']['classification'][label]

                elif "bounding_boxes" in res["result"].keys():
                    if (len(res["result"]["bounding_boxes"])) == 0:
                        if defender_direction != "straight": 
                            model_number = await client.write_gatt_char(bt_uuid, header + portAB_command_fwd + portCD_command_fwd + module_channel + command_chksum_fwd)
                            defender_direction = "straight"
                        
                    else:
                        for bb in res["result"]["bounding_boxes"]:
                            # object has been found & outputting which obj was found and where in the frame
                            img = cv2.rectangle(img, (bb['x'], bb['y']), (bb['x'] + bb['width'], bb['y'] + bb['height']), (255, 0, 0), 1)
                            # take an action for a given object (motor control code here)
                            if (bb['label']) == "pumpkin":
                                if (bb['x'] < 45):
                                    if defender_direction != "right": 
                                        model_number = await client.write_gatt_char(bt_uuid, header + portAB_command_fwd + portCD_command_rgt + module_channel + command_chksum_fwdrgt)
                                        defender_direction = "right"
                                elif (bb['x'] > 44):
                                    if defender_direction != "left": 
                                        model_number = await client.write_gatt_char(bt_uuid, header + portAB_command_fwd + portCD_command_lft + module_channel + command_chksum_fwdlft)
                                        defender_direction = "left"
                                else:
                                    model_number = 0
                                    defender_direction = defender_direction

                                
                if (show_camera):
                    cv2.imshow('edgeimpulse', cv2.cvtColor(img, cv2.COLOR_RGB2BGR))
                    if cv2.waitKey(1) == ord('q'):
                        break

                next_frame = now() + 100
        finally:
            if (runner):
                runner.stop()
                model_number = await client.write_gatt_char(bt_uuid, header + portAB_command_stp + portCD_command_stp + module_channel + command_chksum_stp)

                await client.disconnect()

if __name__ == "__main__":
    asyncio.run(main(sys.argv[1:]))

Credits

Whitney Knitter

Whitney Knitter

158 projects • 1598 followers
All thoughts/opinions are my own and do not reflect those of any company/entity I currently/previously associate with.

Comments