phpoc_man
Published © GPL3+

MQTT for IoT Devices, Support TLS/SSL, QoS Level: 0, 1, 2

This project describes MQTT library for an IoT Platfrom so-called PHPoC. This library supports MQTT over TLS/SSL and all QoS Level: 0, 1, 2.

BeginnerProtip20 hours1,491
MQTT for IoT Devices, Support TLS/SSL, QoS Level: 0, 1, 2

Things used in this project

Hardware components

PHPoC Blue
PHPoC Blue
We can use PHPoC Black or PHPoC Blue
×1
PHPoC Black
PHPoC Black
We can use PHPoC Black or PHPoC Blue
×1

Software apps and online services

MQTT
MQTT

Story

Read more

Code

Example - Publisher without using SSL

PHP
<?php

if(_SERVER("REQUEST_METHOD"))
    exit; // avoid php execution via http request

include_once "/lib/sn_dns.php";
include_once "/lib/vn_mqtt.php";

//$host_name = "test.mosquitto.org";
//$host_name = "iot.eclipse.org";
//$host_name = "broker.hivemq.com";
//$host_name = "broker.mqttdashboard.com";
$host_name = "[192.168.0.3]";
$port = 1883;

mqtt_setup(0, "PHPoC-MQTT Pub Example", $host_name, $port);

$will = "";
$username = "";
$password = "";
mqtt_connect(true, $will, $username, $password);

$dup_flag = 0;
$qos = 2;// change qos here.
$retain_flag = 0;
$out_topics = array(array("sensors/temperature", 1), array("disasters/tsunami", 2));

$in_topic = "";
$in_content = "";
$is_retain = 0;

$count_msg = 0;

while(1)
{            
    if(mqtt_state() == MQTT_CONNECTED)
    {
        $qos = 0;
        $count_msg++;
        $msg = "Message From PHPoC, count: $count_msg, QoS: $qos! at ". date("Y:M-d-D, H:i:s");
        mqtt_publish($out_topics[0][0], $msg, $dup_flag, $qos, $retain_flag);
        mqtt_publish($out_topics[1][0], $msg, $dup_flag, $qos, $retain_flag);
        sleep(2);
        
        $qos = 1;
        $count_msg++;
        $msg = "Message From PHPoC, count: $count_msg, QoS: $qos! at ". date("Y:M-d-D, H:i:s");
        mqtt_publish($out_topics[0][0], $msg, $dup_flag, $qos, $retain_flag);
        mqtt_publish($out_topics[1][0], $msg, $dup_flag, $qos, $retain_flag);
        sleep(2);
        
        $qos = 2;    
        $count_msg++;
        $msg = "Message From PHPoC, count: $count_msg, QoS: $qos! at ". date("Y:M-d-D, H:i:s");
        mqtt_publish($out_topics[0][0], $msg, $dup_flag, $qos, $retain_flag);
        mqtt_publish($out_topics[1][0], $msg, $dup_flag, $qos, $retain_flag);
        sleep(2);
        
    }
    
    if(mqtt_state() == MQTT_DISCONNECTED)
            while(mqtt_reconnect() == false);
    
    if(mqtt_loop($in_topic, $in_content, $is_retain))
    {
        //TODO , procees the received publish packet here
        if($is_retain == 1)
            echo "<<a stale message\r\n";
        
        echo "<<topic:$in_topic\r\n";
        echo "<<content: $in_content\r\n";
    }
}

//mqtt_disconnect();

?>

Library - ThingSpeak Library for PHPoC

PHP
<?php

//vn_mqtt.php for p40
/**
PHPoC MQTT Client library 

*2016-09-01.
    - Support MQTT Version 3.1 and 3.1.1
    - Document Reference:
        + MQTT Version 3.1:   http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html
        + MQTT Version 3.1.1: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html
    - History:
        + 2016-09-01: Support MQTT Version 3.1 and 3.1.1
    - Testing succesufully with:
        + Mosquitto Broker installed in my computer
        + iot.eclipse.org
        + broker.hivemq.com 
        + test.mosquitto.org 
        + broker.mqttdashboard.com 
        + m11.cloudmqtt.com
        In case clean session is set to false, it does work well with some servers due to sever send a lot of packet continously, PHPoC has the limit of embedded system.
        Clean session false is not recommended to use.
    - QoS Level: 0, 1, 2.
    - Note:        
        + Message delivery retry:  http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#retry
            * This is optional, default is disable. User can enable it by using mqtt_setup() function.
            * If retry option is anable, the max time of retry is 10. User can change this value by changing MQTT_RESEND_MAX_NUM.
            Note: Resend process is performed in a blocking loop, be careful when use this option.

*2017-02-03        
    - Support MQTT over TLS/SSL
    
**/

//Constants

define("MQTT_VERION_3_1",   3);
define("MQTT_VERION_3_1_1", 4);
define("MQTT_PROTOCOL_NAME_3_1",   "MQIsdp");
define("MQTT_PROTOCOL_NAME_3_1_1", "MQTT");

//MQTT state
define("MQTT_DISCONNECTED", 0);
define("MQTT_CONNECTED",    1);
define("MQTT_PINGING",      2);

