Water Leakage Detection System

How to use an Esp32 to detect water leakages, building a prototype, simulating the code on Iot-Lab and interacting with AWS

AdvancedFull instructions provided345
Water Leakage Detection System

Things used in this project

Hardware components

Heltec ESP32 LoRa v2
×2
Water Flow Sensor
×2
Garden hose (20 - 25 mm ⌀)
×1
1/2 inch T-adapter
×1
1/2 inch garden tap
×1
Adjustable metal hose clamp
×6
LED (generic)
LED (generic)
×1
Buzzer
Buzzer
×1

Software apps and online services

RIOT
RIOT OS RIOT
The Things Stack
The Things Industries The Things Stack
MQTT
MQTT
AWS - Amazon Web Services

Story

Read more

Schematics

Image circuit

Fritzing circuit

Code

Prototype Code

C/C++
#include <string.h>
#include <stdbool.h>
#include <stdlib.h>
#include "timex.h"
#include "xtimer.h"
#include "ztimer.h"
#include "paho_mqtt.h"
#include "MQTTClient.h"
#include "stdio.h"
#include "periph/gpio.h"
#include "periph/adc.h"
#include "thread.h"
#include "analog_util.h"

int board=0; //  0 -> padre, che vien settato così  -----  1 -> figlio viene settato prima a 0 e poi cambia, dopo aver avuto conversazione precedente

#define MAIN_QUEUE_SIZE     (8)

#define BUF_SIZE                        1024
#define MQTT_VERSION_v311               4       /* MQTT v3.1.1 version is 4 */
#define COMMAND_TIMEOUT_MS              4000

#ifndef DEFAULT_MQTT_CLIENT_ID
#define DEFAULT_MQTT_CLIENT_ID          ""
#endif

#ifndef DEFAULT_MQTT_USER
#define DEFAULT_MQTT_USER               ""
#endif

#ifndef DEFAULT_MQTT_PWD
#define DEFAULT_MQTT_PWD                ""
#endif

/**
 * @brief Default MQTT port
 */
#define DEFAULT_MQTT_PORT               1883
#define DEFAULT_IP                      "192.168.92.168"
#define DEFAULT_TOPIC_AWS               "home"
char* DEFAULT_TOPIC_SUB; //subscribe
char* DEFAULT_TOPIC_PUBLISH; //publish

/**
 * @brief Keepalive timeout in seconds
 */
#define DEFAULT_KEEPALIVE_SEC           10

#ifndef MAX_LEN_TOPIC
#define MAX_LEN_TOPIC                   100
#endif

#ifndef MAX_TOPICS
#define MAX_TOPICS                      4
#endif

#define IS_CLEAN_SESSION                1
#define IS_RETAINED_MSG                 0

static MQTTClient client;
static Network network;

static unsigned char buf[BUF_SIZE];
static unsigned char readbuf[BUF_SIZE];

char stack_led[THREAD_STACKSIZE_MAIN];
kernel_pid_t thread_led;

char stack_buzzer[THREAD_STACKSIZE_MAIN];
kernel_pid_t thread_buzzer;

char water_sensor_stack[THREAD_STACKSIZE_MAIN];
kernel_pid_t water_sensor_thread;

char publish_stack[THREAD_STACKSIZE_MAIN];
kernel_pid_t publish_thread_aws;

char publish_stack_node[THREAD_STACKSIZE_MAIN];
kernel_pid_t publish_thread_node;

gpio_t LED_PIN = GPIO_PIN(0, 2); //pin2
#define LED_PIN_NUMBER 2

gpio_t BUZZER_PIN = GPIO_PIN(0, 15); //pin23
#define BUZZER_PIN_NUMBER 15

#define WATER_FLOW ADC_LINE(0)
#define ADC_RES ADC_RES_12BIT

#define SECONDS 1*US_PER_SEC //regulator for water_flow_rate formula
#define SAMPLING 3*US_PER_SEC //duration time of sampling
#define TESTING 2*US_PER_SEC
#define DAILY_PERIODIC 8*US_PER_SEC //periodic time of sampling (daily or every 6 hours)
#define ANSWER_AGAIN_PERIODIC 8*US_PER_SEC //again time if son is sleeping
#define NO_WATER_AGAIN_PERIODIC 8*US_PER_SEC //again time if there is not water

xtimer_ticks32_t sample_time_now;
xtimer_ticks32_t sample_time_end;
xtimer_ticks32_t sample_time_diff;
xtimer_ticks32_t periodic_time;

xtimer_ticks32_t time_s0_source;
xtimer_ticks32_t time_s1_source;
xtimer_ticks32_t time_s2_source;
xtimer_ticks32_t time_s0_son;
xtimer_ticks32_t time_s1_son;
xtimer_ticks32_t time_s2_son;

char* mex; //for received messages

int ret = -1;
char message[200];
float water_flow_rate_other=0.0;
float water_flow_rate=0.0;
float water_flow_diff=0.0;
float total_quantity=0.0;

