Piero Di Vita
Published © GPL3+

Abraboxabra

Open your garage door with your smartphone or smartwatch!

IntermediateFull instructions provided2,098
Abraboxabra

Things used in this project

Hardware components

Android device
Android device
×1
Raspberry Pi 2 Model B
Raspberry Pi 2 Model B
×1
SunFounder 2 Channel 5V Relay Shield
×1
Female to female breadboard jumper cable wires
×1
Bluetooth dongle compatible with Pi
×1
Remote control garage door opener
×1

Story

Read more

Schematics

Raspberry connections

Code

simple-agent.py

Python
#!/usr/bin/python

from __future__ import absolute_import, print_function, unicode_literals

from optparse import OptionParser
import sys
import dbus
import dbus.service
import dbus.mainloop.glib
try:
  from gi.repository import GObject
except ImportError:
  import gobject as GObject
#import bluezutils
from bluetooth import *

BUS_NAME = 'org.bluez'
AGENT_INTERFACE = 'org.bluez.Agent1'
AGENT_PATH = "/test/agent"

bus = None
device_obj = None
dev_path = None

def ask(prompt):
        try:
                return raw_input(prompt)
        except:
                return input(prompt)

def set_trusted(path):
        props = dbus.Interface(bus.get_object("org.bluez", path),
                                        "org.freedesktop.DBus.Properties")
        props.Set("org.bluez.Device1", "Trusted", True)

def dev_connect(path):
        dev = dbus.Interface(bus.get_object("org.bluez", path),
                                                        "org.bluez.Device1")
        dev.Connect()

class Rejected(dbus.DBusException):
        _dbus_error_name = "org.bluez.Error.Rejected"

class Agent(dbus.service.Object):
        exit_on_release = True

        def set_exit_on_release(self, exit_on_release):
                self.exit_on_release = exit_on_release

        @dbus.service.method(AGENT_INTERFACE,
                                        in_signature="", out_signature="")
        def Release(self):
                print("Release")
                if self.exit_on_release:
                        mainloop.quit()

        @dbus.service.method(AGENT_INTERFACE,
                                        in_signature="os", out_signature="")
        def AuthorizeService(self, device, uuid):
                print("AuthorizeService (%s, %s)" % (device, uuid))
                authorize = ask("Authorize connection (yes/no): ")
                if (authorize == "yes"):
                        return
                raise Rejected("Connection rejected by user")

        @dbus.service.method(AGENT_INTERFACE,
                                        in_signature="o", out_signature="s")
        def RequestPinCode(self, device):
                print("RequestPinCode (%s)" % (device))
                set_trusted(device)
                return ask("Enter PIN Code: ")

        @dbus.service.method(AGENT_INTERFACE,
                                        in_signature="o", out_signature="u")
        def RequestPasskey(self, device):
                print("RequestPasskey (%s)" % (device))
                set_trusted(device)
                passkey = ask("Enter passkey: ")
                return dbus.UInt32(passkey)

        @dbus.service.method(AGENT_INTERFACE,
                                        in_signature="ouq", out_signature="")
        def DisplayPasskey(self, device, passkey, entered):
                print("DisplayPasskey (%s, %06u entered %u)" %
                                                (device, passkey, entered))

        @dbus.service.method(AGENT_INTERFACE,
                                        in_signature="os", out_signature="")
        def DisplayPinCode(self, device, pincode):
                print("DisplayPinCode (%s, %s)" % (device, pincode))

        @dbus.service.method(AGENT_INTERFACE,
                                        in_signature="ou", out_signature="")
        def RequestConfirmation(self, device, passkey):
                print("RequestConfirmation (%s, %06d)" % (device, passkey))
                confirm = ask("Confirm passkey (yes/no): ")
                if (confirm == "yes"):
                        set_trusted(device)
                        return
                raise Rejected("Passkey doesn't match")

        @dbus.service.method(AGENT_INTERFACE,
                                        in_signature="o", out_signature="")
        def RequestAuthorization(self, device):
                print("RequestAuthorization (%s)" % (device))
                auth = ask("Authorize? (yes/no): ")
                if (auth == "yes"):
                        return
                raise Rejected("Pairing rejected")

        @dbus.service.method(AGENT_INTERFACE,
                                        in_signature="", out_signature="")
        def Cancel(self):
                print("Cancel")