//MQTT security
define("MQTT_PLAIN",             0);
define("MQTT_SSL",                1);
define("MQTT_WEBSOCKET",        2);
define("MQTT_WEBSOCKET_SSL",    3);

//message type
define("MQTT_CTRL_CONNECT",     0x1);
define("MQTT_CTRL_CONNECTACK",  0x2);
define("MQTT_CTRL_PUBLISH",     0x3);
define("MQTT_CTRL_PUBACK",      0x4);
define("MQTT_CTRL_PUBREC",      0x5);
define("MQTT_CTRL_PUBREL",      0x6);
define("MQTT_CTRL_PUBCOMP",     0x7);
define("MQTT_CTRL_SUBSCRIBE",   0x8);
define("MQTT_CTRL_SUBACK",      0x9);
define("MQTT_CTRL_UNSUBSCRIBE", 0xA);
define("MQTT_CTRL_UNSUBACK",    0xB);
define("MQTT_CTRL_PINGREQ",     0xC);
define("MQTT_CTRL_PINGRESP",    0xD);
define("MQTT_CTRL_DISCONNECT",  0xE);

//quality of service
define("MQTT_QOS_0",  0x0);
define("MQTT_QOS_1",  0x1);
define("MQTT_QOS_2",  0x2);

/*
Mask for header flags.
Header flags is part of fixed message header.
*/
define("MQTT_HEAD_FLAG_RETAIN", 0x01);
define("MQTT_HEAD_FLAG_QOS_1",  0x02);
define("MQTT_HEAD_FLAG_QOS_2",  0x04);
define("MQTT_HEAD_FLAG_DUP",    0x08);

/*
Mask for connect flags.
Connect flags is part of the variable header of a CONNECT message
*/
define("MQTT_CONN_FLAG_CLEAN_SS",    0x02);
define("MQTT_CONN_FLAG_WILL",        0x04);
define("MQTT_CONN_FLAG_WILL_QOS_1",  0x08);
define("MQTT_CONN_FLAG_WILL_QOS_2",  0x10);
define("MQTT_CONN_FLAG_WILL_RETAIN", 0x20);
define("MQTT_CONN_FLAG_PASSWORD",    0x40);
define("MQTT_CONN_FLAG_USERNAME",    0x80);

/*
Keep Alive timer. 
    -Adjust as necessary, in seconds.  Default to 5 minutes.
    -See http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#keep-alive-timer
*/ 
define("MQTT_CONN_KEEPALIVE",  300);

/* 
These values are timeout for wating reponse from broker.
    -Adjust as necessary according to network latency, in milliseconds.
*/
define("MQTT_TIMEOUT_CONNECT_MS",     6000);//between CONNECT and CONNECTACK.
define("MQTT_TIMEOUT_PUBLISH_MS",     500); //between PUBLISH and PUBACK/PUBREC. between PUBREC and PUBREL. between PUBREL and PUBCOMP.
define("MQTT_TIMEOUT_SUBSCRIBE_MS",   500); //between SUBSCRIBE and SUBACK.
define("MQTT_TIMEOUT_UNSUBSCRIBE_MS", 500); //between UNSUBSCRIBE and UNSUBACK.
define("MQTT_TIMEOUT_PING_MS",        500); //between PINGREQ and PINGRESP.

/*
This is maximum number of time to resend the packet if not received the expected message. 
This only makes send when $vn_mqtt_resend is set to true.
Note: user can change this parameter to 0 or set $vn_mqtt_resend to false to disable resend function.
*/
define("MQTT_RESEND_MAX_NUM", 10); 

//Global variables
$vn_mqtt_tcp_id = 0;
$vn_mqtt_tcp_pid = 0; 

$vn_mqtt_state = MQTT_DISCONNECTED;

$vn_mqtt_client_id = "";

$vn_mqtt_broker_hostname = "";
$vn_mqtt_broker_port = 1883;

$vn_mqtt_security = MQTT_PLAIN;

$vn_mqtt_version = MQTT_VERION_3_1;
$vn_mqtt_protocol_name = MQTT_PROTOCOL_NAME_3_1;
$vn_mqtt_alive_start = 0;
$vn_mqtt_msg_id = 1; //Do not use Message ID 0. It is reserved as an invalid Message ID.

/*
To save information to reconnect.
*/
$vn_mqtt_clean_flag = true;
$vn_mqtt_will = "";
$vn_mqtt_username = "";
$vn_mqtt_password = "";

$vn_mqtt_recv_buffer = "";
$vn_mqtt_packet_manager = "";
$vn_mqtt_unack_list = "";

/*
This paramete can be changed at mqtt_setup function.
Note: user can change this parameter to false or set MQTT_RESEND_MAX_NUM to 0 to disable resend function.
*/
$vn_mqtt_resend = true;

//To store subsription list
$vn_mqtt_subs_list = "";

/*
This function is to get value of timer.
*/
function vn_mqtt_get_tick()
{
    while(($pid = pid_open("/mmap/st9", O_NODIE)) == -EBUSY)
        usleep(500);

    if(!pid_ioctl($pid, "get state"))
        pid_ioctl($pid, "start");

    $tick = pid_ioctl($pid, "get count");
    pid_close($pid);

    return $tick;
}