int answer=0;
int work=0;

char* message_on_aws;
char* value_message_on_received;
//sudo BOARD=esp32-heltec-lora32-v2 BUILD_IN_DOCKER=1 DOCKER="sudo docker" PORT=/dev/ttyUSB0 make flash term


static int connect(void)
{

    /* ensure client isn't connected in case of a new connection */
    if (client.isconnected) {
        printf("mqtt_example: client already connected, disconnecting it\n");
        MQTTDisconnect(&client);
        NetworkDisconnect(&network);
    }

    MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
    data.MQTTVersion = MQTT_VERSION_v311;

    data.clientID.cstring = DEFAULT_MQTT_CLIENT_ID;
    data.username.cstring = DEFAULT_MQTT_USER;
    data.password.cstring = DEFAULT_MQTT_PWD;
    data.keepAliveInterval = DEFAULT_KEEPALIVE_SEC;
    data.cleansession = IS_CLEAN_SESSION;
    data.willFlag = 0;

    printf("mqtt_example: Connecting to MQTT Broker from %s %d\n",
            DEFAULT_IP, DEFAULT_MQTT_PORT);
    printf("mqtt_example: Trying to connect to %s, port: %d\n",
            DEFAULT_IP, DEFAULT_MQTT_PORT);
    ret = NetworkConnect(&network, DEFAULT_IP, DEFAULT_MQTT_PORT);
    if (ret < 0) {
        printf("mqtt_example: Unable to connect\n");
        return ret;
    }

    printf("user:%s clientId:%s password:%s\n", data.username.cstring,
             data.clientID.cstring, data.password.cstring);
    ret = MQTTConnect(&client, &data);
    if (ret < 0) {
        printf("mqtt_example: Unable to connect client %d\n", ret);
        return ret;
    }
    else {
        printf("mqtt_example: Connection successfully\n");
    }

    return (ret > 0) ? 0 : 1;
}

/*
void* led(void* arg){
    while(1){
        printf("Set led to HIGH \n");
        gpio_set(LED_PIN);
        xtimer_sleep(2);
        printf("Set led to LOW \n");
        gpio_clear(LED_PIN);
        xtimer_sleep(2);
        if (fire==0){
            thread_sleep();
        }
    }    
}

void* buzzer(void* arg){
    while(1){
        printf("Set buzzer to HIGH \n");
        gpio_set(BUZZER_PIN); 
        xtimer_sleep(2);
        printf("Set buzzer to LOW \n");
        gpio_clear(BUZZER_PIN);
        xtimer_sleep(2);  
        if (fire==0){
            thread_sleep();
        }
    }    
}
*/
void* publish_on_aws(void* arg){
    while(1){
        enum QoS qos = QOS2;
        char* topic=DEFAULT_TOPIC_AWS;
        MQTTMessage message;
        message.qos = qos;
        message.retained = IS_RETAINED_MSG;
        message.payload = message_on_aws;
        message.payloadlen = strlen(message.payload);

        int rc;
        if ((rc = MQTTPublish(&client, topic, &message)) < 0) {
            printf("mqtt_example: Unable to publish (%d)\n", rc);
        }
        else {
            printf("mqtt_example: Message (%s) has been published to topic %s"
            "with QOS %d\n",
            (char *)message.payload, topic, (int)message.qos);
        }
        thread_sleep();
    }
    return 0;
}

static int publish(char* mex){
    enum QoS qos = QOS2;
    char* topic=DEFAULT_TOPIC_PUBLISH;
    MQTTMessage message;
    message.qos = qos;
    message.retained = IS_RETAINED_MSG;
    message.payload = mex;
    message.payloadlen = strlen(message.payload);

    int rc;
    if ((rc = MQTTPublish(&client, topic, &message)) < 0) {
        printf("mqtt_example: Unable to publish (%d)\n", rc);
    }
    else {
        printf("mqtt_example: Message (%s) has been published to topic %s"
         "with QOS %d\n",
        (char *)message.payload, topic, (int)message.qos);
    }
    return 0;
}

void* publish_on_node(void* arg){
    while(1){
        enum QoS qos = QOS2;
        char* topic=DEFAULT_TOPIC_PUBLISH;
        MQTTMessage message;
        message.qos = qos;
        message.retained = IS_RETAINED_MSG;
        message.payload = value_message_on_received;
        message.payloadlen = strlen(message.payload);

        int rc;
        if ((rc = MQTTPublish(&client, topic, &message)) < 0) {
            printf("mqtt_example: Unable to publish (%d)\n", rc);
        }
        else {
            printf("mqtt_example: Message (%s) has been published to topic %s"
            "with QOS %d\n",
            (char *)message.payload, topic, (int)message.qos);
        }
        thread_sleep();
    }
    return 0;
}

