Peter Ma
Published © GPL3+

Netduino Facial Recognition WiFi Lock

Using facial recognition to unlock a deadbolt through Netduino WiFi.

IntermediateFull instructions provided20 hours2,527

Things used in this project

Story

Read more

Schematics

Netduino Lock Diagram

Netduino Lock Diagram
Ezgif com optimize (9) x60mebm40f

Code

Movidius Face Recognition Unlock Code

Python
Python code for Face Video Match with TensorFlow Facenet
#! /usr/bin/env python3
# Copyright(c) 2017 Intel Corporation. 
# License: MIT See LICENSE file in root directory.
from mvnc import mvncapi as mvnc
import numpy
import cv2
import sys
import os
import requests
EXAMPLES_BASE_DIR='../../'
IMAGES_DIR = './'
VALIDATED_IMAGES_DIR = IMAGES_DIR + 'validated_images/'
validated_image_filename = VALIDATED_IMAGES_DIR + 'valid.jpg'
GRAPH_FILENAME = "facenet_celeb_ncs.graph"
# name of the opencv window
CV_WINDOW_NAME = "FaceNet"
CAMERA_INDEX = 0
REQUEST_CAMERA_WIDTH = 640
REQUEST_CAMERA_HEIGHT = 480
# the same face will return 0.0
# different faces return higher numbers
# this is NOT between 0.0 and 1.0
FACE_MATCH_THRESHOLD = 1.0
# Run an inference on the passed image
# image_to_classify is the image on which an inference will be performed
#    upon successful return this image will be overlayed with boxes
#    and labels identifying the found objects within the image.
# ssd_mobilenet_graph is the Graph object from the NCAPI which will
#    be used to peform the inference.
def run_inference(image_to_classify, facenet_graph):
   # get a resized version of the image that is the dimensions
   # SSD Mobile net expects
   resized_image = preprocess_image(image_to_classify)
   # ***************************************************************
   # Send the image to the NCS
   # ***************************************************************
   facenet_graph.LoadTensor(resized_image.astype(numpy.float16), None)
   # ***************************************************************
   # Get the result from the NCS
   # ***************************************************************
   output, userobj = facenet_graph.GetResult()
   return output
# overlays the boxes and labels onto the display image.
# display_image is the image on which to overlay to
# image info is a text string to overlay onto the image.
# matching is a Boolean specifying if the image was a match.
# returns None
framecount = 0
def overlay_on_image(display_image, image_info, matching):
   global framecount
   rect_width = 10
   offset = int(rect_width/2)
   if (image_info != None):
       cv2.putText(display_image, image_info, (30, 30), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 1)
   if (matching):
       framecount += 1
           # match, green rectangle
       cv2.rectangle(display_image, (0+offset, 0+offset),
                     (display_image.shape[1]-offset-1, display_image.shape[0]-offset-1),
                     (0, 255, 0), 10)
       if framecount >= 20:
           url = "http://192.168.21.210"
           foobar ="ON"
           try:
               requests.post(url, data="ON")
           except:
               print('success')
           framecount=0
   else:
       # not a match, red rectangle
       framecount = 0
       cv2.rectangle(display_image, (0+offset, 0+offset),
                     (display_image.shape[1]-offset-1, display_image.shape[0]-offset-1),
                     (0, 0, 255), 10)
# whiten an image
def whiten_image(source_image):
   source_mean = numpy.mean(source_image)
   source_standard_deviation = numpy.std(source_image)
   std_adjusted = numpy.maximum(source_standard_deviation, 1.0 / numpy.sqrt(source_image.size))
   whitened_image = numpy.multiply(numpy.subtract(source_image, source_mean), 1 / std_adjusted)
   return whitened_image
# create a preprocessed image from the source image that matches the
# network expectations and return it
def preprocess_image(src):
   # scale the image
   NETWORK_WIDTH = 160
   NETWORK_HEIGHT = 160
   preprocessed_image = cv2.resize(src, (NETWORK_WIDTH, NETWORK_HEIGHT))
   #convert to RGB
   preprocessed_image = cv2.cvtColor(preprocessed_image, cv2.COLOR_BGR2RGB)
   #whiten
   preprocessed_image = whiten_image(preprocessed_image)
   # return the preprocessed image
   return preprocessed_image
