WolkWriter
Published © GPL3+

Home Automation Using Z-Wave

Automate your home rapidly with Raspberry Pi, Z-Wave devices and WolkAbout IoT Platform.

IntermediateFull instructions provided1 hour3,478
Home Automation Using Z-Wave

Things used in this project

Hardware components

Raspberry Pi 3 Model B+
Raspberry Pi 3 Model B+
×1
Aeotec Z-Stick Gen5
×1
Aeotec LED Bulb 6
×1
Aeotec MultiSensor 6
×1

Software apps and online services

WolkAbout IoT Platform
WolkAbout IoT Platform

Story

Read more

Code

handlers.py

Python
This file is responsible for checking Z-Wave device status and setting their dynamic values
import time

import pyzwaver
import wolk_gateway_module as wolk

import zwave


def node_status_provider(device_key):
    """Ping network node to get current state."""
    nodes = zwave.controller.nodes.copy()
    nodes.remove(zwave.controller.GetNodeId())
    if device_key == "multi_sensor6":
        node_number = 3
    elif device_key == "LED_Bulb6":
        node_number = 4
    else:
        return wolk.DeviceStatus.OFFLINE

    zwave.translator.Ping(node_number, 5, False, "status_request")
    time.sleep(1.5)
    node = zwave.nodeset.GetNode(node_number)
    if node.IsInterviewed():
        if node in zwave.controller.failed_nodes:
            return wolk.DeviceStatus.OFFLINE
        return wolk.DeviceStatus.CONNECTED
    else:
        return wolk.DeviceStatus.OFFLINE


def handle_actuation(device_key, reference, value):
    """
    Set device actuator identified by reference to value.
    """
    if device_key == "LED_Bulb6":
        if reference == "D":
            node_number = 4
            node = zwave.nodeset.GetNode(node_number)
            node.BatchCommandSubmitFilteredFast(
                pyzwaver.command_helper.MultilevelSwitchSet(int(value))
            )


def get_actuator_status(device_key, reference):
    """
    Get current actuator status identified by device key and reference.
    """
    if device_key == "LED_Bulb6":
        if reference == "D":
            node_number = 4
            node = zwave.nodeset.GetNode(node_number)
            if (
                not node.IsInterviewed()
                or node in zwave.controller.failed_nodes
            ):
                return wolk.ActuatorState.ERROR, None
            node.RefreshDynamicValues()
            time.sleep(1)
            return wolk.ActuatorState.READY, node.values.GetMultilevelSwitchLevel()
    return wolk.ActuatorState.ERROR, None

z_wave_module.service

Plain text
Service that allows the Z-Wave module to run in the background and start on boot
[Unit]
 Description=Service for running Z-Wave gateway module
 After=multi-user.target

[Service]
 Type=idle
 WorkingDirectory=/home/pi/WolkGatewayModule-Z-Wave
 ExecStart=/usr/local/bin/python3.7 /home/pi/WolkGatewayModule-Z-Wave/demo.py
 Restart=always

[Install]
 WantedBy=multi-user.target

wolk_gateway.service

Plain text
Service that allows WolkGateway to run in the background and start on boot
[Unit]
 Description=Gateway for connecting non-IP enabled devices to WolkAbout IoT Platform
 After=multi-user.target

[Service]
 Type=idle
 WorkingDirectory=/home/<USER_NAME>/WolkGateway/out/
 ExecStart=/home/<USER_NAME>/WolkGateway/out/WolkGatewayApp gatewayConfiguration.json
 Restart=always

[Install]
 WantedBy=multi-user.target

demo.py

Python
Main part of the Z-Wave application that will periodically poll data from device and forward it WolkGateway which will then send it to WolkAbout IoT Platform
import json
import time

import wolk_gateway_module as wolk

import zwave
import devices
import handlers

# # uncomment to enable debug
wolk.logging_config("debug")


try:
    with open("configuration.json", encoding="utf-8") as file:
        configuration = json.load(file)
except Exception as e:
    print(f"Failed to open configuration file: {e}")
    quit()

wolk_module = wolk.Wolk(
    configuration["host"],
    configuration["port"],
    configuration["module_name"],
    handlers.node_status_provider,
    actuation_handler=handlers.handle_actuation,
    actuator_status_provider=handlers.get_actuator_status,
)
wolk_module.add_device(devices.LED_bulb_device)
wolk_module.add_device(devices.multi_sensor_device)

wolk_module.connect()
wolk_module.publish()

time.sleep(5)  # Allow connect method to ping nodes for their status

wolk_module.publish_actuator_status("LED_Bulb6", "D")

