We are making an IoT device which can detect light and then set boundaries for the light range.When the light received by the LDR changes suddenly i.e when someone obstructs the light or more light is received by the sensor, through Z score analysis anomaly is detected and an "Alert" message is sent to the user through an application called Twilio.
STEPSSTEP:1CONNECTIONS
- Insert one lead of the LDR into 3V3 pin of bolt wifi module and other to A0 pin
- Insert one end of the 10kohm resistor to the A0 pin and other to the ground pin of the module.
- Using the usb type a to micro usb cable connect the wifi module to a powerbank/laptop.
STEP : 2 BOLT CLOUD
- Log in to the bolt cloud at cloud.boltiot.com and create a product and follow the instructions given there.
- Make it Output device and make collection of data as GPIO.
- Now link you WiFi module to the product and now it will show online.
- SELECT API GENERATE AND COPY.
Login to your digital ocean server or vmware and select ubuntu
1.CONFIGUREFILE
sudo mkdir Anomaly_Detection
cd Anomaly_Detection
sudo nano conf.py
- In conf.py enter the below code and replace all the required data.
SSID = 'You can find SSID in your Twilio Dashboard'
AUTH_TOKEN = 'You can find on your Twilio Dashboard'
FROM_NUMBER = 'This is the no. generated by Twilio. You can find this on your Twilio Dashboard'
TO_NUMBER = 'This is your number. Make sure you are adding +91 in beginning'
API_KEY = 'This is your Bolt Cloud account API key'
DEVICE_ID = 'This is the ID of your Bolt device'
FRAME_SIZE = 10
MUL_FACTOR = 6
2.Anomalydetection code
sudo nano anomaly_detection.py
- We import all the libraries which are required.
- Then create a function compute_bounds to obtain 10 values for the z score. If the data point are less we continue in the loop and if there are more we delete them.
import conf, json, time, math, statistics
from boltiot import Sms, Bolt
def compute_bounds(history_data,frame_size,factor):
if len(history_data)<frame_size :
return None
if len(history_data)>frame_size :
del history_data[0:len(history_data)-frame_size]
- Mean, Variance and Z score are calculated.
- collect all the values as data variable & the range of values are found.
Mn=statistics.mean(history_data)
Variance=0
for data in history_data :
Variance += math.pow((data-Mn),2)
Zn = factor * math.sqrt(Variance / frame_size)
High_bound = history_data[frame_size-1]+Zn
Low_bound = history_data[frame_size-1]-Zn
return [High_bound,Low_bound]
- In the below code, the main program will call the configuration file and obtain details (here twilio will also get the details) from the configuration file. Then we declare history_data as an array to store data in as the index.
mybolt = Bolt(conf.API_KEY, conf.DEVICE_ID)
sms = Sms(conf.SSID, conf.AUTH_TOKEN, conf.TO_NUMBER, conf.FROM_NUMBER)
history_data=[]
- Fetch the data and if any error present it will print 'error' and let the user know. We then set a variable sensor_value which will take in the integer values of the history_data values obtained
while True:
response = mybolt.analogRead('A0')
data = json.loads(response)
if data['success'] != 1:
print("There was an error while retriving the data.")
print("This is the error:"+data['value'])
time.sleep(10)
continue
print ("This is the value "+data['value'])
sensor_value=0
- To get integer values from the sensor, we implicitly covert it to int.
try:
sensor_value = int(data['value'])
except e:
print("There was an error while parsing the response: ",e)
continue
- in the below code, we are collecting 10 nos. of data for z score analysis
bound = compute_bounds(history_data,conf.FRAME_SIZE,conf.MUL_FACTOR)
if not bound:
required_data_count=conf.FRAME_SIZE-len(history_data)
print("Not enough data to compute Z-score. Need ",required_data_count," more data points")
history_data.append(int(data['value']))
time.sleep(10)
continue
- in the below code, we check whether the light intensity is varied suddenly and sends the alert as an SMS.
try:
if sensor_value > bound[0] :
print ("The light level increased suddenly. Sending an SMS.")
response = sms.send_sms("INTRUDER ALERT")
print("This is the response ",response)
elif sensor_value < bound[1]:
print ("The light level decreased suddenly. Sending an SMS.")
response = sms.send_sms("INTRUDER ALERT")
print("This is the response ",response)
history_data.append(sensor_value);
except Exception as e:
print ("Error",e)
time.sleep(1)
OUTPUT
Comments