- Edge AI beehive stress detection using audio signals
- ESP32-S3 AI Camera with PDM microphone input
- On-device inference using Edge Impulse (no cloud ML)
- Signal gating (RMS + ZCR) to ignore silence and irrelevant noise
- Robust decision logic using M-of-N voting and cooldown
- Global monitoring via MQTT (HiveMQ public broker)
- Low latency, low bandwidth, suitable for remote deployments
The BeeGuard system is trained using the To Bee or not to Bee Dataset, an open audio dataset containing real beehive recordings collected under different hive conditions.
This dataset includes:
- Long-duration beehive audio recordings
- Natural background noise and environmental variations
- Sound patterns corresponding to normal and stressed hive activity
To transform the raw recordings into a dataset suitable for Edge AI training and deployment, two custom Python scripts were developed.
Full Dataset Preparation (prepare_beeguard_dataset.py)
This script is responsible for converting the raw Beehive Sounds Dataset into an Edge Impulse–ready audio dataset.
Main tasks performed by the script:
- Standardizing all audio files to 16 kHz mono, matching the firmware recording configuration
- Segmenting long recordings into 2-second audio windows, aligned with the real-time inference window used on the ESP32-S3
- Automatically organizing audio segments into labeled class folders: normal or stress
- Generating a clean, reproducible directory structure that can be directly uploaded to Edge Impulse
import csv
import argparse
from pathlib import Path
import numpy as np
import librosa
import soundfile as sf
from tqdm import tqdm
AUDIO_EXTS = {".wav", ".mp3", ".flac", ".ogg", ".m4a", ".aac"}
def normalize_spaces(s: str) -> str:
return " ".join(s.split())
def safe_name(s: str) -> str:
return "".join(c if (c.isalnum() or c in "._-") else "_" for c in s)
def detect_state_from_name(name: str):
n = normalize_spaces(name.lower())
if "missing queen" in n or "no_queenbee" in n or "no queenbee" in n or "no queen" in n:
return "stress"
if "active" in n or "queenbee" in n:
return "normal"
return None
def parse_lab(lab_path: Path):
lines = lab_path.read_text(encoding="utf-8", errors="ignore").splitlines()
segs = []
for ln in lines[1:]:
ln = ln.strip()
if not ln:
continue
parts = ln.split()
if len(parts) < 3:
continue
try:
start = float(parts[0]); end = float(parts[1]); tag = parts[2].lower()
except ValueError:
continue
if end > start and tag == "bee": # <- only keep bee
segs.append((start, end))
return segs
def load_audio(path: Path, sr: int):
y, _ = librosa.load(path.as_posix(), sr=sr, mono=True)
peak = np.max(np.abs(y)) + 1e-9
return (y / peak).astype(np.float32)
def window_audio(y: np.ndarray, sr: int, win_s: float, hop_s: float):
win = int(sr * win_s); hop = int(sr * hop_s)
if len(y) < win:
return []
return [y[i:i+win] for i in range(0, len(y)-win+1, hop)]
def write_wav(out_path: Path, audio: np.ndarray, sr: int):
out_path.parent.mkdir(parents=True, exist_ok=True)
sf.write(out_path.as_posix(), audio, sr, subtype="PCM_16")
def main():
ap = argparse.ArgumentParser()
ap.add_argument("--root", type=str, required=True)
ap.add_argument("--sr", type=int, default=16000)
ap.add_argument("--win", type=float, default=2.0)
ap.add_argument("--hop", type=float, default=0.5)
ap.add_argument("--max_hours_per_class", type=float, default=15.0)
ap.add_argument("--seed", type=int, default=42)
args = ap.parse_args()
rng = np.random.default_rng(args.seed)
root = Path(args.root)
raw_root = root / "raw"
out_base = root / "processed_2class"
meta_dir = root / "metadata"
meta_dir.mkdir(parents=True, exist_ok=True)
labels = ["normal", "stress"]
for l in labels:
(out_base / l).mkdir(parents=True, exist_ok=True)
caps = {l: args.max_hours_per_class * 3600.0 for l in labels}
written = {l: 0.0 for l in labels}
# index audio by normalized stem
audio_files = [p for p in raw_root.rglob("*") if p.is_file() and p.suffix.lower() in AUDIO_EXTS]
audio_index = {}
for af in audio_files:
key = normalize_spaces(af.stem)
prev = audio_index.get(key)
if prev is None or (prev.suffix.lower() != ".wav" and af.suffix.lower() == ".wav"):
audio_index[key] = af
lab_files = [p for p in raw_root.rglob("*.lab")]
rng.shuffle(lab_files)
def find_audio_for_lab(lab_path: Path):
k = normalize_spaces(lab_path.stem)
if k in audio_index:
return audio_index[k]
for kk, vv in audio_index.items():
if kk.startswith(k) or k.startswith(kk):
return vv
return None
rows = []
skipped = []
for lab in tqdm(lab_files, desc="TBON(2-class)"):
audio_path = find_audio_for_lab(lab)
if audio_path is None:
skipped.append(f"No audio match for {lab.name}")
continue
state = detect_state_from_name(audio_path.name)
if state not in labels:
continue
if written[state] >= caps[state]:
continue
try:
y = load_audio(audio_path, args.sr)
except Exception as e:
skipped.append(f"Load fail {audio_path.name}: {e}")
continue
segs = parse_lab(lab)
if not segs:
continue
base_id = safe_name(normalize_spaces(audio_path.stem))
for si, (start_s, end_s) in enumerate(segs):
if written[state] >= caps[state]:
break
start = int(start_s * args.sr); end = int(end_s * args.sr)
if end - start < int(args.sr * args.win):
continue
frames = window_audio(y[start:end], args.sr, args.win, args.hop)
for fi, fr in enumerate(frames):
if written[state] >= caps[state]:
break
out_name = f"tbon_{base_id}_seg{si:03d}_f{fi:05d}_{state}.wav"
write_wav(out_base / state / out_name, fr, args.sr)
written[state] += args.win
rows.append([out_name, "TBON", state, state])
# write metadata
with open((meta_dir / "source_map_2class.csv").as_posix(), "w", newline="", encoding="utf-8") as fp:
w = csv.writer(fp)
w.writerow(["filename","source","original_label","beeguard_label"])
w.writerows(rows)
(meta_dir / "skipped_files_2class.txt").write_text("\n".join(skipped), encoding="utf-8")
print("\n✅ Done (2-class). Hours written:")
for l in labels:
print(f" {l:6s}: {written[l]/3600.0:.2f} h")
print(f"Output: {out_base}")
if __name__ == "__main__":
main()By automating segmentation and labeling, this script ensures that the training data closely matches the on-device audio pipeline and reduces the risk of human labeling errors.
Fast Iteration Subset Creation (make_ei_subset_2h.py)
Training and experimenting with large audio datasets can be time-consuming. To accelerate development, a second script was used to generate a lightweight training subset. This script:
- Takes the fully prepared dataset generated by prepare_beeguard_dataset.py
- Selects a time-limited subset (~2 hours of audio) from the full dataset
- Maintains a balanced distribution between normal and stress classes
- Outputs a compact dataset optimized for rapid iteration inside Edge Impulse
import random
import shutil
from pathlib import Path
SRC = Path("processed_2class")
DST = Path("processed_2class_subset_2h")
CLASSES = ["normal", "stress"]
# 2s clips -> 3600 clips = 2 hours
CLIPS_PER_CLASS = 3600
SEED = 42
random.seed(SEED)
DST.mkdir(parents=True, exist_ok=True)
for c in CLASSES:
src_dir = SRC / c
dst_dir = DST / c
dst_dir.mkdir(parents=True, exist_ok=True)
files = list(src_dir.glob("*.wav"))
random.shuffle(files)
chosen = files[:CLIPS_PER_CLASS]
if len(chosen) < CLIPS_PER_CLASS:
print(f"[WARN] {c}: only {len(chosen)} clips available")
for f in chosen:
shutil.copy2(f, dst_dir / f.name)
print(f"{c}: copied {len(chosen)} clips ({len(chosen)*2/3600:.2f} hours)")
print("\n✅ 2-hour subset ready at:", DST.resolve())This approach allows fast experimentation with preprocessing parameters and model architectures before performing final training on the complete dataset.
Edge Impulse : Data PreprocessingAfter uploading the dataset to Edge Impulse, MFCC (Mel Frequency Cepstral Coefficients) were used to extract meaningful features from raw beehive audio signals.
MFCCs are well suited for this task as they capture spectral characteristics of hive sounds while remaining computationally efficient for embedded devices.
MFCC Configuration
The preprocessing pipeline was configured as follows:
- Coefficients: 13
- Frame length / stride: 30 ms / 15 ms
- Filter banks: 32
- FFT length: 256
- Frequency range: 100 Hz – 8 kHz
- Pre-emphasis: 0.97
These parameters were chosen to balance noise robustness, feature resolution, and on-device performance.
On-Device Performance
Edge Impulse estimates show that the preprocessing stage requires:
- ~968 ms DSP time
- ~29 KB peak RAM
This confirms that the entire preprocessing pipeline can run efficiently on the ESP32-S3, enabling fully on-device inference without cloud dependency.
Edge Impulse : Model TrainingThe neural network classifier was trained using Edge Impulse and optimized for deployment on the ESP32-S3. Training was performed for 50 cycles with a learning rate of 0.005, using int8 quantization to reduce memory footprint and improve inference efficiency on embedded hardware. The input to the model consists of 1, 716 MFCC features, which are processed through two 1D convolution and pooling layers with dropout to reduce overfitting, followed by an output layer with two classes: normal and stress. The model architecture was intentionally kept compact to balance classification performance and resource constraints.
On the validation dataset, the trained model achieved an accuracy of 84.3% with a loss of 0.39 and a weighted F1-score of 0.84. When compiled using Edge Impulse’s EON compiler, the model demonstrated strong on-device performance with an inference time of approximately 9 ms, a peak RAM usage of around 14.6 KB, and a flash usage of about 46.5 KB. These results confirm that the model is suitable for real-time, on-device beehive stress detection on the ESP32-S3.
After training and validation, the Edge Impulse model was deployed as a C++ inference library and integrated into an Arduino-based firmware running on the ESP32-S3 AI Camera. Audio data is captured in real time from the PDM microphone using the I2S interface at 16 kHz and processed in 2-second windows, matching the configuration used during dataset preparation and model training.
Before inference, each audio window undergoes lightweight on-device preprocessing, including DC offset removal, gain normalization, and signal gating based on RMS and zero-crossing rate (ZCR). This gating step prevents unnecessary inference on silence or irrelevant noise and improves overall system robustness. Once a valid hive sound is detected, the processed audio is passed to the Edge Impulse classifier to estimate the probability of stress.
To further reduce false positives, the firmware applies an M-of-N voting mechanism, requiring multiple consecutive stress detections before confirming an alert, along with a cooldown period to avoid alert flooding. When a stress condition is confirmed, the device publishes telemetry and alerts using MQTT to a public HiveMQ broker, enabling remote monitoring from anywhere. All inference runs fully on-device, with MQTT used only for lightweight data transmission, making the system suitable for low-power and remote beehive deployments.
MQTT (HiveMQ) configuration + topics
This part configures the device to publish telemetry and alerts to the HiveMQ public broker, using short MQTT topics to keep the payload lightweight.
// MQTT / HiveMQ (Step 4)
static const char* MQTT_HOST = "broker.hivemq.com";
static const uint16_t MQTT_PORT = 1883;
// Short topics
static const char* TOPIC_TEL = "bg/tel";
static const char* TOPIC_ALERT = "bg/alert";
static const char* TOPIC_STAT = "bg/stat";Feature extraction: RMS + ZCR (signal gating)
Before running the ML model, the firmware computes two simple audio features — RMS (signal energy) and ZCR (zero-crossing rate). These are used to gate inference and avoid processing silence or irrelevant noise.
static float compute_rms(const int16_t *x, size_t n) {
double acc = 0.0;
for (size_t i = 0; i < n; i++) {
double f = (double)x[i] / 32768.0;
acc += f * f;
}
return (float)sqrt(acc / (double)n);
}
static float compute_zcr(const int16_t *x, size_t n) {
if (n < 2) return 0.0f;
int zc = 0;
for (size_t i = 1; i < n; i++) {
if ((x[i-1] >= 0 && x[i] < 0) || (x[i-1] < 0 && x[i] >= 0)) zc++;
}
return (float)zc / (float)n;
}
static bool hive_present_gate(const int16_t *x, size_t n, float &rms_out, float &zcr_out) {
rms_out = compute_rms(x, n);
zcr_out = compute_zcr(x, n);
if (rms_out < RMS_MIN_GATE) return false;
if (zcr_out < ZCR_MIN_GATE) return false;
return true;
}Voting mechanism (M-of-N) + debounce concept
Raw classifier output can fluctuate from window to window, so BeeGuard implements an M-of-N voting mechanism to confirm stress only after repeated detections.
static void push_vote(bool stress_hit) {
stress_hist[hist_pos] = stress_hit;
hist_pos = (hist_pos + 1) % VOTE_M;
}
static int count_stress_votes() {
int c = 0;
for (int i = 0; i < VOTE_M; i++) if (stress_hist[i]) c++;
return c;
}Cooldown to avoid alert flooding
Even after stress is confirmed, the firmware enforces a cooldown period so alerts don’t spam the broker if the hive stays stressed for a while.
static const uint32_t ALERT_COOLDOWN_MS = 60000; // 60s cooldownMQTT publishing: telemetry + alert
The firmware publishes:
- continuous telemetry (bg/tel) including RMS/ZCR + probabilities + votes
- confirmed alerts (bg/alert) when stress is detected and voting + cooldown allow it
static void mqttPublishTelemetry(float rms, float zcr, float p_normal, float p_stress, int votes, bool hive_ok) {
char payload[256];
snprintf(payload, sizeof(payload),
"{\"id\":\"%s\",\"r\":%.4f,\"z\":%.4f,\"hn\":%s,\"pn\":%.3f,\"ps\":%.3f,\"v\":%d}",
mqttClientId.c_str(),
rms, zcr,
hive_ok ? "true" : "false",
p_normal, p_stress,
votes);
mqtt.publish(TOPIC_TEL, payload);
}
static void mqttPublishAlert(float rms, float zcr, float p_stress, int votes) {
char payload[256];
snprintf(payload, sizeof(payload),
"{\"id\":\"%s\",\"alert\":true,\"r\":%.4f,\"z\":%.4f,\"ps\":%.3f,\"v\":%d}",
mqttClientId.c_str(),
rms, zcr, p_stress, votes);
mqtt.publish(TOPIC_ALERT, payload);
}Results & ValidationDuring testing, BeeGuard reliably detected stress-related hive sounds while ignoring silence and unrelated noise. The RMS/ZCR gating significantly reduced unnecessary inference, and stress alerts were only triggered after repeated confirmations. MQTT telemetry and alerts were successfully received in real time using MQTT Explorer.
All audio processing and inference are performed locally on the ESP32-S3, eliminating the need for continuous cloud streaming. MQTT is used only to transmit compact telemetry and alerts, making the system suitable for low-power, remote, and potentially solar-powered beehive deployments.
Limitations & Lessons LearnedThe current system relies on publicly available beehive audio data and would benefit from additional recordings captured in real hives. Environmental factors such as wind and rain can also affect audio quality and should be addressed in future iterations.
Future Work- Encrypted MQTT (TLS)
- Multi-class events (queen loss, intrusion, swarming)
- Solar + battery power
- Dashboard (Node-RED / Grafana)
- Model retraining with real hive data







Comments