ming huang
Published © GPL3+

Hologram BTLE-Cellular Gateway for BTLE Beacons

Python sample code shows how to implement a BTLE-advertisement-to-cellular-data gateway (1-way) based on Pi ZeroW and Hologram Nova.

IntermediateFull instructions provided8 hours838
Hologram BTLE-Cellular Gateway for BTLE Beacons

Story

Read more

Custom parts and enclosures

no CAD or custom parts needed

, Raspberry Pi Zero W and Hologram Nova modem are all purchased.
Hologram Nova modem, data SIM card for IoT and data plan are from Hologram https://hologram.io/

Schematics

Hologram BTLE-to-cellular gateway

Connect Hologram Nova cellular to Raspberry Pi Zero W on one micro-usb using an adaptor, then connect the 2nd micro-usb of Pi to 5V USB power supply. The 5V usb will supply for both Pi Zero W and Hologram Nova. They are all off-the-shelf components, there is nothing to build or solder by yourself.

Code

holo_gw.py - Hologram BTLE-to-cellular gateway

Python
This single-file sample code shows how to implement a BTLE advertisement to cellular data gateway (1-way). File name is holo_gw.py.
Raspberry Pi zero W is used to capture BTLE advertisements from surrounding devices.
Hologram Nova plugged into Pi relays the advertisement data to Hologram Cloud.
The Relay software runs on Rapberry Pi on Linux Raspbian. It scans for BTLE advertisement received by PI's built-in BTLE then uses Hologram SDK API to send it to Hologram Cloud, via the Holo Nova modem.

This sample code relays only the following selected fields of BTLE adv to Holo Cloud:
- Name
- Device address
- Manufacturer Data break down to a 16-bit unique manufacturer ID followed by manufacturer data. Note that from BTLE standard's view, manu ID + data form a single field in the advertisement.
To customize your filter, you may just modify the relay() function in holo_gw.py.

Usage:
=====
1) install Hologram SDK
2) copy the file holo_gw.py to Raspberry Pi
3) type in Raspberry Pi console where it is copied to: ./holo_gw.py

Note: Even though I installed Bluez SDK on Raspberry Pi during the dev, I don't think Bluez SDK is necessary to run holo_gw.py. That is why I have not put it in the Usage.

Trace captured on Rasperry Pi console:
("/home/pi/holo" is where I put holo_gw.py)
====================================
pi@raspberrypi:~/holo $ ./holo_gw.py
[NEW] Controller B8:27:EB:DF:DD:12 raspberrypi [default]
Agent registered
[bluetooth]# power on
Agent unregistered
[DEL] Controller B8:27:EB:DF:DD:12 raspberrypi [default]
interfaces_added
[ 60:6B:BD:0F:C1:15 ]
AddressType = public
Name = DTVBluetooth
Paired = 0
ServicesResolved = 0
Adapter = /org/bluez/hci0
LegacyPairing = 0
TxPower = 0
Alias = DTVBluetooth
ManufacturerData = dbus.Dictionary({dbus.UInt16(15): dbus.Array([dbus.Byte(0), dbus.Byte(9), dbus.Byte(72)], signature=dbus.Signature('y'), variant_level=1)}, signature=dbus.Signature('qv'), variant_level=1)
Connected = 0
UUIDs = dbus.Array([dbus.String(u'0000110a-0000-1000-8000-00805f9b34fb'), dbus.String(u'00001200-0000-1000-8000-00805f9b34fb')], signature=dbus.Signature('s'), variant_level=1)
Address = 60:6B:BD:0F:C1:15
RSSI = -57
Blocked = 0
Class = 0x08043c
Trusted = 0
Icon = audio-card
**************
send to Holo Cloud: sudo hologram send "DTVBluetooth[60:6B:BD:0F:C1:15]:manu_id=15 data=0x000948"
*************
RESPONSE MESSAGE: Message sent successfully