static void _on_msg_received(MessageData *data)
{
    printf("\npaho_mqtt_water_sensor: message received on topic"
           " %.*s: %.*s\n",
           (int)data->topicName->lenstring.len,
           data->topicName->lenstring.data, (int)data->message->payloadlen,
           (char *)data->message->payload);

    mex = (char *)data->message->payload;

    if((strcmp (mex, "heloy"))==0){
        time_s0_son = xtimer_now();
        value_message_on_received="answr";
        thread_wakeup(publish_thread_node); 
        time_s1_son = xtimer_now();
        work=1;
        thread_wakeup(water_sensor_thread);
    }

    else if((strcmp (mex, "answr"))==0){
        time_s1_source = xtimer_now();
        thread_wakeup(water_sensor_thread);
    }

    else{
        if(work==1){
            time_s2_son = xtimer_now();
            //thread_wakeup(thread_buzzer);
            //thread_wakeup(thread_led);
            water_flow_rate_other = atof(mex);

            if(water_flow_rate-water_flow_rate_other > 3 || water_flow_rate_other-water_flow_rate > 3){
                if(water_flow_rate-water_flow_rate_other > 0)
                {    water_flow_diff=water_flow_rate-water_flow_rate_other; }
                else
                {   water_flow_diff=water_flow_rate_other-water_flow_rate;  }

                printf("LEAKEGE DETECTED \t -> \t My flow:%f \t Other: %f \t difference: %f\n",water_flow_rate,water_flow_rate_other,water_flow_diff);                             
                
                total_quantity=total_quantity+water_flow_diff/60*8;
                //sprintf(message, "{\"id\": \"%d\", \"flow_son\": \"%f\", \"flow_diff\": \"%f\", \"quantity leak\": \"%f\"}", board, water_flow_rate, water_flow_diff, total_quantity);  
                sprintf(message, "{\"time_s0_son\": \"%d\", \"time_s1_son\": \"%d\", \"sample_time_now\": \"%d\", \"sample_time_end\": \"%d\", \"time_s2_son\": \"%d\"}", time_s0_son, time_s1_son, sample_time_now, sample_time_end, time_s2_son);             
                message_on_aws=message;
                thread_wakeup(publish_thread_aws);
            }
            else{
                if(water_flow_rate-water_flow_rate_other > 0)
                {    water_flow_diff=water_flow_rate-water_flow_rate_other; }
                else
                {   water_flow_diff=water_flow_rate_other-water_flow_rate;  }

                printf("NO LEAKEGE\n");

                total_quantity=total_quantity+water_flow_diff/60*8;
                //sprintf(message, "{\"id\": \"%d\", \"flow_son\": \"%f\", \"flow_diff\": \"%f\", \"quantity leak\": \"%f\"}", board, water_flow_rate, water_flow_diff, total_quantity);             
                sprintf(message, "{\"time_s0_son\": \"%d\", \"time_s1_son\": \"%d\", \"sample_time_now\": \"%d\", \"sample_time_end\": \"%d\", \"time_s2_son\": \"%d\"}", time_s0_son, time_s1_son, sample_time_now, sample_time_end, time_s2_son);             
                message_on_aws=message;
                thread_wakeup(publish_thread_aws);
            }
        work=0;    
        }
    }
    memset(mex,0,strlen(mex));
    //memset(value_message_on_received,0,strlen(value_message_on_received));
}

void sub_to_topic(void)
{
    enum QoS qos = QOS0;
    printf("usage: %s <topic name> [QoS level]\n",DEFAULT_TOPIC_SUB);

    printf("mqtt_example: Subscribing to %s\n", DEFAULT_TOPIC_SUB);
    int ret = MQTTSubscribe(&client,DEFAULT_TOPIC_SUB, qos, _on_msg_received);
    if (ret < 0) {
        printf("mqtt_example: Unable to subscribe to %s (%d)\n",
               DEFAULT_TOPIC_SUB, ret);
        //_cmd_discon(0, NULL);
    }
    else {
        printf("\nmqtt_example: Now subscribed to %s, QOS %d\n",
               DEFAULT_TOPIC_SUB, (int) qos);
     
    }

    return;
}

int water_sensor_test(void){

    printf(" \n- - - - - - - - - - - - - - - - - - - - - - - - - - - - \n");
    printf("water sensor test start\n");
    
    int middle_turn=-1;
    int sample=0;
    float number_turn=0.0;
    
    sample = adc_sample(WATER_FLOW,ADC_RES);
    sample_time_now=xtimer_now();

    while(1){
        sample = adc_sample(WATER_FLOW,ADC_RES);
        
        if (middle_turn==0 && sample==4095){
            number_turn++;
        }
        middle_turn=sample;
        
        sample_time_end=xtimer_now();
        sample_time_diff=xtimer_diff(sample_time_end, sample_time_now); //microsecondi
        
        if(sample_time_diff > TESTING && number_turn==0.0){
            if(board!=0){
                break; //se no flow nella seconda va controllato il leakage
            }
            printf("No flow detected\n");
            return 0;
        }
        if(sample_time_diff > TESTING && number_turn!=0.0){
            printf("water flow detected\n");
            
            return 1;
        }
    }

    return 0;
}