/*
Encode a length of a message.
Parameters:
    -$len: length to be encoded.
Return: The encoded data.
*/
function vn_mqtt_encode_length($length)
{
    $ret = "";
    
    do
    {
        $digit = $length % 128;
        $length = $length >> 7;
        //If there are more digits to encode, set the top bit of this digit
        if($length > 0)
            $digit = ($digit | 0x80);
        
        $ret .= sprintf("%c", $digit);
    }while($length > 0);
    
    return $ret;
}

/*
Decode a length of a message (Remaining Length field).
Parameters:
    -$pkt: message to be decoded.
Return: the length of message( excluding size of fixed header).
*/
function vn_mqtt_decode_length($pkt)
{        
    $multiplier = 1; 
    $value = 0 ;
    $i = 1;
    
    do
    {
        $digit = bin2int($pkt[$i], 0, 1);
        $value += ($digit & 127) * $multiplier; 
        $multiplier *= 128;
        $i++;
    }while (($digit & 128) != 0);

    return $value;
}

/*
Attach two-byte length before a string.
Parameters:
    -$str: string to be encoded.
Return: new string which is attached the length.
*/
function vn_mqtt_encode_string($str)
{
    $len = strlen($str);
    $msb = $len >> 8;
    $lsb = $len & 0xff;
    $ret = sprintf("%c", $msb);
    $ret .= sprintf("%c", $lsb);
    $ret .= $str;
    
    return $ret;
}

/*
Get messsage quality of service.
Parameters:
    -$pkt: message to get QoS.
Return:    QoS of the message. 
*/
function vn_mqtt_get_message_qos($pkt)
{
    $qos = (bin2int($pkt[0], 0, 1) & (MQTT_HEAD_FLAG_QOS_1|MQTT_HEAD_FLAG_QOS_2)) >> 1;
    
    return $qos;
}

/*
Get retain flag of message.
Parameters:
    -$pkt: message to get retain.
Return:    retain status of the message. 
*/
function vn_mqtt_get_message_retain($pkt)
{
    $retain = bin2int($pkt[0], 0, 1) & MQTT_HEAD_FLAG_RETAIN ;
    
    return $retain;
}

/*
Get DUP flag of message.
Parameters:
    -$pkt: message to get DUP flag.
Return:    DUP flag status of the message. 
*/
function vn_mqtt_get_message_dup($pkt)
{
    $dup = (bin2int($pkt[0], 0, 1) & MQTT_HEAD_FLAG_DUP) >> 3 ;
    
    return $dup;
}
    
/*
Get messsage identifier.
Parameters:
    -$pkt: message to get id.
Return:    
    - Identifier number of the message. 
    - 0 if message does not have ID.
*/
function vn_mqtt_get_message_id($pkt)
{
    $msg_type = bin2int($pkt[0], 0, 1) >> 4;
            
    $remain_length  = vn_mqtt_decode_length($pkt);
    $var_head_pos = strlen($pkt) - $remain_length;
    
    $msg_id = 0;
    
    switch($msg_type)
    {
        case MQTT_CTRL_PUBLISH: 
            $qos = (bin2int($pkt[0], 0, 1) & (MQTT_HEAD_FLAG_QOS_1|MQTT_HEAD_FLAG_QOS_2)) >> 1;
            
            if($qos)
            {
                $msb = bin2int($pkt[$var_head_pos], 0, 1);
                $lsb = bin2int($pkt[$var_head_pos + 1], 0, 1);
                $topic_length = ($msb << 8) + $lsb;
                
                $msb_pos = $var_head_pos + 2 + $topic_length;
                
                $msb = bin2int($pkt[$msb_pos], 0, 1);
                $lsb = bin2int($pkt[$msb_pos + 1], 0, 1);
                $msg_id = ($msb << 8) + $lsb;
            }            
            break;
            
        case MQTT_CTRL_PUBACK: 
        case MQTT_CTRL_PUBREC:
        case MQTT_CTRL_PUBREL: 
        case MQTT_CTRL_PUBCOMP:
        case MQTT_CTRL_SUBSCRIBE:
        case MQTT_CTRL_SUBACK:
        case MQTT_CTRL_UNSUBSCRIBE: 
        case MQTT_CTRL_UNSUBACK: 
            $msb = bin2int($pkt[$var_head_pos], 0, 1);
            $lsb = bin2int($pkt[$var_head_pos + 1], 0, 1);
            $msg_id = ($msb << 8) + $lsb;
            break;
            
        default:
            $msg_id = 0;
    }
    
    return $msg_id;
}