the received data seen in Hologram Dashboard:
====================================
Message sent from Device #158411: DTVBluetooth[60:6B:BD:0F:C1:15]:manu_id=15 data=0x000948
View Raw:
success true
data
id 4611179
logged "2018-01-06 01:25:53.911534"
orgid 10965
deviceid 158411
record_id "895ec6f4-f280-11e7-bb6d-bc764e206eb6"
device_metadata "{\"m_version\": 1, \"dev_fw_string\": \"nova_u201-0.7.2\"}"
data "{\"received\": \"2018-01-06T01:25:53.251328\", \"authtype\": \"otp\", \"tags\": [\"_SOCKETAPI_\", \"_DEVICE_158411_\", \"_SIMPLESTRING_\"], \"timestamp\": \"1515201949\", \"device_name\": \"Unnamed Device (86191)\", \"errorcode\": 0, \"source\": \"8944502407175486191\", \"record_id\": \"895ec6f4-f280-11e7-bb6d-bc764e206eb6\", \"data\": \"RFRWQmx1ZXRvb3RoWzYwOjZCOkJEOjBGOkMxOjE1XTptYW51X2lkPTE1IGRhdGE9MHgwMDA5NDg=\", \"device_id\": 158411}"
matched_rules []
tags
0 "_SIMPLESTRING_"
1 "_DEVICE_158411_"
2 "_SOCKETAPI_"


For developer's convenience, iBeacon BTLE adversement format is:
==============================================================
static beaconAdvData_t beaconAdv =
{
0x02, // length of this data
0x01, // GAP_ADTYPE_FLAGS
0x06, // GAP_ADTYPE_FLAGS_BREDR_NOT_SUPPORTED | GAP_ADTYPE_FLAGS_GENERAL
0x1A, // length of manufacture specific data excluding this length
0xFF, // GAP_ADTYPE_MANUFACTURER_SPECIFIC
0x4C, // company ID[0]
0x00, // company ID[1]
0x02, // beacon type[0]
0x15, // beacon type[1]
0xA3, // UUID LSB
0x22,
0x37,
0xE7,
0x3E,
0xC0,
0xC5,
0x84,
0x86,
0x4B,
0xB9,
0x99,
0xF9,
0x82,
0x03,
0xF7, // UUID MSB
0x00, //major[0] (user data[0])
0x00, //major[1] (user data[1])
0x00, //minor[0] (user data[2])
0x00, //minor[1] (user data[3])
0x00 //measured power (can be set later)
};
#!/usr/bin/python

from __future__ import absolute_import, print_function, unicode_literals
import dbus
import dbus.mainloop.glib
try:
  from gi.repository import GObject
except ImportError:
  import gobject as GObject
import os

def relay(address, properties):
    name = "unknown"
    manu = ""
    for key in properties.keys():
        value = properties[key]
        if type(value) is dbus.String:
            value = unicode(value).encode('ascii', 'replace')
        if (key == "Name"):
            name = value
        else:
            if (key == "ManufacturerData"):
                manu_id = int(list(value.keys())[0])
                manu = ''.join( [ "%02X" % x for x in list(value.values())[0] ] )              
    if (manu):
        s = "%s[%s]:manu_id=%d data=0x%s" % (name, address, manu_id, manu)
        s = 'sudo hologram send "' + s + '"'
        print("**************\n\t send to Holo Cloud: %s \n*************" % s)
        os.system (s)
    else:
        print("discard adv for absence of manu data")

def print_normal(address, properties):
	print("[ " + address + " ]")

	for key in properties.keys():
		value = properties[key]
		if type(value) is dbus.String:
			value = unicode(value).encode('ascii', 'replace')
		if (key == "Class"):
			print("    %s = 0x%06x" % (key, value))
		else:
			print("    %s = %s" % (key, value))

    
# BTLE user callback
def interfaces_added(path, interfaces):
    #print("interfaces_added path:%s, interfaces:%s" % (path, interfaces))
    properties = interfaces["org.bluez.Device1"]
    if not properties:
        return	
    if path in devices:
        devices[path] = dict(devices[path].items() + properties.items())
    else:
        devices[path] = properties
	
    if "Address" in devices[path]:
        address = properties["Address"]
    else:
        address = "<unknown>"	
    print("interfaces_added")    
    print_normal(address, devices[path])
    relay(address, devices[path])
        
        
