bena
Published

Wapr - a water profiler

Ever wonder which appliance is responsible for that humongous water bill? Me too. Introducing wapr: a non-invasive water usage-profiler.

IntermediateShowcase (no instructions)354
Wapr - a water profiler

Things used in this project

Hardware components

QuickFeather Development Kit
QuickLogic Corp. QuickFeather Development Kit
×1
Adafruit HUZZAH32 – ESP32 Feather Board
Adafruit HUZZAH32 – ESP32 Feather Board
×1

Software apps and online services

SensiML Analytics Studio
SensiML Data Capture Lab
InfluxDB
Eclipse Mosquitto

Story

Read more

Custom parts and enclosures

Add-on module feather case

Extra part for snapping the feather casing on the water line

Code

Feather main.cpp

C/C++
Feather program which takes the classification of the quickfeather and publishes to MQTT topics
#include <Arduino.h>
#include <Wire.h>
#include "WaprMQTT.h"
#include <ArduinoJson.h>

// Hardware ESP32
#ifdef ARDUINO_ARCH_ESP32
#define RXD2 17
#define TXD2 16
#define RESET_FEATHER_PIN 13
#endif

// Classification
uint8_t classification[1];

// Deserialization
String jsonString;
StaticJsonDocument<768> doc;

// Feather reset
int resetDuration = 1000;
int resetDelay = 5000;

// Communication - MQTT
WaprMQTT comClient;

void resetFeather(){
  Serial.println("resetting feather..");
  digitalWrite(RESET_FEATHER_PIN, LOW);
  delay(resetDuration);
  digitalWrite(RESET_FEATHER_PIN, HIGH);
  delay(resetDelay);
}

void setup() {
  Serial.begin(9600);
  Serial.println("running setup..");
  Serial.println("initializing pins..");
  pinMode(RESET_FEATHER_PIN, OUTPUT);
  digitalWrite(RESET_FEATHER_PIN, HIGH);

  Serial.println("intilializing communication..");
  comClient.init();
  
  Serial.println("setting up serial 2..");
  Serial2.begin(460800, SERIAL_8N1, RXD2, TXD2);

  Serial.println("entering loop..");
  delay(100);
}

void loop() {
  if(Serial2.available()>0){
    jsonString = Serial2.readStringUntil('\n');
    DeserializationError error = deserializeJson(doc, jsonString);

    if (error) {
      Serial.print(F("deserializeJson() failed: "));
      Serial.println(error.f_str());

      comClient.update(DEBUG, classification, sizeof(classification));
    }
    else{
      classification[0] = doc["Classification"];
      Serial.println(classification[0]);
      comClient.update(CLASSIFICATION, classification, sizeof(classification));
      if(int(classification[0])==0){
        resetFeather();
      }
    }
  }

  comClient.loop();
}

Feather WaprMQTT.h

C/C++
Header file WaprMQTT library
/*
    Interface for communicating.
*/
#ifndef _WaprMQTT_H
#define _WaprMQTT_H
#include <Arduino.h>

#define MQTT_TOPIC_CLASSIFICATION "wapr/proto/classification"
#define MQTT_TOPIC_DEBUG "wapr/proto/debug"
#define MQTT_TOPIC_RESTART "wapr/proto/restart"
#define MQTT_TOPIC_TEST "wapr/test"

typedef enum {
    CLASSIFICATION,
    TEST,
    DEBUG
} TOPIC;

class WaprMQTT {
public:
    WaprMQTT();
    virtual void init();
    virtual bool update(TOPIC topic, uint8_t bytes[], uint8_t size);
    void loop();
private:
    void setupWifi();
    void reconnect();
    static void callback(char* topic, byte* message, unsigned int length);
};

#endif

Feather WaprMQTT.cpp

C/C++
Source file WaprMQTT library
#include "WaprMQTT.h"
#include <Arduino.h>
#include <WiFi.h>
#include <PubSubClient.h>

const char* ssid = "SSID";
const char* password = "WIFI_PASSWORD";
const char* mqtt_server = "IP_MQTT_SERVER";

char classificationPayload[1];
char testPayload[1];
char debugPayload[1];

WiFiClient wifiClient;
PubSubClient mqttClient(wifiClient);

WaprMQTT::WaprMQTT(){

}


