Mohamed Ali Bedair
Published © GPL3+

BeeGuard: Edge-AI Hive Stress Monitor (ESP32 + Audio + MQTT)

BeeGuard is an edge AI system that monitors beehive audio to detect stress conditions in real time and publishes alerts globally using MQTT.

IntermediateShowcase (no instructions)20 hours76
BeeGuard: Edge-AI Hive Stress Monitor (ESP32 + Audio + MQTT)

Things used in this project

Hardware components

DFRobot ESP32-S3 AI Camera Module
×1

Software apps and online services

Edge Impulse Studio
Edge Impulse Studio
Arduino IDE
Arduino IDE
MQTT
MQTT

Story

Read more

Code

BeeGuard.ino

C/C++
#include <Arduino.h>
#include "ESP_I2S.h"
#include <BeeGuard_inferencing.h>

#include <WiFi.h>
#include <PubSubClient.h>
#include "secrets.h"

#define SAMPLE_RATE       (16000)
#define PDM_CLK_PIN       (GPIO_NUM_38)
#define PDM_DATA_PIN      (GPIO_NUM_39)

#define REC_TIME_SEC      (2)
#define WAV_HEADER_BYTES  (44)
#define EI_WINDOW_SAMPLES (EI_CLASSIFIER_RAW_SAMPLE_COUNT)

// Preprocess params
static const int   TARGET_PEAK = 12000;
static const float MAX_GAIN    = 60.0f;

// ---------- Step 3 params (tune later) ----------
static const float RMS_MIN_GATE        = 0.010f;  // below this = ignore window
static const float ZCR_MIN_GATE        = 0.015f;  // below this = likely not "buzz-like"
static const float STRESS_THRESH       = 0.75f;   // stress probability to count as stress
static const int   VOTE_M              = 5;       // window history size
static const int   VOTE_N              = 3;       // require N of M stress hits
static const uint32_t ALERT_COOLDOWN_MS = 60000;  // 60s cooldown
// -----------------------------------------------

// =========================
// MQTT / HiveMQ (Step 4)
// =========================
static const char* MQTT_HOST = "broker.hivemq.com";
static const uint16_t MQTT_PORT = 1883;

// Short topics (as requested)
static const char* TOPIC_TEL   = "bg/tel";
static const char* TOPIC_ALERT = "bg/alert";
static const char* TOPIC_STAT  = "bg/stat";

// =========================

I2SClass i2s;
static int16_t ei_buffer[EI_WINDOW_SAMPLES];

// voting history
static bool stress_hist[VOTE_M] = {false};
static int  hist_pos = 0;
static uint32_t last_alert_ms = 0;

// ---- MQTT objects ----
WiFiClient wifiClient;
PubSubClient mqtt(wifiClient);
String mqttClientId;

// ---- EI callback ----
int ei_get_data(size_t offset, size_t length, float *out_ptr) {
  for (size_t i = 0; i < length; i++) {
    out_ptr[i] = (float)ei_buffer[offset + i] / 32768.0f;
  }
  return 0;
}

// ---- preprocess: uint16 -> centered signed int16 + gain ----
static void preprocess_window_from_u16(const uint16_t *in_u16, int16_t *out_i16, size_t n,
                                       int target_peak, float max_gain) {
  uint32_t acc = 0;
  for (size_t i = 0; i < n; i++) acc += in_u16[i];
  float mean = (float)acc / (float)n;

  float peak = 0.0f;
  for (size_t i = 0; i < n; i++) {
    float v = (float)in_u16[i] - mean;
    float a = fabsf(v);
    if (a > peak) peak = a;
  }

  if (peak < 1.0f) {
    for (size_t i = 0; i < n; i++) out_i16[i] = 0;
    return;
  }

  float gain = (float)target_peak / peak;
  if (gain > max_gain) gain = max_gain;
  if (gain < 1.0f) gain = 1.0f;

  for (size_t i = 0; i < n; i++) {
    float v = ((float)in_u16[i] - mean) * gain;
    if (v > 32767.0f) v = 32767.0f;
    if (v < -32768.0f) v = -32768.0f;
    out_i16[i] = (int16_t)v;
  }
}