# determine if two images are of matching faces based on the
# the network output for both images.
def face_match(face1_output, face2_output):
   if (len(face1_output) != len(face2_output)):
       print('length mismatch in face_match')
       return False
   total_diff = 0
   for output_index in range(0, len(face1_output)):
       this_diff = numpy.square(face1_output[output_index] - face2_output[output_index])
       total_diff += this_diff
   print('Total Difference is: ' + str(total_diff))
   if (total_diff < FACE_MATCH_THRESHOLD):
       # the total difference between the two is under the threshold so
       # the faces match.
       return True
   # differences between faces was over the threshold above so
   # they didn't match.
   return False
# handles key presses
# raw_key is the return value from cv2.waitkey
# returns False if program should end, or True if should continue
def handle_keys(raw_key):
   ascii_code = raw_key & 0xFF
   if ((ascii_code == ord('q')) or (ascii_code == ord('Q'))):
       return False
   return True
# start the opencv webcam streaming and pass each frame
# from the camera to the facenet network for an inference
# Continue looping until the result of the camera frame inference
# matches the valid face output and then return.
# valid_output is inference result for the valid image
# validated image filename is the name of the valid image file
# graph is the ncsdk Graph object initialized with the facenet graph file
#   which we will run the inference on.
# returns None
def run_camera(valid_output, validated_image_filename, graph):
   camera_device = cv2.VideoCapture(CAMERA_INDEX)
   camera_device.set(cv2.CAP_PROP_FRAME_WIDTH, REQUEST_CAMERA_WIDTH)
   camera_device.set(cv2.CAP_PROP_FRAME_HEIGHT, REQUEST_CAMERA_HEIGHT)
   actual_camera_width = camera_device.get(cv2.CAP_PROP_FRAME_WIDTH)
   actual_camera_height = camera_device.get(cv2.CAP_PROP_FRAME_HEIGHT)
   print ('actual camera resolution: ' + str(actual_camera_width) + ' x ' + str(actual_camera_height))
   if ((camera_device == None) or (not camera_device.isOpened())):
       print ('Could not open camera.  Make sure it is plugged in.')
       print ('Also, if you installed python opencv via pip or pip3 you')
       print ('need to uninstall it and install from source with -D WITH_V4L=ON')
       print ('Use the provided script: install-opencv-from_source.sh')
       return
   frame_count = 0
   cv2.namedWindow(CV_WINDOW_NAME)
   found_match = False
   while True :
       # Read image from camera,
       ret_val, vid_image = camera_device.read()
       if (not ret_val):
           print("No image from camera, exiting")
           break
       frame_count += 1
       frame_name = 'camera frame ' + str(frame_count)
       # run a single inference on the image and overwrite the
       # boxes and labels
       test_output = run_inference(vid_image, graph)
       if (face_match(valid_output, test_output)):
           print('PASS!  File ' + frame_name + ' matches ' + validated_image_filename)
           found_match = True
       else:
           found_match = False
           print('FAIL!  File ' + frame_name + ' does not match ' + validated_image_filename)
       overlay_on_image(vid_image, frame_name, found_match)
       # check if the window is visible, this means the user hasn't closed
       # the window via the X button
       prop_val = cv2.getWindowProperty(CV_WINDOW_NAME, cv2.WND_PROP_ASPECT_RATIO)
       if (prop_val < 0.0):
           print('window closed')
           break
       # display the results and wait for user to hit a key
       cv2.imshow(CV_WINDOW_NAME, vid_image)
       raw_key = cv2.waitKey(1)
       if (raw_key != -1):
           if (handle_keys(raw_key) == False):
               print('user pressed Q')
               break
   if (found_match):
       cv2.imshow(CV_WINDOW_NAME, vid_image)
       cv2.waitKey(0)
# This function is called from the entry point to do
# all the work of the program
def main():
   # Get a list of ALL the sticks that are plugged in
   # we need at least one
   devices = mvnc.EnumerateDevices()
   if len(devices) == 0:
       print('No NCS devices found')
       quit()
   # Pick the first stick to run the network
   device = mvnc.Device(devices[0])
   # Open the NCS
   device.OpenDevice()
   # The graph file that was created with the ncsdk compiler
   graph_file_name = GRAPH_FILENAME
   # read in the graph file to memory buffer
   with open(graph_file_name, mode='rb') as f:
       graph_in_memory = f.read()
   # create the NCAPI graph instance from the memory buffer containing the graph file.
   graph = device.AllocateGraph(graph_in_memory)
   validated_image = cv2.imread(validated_image_filename)
   valid_output = run_inference(validated_image, graph)
   run_camera(valid_output, validated_image_filename, graph)
   # Clean up the graph and the device
   graph.DeallocateGraph()
   device.CloseDevice()