/*
Get messsage payload.
Parameters:
    -$pkt: message to get payload.
Return: payload of the message
*/
function vn_mqtt_get_message_payload($pkt)
{
    $msg_type = bin2int($pkt[0], 0, 1) >> 4;
    
    $remain_length  = vn_mqtt_decode_length($pkt);
    $var_head_pos = strlen($pkt) - $remain_length;
    
    //types of message have a payload: CONNECT, SUBSCRIBE, SUBACK, PUBLISH.    
    switch($msg_type)
    {
        case MQTT_CTRL_SUBSCRIBE:
        case MQTT_CTRL_SUBACK: 
            $payload_pos = $var_head_pos + 2; // two bytes of message identifier
            $payload_length = $remain_length -2;
            $payload =  substr($pkt, $payload_pos, $payload_length);
            break;
            
        case MQTT_CTRL_CONNECT:
            //Protocol Name    
            $pointer = $var_head_pos;
            $msb = bin2int($pkt[$pointer++], 0, 1);
            $lsb = bin2int($pkt[$pointer++], 0, 1);
            $length = ($msb << 8) + $lsb;
            $pointer +=$length;
            $pointer += 4; //1 byte version number, 1 byte connect flag,  byte keep-alive-timer.
            $payload_length = strlen($pkt) - $pointer;
            $payload =  substr($pkt, $pointer, $payload_length);
            break;
            
        case MQTT_CTRL_PUBLISH:
            $pointer = $var_head_pos;
            $msb = bin2int($pkt[$pointer++], 0, 1);
            $lsb = bin2int($pkt[$pointer++], 0, 1);
            $topic_length = ($msb << 8) + $lsb;
            $pointer += $topic_length;
            
            $qos = (bin2int($pkt[0], 0, 1) & (MQTT_HEAD_FLAG_QOS_1|MQTT_HEAD_FLAG_QOS_2)) >> 1;    
            
            if($qos)
                $pointer += 2;// message identifier.
            
            $payload_length = strlen($pkt) - $pointer;
            $payload =  substr($pkt, $pointer, $payload_length);
            break;
            
        default:
            $payload = "";
    }
    
    return $payload;
}

/*
Get topic of publish packet.
Parameters:
    -$pkt: publish packet .
Return: topic
*/
function vn_mqtt_get_topic($pkt)
{
    $topic = "";
    $msg_type = bin2int($pkt[0], 0, 1) >> 4;
    
    if($msg_type != MQTT_CTRL_PUBLISH)
    {
        //echo "mqtt: not publish message type";
        return $topic;
    }
    
    $remain_length  = vn_mqtt_decode_length($pkt);
    $var_head_pos = strlen($pkt) - $remain_length;
    $pointer = $var_head_pos;
    $msb = bin2int($pkt[$pointer++], 0, 1);
    $lsb = bin2int($pkt[$pointer++], 0, 1);
    $topic_length = ($msb << 8) + $lsb;
    
    $topic =  substr($pkt, $pointer, $topic_length);
    
    return $topic;
}

/*
Get content of publish packet.
Parameters:
    -$pkt: publish packet .
Return: content
*/
function vn_mqtt_get_content($pkt)
{
    return vn_mqtt_get_message_payload($pkt);
}

/*
Find packet in receiving buffer by message type. 
Parameters: 
    - $msg_type: type of message to find.
Return:
    - index of  the first packet in buffer if existed.
    - -1: if not existed.
*/
function vn_mqtt_find_packet($msg_type)
{
    global $vn_mqtt_packet_manager;
    
    if($vn_mqtt_packet_manager != "")
    {
        $infos = explode(",", $vn_mqtt_packet_manager);
        $count = count($infos);
        for($i = 0; $i < $count; $i += 2)
        {
            if($msg_type == (int)$infos[$i])
                return ($i/2);
        }
    }
    
    return -1;
}

/*
Get a packet in receiving buffer by index. 
Parameters: 
    - $pkt_id: index of packet in buffer.
    - $is_delete: option to delete packet from buffer after getting
Return:
    - a packet if existed.
    - an empty string: if not existed.
*/
function vn_mqtt_get_packet($pkt_id, $is_delete = true)
{
    global $vn_mqtt_recv_buffer, $vn_mqtt_packet_manager;
    
    $pkt = "";
    
    if($vn_mqtt_packet_manager != "")
    {
        $infos = explode(",", $vn_mqtt_packet_manager);
        $count = count($infos);    
        $pkt_count = $count/2;
        
        if ($pkt_id < $pkt_count)
        {
            $pkt_offset = 0;
            
            for($i = 1; $i < ($pkt_id*2); $i += 2)
            {
                $pkt_offset += (int)$infos[$i];
            }
            
            $pkt_len = (int)$infos[$i];

            $pkt = substr($vn_mqtt_recv_buffer, $pkt_offset, $pkt_len);
            
            if($is_delete)
            {
                //delete from buffer.
                $vn_mqtt_recv_buffer = substr_replace($vn_mqtt_recv_buffer, "", $pkt_offset, $pkt_len);
                
                //update buffer manager.
                $vn_mqtt_packet_manager = "";
                
                for($i = 0; $i < $pkt_count; $i++)
                {
                    if($i != $pkt_id)
                    {
                        $pnt = 2*$i;
                        $pkt_type = $infos[$pnt];
                        $pkt_lengh = $infos[$pnt+1];
                        
                        $vn_mqtt_packet_manager .= "$pkt_type,$pkt_lengh,";
                    }
                }
                
                $vn_mqtt_packet_manager = rtrim($vn_mqtt_packet_manager, ",");
            }
        }
        //else
            //echo "mqtt: invalid packet id\r\n";
    }
    //else
        //echo "mqtt: no packet in buffer now\r\n";
    
    return $pkt; 
}