void WaprMQTT::callback(char* topic, byte* message, unsigned int length) {
  Serial.print("message arrived on topic: ");
  Serial.print(topic);
  Serial.print(". message: ");
  String messageTemp;
  
  for (int i = 0; i < length; i++) {
    Serial.print((char)message[i]);
    messageTemp += (char)message[i];
  }
  Serial.println();

  if(String(topic) == MQTT_TOPIC_RESTART){
    Serial.println("rebooting quickfeather..");
    ESP.restart();
  }

  if (String(topic) == "esp32/output") {
    Serial.print("Changing output to ");
    if(messageTemp == "on"){
      Serial.println("on");
      // digitalWrite(ledPin, HIGH);
    }
    else if(messageTemp == "off"){
      Serial.println("off");
      // digitalWrite(ledPin, LOW);
    }
  }

  
}

void WaprMQTT::reconnect() {
  while (!mqttClient.connected()) {
    Serial.println("Attempting MQTT connection...");
    if (mqttClient.connect("WaprProto")) {
      Serial.println("connected");
      // ... and resubscribe
      mqttClient.subscribe("wapr/proto/debug");
      mqttClient.subscribe(MQTT_TOPIC_RESTART);

    } 
    else {
      Serial.print("failed, rc=");
      Serial.print(mqttClient.state());
      Serial.println(" trying again in 5 seconds..");
      // Wait 5 seconds before retrying
      delay(5000);
    }
  }
}

void WaprMQTT::loop(){

    if (!mqttClient.connected())
    {
        reconnect();
    }

    mqttClient.loop();
}

void WaprMQTT::setupWifi(){
    delay(10);
    // We start by connecting to a WiFi network
    Serial.print("connecting to ");
    Serial.print(ssid);
    Serial.println("..");
    WiFi.begin(ssid, password);

    while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print(".");
    }

    Serial.println("");
    Serial.print("connected with IP address: ");
    Serial.println(WiFi.localIP());
}


void WaprMQTT::init() {
    setupWifi();
    mqttClient.setServer(mqtt_server, 1883);
    mqttClient.setCallback(callback);
    mqttClient.subscribe("wapr/proto/debug");
    mqttClient.subscribe(MQTT_TOPIC_RESTART);
}

bool WaprMQTT::update(TOPIC topic, uint8_t bytes[], uint8_t size) {
    char* pPayload;
    switch(topic){
        case CLASSIFICATION:
            pPayload = &classificationPayload[0];
            for (int i = 0; i < size; i++)
            {
                sprintf(pPayload, "%02X", bytes[i]);
                pPayload+=2;
            }
            mqttClient.publish(MQTT_TOPIC_CLASSIFICATION, classificationPayload);
            return true;
        case TEST:
            pPayload = &testPayload[0];
            for (int i = 0; i < size; i++)
            {
                sprintf(pPayload, "%02X", bytes[i]);
                pPayload+=2;
            }
            mqttClient.publish(MQTT_TOPIC_TEST, testPayload);
            return true;
        case DEBUG:
            pPayload = &debugPayload[0];
            for (int i = 0; i < size; i++)
            {
                sprintf(pPayload, "%02X", bytes[i]);
                pPayload+=2;
            }
            mqttClient.publish(MQTT_TOPIC_TEST, debugPayload);
            return true;
        default:
            return false;
    }
}

Influx Ingestion Script

Python
Ingestion of the wapr-data into an influxdb database
from datetime import datetime
import paho.mqtt.client as mqtt
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
import time

CLASSIFICATION_UNKNOWN = 0
CLASSIFICATION_FAUCET = 1
CLASSIFICATION_IDLE = 2
CLASSIFICATION_SHOWER = 3
CLASSIFICATION_TOILET = 4

classificationFailCount = 0