def pair_reply():
        print("Device paired")
        set_trusted(dev_path)
        dev_connect(dev_path)
        mainloop.quit()

def pair_error(error):
        err_name = error.get_dbus_name()
        if err_name == "org.freedesktop.DBus.Error.NoReply" and device_obj:
                print("Timed out. Cancelling pairing")
                device_obj.CancelPairing()
        else:
                print("Creating device failed: %s" % (error))


        mainloop.quit()

if __name__ == '__main__':
        dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)

        bus = dbus.SystemBus()

        capability = "KeyboardDisplay"

        parser = OptionParser()
        parser.add_option("-i", "--adapter", action="store",
                                        type="string",
                                        dest="adapter_pattern",
                                        default=None)
        parser.add_option("-c", "--capability", action="store",
                                        type="string", dest="capability")
        parser.add_option("-t", "--timeout", action="store",
                                        type="int", dest="timeout",
                                        default=60000)
        (options, args) = parser.parse_args()
        if options.capability:
                capability  = options.capability

        path = "/test/agent"
        agent = Agent(bus, path)

        mainloop = GObject.MainLoop()

        obj = bus.get_object(BUS_NAME, "/org/bluez");
        manager = dbus.Interface(obj, "org.bluez.AgentManager1")
        manager.RegisterAgent(path, capability)

        print("Agent registered")

        # Fix-up old style invocation (BlueZ 4)
        if len(args) > 0 and args[0].startswith("hci"):
                options.adapter_pattern = args[0]
                del args[:1]

        if len(args) > 0:
                device = bluezutils.find_device(args[0],
                                                options.adapter_pattern)
                dev_path = device.object_path
                agent.set_exit_on_release(False)
                device.Pair(reply_handler=pair_reply, error_handler=pair_error,
                                                                timeout=60000)
                device_obj = device
        else:
                manager.RequestDefaultAgent(path)

        mainloop.run()

        #adapter.UnregisterAgent(path)
        #print("Agent unregistered")

abraboxabra.py

Python
# Abraboxabra 1.0
# Author: Pietro Di Vita(scognito@gmail.com)

import time
import RPi.GPIO as GPIO
import os
from bluetooth import *
from thread import *

GPIO.setmode(GPIO.BOARD)

#Commented as GPIO.setup close the circuit. Will use a workaround
#GPIO.setup(7, GPIO.OUT)

#workaround
first_time = True

server_sock=BluetoothSocket( RFCOMM )
server_sock.bind(("",PORT_ANY))
server_sock.listen(10)

port = server_sock.getsockname()[1]

uuid = "94f39d29-7d6d-437d-973b-fba39e49d4ee"

advertise_service( server_sock, "Abraboxabra server",
                   service_id = uuid,
                   service_classes = [ uuid, SERIAL_PORT_CLASS ],
                   profiles = [ SERIAL_PORT_PROFILE ], 
#                   protocols = [ OBEX_UUID ] 
                    )

#GPIO.output(7,True)

print "Waiting for connection on RFCOMM channel %d" % port

#Function for handling connections. This will be used to create threads
def clientthread(conn):
     
    global first_time
    #infinite loop so that function do not terminate and thread do not end.
    while True:
        try: 
		#Receiving from client
		data = conn.recv(1024)
		print "received command [%s]" % data

		client_sock.send('received message ' + data);
		if data == 'openclose':
			if first_time:
				GPIO.setup(7, GPIO.OUT)
				first_time = False

			GPIO.output(7, False)
			time.sleep(1.2)
			GPIO.output(7,True)
			client_sock.send("OK")
		elif data == 'shutdown':
			os.system('sudo shutdown now -h')
	except IOError:
		print "Disconnected from ", client_info
		pass
     
    #came out of loop
    conn.close()

#
# MAIN
#
while True:
	client_sock, client_info = server_sock.accept()
	print "Accepted Bluetooth connection from ", client_info

	start_new_thread(clientthread ,(client_sock,))

print "disconnected"

time.sleep(1)
GPIO.output(7,True)

server_sock.close()
print "all done"

Github

https://github.com/scognito/Abraboxabra/blob/master/utils/simple-agent.py

Credits

Piero Di Vita

Piero Di Vita

1 project • 1 follower

Comments