try:
    while True:
        for n in zwave.controller.nodes:
            node = zwave.nodeset.GetNode(n)
            if node.IsInterviewed():
                if node not in zwave.controller.failed_nodes:
                    node.RefreshDynamicValues()
            time.sleep(5)

        wolk_module.publish_actuator_status("LED_Bulb6", "D")

        node_number = 3
        node = zwave.nodeset.GetNode(node_number)

        if not node.IsInterviewed():
            wolk_module.publish_device_status(
                "multi_sensor6", wolk.DeviceStatus.OFFLINE
            )
            continue

        for value in node.values.Sensors():

            if value[2] == "%":
                wolk_module.add_sensor_reading(
                    "multi_sensor6",
                    "H",
                    value[3],
                    int(round(time.time() * 1000)),
                )
            elif value[2] == "C":
                wolk_module.add_sensor_reading(
                    "multi_sensor6",
                    "T",
                    value[3],
                    int(round(time.time() * 1000)),
                )
            elif value[2] == "lux":
                wolk_module.add_sensor_reading(
                    "multi_sensor6",
                    "IL",
                    value[3],
                    int(round(time.time() * 1000)),
                )
            elif value[2] == "":
                wolk_module.add_sensor_reading(
                    "multi_sensor6",
                    "UV",
                    value[3],
                    int(round(time.time() * 1000)),
                )

        wolk_module.publish()
        time.sleep(5)

except Exception as e:
    print(f"Exception occurred: {e}")
    zwave.driver.Terminate()
    wolk_module.disconnect()
    quit()

devices.py

Python
This file is creates device templates that are necessary for registering devices on WolkAbout IoT Platform
import wolk_gateway_module as wolk


# Aeotec Smart Dimmer 6

power_sensor = wolk.SensorTemplate(
    name="Power",
    reference="P",
    reading_type_name=wolk.ReadingTypeName.POWER,
    unit=wolk.ReadingTypeMeasurementUnit.WATT,
)

voltage_sensor = wolk.SensorTemplate(
    name="Voltage",
    reference="V",
    reading_type_name=wolk.ReadingTypeName.BATTERY_VOLTAGE,
    unit=wolk.ReadingTypeMeasurementUnit.VOLT,
)

current_sensor = wolk.SensorTemplate(
    name="Electric Current",
    reference="I",
    reading_type_name=wolk.ReadingTypeName.ELECTRIC_CURRENT,
    unit=wolk.ReadingTypeMeasurementUnit.AMPERE,
    minimum=0,
    maximum=100,
)

power_consumption_sensor = wolk.SensorTemplate(
    name="Energy consumption",
    reference="EC",
    reading_type_name=wolk.ReadingTypeName.BATTERY_POWER,
    unit=wolk.ReadingTypeMeasurementUnit.BATTERY_X1000,
    minimum=0,
    maximum=100,
)

dimmer_actuator = wolk.ActuatorTemplate(
    name="Dimmer",
    reference="D",
    data_type=wolk.DataType.NUMERIC,
    minimum=0,
    maximum=100,
)

smart_dimmer_template = wolk.DeviceTemplate(
    sensors=[
        power_sensor,
        voltage_sensor,
        current_sensor,
        power_consumption_sensor,
    ],
    actuators=[dimmer_actuator],
)

smart_dimmer_device = wolk.Device(
    "Smart Dimmer", "smart_dimmer6", smart_dimmer_template
)

# Aeotec LED Bulb 6

LED_dimmer_template = wolk.ActuatorTemplate(
    name="Dimmer",
    reference="D",
    data_type=wolk.DataType.NUMERIC,
    minimum=0,
    maximum=100,
)

LED_bulb_template = wolk.DeviceTemplate(actuators=[LED_dimmer_template])

LED_bulb_device = wolk.Device("LED Bulb", "LED_Bulb6", LED_bulb_template)


# Aeotec Multi Sensor 6

illuminance_sensor = wolk.SensorTemplate(
    name="Illuminance",
    reference="IL",
    reading_type_name=wolk.ReadingTypeName.ILLUMINANCE,
    unit=wolk.ReadingTypeMeasurementUnit.LUX,
    minimum=0,
    maximum=30000,
)

humidity_sensor = wolk.SensorTemplate(
    name="Humidity",
    reference="H",
    reading_type_name=wolk.ReadingTypeName.HUMIDITY,
    unit=wolk.ReadingTypeMeasurementUnit.PERCENT,
    minimum=0,
    maximum=100,
)

temperature_sensor = wolk.SensorTemplate(
    name="Temperature",
    reference="T",
    reading_type_name=wolk.ReadingTypeName.TEMPERATURE,
    unit=wolk.ReadingTypeMeasurementUnit.CELSIUS,
    minimum=-10,
    maximum=50,
)

ultraviolet_sensor = wolk.SensorTemplate(
    name="Ultraviolet",
    reference="UV",
    data_type=wolk.DataType.NUMERIC,
    minimum=0,
    maximum=10,
)


multi_sensor_template = wolk.DeviceTemplate(
    sensors=[
        illuminance_sensor,
        humidity_sensor,
        temperature_sensor,
        ultraviolet_sensor,
    ]
)

multi_sensor_device = wolk.Device(
    "MultiSensor 6", "multi_sensor6", multi_sensor_template
)

WolkGatewayModule-Z-Wave

WolkGateway module that enables communication with Z-Wave devices

WolkGateway

WolkGateway enables non-IP enabled devices to connect to WolkAbout IoT Platform using modules

Credits

WolkWriter

WolkWriter

17 projects • 33 followers
With WolkAbout IoT Platform, we give you proven technology to develop powerful IoT applications and control your business ecosystem.

Comments