def on_message(client, userdata, message):
    if(message.topic == classificationTopic):
        global classificationFailCount
        result = int(message.payload.decode("utf-8"))
        p = []
        p.append(Point("quickfeater").field("classification", result))
        if(result == CLASSIFICATION_IDLE):
            classificationFailCount = 0
            p.append(Point("quickfeater").field("idle", 1))
            p.append(Point("quickfeater").field("faucet", 0))
            p.append(Point("quickfeater").field("shower", 0))
            p.append(Point("quickfeater").field("toilet", 0))
        elif(result == CLASSIFICATION_FAUCET):
            classificationFailCount = 0
            p.append(Point("quickfeater").field("idle", 0))
            p.append(Point("quickfeater").field("faucet", 1))
            p.append(Point("quickfeater").field("shower", 0))
            p.append(Point("quickfeater").field("toilet", 0))
        elif(result == CLASSIFICATION_SHOWER):
            classificationFailCount = 0
            p.append(Point("quickfeater").field("idle", 0))
            p.append(Point("quickfeater").field("faucet", 0))
            p.append(Point("quickfeater").field("shower", 1))
            p.append(Point("quickfeater").field("toilet", 0))
        elif(result == CLASSIFICATION_TOILET):
            classificationFailCount = 0
            p.append(Point("quickfeater").field("idle", 0))
            p.append(Point("quickfeater").field("faucet", 0))
            p.append(Point("quickfeater").field("shower", 0))
            p.append(Point("quickfeater").field("toilet", 1))
        elif(result == CLASSIFICATION_UNKNOWN):
            print(f'unknown classification: {result}')
            classificationFailCount += 1
            if(classificationFailCount>10):
                print('something is wrong, rebooting quickfeather..')
                client.publish(restartTopic, "1")
                time.sleep(10)
        write_api.write(bucket=bucket, record=p)
    else:
        print(f'unknown mqtt topic {message.topic} sends {message.payload.decode("utf-8")}')

print("influx ingestion app starting..")
print("connecting to influxdb..")
bucket="YOUR_BUCKET"
token = "YOUR_INFLUXDB_TOKEN"
org = "YOUR_ORG"
client = InfluxDBClient(url="http://localhost:8086", token=token, org=org)
write_api = client.write_api(write_options=SYNCHRONOUS)

# topics
testTopic = "wapr/test"
classificationTopic = "wapr/proto/classification"
restartTopic = "wapr/proto/restart"

# setup client connection
print("connecting to mqtt..")
client = mqtt.Client("influxIngestWapr")
client.connect("MQTT_IP")
print("subscribing to topics..")
client.subscribe(classificationTopic)
client.subscribe(testTopic)
client.on_message = on_message
print("starting client loop..")
client.loop_start()

input("press any key to stop..")

client.loop_stop()

print("goodbye")

Wapr Calculator

Python
Python script for calculating the estimated water usage per device
from datetime import datetime, timezone
from influxdb_client import InfluxDBClient, Point, WritePrecision
from influxdb_client.client.write_api import SYNCHRONOUS
import time
import pytz
import threading


# flow rates in liter/second
FLOW_RATE_FAUCET = 0.1186
FLOW_RATE_SHOWER = 0.1345
FLOW_RATE_TOILET = 0.1049

faucetWindow = 5
idleWindow = 5
toiletWindow = 5
showerWindow = 5

wFaucetThreshold = 0.4
wToiletThreshold = 0.3
wShowerTheshold = 0.3

faucetThreshold = 0.0
toiletThreshold = 0.0
showerTheshold = 0.0

wFaucetPreviousQueryTime = datetime(1970, 1, 1, tzinfo=timezone.utc) 
wToiletPreviousQueryTime = datetime(1970, 1, 1, tzinfo=timezone.utc) 
wShowerPreviousQueryTime = datetime(1970, 1, 1, tzinfo=timezone.utc) 

faucetPreviousQueryTime = datetime(1970, 1, 1, tzinfo=timezone.utc) 
toiletPreviousQueryTime = datetime(1970, 1, 1, tzinfo=timezone.utc) 
showerPreviousQueryTime = datetime(1970, 1, 1, tzinfo=timezone.utc) 


timeRangeStartSearchPrevious = "-6h"
timeRangeStart = "-5m"
timeRangeStop = "-10s"
secondOfSleep = 250

keepRunning = True

#region Queries
queryFaucetWindow = f'from(bucket: "wapr")\
  |> range(start: {timeRangeStart}, stop: {timeRangeStop})\
  |> filter(fn: (r) => r["_measurement"] == "quickfeater")\
  |> filter(fn: (r) => r["_field"] == "faucet")\
  |> aggregateWindow(every: {faucetWindow}s, fn: mean, createEmpty: false)\
  |> yield(name: "mean")'

queryFaucet = f'from(bucket: "wapr")\
  |> range(start: {timeRangeStart}, stop: {timeRangeStop})\
  |> filter(fn: (r) => r["_measurement"] == "quickfeater")\
  |> filter(fn: (r) => r["_field"] == "faucet")'