def properties_changed(interface, changed, invalidated, path):
    #print("properties_changed \n\tinterface:%s\n\tchanged:%s\n\tpath:%s" % (interfaces,changed,path))
    if interface != "org.bluez.Device1":
        print("properties_changed != org.bluez.Device1")
        return
    if path in devices:
        devices[path] = dict(devices[path].items() + changed.items())
    else:
        devices[path] = changed
    if "Address" in devices[path]:
        address = devices[path]["Address"]
    else:
        address = "<unknown>"
    print("properties_changed") 
    print_normal(address, devices[path])
    relay(address, devices[path])
    
        
def find_adapter(objects):
    SERVICE_NAME = "org.bluez"
    ADAPTER_INTERFACE = SERVICE_NAME + ".Adapter1"
    bus = dbus.SystemBus()
    for path, ifaces in objects.iteritems():
        adapter = ifaces.get(ADAPTER_INTERFACE)
        if adapter is None:
            continue
        obj = bus.get_object(SERVICE_NAME, path)
        #print("find_adapter SERVICE_NAME:%s, path:%s" % (SERVICE_NAME,path))
        #   SERVICE_NAME:org.bluez, path:/org/bluez/hci0
        return dbus.Interface(obj, ADAPTER_INTERFACE)
    raise Exception("Bluetooth adapter not found")
    
# mainloop  
devices = {}
if __name__ == '__main__':
    os.system('echo "power on"|sudo bluetoothctl')
    
    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
    bus = dbus.SystemBus()
    bus.add_signal_receiver(interfaces_added,
        dbus_interface = "org.freedesktop.DBus.ObjectManager",
        signal_name = "InterfacesAdded")

    bus.add_signal_receiver(properties_changed,
        dbus_interface = "org.freedesktop.DBus.Properties",
        signal_name = "PropertiesChanged",
        arg0 = "org.bluez.Device1",
        path_keyword = "path")

    om = dbus.Interface(bus.get_object("org.bluez", "/"),
				"org.freedesktop.DBus.ObjectManager")
    objects = om.GetManagedObjects()
    for path, interfaces in objects.iteritems():
        if "org.bluez.Device1" in interfaces:
            devices[path] = interfaces["org.bluez.Device1"]

    # capture every advertisement
    adapter = find_adapter(objects)
    adapter.SetDiscoveryFilter({})
    adapter.StartDiscovery()

    # wait to be called back on each advertisement
    mainloop = GObject.MainLoop()
    mainloop.run()  

holo_gw.py - Hologram BTLE-to-cellular gateway

Python
This python sample code shows how to implement a BTLE advertisement to cellular data gateway (1-way). File name is holo_gw.py.
Raspberry Pi zero W is used to capture BTLE advertisements from surrounding devices.
Hologram Nova plugged into Pi relays the advertisement data to Hologram Cloud.
The Relay software runs on Rapberry Pi on Linux Raspbian. It scans for BTLE advertisement received by PI's built-in BTLE then uses Hologram SDK API to send it to Hologram Cloud, via the Holo Nova modem.

This sample code relays only the following selected fields of BTLE adv to Holo Cloud:
- Name
- Device address
- Manufacturer Data break down to a 16-bit unique manufacturer ID followed by manufacturer data. Note that from BTLE standard's view, manu ID + data form a single field in the advertisement.
To customize your filter, you may just modify the relay() function in holo_gw.py.

Usage:
=====
1) install Hologram SDK
2) copy the file holo_gw.py to Raspberry Pi
3) type in Raspberry Pi console where it is copied to: ./holo_gw.py

Note: Even though I installed Bluez SDK on Raspberry Pi during the dev, I don't think Bluez SDK is necessary to run holo_gw.py. That is why I have not put it in the Usage.