void* water_sensor_sampling(void* arg){
    while(1){ 
        printf("water sensor  sampling thread start\n");
        
        int middle_turn=-1;
        int sample=0;

        float number_turn=0.0;
        float remain=0.0;
        float frequency=0.0;
        water_flow_rate=0.0;
        
        sample = adc_sample(WATER_FLOW,ADC_RES);

        if (sample==0){
            //middle_turn=0;
            remain=1;
        }
        else{ 
            //middle_turn=0;  
        }
        
        sample_time_now=xtimer_now();

        while(1){
            sample = adc_sample(WATER_FLOW,ADC_RES);
            
            if (middle_turn==0 && sample==4095){
                number_turn++;
            }
            middle_turn=sample;
            
            sample_time_end=xtimer_now();
            sample_time_diff=xtimer_diff(sample_time_end, sample_time_now); //microsecondi
            
            if(sample_time_diff > SAMPLING){
                break;
            }
        }

        if(remain!=0){
            number_turn=number_turn+remain;
        }
        
        printf("time: %d microseconds\t turns: %f\n",sample_time_diff,number_turn);
            
        //Q(L/s) = f/impulsi; -> Q(L/min) = f*60/impulsi  
        //usiamo per ora 541
        frequency=number_turn/(float)sample_time_diff*SECONDS;
        water_flow_rate=(float)frequency*60/541; 
        printf("frequency: %f Hz\t water_flow_rate: %f L/min\n",frequency,water_flow_rate);
        printf("water sensor thread end\n");
        if(board==0){
            time_s2_source = xtimer_now();
            sprintf(message, "%f", water_flow_rate);
            publish(message);
            
            total_quantity=total_quantity+water_flow_rate/60*8;
            //sprintf(message, "{\"id\": \"%d\", \"flow source\": \"%f\", \"quantity\": \"%f\"}", board, water_flow_rate, total_quantity); 
            sprintf(message, "{\"time_s0_source\": \"%d\", \"time_s1_source\": \"%d\", \"time_s2_source\": \"%d\"}", time_s0_source, time_s1_source, time_s2_source); 
            message_on_aws=message;
            thread_wakeup(publish_thread_aws);
        }

        thread_sleep();
    }

    return NULL;
}



int main(void)
{
    if(board==0){
        DEFAULT_TOPIC_SUB = "sub"; //subscribe
        DEFAULT_TOPIC_PUBLISH = "publish"; //publish
    }
    else{
        DEFAULT_TOPIC_SUB = "publish"; //subscribe
        DEFAULT_TOPIC_PUBLISH = "sub"; //publish
    }

    printf("You are running RIOT on a(n) %s board.\n", RIOT_BOARD);

    NetworkInit(&network);
 
    xtimer_sleep(3);

    MQTTClientInit(&client, &network, COMMAND_TIMEOUT_MS, buf, BUF_SIZE,
                   readbuf,
                   BUF_SIZE);

    printf("Running water-sensor example.\n");
  
    xtimer_sleep(3);

    MQTTStartTask(&client);
 
    while(ret<0){
        connect();
        xtimer_sleep(1);
    }

    // initialize the FLAME_PIN (ADC) line
    if (adc_init(WATER_FLOW) < 0) {
        printf("Initialization of ADC_LINE(%u) failed\n", WATER_FLOW);
        return 1;
    } else {
        printf("Successfully initialized ADC_LINE(%u)\n", WATER_FLOW);
    }
    publish_thread_node=thread_create(publish_stack_node, sizeof(publish_stack_node),
                  THREAD_PRIORITY_MAIN - 1, THREAD_CREATE_SLEEPING,
                  publish_on_node, NULL, "publish_on_node");  
	
	water_sensor_thread=thread_create(water_sensor_stack, sizeof(water_sensor_stack),
                  THREAD_PRIORITY_MAIN + 1, THREAD_CREATE_SLEEPING,
                  water_sensor_sampling, NULL, "water_sensor");

    publish_thread_aws=thread_create(publish_stack, sizeof(publish_stack),
                  THREAD_PRIORITY_MAIN - 1, THREAD_CREATE_SLEEPING,
                  publish_on_aws, NULL, "publish_on_received");              

    sub_to_topic();
    periodic_time = xtimer_now(); 
    
    while(1){

        if(board==0){

            while(1){  
                if(water_sensor_test()==1){
                    time_s0_source = xtimer_now();
                    publish("heloy");
                }
                xtimer_periodic_wakeup(&periodic_time, DAILY_PERIODIC);
                if(ret<0){
                    connect();
                    xtimer_sleep(1);
                }
            } 
        }

        if(board!=0){
            while(1){
                xtimer_periodic_wakeup(&periodic_time, DAILY_PERIODIC);
                if(ret<0){
                    connect();
                    xtimer_sleep(1);
                }
            }
        }  

    }

    printf("main thread end\n");

	return 0;
    
}    

Simulation Code

C/C++
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <math.h>