queryFaucetWindowPreviousTime = f'from(bucket: "wapr")\
  |> range(start: {timeRangeStartSearchPrevious}, stop: {timeRangeStop})\
  |> filter(fn: (r) => r["_measurement"] == "waprCalculator")\
  |> filter(fn: (r) => r["_field"] == "dailyFaucetOnDurationFloat")\
  |> filter(fn: (r) => r["windowed"] == "True")'

queryFaucetPreviousTime = f'from(bucket: "wapr")\
  |> range(start: {timeRangeStartSearchPrevious}, stop: {timeRangeStop})\
  |> filter(fn: (r) => r["_measurement"] == "waprCalculator")\
  |> filter(fn: (r) => r["_field"] == "dailyFaucetOnDurationFloat")\
  |> filter(fn: (r) => r["windowed"] == "False")'

queryFaucetWindowConsumptionPreviousTime = f'from(bucket: "wapr")\
  |> range(start: {timeRangeStartSearchPrevious}, stop: {timeRangeStop})\
  |> filter(fn: (r) => r["_measurement"] == "waprCalculator")\
  |> filter(fn: (r) => r["_field"] == "dailyFaucetWaterConsumptionFloat")\
  |> filter(fn: (r) => r["windowed"] == "True")'

queryFaucetConsumptionPreviousTime = f'from(bucket: "wapr")\
  |> range(start: {timeRangeStartSearchPrevious}, stop: {timeRangeStop})\
  |> filter(fn: (r) => r["_measurement"] == "waprCalculator")\
  |> filter(fn: (r) => r["_field"] == "dailyFaucetWaterConsumptionFloat")\
  |> filter(fn: (r) => r["windowed"] == "False")'

queryToiletWindow = f'from(bucket: "wapr")\
  |> range(start: {timeRangeStart}, stop: {timeRangeStop})\
  |> filter(fn: (r) => r["_measurement"] == "quickfeater")\
  |> filter(fn: (r) => r["_field"] == "toilet")\
  |> aggregateWindow(every: {toiletWindow}s, fn: mean, createEmpty: false)\
  |> yield(name: "mean")'

queryToilet = f'from(bucket: "wapr")\
  |> range(start: {timeRangeStart}, stop: {timeRangeStop})\
  |> filter(fn: (r) => r["_measurement"] == "quickfeater")\
  |> filter(fn: (r) => r["_field"] == "toilet")'

queryToiletWindowPreviousTime = f'from(bucket: "wapr")\
  |> range(start: {timeRangeStartSearchPrevious}, stop: {timeRangeStop})\
  |> filter(fn: (r) => r["_measurement"] == "waprCalculator")\
  |> filter(fn: (r) => r["_field"] == "dailyToiletOnDurationFloat")\
  |> filter(fn: (r) => r["windowed"] == "True")'

queryToiletPreviousTime = f'from(bucket: "wapr")\
  |> range(start: {timeRangeStartSearchPrevious}, stop: {timeRangeStop})\
  |> filter(fn: (r) => r["_measurement"] == "waprCalculator")\
  |> filter(fn: (r) => r["_field"] == "dailyToiletOnDurationFloat")\
  |> filter(fn: (r) => r["windowed"] == "False")'

queryToiletWindowConsumptionPreviousTime = f'from(bucket: "wapr")\
  |> range(start: {timeRangeStartSearchPrevious}, stop: {timeRangeStop})\
  |> filter(fn: (r) => r["_measurement"] == "waprCalculator")\
  |> filter(fn: (r) => r["_field"] == "dailyToiletWaterConsumptionFloat")\
  |> filter(fn: (r) => r["windowed"] == "True")\
  |> aggregateWindow(every: 5s, fn: last, createEmpty: false)\
  |> yield(name: "last")'

queryToiletConsumptionPreviousTime = f'from(bucket: "wapr")\
  |> range(start: {timeRangeStartSearchPrevious}, stop: {timeRangeStop})\
  |> filter(fn: (r) => r["_measurement"] == "waprCalculator")\
  |> filter(fn: (r) => r["_field"] == "dailyToiletWaterConsumptionFloat")\
  |> filter(fn: (r) => r["windowed"] == "False")'