// ---- gate features: RMS + ZCR ----
static float compute_rms(const int16_t *x, size_t n) {
  double acc = 0.0;
  for (size_t i = 0; i < n; i++) {
    double f = (double)x[i] / 32768.0;
    acc += f * f;
  }
  return (float)sqrt(acc / (double)n);
}

static float compute_zcr(const int16_t *x, size_t n) {
  if (n < 2) return 0.0f;
  int zc = 0;
  for (size_t i = 1; i < n; i++) {
    if ((x[i-1] >= 0 && x[i] < 0) || (x[i-1] < 0 && x[i] >= 0)) zc++;
  }
  return (float)zc / (float)n;
}

static bool hive_present_gate(const int16_t *x, size_t n, float &rms_out, float &zcr_out) {
  rms_out = compute_rms(x, n);
  zcr_out = compute_zcr(x, n);

  if (rms_out < RMS_MIN_GATE) return false;
  if (zcr_out < ZCR_MIN_GATE) return false;

  return true;
}

// ---- voting/debounce ----
static void push_vote(bool stress_hit) {
  stress_hist[hist_pos] = stress_hit;
  hist_pos = (hist_pos + 1) % VOTE_M;
}

static int count_stress_votes() {
  int c = 0;
  for (int i = 0; i < VOTE_M; i++) if (stress_hist[i]) c++;
  return c;
}

// =========================
// Wi-Fi + MQTT helpers
// =========================
static void connectWiFi() {
  WiFi.mode(WIFI_STA);
  WiFi.begin(WIFI_SSID, WIFI_PASS);

  Serial.print("Connecting WiFi");
  while (WiFi.status() != WL_CONNECTED) {
    delay(400);
    Serial.print(".");
  }
  Serial.println();

  Serial.print("WiFi OK, IP: ");
  Serial.println(WiFi.localIP());
}

static void connectMQTT() {
  mqtt.setServer(MQTT_HOST, MQTT_PORT);

  // Unique-ish client id from MAC
  uint64_t mac = ESP.getEfuseMac();
  mqttClientId = "BG-" + String((uint32_t)(mac >> 32), HEX) + String((uint32_t)mac, HEX);

  // Last Will (optional but useful): mark offline if device drops
  mqtt.connect(mqttClientId.c_str(), nullptr, nullptr, TOPIC_STAT, 0, true, "offline");

  Serial.print("Connecting MQTT ");
  Serial.print(MQTT_HOST);
  Serial.print(":");
  Serial.println(MQTT_PORT);

  while (!mqtt.connected()) {
    if (mqtt.connect(mqttClientId.c_str(), nullptr, nullptr, TOPIC_STAT, 0, true, "offline")) {
      Serial.println("MQTT OK ");
      mqtt.publish(TOPIC_STAT, "online", true); // retained
    } else {
      Serial.print("MQTT fail, rc=");
      Serial.print(mqtt.state());
      Serial.println(" retry in 2s...");
      delay(2000);
    }
  }
}

static void mqttEnsureConnected() {
  if (WiFi.status() != WL_CONNECTED) connectWiFi();
  if (!mqtt.connected()) connectMQTT();
  mqtt.loop();
}

// Telemetry JSON (compact)
static void mqttPublishTelemetry(float rms, float zcr, float p_normal, float p_stress, int votes, bool hive_ok) {
  char payload[256];
  snprintf(payload, sizeof(payload),
           "{\"id\":\"%s\",\"r\":%.4f,\"z\":%.4f,\"hn\":%s,\"pn\":%.3f,\"ps\":%.3f,\"v\":%d}",
           mqttClientId.c_str(),
           rms, zcr,
           hive_ok ? "true" : "false",
           p_normal, p_stress,
           votes);

  mqtt.publish(TOPIC_TEL, payload);
}