#include "xtimer.h"

#include "behaviors.h"
#include "sample_generator.h"
#include "drivers_sx127x.h"
#include "payload_formatter.h"
#include "semtech-loramac.h"
#include "fmt.h"
#include "config.h"

#include "params.h"
#include "app_debug.h"

/* Check payload_formatter for more details */
#define VALUE_MAXIMUM_LENGTH 14
#define LOGIC_TIME_MAXIMUM_LENGTH 5

/* Leakage */
#define LEAKAGE_CONDITION 0 /* L/min */
#define LEAKAGE_THRESHOLD 1.0

/* Setting TTN parameters */
#define DEV_EUI "70B3D57ED005D1D6"
#define APP_EUI "0000000000000011"
#define APP_KEY "5F129D225F930EB831FBE861B3B307D0"

uint32_t LATENCY_P2P = US_PER_SEC * 0;

int tx_complete_child;
char message[20];

/* Thread stack */
#define SX127X_STACKSIZE        (THREAD_STACKSIZE_DEFAULT)
static char stack_listen[SX127X_STACKSIZE];
static char stack_send[SX127X_STACKSIZE];

int source_lora_ttn(node_t node) 
{
puts("Behavior: source_lora_ttn");

    /* json to publish on TTN */
    char json[128];

    /* Sampling time */
    int s_time = -1;

    /* Set TTN application parameters */
    char* deveui_list[4] = {"loramac", "set", "deveui", DEV_EUI};
    char** argv = (char**)&deveui_list;
    loramac_handler(4,argv);

    char* appeui_list[4] = {"loramac", "set", "appeui", APP_EUI};
    argv = (char**)&appeui_list;
    loramac_handler(4,argv);

    char* appkey_list[4] = {"loramac", "set", "appkey", APP_KEY};
    argv = (char**)&appkey_list;
    loramac_handler(4,argv);

    char* dr_list[4] = {"loramac", "set", "dr", "5"};
    argv = (char**)&dr_list;
    loramac_handler(4,argv);

    char* join_list[3] = {"loramac", "join", "otaa"};
    argv = (char**)&join_list;
    loramac_handler(3,argv);

    /* Extracting configuration information */
    char* buffer = config();
    int buf_len = strlen(buffer);
    char buf[buf_len + 1];
    strncpy(buf, buffer, buf_len + 1);

    char** pairs = NULL;
    char** pairs_copy = NULL;
    int pair_count = 0;
 
    char* pair = strtok(buf, " ");
    int length;

    /* Extracting pairs */
    while (pair != NULL) {
        pair_count++;

        pairs = realloc(pairs, pair_count*sizeof(char*));
        pairs_copy = realloc(pairs_copy, pair_count*sizeof(char*));

        length = strlen(pair);

        pairs[pair_count - 1] = malloc(length + 1);
        strncpy(pairs[pair_count - 1], pair, length + 1);

        pairs_copy[pair_count - 1] = malloc(length + 1);
        strncpy(pairs_copy[pair_count - 1], pair, length + 1);

        pair = strtok(NULL, " ");
    }

    node_t** nodes = NULL;
    int node_count = 0;

    int right = 0;

    /* Retrieving node information from topology pairs */
    /* Do not consider first pair */
    for (int i = 1; i < pair_count; i++) {

        /* Separate elements for current pair */
        char *elem = strtok(pairs[i], "-");
        length = strlen(elem);

        /* First node code */
        char *node_code_1 = malloc(length + 1);
        strncpy(node_code_1, elem, length + 1);

        /* First node name */
        char* node_name_1 = malloc(15*sizeof(char));
        strcpy(node_name_1, "st-lrwan1-");
        strcat(node_name_1, node_code_1);

        /* Getting second element of the pair */
        elem = strtok(NULL, "-");
        length = strlen(elem);

        /* Second node code */
        char *node_code_2 = malloc(length + 1);
        strncpy(node_code_2, elem, length + 1);

        /* Second node name */
        char* node_name_2 = malloc(15*sizeof(char));
        strcpy(node_name_2, "st-lrwan1-");
        strcat(node_name_2, node_code_2);

        /* Check if the nodes of the pair are already in nodes */
        int node_1_in = 0, node_2_in = 0;

        for (int k = 0; k < node_count; k++) {
            if (strcmp(nodes[k]->node_self, node_name_1) == 0) {
                node_1_in = 1;

                nodes[k]->children_count++;

            }
            if (strcmp(nodes[k]->node_self,node_name_2) == 0) {
                node_2_in = 1;
            }
        }


        if (!node_1_in) {

            node_count++;
            nodes = realloc(nodes, node_count*sizeof(node_t*));
            node_t* node = malloc(sizeof(node_t));
            length = strlen(node_name_1);
            node->node_self = malloc(length + 1);
            strcpy(node->node_self, node_name_1);
            node->children_count = 1;
            node->self_children_position = 0;
            if (i == 1) {
                /* CHIEF node type */
                node->node_type = 1;
            }
            else {
                /* Default: BRANCH node type */
                node->node_type = 2;
            }

            /* Add node to the nodes array */
            nodes[node_count - 1] = (node_t*)malloc(sizeof(node_t));
            nodes[node_count - 1]->node_self = malloc(length + 1);
            nodes[node_count - 1]->node_self = node->node_self;
            nodes[node_count - 1]->node_type = node->node_type;
            nodes[node_count - 1]->children_count = node->children_count;
            nodes[node_count - 1]->self_children_position = node->self_children_position;

        }

        if (!node_2_in) {

            for (int w = 0; w < node_count; w++) {
                if (strcmp(nodes[w]->node_self, node_name_1) == 0) {
                    if (nodes[w]->children_count == 2) {
                        right = 1;
                    }
                }
            }

            node_count++;
            nodes = realloc(nodes, node_count*sizeof(node_t*));
            node_t* node = malloc(sizeof(node_t));
            length = strlen(node_name_2);
            node->node_self = malloc(length +1);
            strcpy(node->node_self, node_name_2);
            node->children_count = 0;
            if (right) {
                node->self_children_position = 1;
                right = 0;
            }
            else {
                node->self_children_position = 0;
            }

            /* Default: FORK node type */
            node->node_type = 2;

            /* Add node to the nodes array */
            nodes[node_count - 1] = (node_t*)malloc(sizeof(node_t));
            nodes[node_count - 1]->node_self = malloc(length + 1);
            nodes[node_count - 1]->node_self = node->node_self;
            nodes[node_count - 1]->node_type = node->node_type;
            nodes[node_count - 1]->children_count = node->children_count;
            nodes[node_count - 1]->self_children_position = node->self_children_position;
        }
        
        /* Free allocated memory */
        free(node_code_1);
        free(node_code_2);
        free(node_name_1);
        free(node_name_2);
        node_code_1 = NULL;
        node_code_2 = NULL;
        node_name_1 = NULL;
        node_name_2 = NULL;

    }

    /* Update node types */
    for (int k = 0; k < node_count; k++) {
        if (nodes[k]->node_type != 1 && nodes[k]->children_count == 0) {
            /* BRANCH node type */
            nodes[k]->node_type = 3;
        }
    }

    while(1) {
        /* Sampling time update */
        s_time += 3;

        /* Get water flow value */
        float value = get_water_flow(node.node_type, 0, s_time);
        char* water_flow = malloc(5*sizeof(char));
        int chars = fmt_float(water_flow, value, 2);
        water_flow[chars] = '\0';
        /* Send source water flow information */
        sprintf(json, "{\"Id\": \"flow\", \"Flow\": \"%s\"}", water_flow);
        puts(json);

        char* tx_list[3] = {"loramac", "tx", json};
        argv = (char**)&tx_list;
        loramac_handler(3, argv);

        free(water_flow);

        /* Sleeping for five seconds */
        xtimer_sleep(5);

        pairs = (char**)realloc(pairs, pair_count*sizeof(char*));

        for (int w = 0; w < pair_count; w++) {
            int length = strlen(pairs_copy[w]);
            pairs[w] = realloc(pairs[w], length + 1);
            strncpy(pairs[w], pairs_copy[w], length + 1);
        }

        for (int i = 1; i < pair_count; i++) {

            /* Separate elements for current pair */
            char *elem = strtok(pairs[i], "-");
            length = strlen(elem);

            /* First node code */
            char *node_code_1 = malloc(length + 1);
            strncpy(node_code_1, elem, length + 1);

            /* First node name */
            char* node_name_1 = malloc(15*sizeof(char));
            strcpy(node_name_1, "st-lrwan1-");
            strcat(node_name_1, node_code_1);

            /* Getting second element of the pair */
            elem = strtok(NULL, "-");
            length = strlen(elem);
        
            /* Second node code */
            char *node_code_2 = malloc(length + 1);
            strncpy(node_code_2, elem, length + 1);

            /* Second node name */
            char* node_name_2 = malloc(15*sizeof(char));
            strcpy(node_name_2, "st-lrwan1-");
            strcat(node_name_2, node_code_2);

            node_t* father_node = malloc(sizeof(node_t));
            node_t* child_node = malloc(sizeof(node_t));

            for (int k = 0; k < node_count - 1; k++) {
                if (strcmp(nodes[k]->node_self, node_name_1) == 0 && strcmp(nodes[k+1]->node_self, node_name_2) == 0) {
                    
                    int length = strlen(nodes[k]->node_self);
                    father_node->node_self = malloc(length + 1);
                    father_node->node_self = nodes[k]->node_self;
                    father_node->node_type = nodes[k]->node_type;
                    father_node->children_count = nodes[k]->children_count;
                    father_node->self_children_position = nodes[k]->self_children_position;

                    length = strlen(nodes[k+1]->node_self);
                    child_node->node_self = malloc(length + 1);
                    child_node->node_self = nodes[k+1]->node_self;
                    child_node->node_type = nodes[k+1]->node_type;
                    child_node->children_count = nodes[k+1]->children_count;
                    child_node->self_children_position = nodes[k]->self_children_position;
                }
            }

            /* Get water flow value */
            float father_water_flow = get_water_flow(father_node->node_type, father_node->self_children_position, s_time);
            float child_water_flow = get_water_flow(child_node->node_type, child_node->self_children_position, s_time);

            float difference = father_water_flow - child_water_flow;

            if (difference > LEAKAGE_THRESHOLD) {
                char* water_leakage = malloc(5*sizeof(char));
                int chars = fmt_float(water_leakage, difference, 2);
                water_leakage[chars] = '\0';
                sprintf(json, "{\"Id\": \"leakage\", \"Child\": \"%s\", \"Father\": \"%s\", \"Leakage\": \"%s\"}", child_node->node_self, father_node->node_self, water_leakage); 
                puts(json);

                char* lx_list[3] = {"loramac", "tx", json};
                argv = (char**)&lx_list;
                loramac_handler(3, argv);

                free(water_leakage);

                xtimer_sleep(5);

            }

        
            /* Free allocated memory */
            free(node_code_1);
            free(node_code_2);
            free(node_name_1);
            free(node_name_2);
            node_code_1 = NULL;
            node_code_2 = NULL;
            node_name_1 = NULL;
            node_name_2 = NULL;

            free(father_node);
            free(child_node);

        }
    }

    return 0;
}