/*
For debugging buffer.
*/
function vn_mqtt_show_packet_list()
{
    global $vn_mqtt_recv_buffer, $vn_mqtt_packet_manager;
    
    $pkt = "";
    $msg_id = 0;
    
    if($vn_mqtt_packet_manager != "")
    {
        $infos = explode(",", $vn_mqtt_packet_manager);
        $count = count($infos);
        $pkt_count = $count/2;
        
        for($i = 0; $i < $count; $i += 2)
        {
            $pkt_id = (int)$infos[$i];
            $pkt = vn_mqtt_get_packet($i/2, false);
            
            if($pkt !== "")
                $msg_id = vn_mqtt_get_message_id($pkt);
            
            //echo "mqtt: packet $pkt_id in buffer with message id: $msg_id\r\n";
        }
    }
    //else
        //echo "mqtt: no packet in buffer now\r\n";
    
    return $pkt; 
}

/*
Delete a packet in receiving buffer by index. 
Parameters: 
    - $pkt_id: index of packet in buffer.
Return:
    - true on success.
    - false otherwise.
*/
function vn_mqtt_delete_packet($pkt_id)
{
    global $vn_mqtt_recv_buffer, $vn_mqtt_packet_manager;    
    
    if($vn_mqtt_packet_manager != "")
    {
        $infos = explode(",", $vn_mqtt_packet_manager);
        $count = count($infos);    
        $pkt_count = $count/2;
        
        if ($pkt_id < $pkt_count)
        {            
            
            $pkt_offset = 0;
            
            for($i = 1; $i < ($pkt_id*2); $i += 2)
            {
                $pkt_offset += (int)$infos[$i];
            }
            
            $pkt_len = (int)$infos[$i];
            
            //delete from buffer.
            $vn_mqtt_recv_buffer = substr_replace($vn_mqtt_recv_buffer, "", $pkt_offset, $pkt_len);
            
            //update buffer manager.
            $vn_mqtt_packet_manager = "";
            
            for($i = 0; $i < $pkt_count; $i++)
            {
                if($i != $pkt_id)
                {
                    $pnt = 2*$i;
                    $pkt_type = $infos[$pnt];
                    $pkt_lengh = $infos[$pnt+1];
                    
                    $vn_mqtt_packet_manager .= "$pkt_type,$pkt_lengh,";
                }
            }
            
            $vn_mqtt_packet_manager = rtrim($vn_mqtt_packet_manager, ",");
            return true;
        }
        //else
            //echo "mqtt: invalid packet id\r\n";
    }
    //else
        //echo "mqtt: no packet in buffer now\r\n";
    
    return false; 
}

/*
Check whether incomming packets are available.
Parameters: None
Return:  a number of packets available.
*/
function vn_mqtt_packet_available()
{
    global $vn_mqtt_tcp_pid, $vn_mqtt_tcp_id; 
    global $vn_mqtt_security;
    global $vn_mqtt_state;
    global $vn_mqtt_recv_buffer, $vn_mqtt_packet_manager;
    
    if(!$vn_mqtt_tcp_pid)
        exit("mqtt: tcp$vn_mqtt_tcp_id not initialized\r\n");
    
    $rbuf = "";
    $pkt_count = 0;
    $infos = array();
    $count = 0;
    
    if($vn_mqtt_packet_manager != "")
    {
        $infos = explode(",", $vn_mqtt_packet_manager);
        $count = count($infos);
        $pkt_count = $count/2;
    }
    
    switch($vn_mqtt_security)
    {
        case MQTT_PLAIN:
            if(pid_ioctl($vn_mqtt_tcp_pid, "get state") != TCP_CONNECTED)
            {
                $vn_mqtt_state = MQTT_DISCONNECTED;
                return -2;
            }
            
            break;
            
        case MQTT_SSL:
        
            if(pid_ioctl($vn_mqtt_tcp_pid, "get state") != SSL_CONNECTED)
            {
                $vn_mqtt_state = MQTT_DISCONNECTED;
                return -2;
            }
            
            break;
        
    }
    
    if(pid_ioctl($vn_mqtt_tcp_pid, "get rxlen"))
    {
        $max_len = MAX_STRING_LEN - strlen($vn_mqtt_recv_buffer);
        
        if($max_len > 10)
            $max_len = 10;
        
        pid_recv($vn_mqtt_tcp_pid, $rbuf, $max_len);
        
        //update buffer
        $vn_mqtt_recv_buffer .= $rbuf;
        
        $buf_len = strlen($vn_mqtt_recv_buffer);
        
        $pkt_offset = 0;
        
        for($i = 1; $i < $count; $i += 2)
        {
            $pkt_offset += (int)$infos[$i];
        }    
            
        if($pkt_offset > $buf_len)
            exit("mqtt: error on memory management");
        
        //update new packet.
        while(1)
        {
            if($buf_len >= ($pkt_offset + 2)) // miminum packet length is 2;
            {
                $pnt = $pkt_offset; //pointer
                
                $pkt_type = bin2int($vn_mqtt_recv_buffer[$pnt++], 0, 1) >> 4;
                
                $multiplier = 1; 
                $value = 0; //the remaining length
                
                do
                {
                    $digit = bin2int($vn_mqtt_recv_buffer[$pnt++], 0, 1);
                    $value += ($digit & 127) * $multiplier; 
                    $multiplier *= 128;
                    
                }while (($digit & 128) && ($pnt < $buf_len));

                if(!($digit & 128) && ( ($pnt + $value) <= $buf_len))
                {
                    //update $vn_mqtt_packet_manager
                    $pkt_lengh = $pnt + $value - $pkt_offset;
                    
                    if($vn_mqtt_packet_manager == "")
                        $vn_mqtt_packet_manager = "$pkt_type,$pkt_lengh";
                    else
                        $vn_mqtt_packet_manager .= ",$pkt_type,$pkt_lengh";
                    
                    $pkt_offset = $pnt + $value;
                    $pkt_count++;
                    continue;
                }
            }
            
            break;
        }
    }
    
    return $pkt_count;
}