Trace captured on Rasperry Pi console:
("/home/pi/holo" is where I put holo_gw.py)
====================================
pi@raspberrypi:~/holo $ ./holo_gw.py
[NEW] Controller B8:27:EB:DF:DD:12 raspberrypi [default]
Agent registered
[bluetooth]# power on
Agent unregistered
[DEL] Controller B8:27:EB:DF:DD:12 raspberrypi [default]
interfaces_added
[ 60:6B:BD:0F:C1:15 ]
AddressType = public
Name = DTVBluetooth
Paired = 0
ServicesResolved = 0
Adapter = /org/bluez/hci0
LegacyPairing = 0
TxPower = 0
Alias = DTVBluetooth
ManufacturerData = dbus.Dictionary({dbus.UInt16(15): dbus.Array([dbus.Byte(0), dbus.Byte(9), dbus.Byte(72)], signature=dbus.Signature('y'), variant_level=1)}, signature=dbus.Signature('qv'), variant_level=1)
Connected = 0
UUIDs = dbus.Array([dbus.String(u'0000110a-0000-1000-8000-00805f9b34fb'), dbus.String(u'00001200-0000-1000-8000-00805f9b34fb')], signature=dbus.Signature('s'), variant_level=1)
Address = 60:6B:BD:0F:C1:15
RSSI = -57
Blocked = 0
Class = 0x08043c
Trusted = 0
Icon = audio-card
**************
send to Holo Cloud: sudo hologram send "DTVBluetooth[60:6B:BD:0F:C1:15]:manu_id=15 data=0x000948"
*************
RESPONSE MESSAGE: Message sent successfully

the received data seen in Hologram Dashboard:
====================================
Message sent from Device #158411: DTVBluetooth[60:6B:BD:0F:C1:15]:manu_id=15 data=0x000948
View Raw:
success true
data
id 4611179
logged "2018-01-06 01:25:53.911534"
orgid 10965
deviceid 158411
record_id "895ec6f4-f280-11e7-bb6d-bc764e206eb6"
device_metadata "{\"m_version\": 1, \"dev_fw_string\": \"nova_u201-0.7.2\"}"
data "{\"received\": \"2018-01-06T01:25:53.251328\", \"authtype\": \"otp\", \"tags\": [\"_SOCKETAPI_\", \"_DEVICE_158411_\", \"_SIMPLESTRING_\"], \"timestamp\": \"1515201949\", \"device_name\": \"Unnamed Device (86191)\", \"errorcode\": 0, \"source\": \"8944502407175486191\", \"record_id\": \"895ec6f4-f280-11e7-bb6d-bc764e206eb6\", \"data\": \"RFRWQmx1ZXRvb3RoWzYwOjZCOkJEOjBGOkMxOjE1XTptYW51X2lkPTE1IGRhdGE9MHgwMDA5NDg=\", \"device_id\": 158411}"
matched_rules []
tags
0 "_SIMPLESTRING_"
1 "_DEVICE_158411_"
2 "_SOCKETAPI_"


For developer's convenience, iBeacon BTLE adversement format is:
==============================================================
static beaconAdvData_t beaconAdv =
{
0x02, // length of this data
0x01, // GAP_ADTYPE_FLAGS
0x06, // GAP_ADTYPE_FLAGS_BREDR_NOT_SUPPORTED | GAP_ADTYPE_FLAGS_GENERAL
0x1A, // length of manufacture specific data excluding this length
0xFF, // GAP_ADTYPE_MANUFACTURER_SPECIFIC
0x4C, // company ID[0]
0x00, // company ID[1]
0x02, // beacon type[0]
0x15, // beacon type[1]
0xA3, // UUID LSB
0x22,
0x37,
0xE7,
0x3E,
0xC0,
0xC5,
0x84,
0x86,
0x4B,
0xB9,
0x99,
0xF9,
0x82,
0x03,
0xF7, // UUID MSB
0x00, //major[0] (user data[0])
0x00, //major[1] (user data[1])
0x00, //minor[0] (user data[2])
0x00, //minor[1] (user data[3])
0x00 //measured power (can be set later)
};
#!/usr/bin/python

from __future__ import absolute_import, print_function, unicode_literals
import dbus
import dbus.mainloop.glib
try:
  from gi.repository import GObject
except ImportError:
  import gobject as GObject
import os