queryShowerWindow = f'from(bucket: "wapr")\
  |> range(start: {timeRangeStart}, stop: {timeRangeStop})\
  |> filter(fn: (r) => r["_measurement"] == "quickfeater")\
  |> filter(fn: (r) => r["_field"] == "shower")\
  |> aggregateWindow(every: {showerWindow}s, fn: mean, createEmpty: false)\
  |> yield(name: "mean")'

queryShower = f'from(bucket: "wapr")\
  |> range(start: {timeRangeStart}, stop: {timeRangeStop})\
  |> filter(fn: (r) => r["_measurement"] == "quickfeater")\
  |> filter(fn: (r) => r["_field"] == "shower")'

queryShowerWindowPreviousTime = f'from(bucket: "wapr")\
  |> range(start: {timeRangeStartSearchPrevious}, stop: {timeRangeStop})\
  |> filter(fn: (r) => r["_measurement"] == "waprCalculator")\
  |> filter(fn: (r) => r["_field"] == "dailyShowerOnDurationFloat")\
  |> filter(fn: (r) => r["windowed"] == "True")'

queryShowerPreviousTime = f'from(bucket: "wapr")\
  |> range(start: {timeRangeStartSearchPrevious}, stop: {timeRangeStop})\
  |> filter(fn: (r) => r["_measurement"] == "waprCalculator")\
  |> filter(fn: (r) => r["_field"] == "dailyShowerOnDurationFloat")\
  |> filter(fn: (r) => r["windowed"] == "False")'

queryShowerWindowConsumptionPreviousTime = f'from(bucket: "wapr")\
  |> range(start: {timeRangeStartSearchPrevious}, stop: {timeRangeStop})\
  |> filter(fn: (r) => r["_measurement"] == "waprCalculator")\
  |> filter(fn: (r) => r["_field"] == "dailyShowerWaterConsumptionFloat")\
  |> filter(fn: (r) => r["windowed"] == "True")'

queryShowerConsumptionPreviousTime = f'from(bucket: "wapr")\
  |> range(start: {timeRangeStartSearchPrevious}, stop: {timeRangeStop})\
  |> filter(fn: (r) => r["_measurement"] == "waprCalculator")\
  |> filter(fn: (r) => r["_field"] == "dailyShowerWaterConsumptionFloat")\
  |> filter(fn: (r) => r["windowed"] == "False")'

queryIdleWindow = f'from(bucket: "wapr")\
  |> range(start: {timeRangeStart}, stop: {timeRangeStop})\
  |> filter(fn: (r) => r["_measurement"] == "quickfeater")\
  |> filter(fn: (r) => r["_field"] == "idle")\
  |> aggregateWindow(every: {idleWindow}s, fn: mean, createEmpty: false)\
  |> yield(name: "mean")'

queryIdle = f'from(bucket: "wapr")\
  |> range(start: {timeRangeStart}, stop: {timeRangeStop})\
  |> filter(fn: (r) => r["_measurement"] == "quickfeater")\
  |> filter(fn: (r) => r["_field"] == "idle")'
#endregion

def getPreviousQueryTime(result):
    '''
    Get the latest time value of the result
    '''
    for table in result:
        if(len(table.records)>0):
            lastRecord = table.records[-1]
            time = lastRecord.get_time()
            return time
    return datetime(1970, 1, 1, tzinfo=timezone.utc)

def getLastestValue(result):
    '''
    Get the latest value of the result
    '''
    if(len(result)==0):
      return 0
    for table in result:
      if(len(table.records)>0):
        lastRecord = table.records[-1]
        value = lastRecord.get_value()
        return value
      else:
        print('something is wrong, value not found')
        return -1
  

def calcWaterUsage(durationOn, flowRate):
    '''
    Returns the amount of liters used for the duration in seconds with flow rate in liters per second
    '''
    return (durationOn * flowRate)

def calcDurationOn(result, threshold, lastQueryTime):
    '''
    Calculates the total duration in seconds that the value was higher than the threshold
    '''
    on = False
    durationOn = 0 # in seconds
    startTime = datetime.now()
    lastOnTime = datetime.now()
    for table in result:
        for record in table.records:
            if(record.get_time()>lastQueryTime):
                if(record.get_value()>threshold):
                    if(on == False):
                        on = True
                        startTime = record.get_time()
                        lastOnTime = startTime
                    else:
                        lastOnTime = record.get_time()
                else:
                    if(on == True):
                        on = False
                        duration = (lastOnTime - startTime).total_seconds()
                        durationOn += duration
    if(on == True):
        on = False
        duration = (lastOnTime - startTime).total_seconds()
        durationOn += duration
    return durationOn
        