// Alert JSON
static void mqttPublishAlert(float rms, float zcr, float p_stress, int votes) {
  char payload[256];
  snprintf(payload, sizeof(payload),
           "{\"id\":\"%s\",\"alert\":true,\"r\":%.4f,\"z\":%.4f,\"ps\":%.3f,\"v\":%d}",
           mqttClientId.c_str(),
           rms, zcr, p_stress, votes);

  mqtt.publish(TOPIC_ALERT, payload);
}

// =========================

void setup() {
  Serial.begin(115200);
  delay(1000);

  Serial.println("BeeGuard ........");
  Serial.print("EI freq: "); Serial.println(EI_CLASSIFIER_FREQUENCY);
  Serial.print("EI samples: "); Serial.println(EI_WINDOW_SAMPLES);

  // Wi-Fi + MQTT
  connectWiFi();
  connectMQTT();

  i2s.setPinsPdmRx(PDM_CLK_PIN, PDM_DATA_PIN);
  if (!i2s.begin(I2S_MODE_PDM_RX, SAMPLE_RATE,
                 I2S_DATA_BIT_WIDTH_16BIT, I2S_SLOT_MODE_MONO)) {
    Serial.println("ERROR: Failed to init I2S PDM RX");
    while (1) delay(100);
  }

  Serial.println("Ready.");
}

void loop() {
  mqttEnsureConnected();

  size_t wav_size = 0;
  uint8_t *wav_buffer = i2s.recordWAV(REC_TIME_SEC, &wav_size);
  if (!wav_buffer || wav_size <= WAV_HEADER_BYTES) {
    if (wav_buffer) free(wav_buffer);
    return;
  }

  uint8_t *pcm_bytes = wav_buffer + WAV_HEADER_BYTES;
  size_t pcm_bytes_len = wav_size - WAV_HEADER_BYTES;

  size_t needed_bytes = EI_WINDOW_SAMPLES * sizeof(uint16_t);
  if (pcm_bytes_len < needed_bytes) {
    free(wav_buffer);
    return;
  }

  const uint16_t *u16_samples = (const uint16_t*)pcm_bytes;

  // preprocess -> signed waveform
  preprocess_window_from_u16(u16_samples, ei_buffer, EI_WINDOW_SAMPLES, TARGET_PEAK, MAX_GAIN);

  // gate
  float rms, zcr;
  bool hive_ok = hive_present_gate(ei_buffer, EI_WINDOW_SAMPLES, rms, zcr);

  Serial.print("gate rms="); Serial.print(rms, 4);
  Serial.print(" zcr="); Serial.print(zcr, 4);
  Serial.print(" -> "); Serial.println(hive_ok ? "HIVE" : "NOT_HIVE");

  if (!hive_ok) {
    push_vote(false);

    // publish telemetry even if not hive (pn/ps = 0)
    int votes = count_stress_votes();
    mqttPublishTelemetry(rms, zcr, 0.0f, 0.0f, votes, hive_ok);

    free(wav_buffer);
    delay(10);
    return;
  }

  // inference
  signal_t signal;
  signal.total_length = EI_WINDOW_SAMPLES;
  signal.get_data = &ei_get_data;

  ei_impulse_result_t result = {0};
  EI_IMPULSE_ERROR err = run_classifier(&signal, &result, false);
  if (err != EI_IMPULSE_OK) {
    Serial.print("EI error: "); Serial.println(err);
    free(wav_buffer);
    return;
  }

  float p_normal = result.classification[0].value;
  float p_stress = result.classification[1].value;

  Serial.print("normal="); Serial.print(p_normal, 3);
  Serial.print(" stress="); Serial.println(p_stress, 3);

  bool stress_hit = (p_stress >= STRESS_THRESH);
  push_vote(stress_hit);

  int votes = count_stress_votes();
  Serial.print("votes(stress) "); Serial.print(votes);
  Serial.print("/"); Serial.print(VOTE_M);

  // publish telemetry each window
  mqttPublishTelemetry(rms, zcr, p_normal, p_stress, votes, hive_ok);

  uint32_t now = millis();
  bool cooldown_ok = (now - last_alert_ms) > ALERT_COOLDOWN_MS;

  if (votes >= VOTE_N && cooldown_ok) {
    Serial.println("  ==> ALERT: STRESS CONFIRMED ");
    last_alert_ms = now;

    // Step 4: publish alert
    mqttPublishAlert(rms, zcr, p_stress, votes);
  } else {
    if (!cooldown_ok) Serial.println("  (cooldown)");
    else Serial.println();
  }

  free(wav_buffer);
  delay(10);
}

