Infineon Team
Published © MIT

Valentine's Day Special: Synchronized Lamps πŸ’“

Always wanted a 'Thinking about you' Lamp? Well now you can build it and gift it to your loved one on Valentine's day.

IntermediateFull instructions provided10 hours622

Things used in this project

Hardware components

PSOCβ„’ 6 AI Evaluation Kit (CY8CKIT-062S2-AI)
Infineon PSOCβ„’ 6 AI Evaluation Kit (CY8CKIT-062S2-AI)
×1
DC-SHIELD BTN9970LV
Infineon DC-SHIELD BTN9970LV
×1
N20 Gear Motor
×1
LilyPad Rainbow LED (strip of 7 colors)
SparkFun LilyPad Rainbow LED (strip of 7 colors)
×1

Software apps and online services

MicroPython
MicroPython
MQTT
MQTT

Hand tools and fabrication machines

Soldering iron (generic)
Soldering iron (generic)
3D Printer (generic)
3D Printer (generic)

Story

Read more

Custom parts and enclosures

Hearted Cone

Enclosure Box

Base

Led Plate

Gear

Schematics

Schematics

Code

Partner

MicroPython
from machine import Pin, bitstream, SPI
import time
import network
import ubinascii
import umqtt.simple
import mip
import math
import array


try:
    import BGT60TRXX as BGT
except ImportError:
    
    mip.install("github:infineon/micropython-radar-bgt60")
    import BGT60TRXX as BGT


no_of_chirps = const(1)
samples_per_chirp = const(128)
words = const(samples_per_chirp * no_of_chirps)
ADC_DIV = const(60)
start_freq = const(58_000_000)  
bandwidth = const(4_500_000)    

threshold = const(8.0)


spi_interface = SPI(
    baudrate=50_000_000,
    polarity=0,
    phase=0,
    bits=8,
    firstbit=SPI.MSB,
    sck="P12_2",
    mosi="P12_0",
    miso="P12_1",
)

radar_sensor = BGT.BGT60TRxxModule(words, spi_interface, Pin("P12_3"), Pin("P11_1"), Pin("P11_0"))
radar_sensor.set_adc_div(ADC_DIV)
radar_sensor.set_chirp_len(samples_per_chirp)

FSU = BGT.calculateFSU(start_freq)
RTU = BGT.calculateRTU(ADC_DIV, samples_per_chirp)
RSU = BGT.calculateRSU(bandwidth, RTU)
radar_sensor.configure_chirp(FSU, RTU, RSU)

radar_sensor.set_vga_gain_ch1(3)
range_resolution = radar_sensor.get_range_resolution() * 100  # cm


threshold_func = None  

