Tejpunj Raju
Published

IoT Based Public Notification System

A smart IoT and machine learning based public address system.

IntermediateFull instructions provided5 hours6,027
IoT Based Public Notification System

Things used in this project

Hardware components

Bolt WiFi Module
Bolt IoT Bolt WiFi Module
×1
Arduino UNO
Arduino UNO
×1
Microphone Amplifier Breakout
Adafruit Microphone Amplifier Breakout
×1
Jumper wires (generic)
Jumper wires (generic)
Male to Female or Male to Male depending on how you want to make the connections.
×5

Software apps and online services

Bolt Cloud
Bolt IoT Bolt Cloud
Telegram
Ubuntu
Ubuntu

Story

Read more

Schematics

Circuit Connection

Make the following connection:
1. Arduino 5V ---> Mic VCC
2. Arduino GND---> Mic GND
3. Arduino A0 ---> Mic OUT
4. Arduino Rx ---> Bolt Tx
5/ Arduino Tx ---> Bolt Rx

Code

Configuration file for the code

Python
Enter your own api_key, device_id, telegram_chat_id, telegram_bot_id
api_key = "XXXXXXXXXXXXXXXXXXXXXXXXX"
device_id = "XXXXXXXXXX"
telegram_chat_id = "XXXXXXXXXXX"
telegram_bot_id = "XXXXXXXXXXXXXXXXXXXXX"
frame_size = 10
mul_factor = 3

Code for everything else

Python
import requests
import time
import statistics
import math
import json

import conf
from boltiot import Bolt

mybolt = Bolt(conf.api_key,conf.device_id)
mybolt.serialBegin('9600')

def getSensorValue():
#Returns Sensor Value. Returns -999 if fails
	try:
		response = mybolt.serialRead("10")
		data = json.loads(response)

		if data["success"] != 1:
			print("Request not successful")
			print(response)
			return -999

		sensor_value = data["value"]
		return sensor_value

	except Exception as e:
		print("Something went wrong. Response from BOLT:")
		print(e)
		return -999

def send_telegram_message(message):
#Send Telegram message and returns status
	url = "https://api.telegram.org/"+conf.telegram_bot_id+"/sendMessage"

	data =  {
		"chat_id" : conf.telegram_chat_id,
		"text" : message
		}

	try:
		response = requests.request("POST",url,params = data)

		print("Telegram url:")
		print(url)

		print("Response from Telegram:")
		print(response.text)

		telegram_data = json.loads(response.text)

		return telegram_data["ok"]

	except Exception as e:

		print("Something went wrong! Response:")
		print(e)
		return(False)

def compute_bounds(history_data,frame_size,factor):
#Computes lower and upper bounds for anomaly detection

	if len(history_data) < frame_size:
		return None

	if len(history_data) > frame_size:
		del history_data[0:len(history_data)-frame_size]

	Mn = statistics.mean(history_data)
	Variance = 0

	for data in history_data:
		Variance = Variance + math.pow((data-Mn),2)

	Zn = factor * math.sqrt(Variance/frame_size)

	high_bound = history_data[frame_size-1] + Zn
	low_bound = history_data[frame_size-1] - Zn

	return [high_bound,low_bound]

history_data = []
count = 0

while True:

	sensor_value = getSensorValue()
	l = len(sensor_value)

	if sensor_value == -999:
		print("Request was unsuccessful! Skipping")
		continue

	curNoise = int(sensor_value[0:l-1])

	print("Current Noise Value: ",curNoise)

	bound = compute_bounds(history_data,conf.frame_size,conf.mul_factor)

	if not bound:
		history_data.append(curNoise)
		print("Need ",conf.frame_size-len(history_data)," more values.")
		time.sleep(5)
		continue

	print(bound,curNoise)

	if curNoise<bound[1]:

		print("No noise at all!")

		message = "Very quiet surroundings."
		telegram_status = send_telegram_message(message)

		print("Status: ",telegram_status)

	elif curNoise>bound[0]:

		print("Bell ringing!")

		message = "Moo! Milk has arrived at the Milk Distribution Point."
		telegram_status = send_telegram_message(message)

		print("Status: ",telegram_status)

	history_data.append(curNoise)
	print("\n")

	time.sleep(5)

Credits

Tejpunj Raju
1 project • 1 follower

Comments