static void _start_listening (void) 
{
    /* listen for lora messages */
    char* list[1] = {"listen_cmd"};
    char** argv = (char**)&list;
    listen_cmd(1, argv);
}

/** Restart listening only in the following conditions (checked in listen_cmd):
 * Alway listening: DUTY_CYCLE = 0 or node.node_type = 1 (CHIEF)
 * I was listening before the message send
*/
static void _restart_listening (void) {
    char* list[2] = {"listen_cmd", "resend"};
    char** argv = (char**)&list;
    listen_cmd(2, argv);
}

static void _sample (sample_t* sample, node_t node, int time) 
{
    /* Count the number of sensors for this board */
    int sensors_number;
    if (node.children_count <= 1) 
        sensors_number = 1;
    else 
        sensors_number = node.children_count;

    /* Check water flow for each sensor and send a message to its children if any */
    sample->water_flow = (float*)malloc(sizeof(float)*node.children_count);
    sample->water_flow_sum = 0.0;
    printf("Sampling: ");
    for (int i = 0; i < sensors_number; i++) {
        /* Sample */
        sample->water_flow[i] = get_water_flow(node.node_type, i, time);
        printf("Sensor %d, value: ", i); print_float(sample->water_flow[i], 2); printf(". ");
        /* Sum */
        sample->water_flow_sum += sample->water_flow[i];
    }
    printf("Total water flow: "); print_float(sample->water_flow_sum, 2); printf("\n\n");
}