def relay(address, properties):
    name = "unknown"
    manu = ""
    for key in properties.keys():
        value = properties[key]
        if type(value) is dbus.String:
            value = unicode(value).encode('ascii', 'replace')
        if (key == "Name"):
            name = value
        else:
            if (key == "ManufacturerData"):
                manu_id = int(list(value.keys())[0])
                manu = ''.join( [ "%02X" % x for x in list(value.values())[0] ] )              
    if (manu):
        s = "%s[%s]:manu_id=%d data=0x%s" % (name, address, manu_id, manu)
        s = 'sudo hologram send "' + s + '"'
        print("**************\n\t send to Holo Cloud: %s \n*************" % s)
        os.system (s)
    else:
        print("discard adv for absence of manu data")

def print_normal(address, properties):
	print("[ " + address + " ]")

	for key in properties.keys():
		value = properties[key]
		if type(value) is dbus.String:
			value = unicode(value).encode('ascii', 'replace')
		if (key == "Class"):
			print("    %s = 0x%06x" % (key, value))
		else:
			print("    %s = %s" % (key, value))

    
# BTLE user callback
def interfaces_added(path, interfaces):
    #print("interfaces_added path:%s, interfaces:%s" % (path, interfaces))
    properties = interfaces["org.bluez.Device1"]
    if not properties:
        return	
    if path in devices:
        devices[path] = dict(devices[path].items() + properties.items())
    else:
        devices[path] = properties
	
    if "Address" in devices[path]:
        address = properties["Address"]
    else:
        address = "<unknown>"	
    print("interfaces_added")    
    print_normal(address, devices[path])
    relay(address, devices[path])
        
        
def properties_changed(interface, changed, invalidated, path):
    #print("properties_changed \n\tinterface:%s\n\tchanged:%s\n\tpath:%s" % (interfaces,changed,path))
    if interface != "org.bluez.Device1":
        print("properties_changed != org.bluez.Device1")
        return
    if path in devices:
        devices[path] = dict(devices[path].items() + changed.items())
    else:
        devices[path] = changed
    if "Address" in devices[path]:
        address = devices[path]["Address"]
    else:
        address = "<unknown>"
    print("properties_changed") 
    print_normal(address, devices[path])
    relay(address, devices[path])
    
        
def find_adapter(objects):
    SERVICE_NAME = "org.bluez"
    ADAPTER_INTERFACE = SERVICE_NAME + ".Adapter1"
    bus = dbus.SystemBus()
    for path, ifaces in objects.iteritems():
        adapter = ifaces.get(ADAPTER_INTERFACE)
        if adapter is None:
            continue
        obj = bus.get_object(SERVICE_NAME, path)
        #print("find_adapter SERVICE_NAME:%s, path:%s" % (SERVICE_NAME,path))
        #   SERVICE_NAME:org.bluez, path:/org/bluez/hci0
        return dbus.Interface(obj, ADAPTER_INTERFACE)
    raise Exception("Bluetooth adapter not found")
    
# mainloop  
devices = {}
if __name__ == '__main__':
    os.system('echo "power on"|sudo bluetoothctl')
    
    dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
    bus = dbus.SystemBus()
    bus.add_signal_receiver(interfaces_added,
        dbus_interface = "org.freedesktop.DBus.ObjectManager",
        signal_name = "InterfacesAdded")

    bus.add_signal_receiver(properties_changed,
        dbus_interface = "org.freedesktop.DBus.Properties",
        signal_name = "PropertiesChanged",
        arg0 = "org.bluez.Device1",
        path_keyword = "path")

    om = dbus.Interface(bus.get_object("org.bluez", "/"),
				"org.freedesktop.DBus.ObjectManager")
    objects = om.GetManagedObjects()
    for path, interfaces in objects.iteritems():
        if "org.bluez.Device1" in interfaces:
            devices[path] = interfaces["org.bluez.Device1"]

    # capture every advertisement
    adapter = find_adapter(objects)
    adapter.SetDiscoveryFilter({})
    adapter.StartDiscovery()

    # wait to be called back on each advertisement
    mainloop = GObject.MainLoop()
    mainloop.run()  

Credits

ming huang

ming huang

7 projects • 7 followers
IOT dev

Comments