Terry RodriguezSalma Mayorquin
Published © MIT

Saving Bandwidth with Anomaly Detection

We experiment with visual anomaly detection to develop techniques for reducing bandwidth consumption in streaming IoT applications.

AdvancedFull instructions provided4 hours1,908

Things used in this project

Hardware components

NVIDIA Jetson Nano Developer Kit
NVIDIA Jetson Nano Developer Kit
×1
Webcam, Logitech® HD Pro
Webcam, Logitech® HD Pro
×1

Software apps and online services

TensorFlow
TensorFlow

Story

Read more

Code

visual_anomaly_demo.py

Python
#!/usr/bin/python3
from __future__ import absolute_import, print_function
import cv2
import numpy as np
from time import sleep, time
from collections import deque

from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import RMSprop
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, UpSampling2D, ZeroPadding2D
from tensorflow.keras.callbacks import History

def build_model(img_size=32):
    input_img = Input(shape=(img_size, img_size, 1))
    x = Conv2D(32, (5, 5), activation='relu', padding='same')(input_img)
    x = MaxPooling2D((2, 2), padding='same')(x)
    x = Conv2D(8, (3, 3), activation='relu', padding='same')(x)
    encoded = MaxPooling2D((2, 2), padding='same', name='encoder')(x)

    x = Conv2D(8, (3, 3), activation='relu', padding='same')(encoded)
    x = UpSampling2D((2, 2))(x)
    x = Conv2D(32, (3, 3), activation='relu', padding='same')(x)
    x = UpSampling2D((2, 2))(x)
    decoded = Conv2D(1, (5, 5), activation='sigmoid', padding='same')(x)

    autoencoder = Model(input_img, decoded)
    autoencoder.compile(optimizer=RMSprop(lr=lr), loss='binary_crossentropy')

    return autoencoder

def load_run_render(frame):
    im = cv2.resize(frame, (img_size, img_size))
    im = cv2.cvtColor(im, cv2.COLOR_RGB2GRAY)
    im = np.expand_dims(np.expand_dims(im, axis=0).astype(np.float32), axis=-1) / 255
    history = History()
    start_time = time()
    autoencoder.fit(im, im,
                    epochs=1,
                    batch_size=1,
                    shuffle=True,
                    callbacks=[history])
    end_time = time()
    return history.history['loss'][-1]

def main():
    # Begin Video Capture
    cam = cv2.VideoCapture(0)
    idx = 0
    while True:
        try:
            ret, frame = cam.read()
            ts = int(time() * 1000)
            print(idx)
            if ret:
                loss = load_run_render(frame)
                q.append(loss)
                if len(q) > 50:
                    prev_hist = list(q)[:-1]
                    threshold = 5 * np.std(prev_hist)
                    if np.abs(loss - np.mean(prev_hist)) > threshold:
                        print(loss, 'anomaly')
                    else:
                        print(loss, 'normal')
                else:
                    pass
        except Exception as e:
            print(e)
            pass
        sleep(DELAY)
        idx += 1
        
if __name__ == '__main__':
    # Declare Parameters
    lr = 1e-4
    DELAY = 0.2
    img_size=128
    q = deque(maxlen=300)

    # Build the Autoencoder
    autoencoder = build_model(img_size=img_size)

    # Run the Training/Inference Loop
    main()

Credits

Terry Rodriguez

Terry Rodriguez

15 projects • 138 followers
hack the change you want to see
Salma Mayorquin

Salma Mayorquin

15 projects • 270 followers
Software engineer AI/ML & hardware tinkerer interested in embedded AI. Lets hack the change we want to see!

Comments