static void _send_water_flow_to_children(node_t node, int time) 
{    
    /* Get the value of the water flow */
    sample_t sample;
    _sample(&sample, node, time);

    if (sample.water_flow_sum) {

        if(APP_DEBUG) { printf("Water flow sum: "); print_float(sample.water_flow_sum, 2); printf("\n\n"); }
        int printed_chars;

        /* Convert the time from int to string */
        char str_time[LOGIC_TIME_MAXIMUM_LENGTH];
        sprintf(str_time, "%d", time); 
        /* Convert the water flow from int to char* and split it between children */
        char** str_water_flow = (char**)malloc(sizeof(char*));
        for (int i = 0; i < node.children_count; i++) {
            str_water_flow[i] = (char*)malloc(sizeof(char)*VALUE_MAXIMUM_LENGTH);
            printed_chars = fmt_float(str_water_flow[i], sample.water_flow[i], 2);
            str_water_flow[i][printed_chars] = '\0';
        }

        free(sample.water_flow);            

        char* str_payload = NULL;

        /* Send water flow to children */
        for (int i = 0; i < node.children_count; i++) {
            tx_complete_child = 0;
            str_payload = format_payload(str_water_flow[i], node.node_self, node.node_children[i], "V", str_time);
            char* list[2] = {"send_cmd", str_payload};
            char** argv = (char**)&list;
            send_cmd(2, argv);

            /* Wait for transmission complete */
            while (!tx_complete_child) {
                /* The sendere thread has less priority, so we need to sleep a little bit */
                xtimer_msleep(100);
            }
        }

        if (node.node_type != 1) {
            /* Restart listening */
            _restart_listening();
        }

        free(str_payload);
        /* Free memory */
        for (int i = 0; i < node.children_count; i++) {
            free(str_water_flow[i]);
        }
        free(str_water_flow);
    }
}