def runCalculation():
    '''
    Loop through this function
    '''
    print('')
    print('running calculation...')
    global keepRunning
    global wFaucetPreviousQueryTime
    global wToiletPreviousQueryTime
    global wShowerPreviousQueryTime
    global faucetPreviousQueryTime
    global toiletPreviousQueryTime
    global showerPreviousQueryTime

    # first time get the latest values
    wLatestFaucetDurationCalculation = client.query_api().query(org=org, query=queryFaucetWindowPreviousTime)
    wLatestToiletDurationCalculation = client.query_api().query(org=org, query=queryToiletWindowPreviousTime)
    wLatestShowerDurationCalculation = client.query_api().query(org=org, query=queryShowerWindowPreviousTime)
    latestFaucetDurationCalculation = client.query_api().query(org=org, query=queryFaucetPreviousTime)
    latestToiletDurationCalculation = client.query_api().query(org=org, query=queryToiletPreviousTime)
    latestShowerDurationCalculation = client.query_api().query(org=org, query=queryShowerPreviousTime)

    currentUTC = datetime.now(timezone.utc)
    print(f'current utc: {currentUTC}')
    wFaucetPreviousQueryTime = getPreviousQueryTime(wLatestFaucetDurationCalculation)
    wToiletPreviousQueryTime = getPreviousQueryTime(wLatestToiletDurationCalculation)
    wShowerPreviousQueryTime = getPreviousQueryTime(wLatestShowerDurationCalculation)
    print(f'wFaucetPreviousQueryTime:{wFaucetPreviousQueryTime}')
    print(f'wToiletPreviousQueryTime:{wToiletPreviousQueryTime}')
    print(f'wShowerPreviousQueryTime:{wShowerPreviousQueryTime}')
    faucetPreviousQueryTime = getPreviousQueryTime(latestFaucetDurationCalculation)
    toiletPreviousQueryTime = getPreviousQueryTime(latestToiletDurationCalculation)
    showerPreviousQueryTime = getPreviousQueryTime(latestShowerDurationCalculation)
    print(f'faucetPreviousQueryTime:{faucetPreviousQueryTime}')
    print(f'toiletPreviousQueryTime:{toiletPreviousQueryTime}')
    print(f'showerPreviousQueryTime:{showerPreviousQueryTime}')

    wFaucetLatestOnDurationValue = getLastestValue(wLatestFaucetDurationCalculation)
    wToiletLatestOnDurationValue = getLastestValue(wLatestToiletDurationCalculation)
    wShowerLatestOnDurationValue = getLastestValue(wLatestShowerDurationCalculation)
    print(f'wFaucetLatestOnDurationValue:{wFaucetLatestOnDurationValue} seconds')
    print(f'wToiletLatestOnDurationValue:{wToiletLatestOnDurationValue} seconds')
    print(f'wShowerLatestOnDurationValue:{wShowerLatestOnDurationValue} seconds')
    faucetLatestOnDurationValue = getLastestValue(latestFaucetDurationCalculation)
    toiletLatestOnDurationValue = getLastestValue(latestToiletDurationCalculation)
    showerLatestOnDurationValue = getLastestValue(latestShowerDurationCalculation)
    print(f'faucetLatestOnDurationValue:{faucetLatestOnDurationValue} seconds')
    print(f'toiletLatestOnDurationValue:{toiletLatestOnDurationValue} seconds')
    print(f'showerLatestOnDurationValue:{showerLatestOnDurationValue} seconds')

    wLatestFaucetConsumptionCalculation = client.query_api().query(org=org, query=queryFaucetWindowConsumptionPreviousTime)
    wLatestToiletConsumptionCalculation = client.query_api().query(org=org, query=queryToiletWindowConsumptionPreviousTime)
    wLatestShowerConsumptionCalculation = client.query_api().query(org=org, query=queryShowerWindowConsumptionPreviousTime)
    latestFaucetConsumptionCalculation = client.query_api().query(org=org, query=queryFaucetConsumptionPreviousTime)
    latestToiletConsumptionCalculation = client.query_api().query(org=org, query=queryToiletConsumptionPreviousTime)
    latestShowerConsumptionCalculation = client.query_api().query(org=org, query=queryShowerConsumptionPreviousTime)

    wFaucetLatestConsumptionValue = getLastestValue(wLatestFaucetConsumptionCalculation)
    wToiletLatestConsumptionValue = getLastestValue(wLatestToiletConsumptionCalculation)
    wShowerLatestConsumptionValue = getLastestValue(wLatestShowerConsumptionCalculation)
    print(f'wFaucetLatestConsumptionValue:{wFaucetLatestConsumptionValue} liters')
    print(f'wToiletLatestConsumptionValue:{wToiletLatestConsumptionValue} liters')
    print(f'wShowerLatestConsumptionValue:{wShowerLatestConsumptionValue} liters')
    faucetLatestConsumptionValue = getLastestValue(latestFaucetConsumptionCalculation)
    toiletLatestConsumptionValue = getLastestValue(latestToiletConsumptionCalculation)
    showerLatestConsumptionValue = getLastestValue(latestShowerConsumptionCalculation)
    print(f'faucetLatestConsumptionValue:{faucetLatestConsumptionValue} liters')
    print(f'toiletLatestConsumptionValue:{toiletLatestConsumptionValue} liters')
    print(f'showerLatestConsumptionValue:{showerLatestConsumptionValue} liters')

    currentDate = datetime.now()
    myTimezone = pytz.timezone("Europe/Brussels")
    currentDate = myTimezone.localize(currentDate)

    while(keepRunning == True):
        # check if it is a new day
        newDate = datetime.now()
        newDate = myTimezone.localize(newDate)

        if(newDate.date() != currentDate.date()):
          wFaucetLatestOnDurationValue = 0
          wToiletLatestOnDurationValue = 0
          wShowerLatestOnDurationValue = 0
          wFaucetLatestConsumptionValue = 0
          wToiletLatestConsumptionValue = 0
          wShowerLatestConsumptionValue = 0
          faucetLatestOnDurationValue = 0
          toiletLatestOnDurationValue = 0
          showerLatestOnDurationValue = 0
          faucetLatestConsumptionValue = 0
          toiletLatestConsumptionValue = 0
          showerLatestConsumptionValue = 0
          currentDate = newDate
        
        resultFaucetWindow = client.query_api().query(org=org, query=queryFaucetWindow)
        resultToiletWindow = client.query_api().query(org=org, query=queryToiletWindow)
        resultShowerWindow = client.query_api().query(org=org, query=queryShowerWindow)
        resultFaucet = client.query_api().query(org=org, query=queryFaucet)
        resultToilet = client.query_api().query(org=org, query=queryToilet)
        resultShower = client.query_api().query(org=org, query=queryShower)

        wFaucetOnDuration = calcDurationOn(resultFaucetWindow, wFaucetThreshold, wFaucetPreviousQueryTime)
        wToiletOnDuration = calcDurationOn(resultToiletWindow, wToiletThreshold, wToiletPreviousQueryTime)
        wShowerOnDuration = calcDurationOn(resultShowerWindow, wShowerTheshold, wShowerPreviousQueryTime)
        print(f'windowed faucetOnDuration:{wFaucetOnDuration}')
        print(f'windowed toiletOnDuration:{wToiletOnDuration}')
        print(f'windowed showerOnDuration:{wShowerOnDuration}')
        faucetOnDuration = calcDurationOn(resultFaucet, faucetThreshold, faucetPreviousQueryTime)
        toiletOnDuration = calcDurationOn(resultToilet, toiletThreshold, toiletPreviousQueryTime)
        showerOnDuration = calcDurationOn(resultShower, showerTheshold, showerPreviousQueryTime)
        print(f'faucetOnDuration:{faucetOnDuration}')
        print(f'toiletOnDuration:{toiletOnDuration}')
        print(f'showerOnDuration:{showerOnDuration}')

        now = datetime.now(timezone.utc)
        wFaucetPreviousQueryTime = now
        wToiletPreviousQueryTime = now
        wShowerPreviousQueryTime = now
        wFaucetLatestOnDurationValue += wFaucetOnDuration
        wToiletLatestOnDurationValue += wToiletOnDuration
        wShowerLatestOnDurationValue += wShowerOnDuration
        faucetPreviousQueryTime = now
        toiletPreviousQueryTime = now
        showerPreviousQueryTime = now
        faucetLatestOnDurationValue += faucetOnDuration
        toiletLatestOnDurationValue += toiletOnDuration
        showerLatestOnDurationValue += showerOnDuration
        
        wFaucetWaterConsumption = calcWaterUsage(wFaucetOnDuration, FLOW_RATE_FAUCET)
        wToiletWaterConsumption = calcWaterUsage(wToiletOnDuration, FLOW_RATE_TOILET)
        wShowerWaterConsumption = calcWaterUsage(wShowerOnDuration, FLOW_RATE_SHOWER)
        print(f'windowed faucetWaterConsumption:{wFaucetWaterConsumption} liters')
        print(f'windowed toiletWaterConsumption:{wToiletWaterConsumption} liters')
        print(f'windowed showerWaterConsumption:{wShowerWaterConsumption} liters')
        faucetWaterConsumption = calcWaterUsage(faucetOnDuration, FLOW_RATE_FAUCET)
        toiletWaterConsumption = calcWaterUsage(toiletOnDuration, FLOW_RATE_TOILET)
        showerWaterConsumption = calcWaterUsage(showerOnDuration, FLOW_RATE_SHOWER)
        print(f'faucetWaterConsumption:{faucetWaterConsumption} liters')
        print(f'toiletWaterConsumption:{toiletWaterConsumption} liters')
        print(f'showerWaterConsumption:{showerWaterConsumption} liters')

        wFaucetLatestConsumptionValue += wFaucetWaterConsumption
        wToiletLatestConsumptionValue += wToiletWaterConsumption
        wShowerLatestConsumptionValue += wShowerWaterConsumption
        faucetLatestConsumptionValue += faucetWaterConsumption
        toiletLatestConsumptionValue += toiletWaterConsumption
        showerLatestConsumptionValue += showerWaterConsumption

        p = []
        p.append(Point("waprCalculator").field("dailyFaucetOnDurationFloat", float(wFaucetLatestOnDurationValue)).tag("windowed",True))
        p.append(Point("waprCalculator").field("dailyToiletOnDurationFloat", float(wToiletLatestOnDurationValue)).tag("windowed",True))
        p.append(Point("waprCalculator").field("dailyShowerOnDurationFloat", float(wShowerLatestOnDurationValue)).tag("windowed",True))
        p.append(Point("waprCalculator").field("dailyFaucetWaterConsumptionFloat", float(wFaucetLatestConsumptionValue)).tag("windowed",True))
        p.append(Point("waprCalculator").field("dailyToiletWaterConsumptionFloat", float(wToiletLatestConsumptionValue)).tag("windowed",True))
        p.append(Point("waprCalculator").field("dailyShowerWaterConsumptionFloat", float(wShowerLatestConsumptionValue)).tag("windowed",True))
        p.append(Point("waprCalculator").field("dailyFaucetOnDurationFloat", float(faucetLatestOnDurationValue)).tag("windowed",False))
        p.append(Point("waprCalculator").field("dailyToiletOnDurationFloat", float(toiletLatestOnDurationValue)).tag("windowed",False))
        p.append(Point("waprCalculator").field("dailyShowerOnDurationFloat", float(showerLatestOnDurationValue)).tag("windowed",False))
        p.append(Point("waprCalculator").field("dailyFaucetWaterConsumptionFloat", float(faucetLatestConsumptionValue)).tag("windowed",False))
        p.append(Point("waprCalculator").field("dailyToiletWaterConsumptionFloat", float(toiletLatestConsumptionValue)).tag("windowed",False))
        p.append(Point("waprCalculator").field("dailyShowerWaterConsumptionFloat", float(showerLatestConsumptionValue)).tag("windowed",False))
        write_api.write(bucket=bucket, record=p)

        time.sleep(secondOfSleep)


# app initialization 
print('starting wapr-calculator..')
print("connecting to influxdb..")
bucket="YOUR_BUCKET"
token = "YOUR_TOKEN"
org = "YOUR_ORG"
client = InfluxDBClient(url="http://localhost:8086", token=token, org=org)
write_api = client.write_api(write_options=SYNCHRONOUS)
query_api = client.query_api()

calcThread = threading.Thread(target=runCalculation)
calcThread.start()

input("press any key to stop the calculation thread")
keepRunning = False

input("press any key to exit")

Credits

bena

bena

1 project β€’ 1 follower

Comments