make_ei_subset_2h.py

Python
import random
import shutil
from pathlib import Path

SRC = Path("processed_2class")
DST = Path("processed_2class_subset_2h")
CLASSES = ["normal", "stress"]

# 2s clips -> 3600 clips = 2 hours
CLIPS_PER_CLASS = 3600
SEED = 42

random.seed(SEED)
DST.mkdir(parents=True, exist_ok=True)

for c in CLASSES:
    src_dir = SRC / c
    dst_dir = DST / c
    dst_dir.mkdir(parents=True, exist_ok=True)

    files = list(src_dir.glob("*.wav"))
    random.shuffle(files)
    chosen = files[:CLIPS_PER_CLASS]

    if len(chosen) < CLIPS_PER_CLASS:
        print(f"[WARN] {c}: only {len(chosen)} clips available")

    for f in chosen:
        shutil.copy2(f, dst_dir / f.name)

    print(f"{c}: copied {len(chosen)} clips ({len(chosen)*2/3600:.2f} hours)")

print("\n 2-hour subset ready at:", DST.resolve())

prepare_beeguard_dataset.py

Python
import csv
import argparse
from pathlib import Path
import numpy as np
import librosa
import soundfile as sf
from tqdm import tqdm

AUDIO_EXTS = {".wav", ".mp3", ".flac", ".ogg", ".m4a", ".aac"}

def normalize_spaces(s: str) -> str:
    return " ".join(s.split())

def safe_name(s: str) -> str:
    return "".join(c if (c.isalnum() or c in "._-") else "_" for c in s)

def detect_state_from_name(name: str):
    n = normalize_spaces(name.lower())
    if "missing queen" in n or "no_queenbee" in n or "no queenbee" in n or "no queen" in n:
        return "stress"
    if "active" in n or "queenbee" in n:
        return "normal"
    return None

def parse_lab(lab_path: Path):
    lines = lab_path.read_text(encoding="utf-8", errors="ignore").splitlines()
    segs = []
    for ln in lines[1:]:
        ln = ln.strip()
        if not ln:
            continue
        parts = ln.split()
        if len(parts) < 3:
            continue
        try:
            start = float(parts[0]); end = float(parts[1]); tag = parts[2].lower()
        except ValueError:
            continue
        if end > start and tag == "bee":   # <- only keep bee
            segs.append((start, end))
    return segs

def load_audio(path: Path, sr: int):
    y, _ = librosa.load(path.as_posix(), sr=sr, mono=True)
    peak = np.max(np.abs(y)) + 1e-9
    return (y / peak).astype(np.float32)

def window_audio(y: np.ndarray, sr: int, win_s: float, hop_s: float):
    win = int(sr * win_s); hop = int(sr * hop_s)
    if len(y) < win:
        return []
    return [y[i:i+win] for i in range(0, len(y)-win+1, hop)]

