Recently I looked at how we could use FPGA and softcore MicroBlaze V processors to offload intelligent motor control.
Controlling intelligent motors is however, only one small part of Robotic solutions. The motors enable us to move around in the environment, however sensors such as LIDAR and cameras enable us to navigate safely around in that environment.
In this project we are going to examine how we are able to control and interface with a commonly used LIDAR sensor using a similar approach.
This will enable us to develop a solution which is able to use FPGAs for intelligent motor control, and safely navigating within its environment. The use of two softcore processors one for motor control and the other for environmental sensing enables us to then leverage the programmable logic for more complex algorithms, functional safety features which require determinism and responsivity. The use of a FPGA also enables tight integration of the overall system enabling a more compact and power efficient solution.
LDS-01The sensor we are using for this project is the LDS-01 from Robotis, this is a a 2D laser scanner which rotates at 300 RPM. It is capable of measuring distances between 120 and 3500 mm with an accuracy of +/- 5%.
This sensor operates from a 5V supply and requires a PWM signal to drive the motor at the required scan rate.
Data is communicated from the LDS-01 using UART operating at 230400 baud.
To interface this with the FPGA we need a UART at the correct baud and the ability to generate a PWM.
Hardware DesignWe can implement this in any AMD FPGA or SoC, for this experiment I have been using the Arty S7.
This design is going to have the following
- MicroBlaze V configured as an microcontroller, with 64KB of memory.
- AXI UART Lite connected to the USB UART 230400 baud.
- AXI UART Lite connected to the LDS-01 230400 baud.
- AXI Timer to generate the PWM
- Clock Wizard - generate the clock at 200 MHz.
The final system looks as below
The implementation resources are limited.
The next step is to export the XSA and start the development of the embedded software development.
Software ArchitectureThis application needs two kinds of software first the embedded software used to run on the MicroBlaze V.
We also need some software running on the host to plot the data, from the sensor
First lets look at the strucutre of data received from the LDS-01 over the UART link. Instead of sending a full 360° scan at once, the LDS-01 transmits the scan in small blocks:
- 1 packet = 42 bytes
- 1 packet covers 6 degrees of rotation (6 samples, one per degree)
- 60 packets make a full 360° sweep (60 × 6° = 360°)
Each 42-byte packet contains
- Header / sync byte (used to find the start of a packet)
- Angle index (tells you which 6° block this packet belongs to)
- Motor speed (RPM) (2 bytes)
- Six measurement blocks (one per degree within that 6° block)
- Checksum
Each of the six measurement blocks contains :
- Intensity (2 bytes)
- Distance (2 bytes, in millimetres)
- Reserved/flags (2 bytes)
The angle index identifies which 6° segment you’re receiving. You place the six samples into the correct positions in your 360-element scan array using:
base_angle = angle_index × 6- sample angles are
base_angle + 0throughbase_angle + 5
After you’ve received all 60 packets, you have a complete 360° scan.
The packet includes a 2-byte RPM value. This can be used for monitoring rotation speed and compensating for timing skew.
The LDS-01 includes a simple checksum at the end of the packet.
- Sum the first 40 bytes of the packet
- Reduce to 8 bits (mod 256)
- Compute
checksum = 0xFF - sum - Compare against the checksum bytes in the packetIf the checksum doesn’t match, discard the packet and resynchronise on the next header byte.
We can stop or start the stream using simple commands
- Send
bto begin operation - Send
eto pause operation
The LDS-01 connector includes a PWM motor control pin, this needs to be driven by a 50Khz 50% PWM signal.
The embedded software developed in Vitis, is split into two files the first file which contains the main(). performs the configuration and set up along with running the necessary functions for the application.
#include <stdio.h>
#include <string.h>
#include "platform.h"
#include "xil_printf.h"
#include "xtmrctr.h"
#include "xparameters.h"
#include "lds01.h"
/* 50 kHz PWM with 50% duty cycle. */
#define PWM_FREQUENCY_HZ 50000U
#define PWM_DUTY_PERCENT 50U
int main()
{
XTmrCtr timer;
u32 period_ns;
u32 high_ns;
u8 actual_duty;
int status;
Lds01 lidar;
init_platform();
print("Hello World\n\r");
period_ns = 1000000000U / PWM_FREQUENCY_HZ;
high_ns = (period_ns * PWM_DUTY_PERCENT) / 100U;
status = XTmrCtr_Initialize(&timer, XPAR_AXI_TIMER_0_BASEADDR);
if (status != XST_SUCCESS) {
xil_printf("AXI Timer init failed: %d\r\n", status);
cleanup_platform();
return status;
}
actual_duty = XTmrCtr_PwmConfigure(&timer, period_ns, high_ns);
if (actual_duty == XST_INVALID_PARAM) {
xil_printf("PWM config invalid (period=%u ns, high=%u ns)\r\n",
period_ns, high_ns);
cleanup_platform();
return XST_FAILURE;
}
XTmrCtr_PwmEnable(&timer);
xil_printf("AXI Timer PWM running at %u Hz, duty ~%u%%\r\n",
PWM_FREQUENCY_HZ, actual_duty);
status = Lds01_Init(&lidar);
if (status != XST_SUCCESS) {
xil_printf("LDS-01 UART init failed: %d\r\n", status);
cleanup_platform();
return status;
}
xil_printf("LDS-01 UART ready at %u baud\r\n", XPAR_XUARTLITE_1_BAUDRATE);
while (1) {
if (Lds01_Poll(&lidar) != 0) {
const Lds01Scan *scan = Lds01_GetLastScan(&lidar);
static u16 snap[LDS01_TOTAL_POINTS];
static u8 snap_valid[LDS01_TOTAL_POINTS];
u32 rpm = 0;
u32 i;
if (scan != NULL) {
memcpy(snap, scan->distance_mm, sizeof(snap));
memcpy(snap_valid, scan->valid, sizeof(snap_valid));
rpm = (scan->rpm_x10 + 5U) / 10U;
Lds01_ReleaseScan(&lidar);
xil_printf("SCAN,%u", rpm);
for (i = 0; i < LDS01_TOTAL_POINTS; ++i) {
if (snap_valid[i] != 0U) {
xil_printf(",%u", snap[i]);
} else {
xil_printf(",0");
}
}
xil_printf("\r\n");
}
}
}
}There is also a LDS driver file which contains a number of functions necessary to process the data.
Lds01_ResetScanState()
Resets the scan-collection state so the driver can start building a fresh 360° scan. It clears the “which packets have we seen?” tracker and resets the packet counter.
Lds01_ClearWriteBuffer()
Prepares the active scan buffer for new data by clearing all valid[] flags. This prevents old points from being mistaken as current measurements.
Lds01_MarkScanComplete()
Called when a full scan’s worth of packets has been received. It marks the current buffer as “ready” for the application to read, then switches to the other buffer so the next scan can be captured without stopping the UART stream. If both buffers are busy, it pauses acquisition until the application releases a scan.
Lds01_ParsePacket()
Takes one complete LDS01 packet and unpacks it into the current scan buffer. It extracts the motor speed (RPM×10) and fills in distance + signal strength for each measurement point, marking each point as valid. It also tracks which packet indices have arrived, and triggers “scan complete” once all expected packets are received.
Lds01_ConsumeByte()
Byte-by-byte packet assembler for the UART stream. It hunts for the sync byte, validates the packet index, collects bytes into a full packet buffer, and then hands the completed packet to the parser. If acquisition is paused (buffers full), it ignores incoming bytes.
Lds01_Init()
One-time setup for the LDS01 driver. It initializes the Xilinx UARTLite interface, resets UART FIFOs, clears internal state, and gets the double-buffered scan storage ready to receive data.
Lds01_Poll()
The main “service” function you call repeatedly. It reads any available bytes from UART, feeds them into the packet assembler, and returns 1 when a complete scan is ready for your application to use.
Lds01_GetLastScan()
Returns a pointer to the latest completed scan (distance + strength arrays plus RPM). If no scan is ready yet, it returns NULL so your application can keep polling.
Lds01_ReleaseScan()
Tells the driver you’re done with the current scan so it can reuse buffers and continue streaming. If a second scan was queued while you were processing, it promotes that queued scan to “ready” immediately and resumes acquisition.
#include "lds01.h"
#include <string.h>
#include "xparameters.h"
#include "xstatus.h"
static void Lds01_ResetScanState(Lds01 *dev)
{
dev->packets_in_scan = 0;
memset(dev->packet_seen, 0, sizeof(dev->packet_seen));
}
static void Lds01_ClearWriteBuffer(Lds01 *dev)
{
if (dev != NULL) {
memset(dev->scans[dev->write_index].valid, 0,
sizeof(dev->scans[dev->write_index].valid));
}
}
static void Lds01_MarkScanComplete(Lds01 *dev)
{
if (dev->ready_pending == 0U) {
dev->ready_pending = 1U;
dev->ready_index = dev->write_index;
dev->write_index ^= 1U;
Lds01_ClearWriteBuffer(dev);
} else if (dev->queued_pending == 0U) {
dev->queued_pending = 1U;
dev->queued_index = dev->write_index;
dev->paused = 1U;
} else {
dev->paused = 1U;
}
Lds01_ResetScanState(dev);
}
static void Lds01_ParsePacket(Lds01 *dev, const u8 *buf)
{
u8 index = (u8)(buf[1] - LDS01_INDEX_MIN);
u16 speed = (u16)buf[2] | ((u16)buf[3] << 8);
Lds01Scan *scan = &dev->scans[dev->write_index];
u8 i;
if (index >= LDS01_PACKET_COUNT) {
return;
}
scan->rpm_x10 = speed;
for (i = 0; i < LDS01_POINTS_PER_PACKET; ++i) {
u8 offset = (u8)(4U + (i * 6U));
u8 byte0 = buf[offset];
u8 byte1 = buf[offset + 1U];
u8 byte2 = buf[offset + 2U];
u8 byte3 = buf[offset + 3U];
u16 strength = (u16)byte1 << 8 | (u16)byte0;
u16 distance = (u16)byte3 << 8 | (u16)byte2;
u16 angle = (u16)(index * LDS01_POINTS_PER_PACKET + i);
if (angle < LDS01_TOTAL_POINTS) {
scan->distance_mm[angle] = distance;
scan->strength[angle] = strength;
scan->valid[angle] = 1U;
}
}
if (dev->packet_seen[index] == 0U) {
dev->packet_seen[index] = 1U;
dev->packets_in_scan++;
if (dev->packets_in_scan >= LDS01_PACKET_COUNT) {
Lds01_MarkScanComplete(dev);
}
}
}
static void Lds01_ConsumeByte(Lds01 *dev, u8 byte)
{
if (dev->paused != 0U) {
return;
}
if (dev->rx_count == 0U) {
if (byte != LDS01_SYNC_BYTE) {
return;
}
} else if (dev->rx_count == 1U) {
if (byte < LDS01_INDEX_MIN || byte > LDS01_INDEX_MAX) {
dev->rx_count = 0U;
return;
}
}
dev->rx_buf[dev->rx_count] = byte;
dev->rx_count++;
if (dev->rx_count >= LDS01_PACKET_SIZE) {
Lds01_ParsePacket(dev, dev->rx_buf);
dev->rx_count = 0U;
}
}
int Lds01_Init(Lds01 *dev)
{
int status;
if (dev == NULL) {
return XST_INVALID_PARAM;
}
memset(dev, 0, sizeof(*dev));
#ifdef SDT
status = XUartLite_Initialize(&dev->uart, XPAR_XUARTLITE_1_BASEADDR);
#else
#ifdef XPAR_XUARTLITE_1_DEVICE_ID
status = XUartLite_Initialize(&dev->uart, XPAR_XUARTLITE_1_DEVICE_ID);
#else
{
XUartLite_Config cfg;
cfg.DeviceId = 0U;
cfg.RegBaseAddr = XPAR_XUARTLITE_1_BASEADDR;
cfg.BaudRate = XPAR_XUARTLITE_1_BAUDRATE;
cfg.UseParity = XPAR_XUARTLITE_1_USE_PARITY;
cfg.ParityOdd = XPAR_XUARTLITE_1_ODD_PARITY;
cfg.DataBits = XPAR_XUARTLITE_1_DATA_BITS;
status = XUartLite_CfgInitialize(&dev->uart, &cfg, cfg.RegBaseAddr);
}
#endif
#endif
if (status != XST_SUCCESS) {
return status;
}
XUartLite_ResetFifos(&dev->uart);
Lds01_ResetScanState(dev);
dev->ready_pending = 0U;
dev->queued_pending = 0U;
dev->paused = 0U;
dev->write_index = 0U;
Lds01_ClearWriteBuffer(dev);
return XST_SUCCESS;
}
int Lds01_Poll(Lds01 *dev)
{
u8 rx_buf[64];
unsigned int received;
u32 i;
if (dev == NULL) {
return 0;
}
received = XUartLite_Recv(&dev->uart, rx_buf, sizeof(rx_buf));
for (i = 0; i < received; ++i) {
Lds01_ConsumeByte(dev, rx_buf[i]);
}
return (dev->ready_pending != 0U) ? 1 : 0;
}
const Lds01Scan *Lds01_GetLastScan(const Lds01 *dev)
{
if (dev == NULL) {
return NULL;
}
if (dev->ready_pending == 0U) {
return NULL;
}
return &dev->scans[dev->ready_index];
}
int Lds01_ReleaseScan(Lds01 *dev)
{
if (dev == NULL) {
return 0;
}
if (dev->ready_pending == 0U) {
return 0;
}
dev->ready_pending = 0U;
if (dev->queued_pending != 0U) {
dev->ready_pending = 1U;
dev->ready_index = dev->queued_index;
dev->queued_pending = 0U;
dev->paused = 0U;
dev->write_index = (u8)(dev->ready_index ^ 1U);
Lds01_ResetScanState(dev);
Lds01_ClearWriteBuffer(dev);
return 1;
}
dev->paused = 0U;
return 1;
}This sends the data to a host machine, what we need to do then is display the information and check it is working correctly.
For this we will use Python and the textualize app. The script reads each full sweep as a single ASCII message (SCAN,<rpm>,<d0>..,<d359>), filters out bad readings, and plots the scan on an ASCII grid using Textual.
# lidar_textual_joined.py
#
# Reads ASCII scans over serial:
# SCAN,<rpm>,<d0>,<d1>,...,<d359>
# and displays a 360° “radar” view in the terminal using Textual,
# with optional joining of dots to make walls/edges show up as continuous lines.
#
# Install:
# py -m pip install textual pyserial
#
# Run (Windows):
# py lidar_textual_joined.py --port COM25 --baud 115200
#
# Controls (in app):
# q quit
# R / r increase / decrease plot range (mm)
# T / t increase / decrease obstacle threshold (mm)
# M / m increase / decrease min valid distance (mm)
# O / o rotate view +/-
# j toggle join on/off
# K / k increase / decrease join distance tolerance (mm)
# G / g increase / decrease max angular gap for joining (deg)
# P / p increase / decrease plot resolution (radius cells) (needs bigger terminal)
import math
import threading
import queue
import argparse
import serial
import time
import statistics
from dataclasses import dataclass
from textual.app import App, ComposeResult
from textual.containers import Horizontal
from textual.widgets import Static
# -----------------------------
# Parsing / filtering
# -----------------------------
def parse_scan_line(line: str):
"""
Expected:
SCAN,<rpm>,<d0>,...,<d359>
Allows spaces and a trailing comma.
Returns (rpm:int, dists:list[int]) or None.
"""
s = line.strip()
if not s:
return None
parts = [p.strip() for p in s.split(",")]
# tolerate a trailing comma -> last part is ""
while parts and parts[-1] == "":
parts.pop()
if len(parts) != 362:
return None
if parts[0].upper() != "SCAN":
return None
try:
rpm = int(parts[1])
dists = [int(x) for x in parts[2:]]
except ValueError:
return None
if len(dists) != 360:
return None
return rpm, dists
def sanitize_distances(dists, min_mm=80, max_mm=6000):
"""
Invalid samples become 0.
Returns (sanitized_list, stats_dict, valid_values_list)
"""
zeros = ltmin = gtmax = 0
out = []
valid_vals = []
for d in dists:
if d == 0:
zeros += 1
out.append(0)
elif d < min_mm:
ltmin += 1
out.append(0)
elif d > max_mm:
gtmax += 1
out.append(0)
else:
out.append(d)
valid_vals.append(d)
stats = {
"zeros": zeros,
"ltmin": ltmin,
"gtmax": gtmax,
"valid": 360 - zeros - ltmin - gtmax,
}
return out, stats, valid_vals
@dataclass
class Obstacle:
angle_deg: float
min_mm: int
width_deg: int
def find_obstacles(dists, threshold_mm=1200, min_width_deg=2):
"""
Very simple clustering:
- hit if 0 < d <= threshold_mm
- group contiguous hits into blobs
- report centroid angle + minimum distance + width
"""
hits = [(0 < d <= threshold_mm) for d in dists]
obstacles = []
i = 0
while i < 360:
if not hits[i]:
i += 1
continue
min_d = dists[i]
sum_angles = i
count = 1
i += 1
while i < 360 and hits[i]:
d = dists[i]
if d < min_d:
min_d = d
sum_angles += i
count += 1
i += 1
if count >= min_width_deg:
obstacles.append(Obstacle(angle_deg=sum_angles / count, min_mm=min_d, width_deg=count))
obstacles.sort(key=lambda o: o.min_mm)
return obstacles
# -----------------------------
# Serial reader thread
# -----------------------------
def serial_reader(port, baud, out_q, stop_evt):
ser = serial.Serial(port=port, baudrate=baud, timeout=1.0)
try:
while not stop_evt.is_set():
raw = ser.readline()
if not raw:
continue
line = raw.decode(errors="ignore")
parsed = parse_scan_line(line)
if parsed is None:
continue
# Put timestamp + parsed data + simple integrity hints
out_q.put((time.time(), parsed[0], parsed[1], line.count(","), len(line.strip())))
finally:
ser.close()
# -----------------------------
# Drawing (with joining)
# -----------------------------
def bresenham(x0, y0, x1, y1):
"""Integer grid line from (x0,y0) to (x1,y1)."""
points = []
dx = abs(x1 - x0)
dy = -abs(y1 - y0)
sx = 1 if x0 < x1 else -1
sy = 1 if y0 < y1 else -1
err = dx + dy
x, y = x0, y0
while True:
points.append((x, y))
if x == x1 and y == y1:
break
e2 = 2 * err
if e2 >= dy:
err += dy
x += sx
if e2 <= dx:
err += dx
y += sy
return points
def render_radar_joined(
dists_mm,
*,
max_plot_mm=3500,
radius_cells=24,
angle_offset_deg=0,
join=True,
join_mm=800,
join_gap_deg=8,
):
"""
Returns: (text, joined_segments)
Coordinates:
- 0° is up (N)
- 90° is right (E)
- 180° down (S)
- 270° left (W)
Joining strategy:
For each angle with a valid point, find the next valid point within join_gap_deg.
Draw a line between them if abs(d0 - d1) <= join_mm.
"""
size = radius_cells * 2 + 1
cx = cy = radius_cells
grid = [[" " for _ in range(size)] for _ in range(size)]
# Crosshair
for x in range(size):
grid[cy][x] = "─"
for y in range(size):
grid[y][cx] = "│"
grid[cy][cx] = "┼"
# Cardinal labels (best effort)
grid[0][cx] = "N"
grid[cy][size - 1] = "E"
grid[size - 1][cx] = "S"
grid[cy][0] = "W"
def polar_to_grid(angle_deg, dist_mm):
a = (angle_deg + angle_offset_deg) % 360
th = math.radians(a)
d = min(dist_mm, max_plot_mm)
r = int((d / max_plot_mm) * radius_cells)
x = cx + int(r * math.sin(th)) # 90° right
y = cy - int(r * math.cos(th)) # 0° up
return x, y
# Build angle -> (dist,x,y)
valid = [None] * 360
for ang in range(360):
d = dists_mm[ang]
if d <= 0:
continue
x, y = polar_to_grid(ang, d)
if 0 <= x < size and 0 <= y < size:
valid[ang] = (d, x, y)
joined_segments = 0
# Draw joins first (lighter), then points (heavier)
if join:
for ang in range(360):
v0 = valid[ang]
if v0 is None:
continue
d0, x0, y0 = v0
# find the next valid point within join_gap_deg steps
target = None
for step in range(1, join_gap_deg + 1):
nxt = (ang + step) % 360
if valid[nxt] is not None:
target = nxt
break
if target is None:
continue
d1, x1, y1 = valid[target]
# join only if the range change suggests a continuous surface
if abs(d0 - d1) <= join_mm:
joined_segments += 1
for (x, y) in bresenham(x0, y0, x1, y1):
if 0 <= x < size and 0 <= y < size:
if grid[y][x] == " ":
grid[y][x] = "·" # line fill
# Draw points on top
for ang in range(360):
v = valid[ang]
if v is None:
continue
_, x, y = v
grid[y][x] = "•"
# Small “bridge” pass (fills single-cell holes in lines)
for y in range(1, size - 1):
for x in range(1, size - 1):
if grid[y][x] != " ":
continue
if grid[y][x - 1] in ("•", "·") and grid[y][x + 1] in ("•", "·"):
grid[y][x] = "·"
elif grid[y - 1][x] in ("•", "·") and grid[y + 1][x] in ("•", "·"):
grid[y][x] = "·"
elif grid[y - 1][x - 1] in ("•", "·") and grid[y + 1][x + 1] in ("•", "·"):
grid[y][x] = "·"
elif grid[y - 1][x + 1] in ("•", "·") and grid[y + 1][x - 1] in ("•", "·"):
grid[y][x] = "·"
top = "┌" + ("─" * size) + "┐"
bot = "└" + ("─" * size) + "┘"
mid = ["│" + "".join(row) + "│" for row in grid]
return "\n".join([top, *mid, bot]), joined_segments
# -----------------------------
# Textual App
# -----------------------------
class LidarApp(App):
CSS = """
Screen { padding: 1; }
#radar { width: 62%; }
#info { width: 38%; padding-left: 2; }
"""
def __init__(self, q, stop_evt, **kwargs):
super().__init__(**kwargs)
self.q = q
self.stop_evt = stop_evt
self.rpm = 0
self.dists = [0] * 360
self.stats = {"zeros": 0, "ltmin": 0, "gtmax": 0, "valid": 0}
self.valid_vals = []
self.max_plot_mm = 4000
self.obstacle_mm = 1200
self.min_valid_mm = 80
self.max_valid_mm = 6000
self.angle_offset = 0
self.join = True
self.join_mm = 800
self.join_gap_deg = 8
self.joined_segments = 0
self.radius_cells = 24 # increase if terminal is large
self.last_scan_time = None
self.scan_hz = 0.0
self.last_commas = 0
self.last_len = 0
def compose(self) -> ComposeResult:
with Horizontal():
yield Static("", id="radar")
yield Static("", id="info")
def on_mount(self) -> None:
self.set_interval(0.05, self.refresh_from_queue)
def refresh_from_queue(self) -> None:
updated = False
# Drain queue, keep most recent scan
newest = None
while True:
try:
newest = self.q.get_nowait()
except queue.Empty:
break
if newest is None:
return
tstamp, rpm, dists_raw, commas, linelen = newest
# Scan rate estimate (based on arriving scan lines)
if self.last_scan_time is not None:
dt = tstamp - self.last_scan_time
if dt > 0:
hz = 1.0 / dt
self.scan_hz = hz if self.scan_hz == 0 else (0.85 * self.scan_hz + 0.15 * hz)
self.last_scan_time = tstamp
self.rpm = rpm
self.last_commas = commas
self.last_len = linelen
self.dists, self.stats, self.valid_vals = sanitize_distances(
dists_raw, self.min_valid_mm, self.max_valid_mm
)
radar_txt, joined = render_radar_joined(
self.dists,
max_plot_mm=self.max_plot_mm,
radius_cells=self.radius_cells,
angle_offset_deg=self.angle_offset,
join=self.join,
join_mm=self.join_mm,
join_gap_deg=self.join_gap_deg,
)
self.joined_segments = joined
self.query_one("#radar", Static).update(radar_txt)
# Nearest valid
best = None
for a, d in enumerate(self.dists):
if d > 0 and (best is None or d < best[1]):
best = (a, d)
# Obstacles list
obs = find_obstacles(self.dists, threshold_mm=self.obstacle_mm, min_width_deg=2)[:10]
# Basic stats
if self.valid_vals:
vmin = min(self.valid_vals)
vmed = int(statistics.median(self.valid_vals))
vmax = max(self.valid_vals)
else:
vmin = vmed = vmax = None
lines = []
lines.append("Input: SCAN,<rpm>,<d0>..,<d359>")
lines.append(f"Integrity: commas={self.last_commas} (want 361) len={self.last_len}")
lines.append(f"Scan rate: {self.scan_hz:.2f} Hz")
lines.append("")
lines.append(f"RPM: {self.rpm}")
lines.append(f"Plot range (R/r): {self.max_plot_mm} mm")
lines.append(f"Obstacle thresh (T/t): {self.obstacle_mm} mm")
lines.append(f"Valid filter: min (M/m)={self.min_valid_mm} max={self.max_valid_mm}")
lines.append(f"Rotate (O/o): {self.angle_offset}°")
lines.append(f"Resolution (P/p): radius_cells={self.radius_cells}")
lines.append("")
lines.append(
f"Join (j): {'ON' if self.join else 'OFF'} "
f"join_mm (K/k)={self.join_mm} gap (G/g)={self.join_gap_deg}°"
)
lines.append(f"Joined segments: {self.joined_segments}")
lines.append("")
lines.append(
f"Per-scan: valid={self.stats['valid']} zeros={self.stats['zeros']} "
f"<min={self.stats['ltmin']} >max={self.stats['gtmax']}"
)
if vmin is None:
lines.append("Valid dist stats: n/a")
else:
lines.append(f"Valid dist stats: min={vmin} med={vmed} max={vmax}")
lines.append(f"Nearest valid: {best[1]} mm @ {best[0]}°" if best else "Nearest valid: n/a")
lines.append("")
lines.append("Closest obstacles (clustered):")
if not obs:
lines.append(" (none <= threshold)")
else:
for o in obs:
lines.append(f" {o.min_mm:4d} mm @ {o.angle_deg:6.1f}° (width {o.width_deg}°)")
lines.append("")
lines.append("Keys: q quit | R/r range | T/t thresh | M/m min-valid | O/o rotate")
lines.append(" j join | K/k join_mm | G/g gap_deg | P/p resolution")
self.query_one("#info", Static).update("\n".join(lines))
def on_key(self, event) -> None:
k = event.key
if k == "q":
self.exit()
return
# Plot range
if k == "r":
self.max_plot_mm = max(500, self.max_plot_mm - 250)
elif k == "R":
self.max_plot_mm = min(20000, self.max_plot_mm + 250)
# Obstacle threshold
if k == "t":
self.obstacle_mm = max(100, self.obstacle_mm - 100)
elif k == "T":
self.obstacle_mm = min(20000, self.obstacle_mm + 100)
# Min valid filter
if k == "m":
self.min_valid_mm = max(0, self.min_valid_mm - 20)
elif k == "M":
self.min_valid_mm = min(2000, self.min_valid_mm + 20)
# Rotate
if k == "o":
self.angle_offset = (self.angle_offset - 5) % 360
elif k == "O":
self.angle_offset = (self.angle_offset + 5) % 360
# Join toggle
if k == "j":
self.join = not self.join
# Join tolerance
if k == "k":
self.join_mm = max(0, self.join_mm - 50)
elif k == "K":
self.join_mm = min(5000, self.join_mm + 50)
# Join angular gap
if k == "g":
self.join_gap_deg = max(1, self.join_gap_deg - 1)
elif k == "G":
self.join_gap_deg = min(30, self.join_gap_deg + 1)
# Resolution
if k == "p":
self.radius_cells = max(10, self.radius_cells - 2)
elif k == "P":
self.radius_cells = min(60, self.radius_cells + 2)
def on_exit(self) -> None:
self.stop_evt.set()
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--port", default="COM25", help="Windows: COM25, Linux: /dev/ttyUSB0, etc.")
ap.add_argument("--baud", type=int, default=230400)
args = ap.parse_args()
port = args.port
# For Windows COM>9, pyserial prefers the special path form:
if port.upper().startswith("COM") and len(port) > 4:
port = r"\\.\\" + port
q = queue.Queue()
stop_evt = threading.Event()
t = threading.Thread(target=serial_reader, args=(port, args.baud, q, stop_evt), daemon=True)
t.start()
LidarApp(q, stop_evt).run()
if __name__ == "__main__":
main()
RunningRunning both embedded and host applications on the development board shows the LIDAR running as expected.
This has demonstrated how we can easily develop and integrate a LIDAR sensor into a FPGA. This allows us to create a tightly integrated and deterministic solution. Exactly what is needed for many ROBOTIC applications.











Comments