/*
Check whether a PUBLISH packet is acknowledged or not. 
Parameters: 
    - $msg_id: message identifier.
Return:
    - true if packet is unacknowledged.
    - false otherwise.
*/
function vn_mqtt_is_unack($msg_id)
{
    global $vn_mqtt_unack_list;    
    
    if($vn_mqtt_unack_list != "")
    {
        $infos = explode(",", $vn_mqtt_unack_list);
        $count = count($infos);    
        
        for($i = 0; $i < $count; $i++)
        {
            if($msg_id == (int)$infos[$i])
                return true;
        }
    }
    
    return false;
}

/*
Remove the message identifier of a PUBLISH packet from unacknowledged list if existed. 
Parameters: 
    - $msg_id: message identifier.
Return: none.
*/
function vn_mqtt_remove_msg_id($msg_id)
{
    global $vn_mqtt_unack_list;    
    
    if($vn_mqtt_unack_list != "")
    {
        $infos = explode(",", $vn_mqtt_unack_list);
        $count = count($infos);    
        
        $vn_mqtt_unack_list = "";
        
        for($i = 0; $i < $count; $i++)
        {
            $id = (int)$infos[$i];
            
            if($msg_id != $id)
                $vn_mqtt_unack_list .= "$id,";
        }
        
        $vn_mqtt_unack_list = rtrim($vn_mqtt_unack_list, ","); // remove the last comma
    }        
}

/*
Create a connect packet.
Parameters:
    - $clean_flag: Clean Session flag. Default: true.
    - $will: 
        + if set to "", the will flag is unset.
        + if set to an array($will_qos, $will_retain, $will_topic, $will_message) which contains Will QoS, Will Retain flag, Will Topic and Will Message respectively,
          the will flag is set.
        + Default: "".
    - $username: 
        + if set to "", the username flag is unset.
        + otherwise, the username flag is set and username is $username.
        + Default: "".
    - $password: 
        + if set to "", the password flag is unset.
        + otherwise, the password flag is set and password is $password.    
        + Default: "".
Return: The created packet.
*/
function vn_mqtt_create_connect_packet($clean_flag = true, $will = "", $username = "", $password = "")
{
    global $vn_mqtt_client_id;
    global $vn_mqtt_version, $vn_mqtt_protocol_name;
    
    $msg_type = MQTT_CTRL_CONNECT;
    $will_flag = false;
    
    if(is_array($will))
    {
        $will_flag = true;
        $will_qos = $will[0];
        $will_retain = $will[1];
        $will_topic = $will[2];
        $will_message = $will[3];
    }
    
    //Variable header
    $vari_header = vn_mqtt_encode_string($vn_mqtt_protocol_name);//Protocol name        
    $vari_header .= sprintf("%c", $vn_mqtt_version);//Protocol Version Number
    
    $byte10 = 0;
    
    if($clean_flag) 
        $byte10 |= MQTT_CONN_FLAG_CLEAN_SS;
    
    if($will_flag) 
    {
        $byte10 |= MQTT_CONN_FLAG_WILL;//Will Flag
        $byte10 |= $will_qos << 3;//Will QoS
        
        if($will_retain) 
            $byte10 |= MQTT_CONN_FLAG_WILL_RETAIN;//Will Retain
    }
    
    if($username !== "")
    {
        $byte10 |= MQTT_CONN_FLAG_USERNAME;//User Name Flag
        
        if($password !== "") 
            $byte10 |= MQTT_CONN_FLAG_PASSWORD;//Password Flag
    }
    
    $vari_header .= sprintf("%c", $byte10); //Connect Flags
    
    $vari_header .= sprintf("%c", MQTT_CONN_KEEPALIVE >> 8); 
    $vari_header .= sprintf("%c", MQTT_CONN_KEEPALIVE & 0xff);
    
    //Payload
    $payload = vn_mqtt_encode_string($vn_mqtt_client_id);//Client Identifier
    
    if($will_flag)
    {
        $payload .= vn_mqtt_encode_string($will_topic);//Will Topic
        $payload .= vn_mqtt_encode_string($will_message);//Will Message
    }
    
    if($username !== "")
    {
        $payload .= vn_mqtt_encode_string($username);//User Name 
        
        if($password !== "") 
            $payload .= vn_mqtt_encode_string($password);//Password 
    }
    
    //Fixed Header    
    //The DUP, QoS, and RETAIN flags are not used in the CONNECT message.
    $header = sprintf("%c", $msg_type << 4);
    $remain_length = strlen($vari_header) + strlen($payload);
    $header .= vn_mqtt_encode_length($remain_length);
    
    $pkt = $header.$vari_header.$payload;
    
    return $pkt;
}