# main entry point for program. we'll call main() to do what needs to be done.
if __name__ == "__main__":
   sys.exit(main())

Netduino C# Code

C#
C# Code for Netduino to operate the lock
using Microsoft.SPOT.Hardware;
using Microsoft.SPOT.Net.NetworkInformation;
using SecretLabs.NETMF.Hardware;
using SecretLabs.NETMF.Hardware.Netduino;

namespace Lock
{
    public class Program
    {
        // Main method is called once as opposed to a loop method called over and over again
        public static void Main()
        {
            Thread.Sleep(5000);
            App app = new App();
            app.Run();

            OutputPort led = new OutputPort(Pins.ONBOARD_LED, false);
            OutputPort relay = new OutputPort(Pins.GPIO_PIN_D5, false);

            int port = 80;

            Socket listenerSocket = new Socket(AddressFamily.InterNetwork,
                                   SocketType.Stream,
                                   ProtocolType.Tcp);
            IPEndPoint listenerEndPoint = new IPEndPoint(IPAddress.Any, port);

            Debug.Print("setting up socket");

            // bind to the listening socket
            listenerSocket.Bind(listenerEndPoint);
            // and start listening for incoming connections
            listenerSocket.Listen(1);
            Debug.Print("listening");


            while (true)
            {
                Debug.Print(".");

                // wait for a client to connect
                Socket clientSocket = listenerSocket.Accept();
                // wait for data to arrive
                bool dataReady = clientSocket.Poll(5000000, SelectMode.SelectRead);


                // if dataReady is true and there are bytes available to read,
                // then you have a good connection.
                if (dataReady && clientSocket.Available > 0)
                {
                    byte[] buffer = new byte[clientSocket.Available];
                    clientSocket.Receive(buffer);
                    string request =
                      new string(System.Text.Encoding.UTF8.GetChars(buffer));

                    Debug.Print(request);

                    if (request.IndexOf("ON") >= 0)
                    {
                        led.Write(true);
                        relay.Write(true);
                        Thread.Sleep(5000);
                        led.Write(false);
                        relay.Write(false);
                    }


                    string statusText = "Lock is " + (led.Read() ? "ON" : "OFF") + ".";
                    // return a message to the client letting it
                    // know if the LED is now on or off.
                    string response = statusText ;
                    clientSocket.Send(System.Text.Encoding.UTF8.GetBytes(response));

                }
                // important: close the client socket
                clientSocket.Close();

            }

        }
    }

    public class App
    {
        NetworkInterface[] _interfaces;

        public string NetDuinoIPAddress { get; set; }
        public bool IsRunning { get; set; }

        public void Run()
        {
            //this.IsRunning = true;
            bool goodToGo = InitializeNetwork();

            this.IsRunning = false;
        }


        protected bool InitializeNetwork()
        {
            if (Microsoft.SPOT.Hardware.SystemInfo.SystemID.SKU == 3)
            {
                Debug.Print("Wireless tests run only on Device");
                return false;
            }

            Debug.Print("Getting all the network interfaces.");
            _interfaces = NetworkInterface.GetAllNetworkInterfaces();

            // debug output
            ListNetworkInterfaces();

            // loop through each network interface
            foreach (var net in _interfaces)
            {
                // debug out
                ListNetworkInfo(net);

                switch (net.NetworkInterfaceType)
                {
                    case (NetworkInterfaceType.Ethernet):
                        Debug.Print("Found Ethernet Interface");
                        break;
                    case (NetworkInterfaceType.Wireless80211):
                        Debug.Print("Found 802.11 WiFi Interface");
                        break;
                    case (NetworkInterfaceType.Unknown):
                        Debug.Print("Found Unknown Interface");
                        break;
                }

                // check for an IP address, try to get one if it's empty
                return CheckIPAddress(net);
            }

            // if we got here, should be false.
            return false;
        }

        public void MakeWebRequest(string url)
        {
            var httpWebRequest = (HttpWebRequest)WebRequest.Create(url);
            httpWebRequest.Method = "GET";
            httpWebRequest.Timeout = 1000;
            httpWebRequest.KeepAlive = false;

            httpWebRequest.GetResponse();
            /*
            using (var streamReader = new StreamReader(httpResponse.GetResponseStream()))
            {
                var result = streamReader.ReadToEnd();
                Debug.Print("this is what we got from " + url + ": " + result);
            }*/
        }