def _init_threshold_func():
    global threshold_func
    threshold_func = array.array("f", (threshold for _ in range(int(radar_sensor.fft.length) // 2)))

@micropython.native
def _nearest_peak_cm() -> float:
  
    for i in range(len(radar_sensor.fft_data) // 2):
        if radar_sensor.fft_data[i] > threshold_func[i]:
            return (i * range_resolution / no_of_chirps)
    return -1.0


TAP_MAX_DISTANCE_CM = 15.0  
TAP_COOLDOWN_MS = 600       
PRESENCE_HOLD_MS = 250      

_last_presence_ms = 0
_last_tap_ms = 0
_partner_state = 0  # 0=off, 1=on

def radar_poll_and_toggle(mqttc, topic_state):

    global _last_presence_ms, _last_tap_ms, _partner_state

    now = time.ticks_ms()

    
    radar_sensor.readDistance()

    
    d_cm = _nearest_peak_cm()

    present = (d_cm > 0) and (d_cm <= TAP_MAX_DISTANCE_CM)

  
    if present:
        _last_presence_ms = now

    
    absent_long_enough = time.ticks_diff(now, _last_presence_ms) > PRESENCE_HOLD_MS
    cooldown_ok = time.ticks_diff(now, _last_tap_ms) > TAP_COOLDOWN_MS

    
    if present and absent_long_enough and cooldown_ok:
        _last_tap_ms = now
        _partner_state = 0 if _partner_state else 1

        msg = b"1" if _partner_state else b"0"

        
        try:
            mqttc.publish(topic_state, msg)
        except Exception as e:
            print("Partner: MQTT publish failed:", e)

 
        try:
            mqtt_callback(topic_state, msg)
        except Exception as e:
            print("Partner: local toggle failed:", e)


print("Connecting to wifi...")
sta = network.WLAN(network.STA_IF)
sta.active(True)

sta.connect('your wifi', 'password')
while not sta.isconnected():
    time.sleep_ms(200)
print("Connected:", sta.ifconfig())

Topic_state = b"lovelight-1402/state"

IN1  = Pin("P9_6",  Pin.OUT)
INH1 = Pin("P9_5",  Pin.OUT)
DIN1 = Pin("P9_1",  Pin.OUT, value=0)

timing = [500, 1125, 800, 750]

def manualLightAll(buf):
    bitstream(DIN1, 0, timing, buf)

def create_color(r, g, b):
    return bytearray([g, r, b])

amount_of_leds = 80

buf_on = bytearray()
for _ in range(amount_of_leds):
    buf_on.extend(create_color(255, 0, 80))

buf_off = bytearray()
for _ in range(amount_of_leds):
    buf_off.extend(create_color(0, 0, 0))

def mqtt_callback(topic, msg):
    if msg == b"1":
        manualLightAll(buf_on)
        INH1.value(1)
        IN1.value(1)
        print("Partner: LED ON (got 1)")
    elif msg == b"0":
        manualLightAll(buf_off)
        INH1.value(0)
        IN1.value(0)
        print("Partner: LED OFF (got 0)")

mqttc = umqtt.simple.MQTTClient(f'me-{sta.config("mac")}', 'test.mosquitto.org', keepalive=60)
mqttc.set_callback(mqtt_callback)
print("Connecting to MQTT broker...")
mqttc.connect()
mqttc.subscribe(b"lovelight-1402/state")
print("MQTT connected. Subscribed to:", b"lovelight-1402/state")

print("Partner: running")

_init_threshold_func()

while True:
    mqttc.check_msg()

   
    radar_poll_and_toggle(mqttc, Topic_state)

    time.sleep_ms(20)

Me

MicroPython
from machine import Pin, bitstream
import time
import network
import ubinascii
import umqtt.simple


print("Connecting to wifi...")
sta = network.WLAN(network.STA_IF)

sta.connect('things', 'dmPn_bPMor!Xj925Cb')
while not sta.isconnected():
    time.sleep_ms(200)
print("Connected:", sta.ifconfig())



TOPIC_STATE = b"lovelight-1402/state"

mqttc = umqtt.simple.MQTTClient(f'me-{sta.config('mac')}', 'test.mosquitto.org', keepalive=60)
print("Connecting to MQTT broker...")
mqttc.connect()
print("MQTT connected")

# Lights
timing = [500, 1125, 800, 750]
DIN1 = Pin('P9_1', Pin.OUT, value=0)

def manualLightAll(buf):
    bitstream(DIN1, 0, timing, buf)

def create_color(r, g, b):
    return bytearray([g, r, b])

amount_of_leds = 80

buf_on = bytearray()
for _ in range(amount_of_leds):
    buf_on.extend(create_color(255, 0, 80))

buf_off = bytearray()
for _ in range(amount_of_leds):
    buf_off.extend(create_color(0, 0, 0))

#  BUTTON 
BTN = Pin('P9_2', Pin.IN, Pin.PULL_UP)
last = BTN.value()
led_state = False

print("Me: running")

while True:
    v = BTN.value()

    # Detect press edge (0 -> 1)
    if v == 1 and last == 0:
        led_state = not led_state

        if led_state:
            manualLightAll(buf_on)
            mqttc.publish(b"lovelight-1402/state", b"1")
            print("Me: LED ON -> published 1")
        else:
            manualLightAll(buf_off)
            mqttc.publish(b"lovelight-1402/state", b"0")
            print("Me: LED OFF -> published 0")

    last = v
    time.sleep_ms(20)

simple.py

MicroPython
import socket
import struct
from binascii import hexlify


class MQTTException(Exception):
    pass


class MQTTClient:
    def __init__(
        self,
        client_id,
        server,
        port=0,
        user=None,
        password=None,
        keepalive=0,
        ssl=None,
        ssl_params={},
    ):
        if port == 0:
            port = 8883 if ssl else 1883
        self.client_id = client_id
        self.sock = None
        self.server = server
        self.port = port
        self.ssl = ssl
        self.ssl_params = ssl_params
        self.pid = 0
        self.cb = None
        self.user = user
        self.pswd = password
        self.keepalive = keepalive
        self.lw_topic = None
        self.lw_msg = None
        self.lw_qos = 0
        self.lw_retain = False

    def _send_str(self, s):
        self.sock.write(struct.pack("!H", len(s)))
        self.sock.write(s)

    def _recv_len(self):
        n = 0
        sh = 0
        while 1:
            b = self.sock.read(1)[0]
            n |= (b & 0x7F) << sh
            if not b & 0x80:
                return n
            sh += 7

    def set_callback(self, f):
        self.cb = f

    def set_last_will(self, topic, msg, retain=False, qos=0):
        assert 0 <= qos <= 2
        assert topic
        self.lw_topic = topic
        self.lw_msg = msg
        self.lw_qos = qos
        self.lw_retain = retain

    def connect(self, clean_session=True, timeout=None):
        self.sock = socket.socket()
        self.sock.settimeout(timeout)
        addr = socket.getaddrinfo(self.server, self.port)[0][-1]
        self.sock.connect(addr)
        if self.ssl is True:
            # Legacy support for ssl=True and ssl_params arguments.
            import ssl

            self.sock = ssl.wrap_socket(self.sock, **self.ssl_params)
        elif self.ssl:
            self.sock = self.ssl.wrap_socket(self.sock, server_hostname=self.server)
        premsg = bytearray(b"\x10\0\0\0\0\0")
        msg = bytearray(b"\x04MQTT\x04\x02\0\0")

        sz = 10 + 2 + len(self.client_id)
        msg[6] = clean_session << 1
        if self.user:
            sz += 2 + len(self.user) + 2 + len(self.pswd)
            msg[6] |= 0xC0
        if self.keepalive:
            assert self.keepalive < 65536
            msg[7] |= self.keepalive >> 8
            msg[8] |= self.keepalive & 0x00FF
        if self.lw_topic:
            sz += 2 + len(self.lw_topic) + 2 + len(self.lw_msg)
            msg[6] |= 0x4 | (self.lw_qos & 0x1) << 3 | (self.lw_qos & 0x2) << 3
            msg[6] |= self.lw_retain << 5

        i = 1
        while sz > 0x7F:
            premsg[i] = (sz & 0x7F) | 0x80
            sz >>= 7
            i += 1
        premsg[i] = sz

        self.sock.write(premsg, i + 2)
        self.sock.write(msg)
        # print(hex(len(msg)), hexlify(msg, ":"))
        self._send_str(self.client_id)
        if self.lw_topic:
            self._send_str(self.lw_topic)
            self._send_str(self.lw_msg)
        if self.user:
            self._send_str(self.user)
            self._send_str(self.pswd)
        resp = self.sock.read(4)
        assert resp[0] == 0x20 and resp[1] == 0x02
        if resp[3] != 0:
            raise MQTTException(resp[3])
        return resp[2] & 1

    def disconnect(self):
        self.sock.write(b"\xe0\0")
        self.sock.close()

    def ping(self):
        self.sock.write(b"\xc0\0")

    def publish(self, topic, msg, retain=False, qos=0):
        pkt = bytearray(b"\x30\0\0\0")
        pkt[0] |= qos << 1 | retain
        sz = 2 + len(topic) + len(msg)
        if qos > 0:
            sz += 2
        assert sz < 2097152
        i = 1
        while sz > 0x7F:
            pkt[i] = (sz & 0x7F) | 0x80
            sz >>= 7
            i += 1
        pkt[i] = sz
        # print(hex(len(pkt)), hexlify(pkt, ":"))
        self.sock.write(pkt, i + 1)
        self._send_str(topic)
        if qos > 0:
            self.pid += 1
            pid = self.pid
            struct.pack_into("!H", pkt, 0, pid)
            self.sock.write(pkt, 2)
        self.sock.write(msg)
        if qos == 1:
            while 1:
                op = self.wait_msg()
                if op == 0x40:
                    sz = self.sock.read(1)
                    assert sz == b"\x02"
                    rcv_pid = self.sock.read(2)
                    rcv_pid = rcv_pid[0] << 8 | rcv_pid[1]
                    if pid == rcv_pid:
                        return
        elif qos == 2:
            assert 0

    def subscribe(self, topic, qos=0):
        assert self.cb is not None, "Subscribe callback is not set"
        pkt = bytearray(b"\x82\0\0\0")
        self.pid += 1
        struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic) + 1, self.pid)
        # print(hex(len(pkt)), hexlify(pkt, ":"))
        self.sock.write(pkt)
        self._send_str(topic)
        self.sock.write(qos.to_bytes(1, "little"))
        while 1:
            op = self.wait_msg()
            if op == 0x90:
                resp = self.sock.read(4)
                # print(resp)
                assert resp[1] == pkt[2] and resp[2] == pkt[3]
                if resp[3] == 0x80:
                    raise MQTTException(resp[3])
                return

    def unsubscribe(self, topic):
        pkt = bytearray(b"\xa2\0\0\0")
        self.pid += 1
        struct.pack_into("!BH", pkt, 1, 2 + 2 + len(topic), self.pid)
        self.sock.write(pkt)
        self._send_str(topic)
        while 1:
            op = self.wait_msg()
            if op == 0xB0:
                resp = self.sock.read(3)
                assert resp[1] == pkt[2] and resp[2] == pkt[3]
                return

    # Wait for a single incoming MQTT message and process it.
    # Subscribed messages are delivered to a callback previously
    # set by .set_callback() method. Other (internal) MQTT
    # messages processed internally.
    def wait_msg(self):
        res = self.sock.read(1)
        self.sock.setblocking(True)
        if res is None:
            return None
        if res == b"":
            raise OSError(-1)
        if res == b"\xd0":  # PINGRESP
            sz = self.sock.read(1)[0]
            assert sz == 0
            return None
        op = res[0]
        if op & 0xF0 != 0x30:
            return op
        sz = self._recv_len()
        topic_len = self.sock.read(2)
        topic_len = (topic_len[0] << 8) | topic_len[1]
        topic = self.sock.read(topic_len)
        sz -= topic_len + 2
        if op & 6:
            pid = self.sock.read(2)
            pid = pid[0] << 8 | pid[1]
            sz -= 2
        msg = self.sock.read(sz)
        self.cb(topic, msg)
        if op & 6 == 2:
            pkt = bytearray(b"\x40\x02\0\0")
            struct.pack_into("!H", pkt, 2, pid)
            self.sock.write(pkt)
        elif op & 6 == 4:
            assert 0
        return op

    # Checks whether a pending message from server is available.
    # If not, returns immediately with None. Otherwise, does
    # the same processing as wait_msg.
    def check_msg(self):
        self.sock.setblocking(False)
        return self.wait_msg()

Credits

Infineon Team
125 projects β€’ 201 followers
Hands-on projects, tutorials, and code for sensors, MCUs, connectivity, security, power, and IoT. Follow for new builds and ideas.
Thanks to The Spanner.

Comments