/*
Create a publish message
Parameters:
    - $topic: name of a topic. This must not contain Topic wildcard characters.
    - $msg: a message to be publish.
    - $msg_id: message identifier in case of qos > 0.
    - $dup_flag: dup flag. This value should be set to 0.
    - $qos: quality of service of message. valid from 0 to 2. 
            If it is set over 2, it will be downgraded to 2. 
            It is is set lower than 0, it will be upgraded to 0.
            Default = 0.
    - $retain_flag: $retain flag. Default = 0.
Return: The created packet.
*/
function vn_mqtt_create_pulish_packet(&$topic, &$msg, $msg_id = 0, $dup_flag = 0, $qos = 0, $retain_flag = 0)
{            
    $msg_type = MQTT_CTRL_PUBLISH;
                 
    //Variable header
    $vari_header = vn_mqtt_encode_string($topic);//Topic name
    
    if($qos)
    {
        $vari_header .= sprintf("%c", $msg_id >> 8);
        $vari_header .= sprintf("%c", $msg_id & 0xff);
    }
    
    //Fixed Header        
    $byte1 = ($msg_type<<4) | ($dup_flag<<3) | ($qos<<1) | $retain_flag;
    $header = sprintf("%c", $byte1);
    $remain_length = strlen($vari_header) + strlen($msg);
    $header .= vn_mqtt_encode_length($remain_length);
    
    $pkt = $header.$vari_header.$msg;
    
    return $pkt;
}

/*
The common function to create packets for: 
PUBACK, PUBREC, PUBREL, PUBCOMP.
*/
function vn_mqtt_create_common_pub($msg_type, $msg_id, $dup_flag, $qos, $retain_flag)
{
    //Fixed Header        
    $byte1 = ($msg_type<<4) | ($dup_flag<<3) | ($qos<<1) | $retain_flag;
    $pkt = sprintf("%c", $byte1);
    $pkt .= sprintf("%c", 2);
    
    //Variable header    
    $pkt .= sprintf("%c", $msg_id >> 8);
    $pkt .= sprintf("%c", $msg_id & 0xff);
        
    return $pkt;
}

/*
Create publish acknowledgment packet.
Parameters: $msg_id: message identifier.
Return: The created packet.
*/
function vn_mqtt_create_puback_packet($msg_id)
{            
    $msg_type = MQTT_CTRL_PUBACK;
    $dup_flag = 0; $qos = 0; $retain_flag = 0;
    
    return vn_mqtt_create_common_pub($msg_type, $msg_id, $dup_flag, $qos, $retain_flag);
}

/*
Create publish received packet.
Parameters: $msg_id: message identifier.
Return: The created packet.
*/
function vn_mqtt_create_pubrec_packet($msg_id)
{
    $msg_type = MQTT_CTRL_PUBREC;
    $dup_flag = 0; $qos = 0; $retain_flag = 0;
    
    return vn_mqtt_create_common_pub($msg_type, $msg_id, $dup_flag, $qos, $retain_flag);
}

/*
Create publish release packet.
Parameters: $msg_id: message identifier.
Return: The created packet.
*/
function vn_mqtt_create_pubrel_packet($msg_id)
{
    $msg_type = MQTT_CTRL_PUBREL;
    $dup_flag = 0; $qos = 1; $retain_flag = 0;
    
    return vn_mqtt_create_common_pub($msg_type, $msg_id, $dup_flag, $qos, $retain_flag);
}

/*
Create publish complete packet.
Parameters: $msg_id: message identifier.
Return: The created packet.
*/
function vn_mqtt_create_pubcomp_packet($msg_id)
{
    $msg_type = MQTT_CTRL_PUBCOMP;
    $dup_flag = 0; $qos = 0; $retain_flag = 0;
    
    return vn_mqtt_create_common_pub($msg_type, $msg_id, $dup_flag, $qos, $retain_flag);
}

/*
Create a Subscribe packet.
Parameters: 
    - $topics: an two-dimensional array contains list of array which store topic name and QoS. 
      Example: array( array("topic1_name", topic1_qos), array("topic2_name", topic2_qos)). 
      In case there is ony one topic, $topics can be set as array("topic1_name", topic1_qos).
...

This file has been truncated, please download it to see its full contents.

Example - Subscriber without using SSL

PHP
<?php

if(_SERVER("REQUEST_METHOD"))
    exit; // avoid php execution via http request

include_once "/lib/sn_dns.php";
include_once "/lib/vn_mqtt.php";

//$host_name = "test.mosquitto.org";
//$host_name = "iot.eclipse.org";
//$host_name = "broker.hivemq.com";
//$host_name = "broker.mqttdashboard.com";
$host_name = "[192.168.0.3]";
$port = 1883;

mqtt_setup(0, "PHPoC-MQTT Sub Example",  $host_name, $port); 

/*
$will_qos = 2;
$will_retain = true;
$will_topic = "disaster/tsunami";
$will_message = "Goodbye forever at ". date("Y:M-d-D, H:i:s");
$will = array($will_qos, $will_retain, $will_topic, $will_message);
*/
$will = "";
$username = "";
$password = "";

mqtt_connect(true, $will, $username, $password);

$out_topics = array(array("sensors/temperature", 1), array("disaster/tsunami", 2));

