Saving Lives, One Sensor at a Time
Every year, numerous animals are injured or killed on roadways and countless drivers are put at risk in the process. As traffic volumes increase and climate conditions grow more unpredictable, traditional road safety systems are struggling to keep up. My proposal for this problem is a decentralized road monitoring system that monitors animal presence in the area along with road conditions and air visibility and adjusts in real time traffic rules to increase safety on roads, both for animals and drivers close to natural habitats of wildlife.
The dynamic nature of my proposal also helps address a common challenge: overly conservative speed limits that drivers often ignore. Instead of relying on static signs that may not reflect actual risk, decentralized monitoring allows speed limits and alerts to change only when necessary, making them more credible, more effective, and more likely to be followed.
At the heart of this project lies a simple but powerful goal: safer roads for both humans and animals.
General ConceptThe project can be split into two parts: a main processing unit / road sign display and an array of remote trail cameras for wildlife detection:
On the image above you can see four low power trail cameras watching over the area around the road. These cameras perform simple binary classification over an frame to detect, whether some animal is present or not.
Upon detection, the frame is sent to main processing unit (the sign itself) and second, more robust classification to detect the type of an animal is performed. The result of this classification is combined with other road condition data to change speed limits and warnings accordingly.
The DisplayThe display consists of two 64x64 pixel LED panels, similar to those used in normal electronic road signs. To control them I've used Raspberry Pi Pico W as its driver. Each panel is meant to display one road sign.
The Pico has all the data about signs to display preloaded in flash as C arrays exported from GIMP and the main processing unit only sends info about which sign is on, which sign to display and in case of speed limit sign, info about the speed limit:
struct SignAddrs
{
uint16_t on;
uint16_t type;
uint16_t speed;
};This data is sent over serial using MODBUS protocol for reliability.
The display code was tested using Python and PyModbus in a simple loop:
# Example MODBUS write to sign's MCU:
def sign_warning_animals(self):
self.client.write_register(self.signs[self.SIGN_IDX_WARNING]["type"],
self.SIGN_ANIMALS, device_id=self.ID)
self.client.write_coil(self.signs[self.SIGN_IDX_WARNING]["on"], 1, device_id=self.ID)
# Simple loop over available signs
driver = SignDriver("COM9", 1)
while True:
driver.sign_warning_off()
driver.sign_speed_stop_off()
time.sleep(1)
driver.sign_warning_animals()
speeds = [driver.SPEED_30, driver.SPEED_50, driver.SPEED_70, driver.SPEED_90]
for i in range(4):
driver.sign_speed(speeds[i])
time.sleep(1)
driver.sign_warning_wild_animals()
driver.sign_stop()
time.sleep(1)First Stage Animal DetectionThe first stage animal detection is performed on the field, on low power ESP32-CAM module:
Its job is pretty much to detect just presence of an animal and alert the main processing unit. This purpose isn't so complicated, so there's no need for overcomplicated and oversized algorithm. To detect animals I've created a simple neural network in Edge Impulse. This network has only two classes: animal and no animal and its footprint is small enough to easily fit into ESP32-CAM's flash. The network takes an grayscale 320x240 image:
On real hardware, upon detection, the ESP32-CAM notifies the main processing unit using MQTT and sends the whole frame over it for further processing. The main loop for this device is pretty straightforward:
void loop()
{
mqttClient->loop();
camera_fb_t *fb = esp_camera_fb_get();
if (nullptr == fb)
{
Serial.println("Failed to get frame");
return;
}
auto dataProcessor = [fb](size_t offset, size_t length, float *out_ptr) -> int
{
uint8_t *start = fb->buf + offset;
for (size_t i = 0; i < length; i++)
{
int filled = (start[i] << 16) | (start[i] << 8) | (start[i]);
out_ptr[i] = filled;
}
return 0;
};
ClassificationResult res = runClassifier(dataProcessor);
if (res == ClassificationResult::ANIMAL)
{
if (!publishImg(fb))
{
enterRecoveryMQTT(MQTT_TOPIC_STDBY);
}
else
{
delay(DELAY_AFTER_PUBLISH_MS);
}
}
esp_camera_fb_return(fb);
}Sample received stream can be seen below:
Also, the device can be put to deep sleep through MQTT:
void enterSleep(char *topic, uint8_t *msg, unsigned int length)
{
Serial.println("Entering sleep");
esp_sleep_enable_timer_wakeup(DEEP_SLEEP_TIME_US);
esp_deep_sleep_start();
}This functionality is useful to for power savings and to prevent trail cameras from sending detections when the wildlife alert was already issued recently.
Second Stage Animal ClassifierAfter early detection has been received by the main processing unit, it is passed into second stage classifier. This classifier is more robust and uses ResNet architecture to classify detections into specific animals. In my case I've created network that classifies commonly found animals in Poland:
classes_names = ['boar', 'cow', 'deer', 'dog', 'empty', 'goat', 'horse', 'pig', 'sheep', 'wolf']"empty" class is added to filter out false positives from the first stage.
The network was prepared and trained using Keras.
This second stage classification is a part of a bigger runtime that will be described later.
Road Visibility classifierRoad visibility is also an important aspect of this project. In case the visibility is low, the trail cameras can return false positives, or worse, yield false negatives.
To prevent that, an additional network was designed for this project to estimate the air visibility on the road. This network uses a model proposed in Leveraging airport CCTV footage through video understanding techniques for visibility prediction:
To train this network I've used GRAM Road-Traffic Monitoring dataset. The problem is that this dataset doesn't contain any data regarding the visibility, so I've augmented this dataset with artificially created fog.
Firstly, to generate realistically looking fog, depth data is required. This dataset also doesn't contain any depth data, as its just a simple recording from a CCTV monocular camera. To generate depth data I've used Depth-Anything:
Then, to generate fog, a simple convolution is applied to the original frame:
# Semantic Foggy Scene Understanding with Synthetic Data
# DOI: 10.1007/s11263-018-1072-8
# https://arxiv.org/abs/1708.07819
for x in range(width):
for y in range(height):
t = np.exp(-beta * (255-distance_map[y - 1, x - 1]))
val = t * img[y, x] + 255 * (1 - t)
img_fogged[y, x] = val
return img_foggedThis network requires additional input images in form of SIFT and optical flow data. These were generated using OpenCV:
With all required data generated, the network was prepared and trained using Keras.
This network runs on Jetson Nano and receives frames from two Raspberry Pi Cameras running in stereo configuration:
This network is also a part of a bigger runtime that will be described later.
Combining It All TogetherLED panels were joined together using 3D printed mounts. These mounts also have a small space for DIN rails that were used to mount Pico panel driver and Jetson Nano on the sign itself.
Everything is powered from step down converter at 5V.
The sign itself was mounted on a tripod for testing purposes.
Runner logicLogic runner runs on Jetson Nano as a series of multiple Python scripts running in parallel:
- MQTT Runner
- Second Stage Classifier
- Camera Subscriber
- Air Visibility Classifier
- Runtime Logic
- LED Sign Driver
MQTT Runner is just a simple paho-mqtt subscriber / publisher that awaits messages from trail cameras and puts them to deep sleep if necessary. Each received frame from this runner is passed to Second Stage Classifier's queue.
Second Stage Classifier runs in its own separate thread in an endless loop. During each loop iteration it checks its queue for new frames. Each new frame is then processed through Tensorflow model. Result from inference is then passed to Runtime Logic.
# MQTT Runner pushes received frames to Second Stage Classifier through this method
def add_animal_detection_to_queue(self, img):
print(f"Received image from 1st stage: {img.shape}")
with self.animal_classifier_mutex:
self.animal_classifier_queue.append(img)
# Separate thread loop
def __inference_loop(self):
while not self.thread_event.is_set():
data = None
with self.animal_classifier_mutex:
if len(self.animal_classifier_queue):
data = self.animal_classifier_queue.pop(0)
if data is not None and np.prod(data.shape) == config.FRAME_WIDTH*config.FRAME_HEIGHT:
data.shape = (1, config.FRAME_HEIGHT, config.FRAME_WIDTH, 1)
result = self.model.serve(data)
result = self.__get_readable_result(result)
result = self.__get_best_result(result)
if self.cb != None:
print(f"Sending animal 2nd stage classifier result: {result}")
self.cb(result)Camera Subscriber is just simple class that fetches new frames from attached CSI camera in an asynchronous way. Camera Subscriber is used by Air Visibility Classifier.
Air Visibility Classifier also runs in its own separate thread and fetches frames from CSI camera, preprocesses them, generates SIFT and Optical Flow images, then performs inference through Tensorflow model. Each result from inference is then passed to Runtime Logic.
# Process each frame from camera to estimate air visibility
def __inference_loop(self):
while not self.thread_event.is_set():
f0 = self.c0.get_frame()
f1 = self.c1.get_frame()
time.sleep(0.2)
f0_next = self.c0.get_frame()
desired_shape = config.CAMERA_WIDTH*config.CAMERA_HEIGHT*3
if f0 is not None and f1 is not None and f0_next is not None and \
np.prod(f0.shape) == desired_shape and np.prod(f1.shape) == desired_shape and np.prod(f0_next.shape) == desired_shape:
print("Received air visibility frames, estimating visibility...")
f0_sift = self.__get_sift_img(f0)
f0_flow = self.__get_flow_img(f0, f0_next)
f0 = cv.resize(f0.copy(), (config.RGB_HEIGHT, config.RGB_WIDTH))
f0.shape = (1, config.RGB_HEIGHT, config.RGB_WIDTH, 3)
f0_sift.shape = (1, config.SIFT_FLOW_HEIGHT, config.SIFT_FLOW_WIDTH, 3)
f0_flow.shape = (1, config.SIFT_FLOW_HEIGHT, config.SIFT_FLOW_WIDTH, 3)
result = self.model.serve([f0, f0_sift, f0_flow])
result = self.__get_result(result)
if self.cb != None:
print(f"Sending air visibility result: {result}")
self.cb(result)Runtime Logic combines data from Second Stage Classifier and Air Visibility Classifier to estimate safe road rules. Data comes through two callback methods:
# Callback from Air Visibility Classifier
# fog_density is either "SMALL", "MEDIUM" or "HIGH"
def fog_visibility_consumer(self, fog_density):
with self.fog_lock:
self.fog_current = fog_density
# Callback from Second Stage Classifier
# Result is animal type and certainty
def animal_classifier_consumer(self, classification):
# Ignore results that have low certainty
if classification[1] < 0.85:
return
now = int(time.time())
with self.animal_lock:
self.new_animal_detection = True
if classification[0] == "empty":
# Prevent override if any animal was observed in given timeframe
if (now - self.animal_wild_stamp) < self.ANIMAL_SIGN_TIMEOUT or \
(now - self.animal_farm_stamp) < self.ANIMAL_SIGN_TIMEOUT:
return
self.animal_current = self.ANIMAL_SIGN_NONE
elif self.__is_wild_animal(classification[0]):
self.animal_wild_stamp = int(time.time())
self.animal_current = self.ANIMAL_SIGN_WILD
else:
# Prevent override if wild animal was observed in given timeframe
if (now - self.animal_wild_stamp) < self.ANIMAL_SIGN_TIMEOUT:
return
self.animal_farm_stamp = int(time.time())
self.animal_current = self.ANIMAL_SIGN_FARMThen, the Runtime Logic runs in slow loop and uses these values to change road rules. In the project there is a space for two signs (1 for each LED panel). The top one is a warning sign. Logic for this sign is pretty straightforward:
def __process_animal_sign(self):
now = int(time.time())
with self.animal_lock:
if self.new_animal_detection:
self.new_animal_detection = False
# Refresh timeout as new same detection occured
if self.animal_current == self.animal_applied:
print("Same animal warning applied, refreshing timeout")
self.animal_stamp = int(time.time())
# Instantly apply more important animal sign
elif self.animal_current > self.animal_applied:
print("Bigger animal warning occured, applying instantly")
self.animal_stamp = int(time.time())
self.animal_applied = self.animal_current
self.__reset_animal_current()
# Timeout occured, apply lower sign
if self.animal_applied > self.ANIMAL_SIGN_NONE and (now - self.animal_stamp > self.ANIMAL_SIGN_TIMEOUT):
print("Timeout, applying lower warning")
# Apply normal animal warning if there was detection within some timeframe
if self.animal_applied == self.ANIMAL_SIGN_WILD and (now - self.animal_farm_stamp < self.ANIMAL_SIGN_TIMEOUT):
print("Farm animal warning applied")
self.animal_applied = self.ANIMAL_SIGN_FARM
self.__reset_animal_current()
else:
print("No animal warning present")
self.animal_applied = self.ANIMAL_SIGN_NONE
self.__reset_animal_current()If new detection is the same as the current one, then timer for displaying the sign is reset and the sign stays for longer. In case new detection is of higher priority, like wild animals are more dangerous than farm ones, then the warning for wild animals is applied instantly. After timers runs out, then in case of wild animal sign, in case there was a farm animal detected during wild animal warning, then farm animal warning is displayed. Otherwise the sign turns off.
The bottom sign is a speed limit. Its logic uses fog and animal data to adjust speed limits in a safe way:
def __lookup_speed(self, fog, sign):
if fog == "SMALL" and (sign == self.ANIMAL_SIGN_NONE or sign == self.ANIMAL_SIGN_FARM):
return SignDriver.SPEED_90
elif fog == "MEDIUM" and (sign == self.ANIMAL_SIGN_NONE or sign == self.ANIMAL_SIGN_FARM):
return SignDriver.SPEED_70
elif (fog == "HIGH" and (sign == self.ANIMAL_SIGN_NONE or sign == self.ANIMAL_SIGN_FARM)) or fog == "SMALL" and sign == self.ANIMAL_SIGN_WILD):
return SignDriver.SPEED_50
elif (fog == "HIGH" or fog == "MEDIUM") and sign == self.ANIMAL_SIGN_WILD:
return SignDriver.SPEED_30
return SignDriver.SPEED_30 # Fallback just in case
def __process_speed_sign(self):
now = int(time.time())
with self.fog_lock and self.animal_lock:
speed = self.__lookup_speed(self.fog_current, self.animal_current)
# Refresh timeout as new same detection occured
if speed == self.speed_applied:
print("Same speed computed, refreshing timeout")
self.speed_stamp = int(time.time())
# Instantly apply slower speed limit
elif speed < self.speed_applied:
print("Lower speed computed, applying instantly")
self.speed_stamp = int(time.time())
self.speed_applied = speed
# Timeout occured, apply lower speed
if now - self.speed_stamp > self.SPEED_SIGN_TIMEOUT:
print(f"Timeout, applying speed: {speed}")
self.speed_applied = speedThe logic of __process_speed_sign is pretty much similar to __process_animal_sign. The difference is only that instead of applying higher priority warning if necessary, it lowers the speed limit.
The logic of __lookup_speed seems convoluted a bit, but in truth its a just simple truth table that takes values of fog and animal presence:
The values from left to right are:
S ("SMALL" fog), M ("MEDIUM" fog), H ("HIGH" fog), A (farm animals warning), WA (wild animals warning). This truth table was minimized and implemented as a above function.
Sign Driver is just a simple MODBUS client used to write runtime logic's verdict into a LED panel driver:
def loop_iteration(self):
self.__process_animal_sign()
self.__process_speed_sign()
return [self.speed_applied, self.__get_driver_sign()]
while True:
res = sign_logic.loop_iteration()
warning = res[1]
speed = res[0]
if warning == None:
sign_driver.sign_warning_off()
elif warning == SignDriver.SIGN_ANIMALS:
sign_driver.sign_warning_animals()
elif warning == SignDriver.SIGN_WILD_ANIMALS:
sign_driver.sign_warning_wild_animals()
sign_driver.sign_speed(speed)
time.sleep(5)Summary of all the logic running within the system can be seen below:
To conclude this project, in my opinion this system offers a smarter, more responsive approach to road safety and respects the needs of both drivers and wildlife. With the right implementation, this solution can reduce collisions, save lives, and ease the way for more intelligent infrastructure in areas where nature and human activity intersect.
Further ideas- Replace WiFi with LoRa
- Add road surface evaluation (RoadSaW)
- Add more MQTT endpoints to configure the runtime on the fly
- Add more warning sign types to increase safety for drivers and animals even further
- The __lookup_speed speed evaluation seems unreadable, in real product it would be a good idea to load it from human readable truth table. This table could be also configured on the fly through i.e MQTT










_9cZcrRjBKD.jpg)










Comments