        protected bool CheckIPAddress(NetworkInterface net)
        {
            int timeout = 10000; // timeout, in milliseconds to wait for an IP. 10,000 = 10 seconds

            // check to see if the IP address is empty (0.0.0.0). IPAddress.Any is 0.0.0.0.
            if (net.IPAddress == IPAddress.Any.ToString())
            {
                Debug.Print("No IP Address");

                if (net.IsDhcpEnabled)
                {
                    Debug.Print("DHCP is enabled, attempting to get an IP Address");

                    // ask for an IP address from DHCP [note this is a static, not sure which network interface it would act on]
                    int sleepInterval = 10;
                    int maxIntervalCount = timeout / sleepInterval;
                    int count = 0;
                    while (IPAddress.GetDefaultLocalAddress() == IPAddress.Any && count < maxIntervalCount)
                    {
                        Debug.Print("Sleep while obtaining an IP");
                        Thread.Sleep(10);
                        count++;
                    };

                    // if we got here, we either timed out or got an address, so let's find out.
                    if (net.IPAddress == IPAddress.Any.ToString())
                    {
                        Debug.Print("Failed to get an IP Address in the alotted time.");
                        return false;
                    }

                    Debug.Print("Got IP Address: " + net.IPAddress.ToString());
                    return true;

                    //NOTE: this does not work, even though it's on the actual network device. [shrug]
                    // try to renew the DHCP lease and get a new IP Address
                    //net.RenewDhcpLease ();
                    //while (net.IPAddress == "0.0.0.0") {
                    //    Thread.Sleep (10);
                    //}

                }
                else
                {
                    Debug.Print("DHCP is not enabled, and no IP address is configured, bailing out.");
                    return false;
                }
            }
            else
            {
                Debug.Print("Already had IP Address: " + net.IPAddress.ToString());
                return true;
            }

        }

        protected void ListNetworkInterfaces()
        {
            foreach (var net in _interfaces)
            {
                switch (net.NetworkInterfaceType)
                {
                    case (NetworkInterfaceType.Ethernet):
                        Debug.Print("Found Ethernet Interface");
                        break;
                    case (NetworkInterfaceType.Wireless80211):
                        Debug.Print("Found 802.11 WiFi Interface");
                        break;
                    case (NetworkInterfaceType.Unknown):
                        Debug.Print("Found Unknown Interface");
                        break;
                }
            }
        }

        protected void ListNetworkInfo(NetworkInterface net)
        {
            Debug.Print("MAC Address: " + BytesToHexString(net.PhysicalAddress));
            Debug.Print("DHCP enabled: " + net.IsDhcpEnabled.ToString());
            Debug.Print("Dynamic DNS enabled: " + net.IsDynamicDnsEnabled.ToString());
            Debug.Print("IP Address: " + net.IPAddress.ToString());
            Debug.Print("Subnet Mask: " + net.SubnetMask.ToString());
            Debug.Print("Gateway: " + net.GatewayAddress.ToString());

            if (net is Wireless80211)
            {
                var wifi = net as Wireless80211;
                Debug.Print("SSID:" + wifi.Ssid.ToString());
            }

            this.NetDuinoIPAddress = net.IPAddress.ToString();

        }

        private static string BytesToHexString(byte[] bytes)
        {
            string hexString = string.Empty;

            // Create a character array for hexadecimal conversion.
            const string hexChars = "0123456789ABCDEF";

            // Loop through the bytes.
            for (byte b = 0; b < bytes.Length; b++)
            {
                if (b > 0)
                    hexString += "-";

                // Grab the top 4 bits and append the hex equivalent to the return string.        
                hexString += hexChars[bytes[b] >> 4];

                // Mask off the upper 4 bits to get the rest of it.
                hexString += hexChars[bytes[b] & 0x0F];
            }

            return hexString;
        }

        public string getIPAddress()
        {
            return this.NetDuinoIPAddress;
        }
    }
}

Netduino Lock Repo

Netduino Lock Repo

Credits

Peter Ma

Peter Ma

22 projects • 98 followers
Prototype Hacker, Intel Software Innovator, Hackathon Goer, World Traveler, Ecological balancer, integrationist, technologist, futurist.

Comments