if(mqtt_state() == MQTT_CONNECTED)
    mqtt_subscribe($out_topics);

$in_topic = "";
$in_content = "";
$is_retain = 0;

while(1)
{    
    if(mqtt_state() == MQTT_DISCONNECTED)
        while(mqtt_reconnect() == false);
    
    if(mqtt_loop($in_topic, $in_content, $is_retain))
    {
        //TODO , procees the received publish packet here
        if($is_retain == 1)
            echo "<<a stale message\r\n";
        
        echo "<<topic:$in_topic\r\n";
        echo "<<content: $in_content\r\n";
    }
}
//mqtt_unsubscribe("sensors/temperature");
    
mqtt_disconnect();

?>

Example - Publisher using SSL

PHP
<?php

if(_SERVER("REQUEST_METHOD"))
    exit; // avoid php execution via http request

include_once "/lib/sn_dns.php";
include_once "/lib/vn_mqtt.php";

$host_name = "iot.eclipse.org";
//$host_name = "[192.168.0.3]";
$port = 8883;

mqtt_ssl_setup(0, "PHPoC-MQTT Pub Example", $host_name, $port);

$will = "";
$username = "";
$password = "";
mqtt_connect(true, $will, $username, $password);

$dup_flag = 0;
$qos = 2;// change qos here.
$retain_flag = 0;
$out_topics = array(array("sensors/temperature", 1), array("disasters/tsunami", 2));

$in_topic = "";
$in_content = "";
$is_retain = 0;

$count_msg = 0;

while(1)
{            
    if(mqtt_state() == MQTT_CONNECTED)
    {
        $qos = 0;
        $count_msg++;
        $msg = "Message From PHPoC, count: $count_msg, QoS: $qos! at ". date("Y:M-d-D, H:i:s");
        mqtt_publish($out_topics[0][0], $msg, $dup_flag, $qos, $retain_flag);
        mqtt_publish($out_topics[1][0], $msg, $dup_flag, $qos, $retain_flag);
        sleep(2);
        
        $qos = 1;
        $count_msg++;
        $msg = "Message From PHPoC, count: $count_msg, QoS: $qos! at ". date("Y:M-d-D, H:i:s");
        mqtt_publish($out_topics[0][0], $msg, $dup_flag, $qos, $retain_flag);
        mqtt_publish($out_topics[1][0], $msg, $dup_flag, $qos, $retain_flag);
        sleep(2);
        
        $qos = 2;    
        $count_msg++;
        $msg = "Message From PHPoC, count: $count_msg, QoS: $qos! at ". date("Y:M-d-D, H:i:s");
        mqtt_publish($out_topics[0][0], $msg, $dup_flag, $qos, $retain_flag);
        mqtt_publish($out_topics[1][0], $msg, $dup_flag, $qos, $retain_flag);
        sleep(2);
        
    }
    
    if(mqtt_state() == MQTT_DISCONNECTED)
            while(mqtt_reconnect() == false);
    
    if(mqtt_loop($in_topic, $in_content, $is_retain))
    {
        //TODO , procees the received publish packet here
        if($is_retain == 1)
            echo "<<a stale message\r\n";
        
        echo "<<topic:$in_topic\r\n";
        echo "<<content: $in_content\r\n";
    }
}

//mqtt_disconnect();

?>

Example - Subscriber using SSL

PHP
<?php

if(_SERVER("REQUEST_METHOD"))
    exit; // avoid php execution via http request

include_once "/lib/sn_dns.php";
include_once "/lib/vn_mqtt.php";

$host_name = "iot.eclipse.org";
//$host_name = "[192.168.0.3]";
$port = 8883;

mqtt_ssl_setup(0, "PHPoC-MQTT Sub Example",  $host_name, $port); 

/*
$will_qos = 2;
$will_retain = true;
$will_topic = "disaster/tsunami";
$will_message = "Goodbye forever at ". date("Y:M-d-D, H:i:s");
$will = array($will_qos, $will_retain, $will_topic, $will_message);
*/
$will = "";
$username = "";
$password = "";

mqtt_connect(true, $will, $username, $password);

$out_topics = array(array("sensors/temperature", 1), array("disaster/tsunami", 2));

if(mqtt_state() == MQTT_CONNECTED)
    mqtt_subscribe($out_topics);

$in_topic = "";
$in_content = "";
$is_retain = 0;

while(1)
{    
    if(mqtt_state() == MQTT_DISCONNECTED)
        while(mqtt_reconnect() == false);
    
    if(mqtt_loop($in_topic, $in_content, $is_retain))
    {
        //TODO , procees the received publish packet here
        if($is_retain == 1)
            echo "<<a stale message\r\n";
        
        echo "<<topic:$in_topic\r\n";
        echo "<<content: $in_content\r\n";
    }
}
//mqtt_unsubscribe("sensors/temperature");
    
mqtt_disconnect();

?>

System Setting file (phpoc.ini)

PHP
This file is needed when using SSL to increase the TCP bufffer
tcp0_txbuf_size = 4096 ; SSL/TCP send buffer
tcp0_rxbuf_size = 4096 ; SSL/TCP send buffer
ssl0_rxbuf_size = 4096

Credits

phpoc_man

phpoc_man

62 projects • 405 followers

Comments