def write_wav(out_path: Path, audio: np.ndarray, sr: int):
    out_path.parent.mkdir(parents=True, exist_ok=True)
    sf.write(out_path.as_posix(), audio, sr, subtype="PCM_16")

def main():
    ap = argparse.ArgumentParser()
    ap.add_argument("--root", type=str, required=True)
    ap.add_argument("--sr", type=int, default=16000)
    ap.add_argument("--win", type=float, default=2.0)
    ap.add_argument("--hop", type=float, default=0.5)
    ap.add_argument("--max_hours_per_class", type=float, default=15.0)
    ap.add_argument("--seed", type=int, default=42)
    args = ap.parse_args()

    rng = np.random.default_rng(args.seed)

    root = Path(args.root)
    raw_root = root / "raw"
    out_base = root / "processed_2class"
    meta_dir = root / "metadata"
    meta_dir.mkdir(parents=True, exist_ok=True)

    labels = ["normal", "stress"]
    for l in labels:
        (out_base / l).mkdir(parents=True, exist_ok=True)

    caps = {l: args.max_hours_per_class * 3600.0 for l in labels}
    written = {l: 0.0 for l in labels}

    # index audio by normalized stem
    audio_files = [p for p in raw_root.rglob("*") if p.is_file() and p.suffix.lower() in AUDIO_EXTS]
    audio_index = {}
    for af in audio_files:
        key = normalize_spaces(af.stem)
        prev = audio_index.get(key)
        if prev is None or (prev.suffix.lower() != ".wav" and af.suffix.lower() == ".wav"):
            audio_index[key] = af

    lab_files = [p for p in raw_root.rglob("*.lab")]
    rng.shuffle(lab_files)

    def find_audio_for_lab(lab_path: Path):
        k = normalize_spaces(lab_path.stem)
        if k in audio_index:
            return audio_index[k]
        for kk, vv in audio_index.items():
            if kk.startswith(k) or k.startswith(kk):
                return vv
        return None

    rows = []
    skipped = []

    for lab in tqdm(lab_files, desc="TBON(2-class)"):
        audio_path = find_audio_for_lab(lab)
        if audio_path is None:
            skipped.append(f"No audio match for {lab.name}")
            continue

        state = detect_state_from_name(audio_path.name)
        if state not in labels:
            continue

        if written[state] >= caps[state]:
            continue

        try:
            y = load_audio(audio_path, args.sr)
        except Exception as e:
            skipped.append(f"Load fail {audio_path.name}: {e}")
            continue

        segs = parse_lab(lab)
        if not segs:
            continue

        base_id = safe_name(normalize_spaces(audio_path.stem))
        for si, (start_s, end_s) in enumerate(segs):
            if written[state] >= caps[state]:
                break
            start = int(start_s * args.sr); end = int(end_s * args.sr)
            if end - start < int(args.sr * args.win):
                continue
            frames = window_audio(y[start:end], args.sr, args.win, args.hop)
            for fi, fr in enumerate(frames):
                if written[state] >= caps[state]:
                    break
                out_name = f"tbon_{base_id}_seg{si:03d}_f{fi:05d}_{state}.wav"
                write_wav(out_base / state / out_name, fr, args.sr)
                written[state] += args.win
                rows.append([out_name, "TBON", state, state])

    # write metadata
    with open((meta_dir / "source_map_2class.csv").as_posix(), "w", newline="", encoding="utf-8") as fp:
        w = csv.writer(fp)
        w.writerow(["filename","source","original_label","beeguard_label"])
        w.writerows(rows)
    (meta_dir / "skipped_files_2class.txt").write_text("\n".join(skipped), encoding="utf-8")

    print("\n Done (2-class). Hours written:")
    for l in labels:
        print(f"  {l:6s}: {written[l]/3600.0:.2f} h")
    print(f"Output: {out_base}")

if __name__ == "__main__":
    main()

Credits

Mohamed Ali Bedair
10 projects • 6 followers
Engineer/Maker

Comments