void _check_leakage (node_t node, payload_t* payload) {
    /* Get the value of the water flow */
    sample_t sample;
    _sample(&sample, node, atoi(payload->logic_time));
    free(sample.water_flow);
    printf("Current water flow: "); print_float(sample.water_flow_sum, 2); printf(". ");

    /* Compute the difference */
    float difference = atof(payload->value) - sample.water_flow_sum;

    if (difference > LEAKAGE_CONDITION) {
        /* Leakage detected */
        puts("Leakage detected, sending a message to the source");

        int printed_chars;

        /* Convert the differece from float to char* */
        char str_difference[VALUE_MAXIMUM_LENGTH];
        printed_chars = fmt_float(str_difference, difference, 2);
        str_difference[printed_chars] = '\0';

        /* Send a the leakage message to the source */
        char json[128];
        sprintf(json, "{\"Id\": \"leakage\", \"Child\": \"%s\", \"Father\": \"%s\", \"Leakage\": \"%s\"}", node.node_self, node.node_father, str_difference); 
        printf("Sending the leakage message to TTN: ");
        puts(json);

    } else {
        puts("No leakage detected\n");
    }
}

void transmission_complete_clb (void) {
    if (APP_DEBUG) puts("Callback on trasmission complete");
    tx_complete_child = 1;
}

/**
 * @return 1 if the message is sent from the parent, then you need to stop listening, else return 0
*/
int message_received_clb (node_t node, char message[32]) {
    if (APP_DEBUG) puts("Callback invoked, starting message parsing");

    if (strlen(message) > 31) {
        printf("Extraneous message received, message lenght: %d\n", strlen(message));
        return 0;
    }

    /* Message parsing */
    payload_t* payload = get_values(message);
    if (!payload) {
        /* Not a message from our application */
        if (APP_DEBUG) puts("Not a message from our application");
        
        return 0;
    }

    /* Check destination */
    if (strcmp(payload->to, node.node_self) != 0) {
        /* Message not sent to me */
        if (APP_DEBUG) puts("Message not sent to me");

        free_payload(payload);
        return 0;
    }
    
    /* Compute the sender of the message */
    if (strcmp(payload->from, node.node_father) == 0) {
        /* Message sent from the parent */
        printf("Message from the parent received: %s\n", message);

        /* Check leakage */
        _check_leakage(node, payload);

        free_payload(payload);
        return 1;
    }

    return 0;
}

void *_periodic_listening(void *arg) {

    (void)arg;
    xtimer_ticks32_t last_wakeup;
    bool is_last_wakeup = false;

    while (1) {
        _start_listening();

        /* Duty cycle */
        if (!is_last_wakeup) {
            /* set last_wakeup only the first time */
            is_last_wakeup = true;
            last_wakeup = xtimer_now();
        }
        xtimer_periodic_wakeup(&last_wakeup, US_PER_SEC * SIMULATED_DAY);
    }
}

void *_periodic_sending(void *arg) {

    xtimer_ticks32_t last_wakeup;
    bool is_last_wakeup = false;
    /* Starting logic time for the sample generator */
    int time = 6;
    node_t node = *(node_t *)arg;

    while (1) {
        /* Set time for sampling: [0, inf) */
        time = (time + 1);

        _send_water_flow_to_children(node, time);

        /* Duty cycle */
        if (!is_last_wakeup) {
            /* set last_wakeup only the first time */
            is_last_wakeup = true;
            last_wakeup = xtimer_now();
        }
        xtimer_periodic_wakeup(&last_wakeup, US_PER_SEC * LEAKAGE_TEST_PERIOD);
    }
}

int lora_p2p(node_t node) {

    puts("Behavior: lora_p2p\n");

    /* Start listening: periodic if DUTY_CYCLE is setted, else continuous listening */
    if (!node.node_type == 0 && node.node_type != 1) {
        /* Source nodes doesn't receive messages */
        if (DUTY_CYCLE) {
            kernel_pid_t _listen_pid = thread_create(stack_listen, sizeof(stack_listen), THREAD_PRIORITY_MAIN - 1,
                                    THREAD_CREATE_STACKTEST, _periodic_listening, NULL,
                                    "_periodic_listening");
            if (_listen_pid <= KERNEL_PID_UNDEF) {
                puts("Creation of _periodic_listening thread failed");
                return 1;
            }
        } else {
            _start_listening();
        }
    }

    /* Start sending: only if the current node is not a BRANCH */
    if (node.node_type != 3) {
        kernel_pid_t _sending_pid = thread_create(stack_send, sizeof(stack_send), THREAD_PRIORITY_MAIN - 1,
                                THREAD_CREATE_STACKTEST, _periodic_sending, (void *)&node,
                                "_periodic_sending");

        if (_sending_pid <= KERNEL_PID_UNDEF) {
            puts("Creation of _periodic_sending thread failed");
            return 1;
        } 
    }

    return 0;
}

GitHub Repository

Credits

Riccardo Gobbato

Riccardo Gobbato

4 projects • 5 followers
Computer Engineer
pasquale mocerino

pasquale mocerino

3 projects • 3 followers
Simone Scaccia

Simone Scaccia

2 projects • 3 followers
Engineering in Computer Science student

Comments