<?php
//vn_mqtt.php for p40
/**
PHPoC MQTT Client library
*2016-09-01.
- Support MQTT Version 3.1 and 3.1.1
- Document Reference:
+ MQTT Version 3.1: http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html
+ MQTT Version 3.1.1: http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/mqtt-v3.1.1.html
- History:
+ 2016-09-01: Support MQTT Version 3.1 and 3.1.1
- Testing succesufully with:
+ Mosquitto Broker installed in my computer
+ iot.eclipse.org
+ broker.hivemq.com
+ test.mosquitto.org
+ broker.mqttdashboard.com
+ m11.cloudmqtt.com
In case clean session is set to false, it does work well with some servers due to sever send a lot of packet continously, PHPoC has the limit of embedded system.
Clean session false is not recommended to use.
- QoS Level: 0, 1, 2.
- Note:
+ Message delivery retry: http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#retry
* This is optional, default is disable. User can enable it by using mqtt_setup() function.
* If retry option is anable, the max time of retry is 10. User can change this value by changing MQTT_RESEND_MAX_NUM.
Note: Resend process is performed in a blocking loop, be careful when use this option.
*2017-02-03
- Support MQTT over TLS/SSL
**/
//Constants
define("MQTT_VERION_3_1", 3);
define("MQTT_VERION_3_1_1", 4);
define("MQTT_PROTOCOL_NAME_3_1", "MQIsdp");
define("MQTT_PROTOCOL_NAME_3_1_1", "MQTT");
//MQTT state
define("MQTT_DISCONNECTED", 0);
define("MQTT_CONNECTED", 1);
define("MQTT_PINGING", 2);
//MQTT security
define("MQTT_PLAIN", 0);
define("MQTT_SSL", 1);
define("MQTT_WEBSOCKET", 2);
define("MQTT_WEBSOCKET_SSL", 3);
//message type
define("MQTT_CTRL_CONNECT", 0x1);
define("MQTT_CTRL_CONNECTACK", 0x2);
define("MQTT_CTRL_PUBLISH", 0x3);
define("MQTT_CTRL_PUBACK", 0x4);
define("MQTT_CTRL_PUBREC", 0x5);
define("MQTT_CTRL_PUBREL", 0x6);
define("MQTT_CTRL_PUBCOMP", 0x7);
define("MQTT_CTRL_SUBSCRIBE", 0x8);
define("MQTT_CTRL_SUBACK", 0x9);
define("MQTT_CTRL_UNSUBSCRIBE", 0xA);
define("MQTT_CTRL_UNSUBACK", 0xB);
define("MQTT_CTRL_PINGREQ", 0xC);
define("MQTT_CTRL_PINGRESP", 0xD);
define("MQTT_CTRL_DISCONNECT", 0xE);
//quality of service
define("MQTT_QOS_0", 0x0);
define("MQTT_QOS_1", 0x1);
define("MQTT_QOS_2", 0x2);
/*
Mask for header flags.
Header flags is part of fixed message header.
*/
define("MQTT_HEAD_FLAG_RETAIN", 0x01);
define("MQTT_HEAD_FLAG_QOS_1", 0x02);
define("MQTT_HEAD_FLAG_QOS_2", 0x04);
define("MQTT_HEAD_FLAG_DUP", 0x08);
/*
Mask for connect flags.
Connect flags is part of the variable header of a CONNECT message
*/
define("MQTT_CONN_FLAG_CLEAN_SS", 0x02);
define("MQTT_CONN_FLAG_WILL", 0x04);
define("MQTT_CONN_FLAG_WILL_QOS_1", 0x08);
define("MQTT_CONN_FLAG_WILL_QOS_2", 0x10);
define("MQTT_CONN_FLAG_WILL_RETAIN", 0x20);
define("MQTT_CONN_FLAG_PASSWORD", 0x40);
define("MQTT_CONN_FLAG_USERNAME", 0x80);
/*
Keep Alive timer.
-Adjust as necessary, in seconds. Default to 5 minutes.
-See http://public.dhe.ibm.com/software/dw/webservices/ws-mqtt/mqtt-v3r1.html#keep-alive-timer
*/
define("MQTT_CONN_KEEPALIVE", 300);
/*
These values are timeout for wating reponse from broker.
-Adjust as necessary according to network latency, in milliseconds.
*/
define("MQTT_TIMEOUT_CONNECT_MS", 6000);//between CONNECT and CONNECTACK.
define("MQTT_TIMEOUT_PUBLISH_MS", 500); //between PUBLISH and PUBACK/PUBREC. between PUBREC and PUBREL. between PUBREL and PUBCOMP.
define("MQTT_TIMEOUT_SUBSCRIBE_MS", 500); //between SUBSCRIBE and SUBACK.
define("MQTT_TIMEOUT_UNSUBSCRIBE_MS", 500); //between UNSUBSCRIBE and UNSUBACK.
define("MQTT_TIMEOUT_PING_MS", 500); //between PINGREQ and PINGRESP.
/*
This is maximum number of time to resend the packet if not received the expected message.
This only makes send when $vn_mqtt_resend is set to true.
Note: user can change this parameter to 0 or set $vn_mqtt_resend to false to disable resend function.
*/
define("MQTT_RESEND_MAX_NUM", 10);
//Global variables
$vn_mqtt_tcp_id = 0;
$vn_mqtt_tcp_pid = 0;
$vn_mqtt_state = MQTT_DISCONNECTED;
$vn_mqtt_client_id = "";
$vn_mqtt_broker_hostname = "";
$vn_mqtt_broker_port = 1883;
$vn_mqtt_security = MQTT_PLAIN;
$vn_mqtt_version = MQTT_VERION_3_1;
$vn_mqtt_protocol_name = MQTT_PROTOCOL_NAME_3_1;
$vn_mqtt_alive_start = 0;
$vn_mqtt_msg_id = 1; //Do not use Message ID 0. It is reserved as an invalid Message ID.
/*
To save information to reconnect.
*/
$vn_mqtt_clean_flag = true;
$vn_mqtt_will = "";
$vn_mqtt_username = "";
$vn_mqtt_password = "";
$vn_mqtt_recv_buffer = "";
$vn_mqtt_packet_manager = "";
$vn_mqtt_unack_list = "";
/*
This paramete can be changed at mqtt_setup function.
Note: user can change this parameter to false or set MQTT_RESEND_MAX_NUM to 0 to disable resend function.
*/
$vn_mqtt_resend = true;
//To store subsription list
$vn_mqtt_subs_list = "";
/*
This function is to get value of timer.
*/
function vn_mqtt_get_tick()
{
while(($pid = pid_open("/mmap/st9", O_NODIE)) == -EBUSY)
usleep(500);
if(!pid_ioctl($pid, "get state"))
pid_ioctl($pid, "start");
$tick = pid_ioctl($pid, "get count");
pid_close($pid);
return $tick;
}
/*
Encode a length of a message.
Parameters:
-$len: length to be encoded.
Return: The encoded data.
*/
function vn_mqtt_encode_length($length)
{
$ret = "";
do
{
$digit = $length % 128;
$length = $length >> 7;
//If there are more digits to encode, set the top bit of this digit
if($length > 0)
$digit = ($digit | 0x80);
$ret .= sprintf("%c", $digit);
}while($length > 0);
return $ret;
}
/*
Decode a length of a message (Remaining Length field).
Parameters:
-$pkt: message to be decoded.
Return: the length of message( excluding size of fixed header).
*/
function vn_mqtt_decode_length($pkt)
{
$multiplier = 1;
$value = 0 ;
$i = 1;
do
{
$digit = bin2int($pkt[$i], 0, 1);
$value += ($digit & 127) * $multiplier;
$multiplier *= 128;
$i++;
}while (($digit & 128) != 0);
return $value;
}
/*
Attach two-byte length before a string.
Parameters:
-$str: string to be encoded.
Return: new string which is attached the length.
*/
function vn_mqtt_encode_string($str)
{
$len = strlen($str);
$msb = $len >> 8;
$lsb = $len & 0xff;
$ret = sprintf("%c", $msb);
$ret .= sprintf("%c", $lsb);
$ret .= $str;
return $ret;
}
/*
Get messsage quality of service.
Parameters:
-$pkt: message to get QoS.
Return: QoS of the message.
*/
function vn_mqtt_get_message_qos($pkt)
{
$qos = (bin2int($pkt[0], 0, 1) & (MQTT_HEAD_FLAG_QOS_1|MQTT_HEAD_FLAG_QOS_2)) >> 1;
return $qos;
}
/*
Get retain flag of message.
Parameters:
-$pkt: message to get retain.
Return: retain status of the message.
*/
function vn_mqtt_get_message_retain($pkt)
{
$retain = bin2int($pkt[0], 0, 1) & MQTT_HEAD_FLAG_RETAIN ;
return $retain;
}
/*
Get DUP flag of message.
Parameters:
-$pkt: message to get DUP flag.
Return: DUP flag status of the message.
*/
function vn_mqtt_get_message_dup($pkt)
{
$dup = (bin2int($pkt[0], 0, 1) & MQTT_HEAD_FLAG_DUP) >> 3 ;
return $dup;
}
/*
Get messsage identifier.
Parameters:
-$pkt: message to get id.
Return:
- Identifier number of the message.
- 0 if message does not have ID.
*/
function vn_mqtt_get_message_id($pkt)
{
$msg_type = bin2int($pkt[0], 0, 1) >> 4;
$remain_length = vn_mqtt_decode_length($pkt);
$var_head_pos = strlen($pkt) - $remain_length;
$msg_id = 0;
switch($msg_type)
{
case MQTT_CTRL_PUBLISH:
$qos = (bin2int($pkt[0], 0, 1) & (MQTT_HEAD_FLAG_QOS_1|MQTT_HEAD_FLAG_QOS_2)) >> 1;
if($qos)
{
$msb = bin2int($pkt[$var_head_pos], 0, 1);
$lsb = bin2int($pkt[$var_head_pos + 1], 0, 1);
$topic_length = ($msb << 8) + $lsb;
$msb_pos = $var_head_pos + 2 + $topic_length;
$msb = bin2int($pkt[$msb_pos], 0, 1);
$lsb = bin2int($pkt[$msb_pos + 1], 0, 1);
$msg_id = ($msb << 8) + $lsb;
}
break;
case MQTT_CTRL_PUBACK:
case MQTT_CTRL_PUBREC:
case MQTT_CTRL_PUBREL:
case MQTT_CTRL_PUBCOMP:
case MQTT_CTRL_SUBSCRIBE:
case MQTT_CTRL_SUBACK:
case MQTT_CTRL_UNSUBSCRIBE:
case MQTT_CTRL_UNSUBACK:
$msb = bin2int($pkt[$var_head_pos], 0, 1);
$lsb = bin2int($pkt[$var_head_pos + 1], 0, 1);
$msg_id = ($msb << 8) + $lsb;
break;
default:
$msg_id = 0;
}
return $msg_id;
}
/*
Get messsage payload.
Parameters:
-$pkt: message to get payload.
Return: payload of the message
*/
function vn_mqtt_get_message_payload($pkt)
{
$msg_type = bin2int($pkt[0], 0, 1) >> 4;
$remain_length = vn_mqtt_decode_length($pkt);
$var_head_pos = strlen($pkt) - $remain_length;
//types of message have a payload: CONNECT, SUBSCRIBE, SUBACK, PUBLISH.
switch($msg_type)
{
case MQTT_CTRL_SUBSCRIBE:
case MQTT_CTRL_SUBACK:
$payload_pos = $var_head_pos + 2; // two bytes of message identifier
$payload_length = $remain_length -2;
$payload = substr($pkt, $payload_pos, $payload_length);
break;
case MQTT_CTRL_CONNECT:
//Protocol Name
$pointer = $var_head_pos;
$msb = bin2int($pkt[$pointer++], 0, 1);
$lsb = bin2int($pkt[$pointer++], 0, 1);
$length = ($msb << 8) + $lsb;
$pointer +=$length;
$pointer += 4; //1 byte version number, 1 byte connect flag, byte keep-alive-timer.
$payload_length = strlen($pkt) - $pointer;
$payload = substr($pkt, $pointer, $payload_length);
break;
case MQTT_CTRL_PUBLISH:
$pointer = $var_head_pos;
$msb = bin2int($pkt[$pointer++], 0, 1);
$lsb = bin2int($pkt[$pointer++], 0, 1);
$topic_length = ($msb << 8) + $lsb;
$pointer += $topic_length;
$qos = (bin2int($pkt[0], 0, 1) & (MQTT_HEAD_FLAG_QOS_1|MQTT_HEAD_FLAG_QOS_2)) >> 1;
if($qos)
$pointer += 2;// message identifier.
$payload_length = strlen($pkt) - $pointer;
$payload = substr($pkt, $pointer, $payload_length);
break;
default:
$payload = "";
}
return $payload;
}
/*
Get topic of publish packet.
Parameters:
-$pkt: publish packet .
Return: topic
*/
function vn_mqtt_get_topic($pkt)
{
$topic = "";
$msg_type = bin2int($pkt[0], 0, 1) >> 4;
if($msg_type != MQTT_CTRL_PUBLISH)
{
//echo "mqtt: not publish message type";
return $topic;
}
$remain_length = vn_mqtt_decode_length($pkt);
$var_head_pos = strlen($pkt) - $remain_length;
$pointer = $var_head_pos;
$msb = bin2int($pkt[$pointer++], 0, 1);
$lsb = bin2int($pkt[$pointer++], 0, 1);
$topic_length = ($msb << 8) + $lsb;
$topic = substr($pkt, $pointer, $topic_length);
return $topic;
}
/*
Get content of publish packet.
Parameters:
-$pkt: publish packet .
Return: content
*/
function vn_mqtt_get_content($pkt)
{
return vn_mqtt_get_message_payload($pkt);
}
/*
Find packet in receiving buffer by message type.
Parameters:
- $msg_type: type of message to find.
Return:
- index of the first packet in buffer if existed.
- -1: if not existed.
*/
function vn_mqtt_find_packet($msg_type)
{
global $vn_mqtt_packet_manager;
if($vn_mqtt_packet_manager != "")
{
$infos = explode(",", $vn_mqtt_packet_manager);
$count = count($infos);
for($i = 0; $i < $count; $i += 2)
{
if($msg_type == (int)$infos[$i])
return ($i/2);
}
}
return -1;
}
/*
Get a packet in receiving buffer by index.
Parameters:
- $pkt_id: index of packet in buffer.
- $is_delete: option to delete packet from buffer after getting
Return:
- a packet if existed.
- an empty string: if not existed.
*/
function vn_mqtt_get_packet($pkt_id, $is_delete = true)
{
global $vn_mqtt_recv_buffer, $vn_mqtt_packet_manager;
$pkt = "";
if($vn_mqtt_packet_manager != "")
{
$infos = explode(",", $vn_mqtt_packet_manager);
$count = count($infos);
$pkt_count = $count/2;
if ($pkt_id < $pkt_count)
{
$pkt_offset = 0;
for($i = 1; $i < ($pkt_id*2); $i += 2)
{
$pkt_offset += (int)$infos[$i];
}
$pkt_len = (int)$infos[$i];
$pkt = substr($vn_mqtt_recv_buffer, $pkt_offset, $pkt_len);
if($is_delete)
{
//delete from buffer.
$vn_mqtt_recv_buffer = substr_replace($vn_mqtt_recv_buffer, "", $pkt_offset, $pkt_len);
//update buffer manager.
$vn_mqtt_packet_manager = "";
for($i = 0; $i < $pkt_count; $i++)
{
if($i != $pkt_id)
{
$pnt = 2*$i;
$pkt_type = $infos[$pnt];
$pkt_lengh = $infos[$pnt+1];
$vn_mqtt_packet_manager .= "$pkt_type,$pkt_lengh,";
}
}
$vn_mqtt_packet_manager = rtrim($vn_mqtt_packet_manager, ",");
}
}
//else
//echo "mqtt: invalid packet id\r\n";
}
//else
//echo "mqtt: no packet in buffer now\r\n";
return $pkt;
}
/*
For debugging buffer.
*/
function vn_mqtt_show_packet_list()
{
global $vn_mqtt_recv_buffer, $vn_mqtt_packet_manager;
$pkt = "";
$msg_id = 0;
if($vn_mqtt_packet_manager != "")
{
$infos = explode(",", $vn_mqtt_packet_manager);
$count = count($infos);
$pkt_count = $count/2;
for($i = 0; $i < $count; $i += 2)
{
$pkt_id = (int)$infos[$i];
$pkt = vn_mqtt_get_packet($i/2, false);
if($pkt !== "")
$msg_id = vn_mqtt_get_message_id($pkt);
//echo "mqtt: packet $pkt_id in buffer with message id: $msg_id\r\n";
}
}
//else
//echo "mqtt: no packet in buffer now\r\n";
return $pkt;
}
/*
Delete a packet in receiving buffer by index.
Parameters:
- $pkt_id: index of packet in buffer.
Return:
- true on success.
- false otherwise.
*/
function vn_mqtt_delete_packet($pkt_id)
{
global $vn_mqtt_recv_buffer, $vn_mqtt_packet_manager;
if($vn_mqtt_packet_manager != "")
{
$infos = explode(",", $vn_mqtt_packet_manager);
$count = count($infos);
$pkt_count = $count/2;
if ($pkt_id < $pkt_count)
{
$pkt_offset = 0;
for($i = 1; $i < ($pkt_id*2); $i += 2)
{
$pkt_offset += (int)$infos[$i];
}
$pkt_len = (int)$infos[$i];
//delete from buffer.
$vn_mqtt_recv_buffer = substr_replace($vn_mqtt_recv_buffer, "", $pkt_offset, $pkt_len);
//update buffer manager.
$vn_mqtt_packet_manager = "";
for($i = 0; $i < $pkt_count; $i++)
{
if($i != $pkt_id)
{
$pnt = 2*$i;
$pkt_type = $infos[$pnt];
$pkt_lengh = $infos[$pnt+1];
$vn_mqtt_packet_manager .= "$pkt_type,$pkt_lengh,";
}
}
$vn_mqtt_packet_manager = rtrim($vn_mqtt_packet_manager, ",");
return true;
}
//else
//echo "mqtt: invalid packet id\r\n";
}
//else
//echo "mqtt: no packet in buffer now\r\n";
return false;
}
/*
Check whether incomming packets are available.
Parameters: None
Return: a number of packets available.
*/
function vn_mqtt_packet_available()
{
global $vn_mqtt_tcp_pid, $vn_mqtt_tcp_id;
global $vn_mqtt_security;
global $vn_mqtt_state;
global $vn_mqtt_recv_buffer, $vn_mqtt_packet_manager;
if(!$vn_mqtt_tcp_pid)
exit("mqtt: tcp$vn_mqtt_tcp_id not initialized\r\n");
$rbuf = "";
$pkt_count = 0;
$infos = array();
$count = 0;
if($vn_mqtt_packet_manager != "")
{
$infos = explode(",", $vn_mqtt_packet_manager);
$count = count($infos);
$pkt_count = $count/2;
}
switch($vn_mqtt_security)
{
case MQTT_PLAIN:
if(pid_ioctl($vn_mqtt_tcp_pid, "get state") != TCP_CONNECTED)
{
$vn_mqtt_state = MQTT_DISCONNECTED;
return -2;
}
break;
case MQTT_SSL:
if(pid_ioctl($vn_mqtt_tcp_pid, "get state") != SSL_CONNECTED)
{
$vn_mqtt_state = MQTT_DISCONNECTED;
return -2;
}
break;
}
if(pid_ioctl($vn_mqtt_tcp_pid, "get rxlen"))
{
$max_len = MAX_STRING_LEN - strlen($vn_mqtt_recv_buffer);
if($max_len > 10)
$max_len = 10;
pid_recv($vn_mqtt_tcp_pid, $rbuf, $max_len);
//update buffer
$vn_mqtt_recv_buffer .= $rbuf;
$buf_len = strlen($vn_mqtt_recv_buffer);
$pkt_offset = 0;
for($i = 1; $i < $count; $i += 2)
{
$pkt_offset += (int)$infos[$i];
}
if($pkt_offset > $buf_len)
exit("mqtt: error on memory management");
//update new packet.
while(1)
{
if($buf_len >= ($pkt_offset + 2)) // miminum packet length is 2;
{
$pnt = $pkt_offset; //pointer
$pkt_type = bin2int($vn_mqtt_recv_buffer[$pnt++], 0, 1) >> 4;
$multiplier = 1;
$value = 0; //the remaining length
do
{
$digit = bin2int($vn_mqtt_recv_buffer[$pnt++], 0, 1);
$value += ($digit & 127) * $multiplier;
$multiplier *= 128;
}while (($digit & 128) && ($pnt < $buf_len));
if(!($digit & 128) && ( ($pnt + $value) <= $buf_len))
{
//update $vn_mqtt_packet_manager
$pkt_lengh = $pnt + $value - $pkt_offset;
if($vn_mqtt_packet_manager == "")
$vn_mqtt_packet_manager = "$pkt_type,$pkt_lengh";
else
$vn_mqtt_packet_manager .= ",$pkt_type,$pkt_lengh";
$pkt_offset = $pnt + $value;
$pkt_count++;
continue;
}
}
break;
}
}
return $pkt_count;
}
/*
Check whether a PUBLISH packet is acknowledged or not.
Parameters:
- $msg_id: message identifier.
Return:
- true if packet is unacknowledged.
- false otherwise.
*/
function vn_mqtt_is_unack($msg_id)
{
global $vn_mqtt_unack_list;
if($vn_mqtt_unack_list != "")
{
$infos = explode(",", $vn_mqtt_unack_list);
$count = count($infos);
for($i = 0; $i < $count; $i++)
{
if($msg_id == (int)$infos[$i])
return true;
}
}
return false;
}
/*
Remove the message identifier of a PUBLISH packet from unacknowledged list if existed.
Parameters:
- $msg_id: message identifier.
Return: none.
*/
function vn_mqtt_remove_msg_id($msg_id)
{
global $vn_mqtt_unack_list;
if($vn_mqtt_unack_list != "")
{
$infos = explode(",", $vn_mqtt_unack_list);
$count = count($infos);
$vn_mqtt_unack_list = "";
for($i = 0; $i < $count; $i++)
{
$id = (int)$infos[$i];
if($msg_id != $id)
$vn_mqtt_unack_list .= "$id,";
}
$vn_mqtt_unack_list = rtrim($vn_mqtt_unack_list, ","); // remove the last comma
}
}
/*
Create a connect packet.
Parameters:
- $clean_flag: Clean Session flag. Default: true.
- $will:
+ if set to "", the will flag is unset.
+ if set to an array($will_qos, $will_retain, $will_topic, $will_message) which contains Will QoS, Will Retain flag, Will Topic and Will Message respectively,
the will flag is set.
+ Default: "".
- $username:
+ if set to "", the username flag is unset.
+ otherwise, the username flag is set and username is $username.
+ Default: "".
- $password:
+ if set to "", the password flag is unset.
+ otherwise, the password flag is set and password is $password.
+ Default: "".
Return: The created packet.
*/
function vn_mqtt_create_connect_packet($clean_flag = true, $will = "", $username = "", $password = "")
{
global $vn_mqtt_client_id;
global $vn_mqtt_version, $vn_mqtt_protocol_name;
$msg_type = MQTT_CTRL_CONNECT;
$will_flag = false;
if(is_array($will))
{
$will_flag = true;
$will_qos = $will[0];
$will_retain = $will[1];
$will_topic = $will[2];
$will_message = $will[3];
}
//Variable header
$vari_header = vn_mqtt_encode_string($vn_mqtt_protocol_name);//Protocol name
$vari_header .= sprintf("%c", $vn_mqtt_version);//Protocol Version Number
$byte10 = 0;
if($clean_flag)
$byte10 |= MQTT_CONN_FLAG_CLEAN_SS;
if($will_flag)
{
$byte10 |= MQTT_CONN_FLAG_WILL;//Will Flag
$byte10 |= $will_qos << 3;//Will QoS
if($will_retain)
$byte10 |= MQTT_CONN_FLAG_WILL_RETAIN;//Will Retain
}
if($username !== "")
{
$byte10 |= MQTT_CONN_FLAG_USERNAME;//User Name Flag
if($password !== "")
$byte10 |= MQTT_CONN_FLAG_PASSWORD;//Password Flag
}
$vari_header .= sprintf("%c", $byte10); //Connect Flags
$vari_header .= sprintf("%c", MQTT_CONN_KEEPALIVE >> 8);
$vari_header .= sprintf("%c", MQTT_CONN_KEEPALIVE & 0xff);
//Payload
$payload = vn_mqtt_encode_string($vn_mqtt_client_id);//Client Identifier
if($will_flag)
{
$payload .= vn_mqtt_encode_string($will_topic);//Will Topic
$payload .= vn_mqtt_encode_string($will_message);//Will Message
}
if($username !== "")
{
$payload .= vn_mqtt_encode_string($username);//User Name
if($password !== "")
$payload .= vn_mqtt_encode_string($password);//Password
}
//Fixed Header
//The DUP, QoS, and RETAIN flags are not used in the CONNECT message.
$header = sprintf("%c", $msg_type << 4);
$remain_length = strlen($vari_header) + strlen($payload);
$header .= vn_mqtt_encode_length($remain_length);
$pkt = $header.$vari_header.$payload;
return $pkt;
}
/*
Create a publish message
Parameters:
- $topic: name of a topic. This must not contain Topic wildcard characters.
- $msg: a message to be publish.
- $msg_id: message identifier in case of qos > 0.
- $dup_flag: dup flag. This value should be set to 0.
- $qos: quality of service of message. valid from 0 to 2.
If it is set over 2, it will be downgraded to 2.
It is is set lower than 0, it will be upgraded to 0.
Default = 0.
- $retain_flag: $retain flag. Default = 0.
Return: The created packet.
*/
function vn_mqtt_create_pulish_packet(&$topic, &$msg, $msg_id = 0, $dup_flag = 0, $qos = 0, $retain_flag = 0)
{
$msg_type = MQTT_CTRL_PUBLISH;
//Variable header
$vari_header = vn_mqtt_encode_string($topic);//Topic name
if($qos)
{
$vari_header .= sprintf("%c", $msg_id >> 8);
$vari_header .= sprintf("%c", $msg_id & 0xff);
}
//Fixed Header
$byte1 = ($msg_type<<4) | ($dup_flag<<3) | ($qos<<1) | $retain_flag;
$header = sprintf("%c", $byte1);
$remain_length = strlen($vari_header) + strlen($msg);
$header .= vn_mqtt_encode_length($remain_length);
$pkt = $header.$vari_header.$msg;
return $pkt;
}
/*
The common function to create packets for:
PUBACK, PUBREC, PUBREL, PUBCOMP.
*/
function vn_mqtt_create_common_pub($msg_type, $msg_id, $dup_flag, $qos, $retain_flag)
{
//Fixed Header
$byte1 = ($msg_type<<4) | ($dup_flag<<3) | ($qos<<1) | $retain_flag;
$pkt = sprintf("%c", $byte1);
$pkt .= sprintf("%c", 2);
//Variable header
$pkt .= sprintf("%c", $msg_id >> 8);
$pkt .= sprintf("%c", $msg_id & 0xff);
return $pkt;
}
/*
Create publish acknowledgment packet.
Parameters: $msg_id: message identifier.
Return: The created packet.
*/
function vn_mqtt_create_puback_packet($msg_id)
{
$msg_type = MQTT_CTRL_PUBACK;
$dup_flag = 0; $qos = 0; $retain_flag = 0;
return vn_mqtt_create_common_pub($msg_type, $msg_id, $dup_flag, $qos, $retain_flag);
}
/*
Create publish received packet.
Parameters: $msg_id: message identifier.
Return: The created packet.
*/
function vn_mqtt_create_pubrec_packet($msg_id)
{
$msg_type = MQTT_CTRL_PUBREC;
$dup_flag = 0; $qos = 0; $retain_flag = 0;
return vn_mqtt_create_common_pub($msg_type, $msg_id, $dup_flag, $qos, $retain_flag);
}
/*
Create publish release packet.
Parameters: $msg_id: message identifier.
Return: The created packet.
*/
function vn_mqtt_create_pubrel_packet($msg_id)
{
$msg_type = MQTT_CTRL_PUBREL;
$dup_flag = 0; $qos = 1; $retain_flag = 0;
return vn_mqtt_create_common_pub($msg_type, $msg_id, $dup_flag, $qos, $retain_flag);
}
/*
Create publish complete packet.
Parameters: $msg_id: message identifier.
Return: The created packet.
*/
function vn_mqtt_create_pubcomp_packet($msg_id)
{
$msg_type = MQTT_CTRL_PUBCOMP;
$dup_flag = 0; $qos = 0; $retain_flag = 0;
return vn_mqtt_create_common_pub($msg_type, $msg_id, $dup_flag, $qos, $retain_flag);
}
/*
Create a Subscribe packet.
Parameters:
- $topics: an two-dimensional array contains list of array which store topic name and QoS.
Example: array( array("topic1_name", topic1_qos), array("topic2_name", topic2_qos)).
In case there is ony one topic, $topics can be set as array("topic1_name", topic1_qos).
...
This file has been truncated, please download it to see its full contents.
Comments
Please log in or sign up to comment.