• 方案介紹
  • 附件下載
  • 相關(guān)推薦
申請(qǐng)入駐 產(chǎn)業(yè)圖譜

C語(yǔ)言代碼封裝MQTT協(xié)議報(bào)文,了解MQTT協(xié)議通信過(guò)程

03/16 08:37
1603
加入交流群
掃碼加入
獲取工程師必備禮包
參與熱點(diǎn)資訊討論

更多詳細(xì)資料請(qǐng)聯(lián)系.docx

共1個(gè)文件

【1】MQTT協(xié)議介紹

MQTT是一種輕量級(jí)的通信協(xié)議,適用于物聯(lián)網(wǎng)(IoT)和低帶寬網(wǎng)絡(luò)環(huán)境。它基于一種“發(fā)布/訂閱”模式,其中設(shè)備發(fā)送數(shù)據(jù)(也稱為 “發(fā)布”)到經(jīng)紀(jì)人(稱為MQTT代理),這些數(shù)據(jù)被存儲(chǔ),并在需要時(shí)被轉(zhuǎn)發(fā)給訂閱者。這種方式簡(jiǎn)化了網(wǎng)絡(luò)管理,允許多個(gè)設(shè)備在不同的網(wǎng)絡(luò)條件下進(jìn)行通信(包括延遲和帶寬限制),并支持實(shí)時(shí)數(shù)據(jù)更新。它是開放的,可免費(fèi)使用并易于實(shí)施。
在這里插入圖片描述

【2】MQTT協(xié)議報(bào)文字段介紹

MQTT協(xié)議報(bào)文由兩部分組成:固定報(bào)頭和可變報(bào)頭。

固定報(bào)頭的格式是統(tǒng)一的,其中包括了報(bào)文類型和剩余長(zhǎng)度兩個(gè)字段。

可變報(bào)頭的格式取決于報(bào)文類型。

下面是MQTT協(xié)議中各個(gè)報(bào)文類型的可變報(bào)頭字段說(shuō)明。

(1)CONNECT:MQTT連接請(qǐng)求報(bào)文

CONNECT報(bào)文包括固定報(bào)頭和可變報(bào)頭兩部分。其中,固定報(bào)頭的第一個(gè)字節(jié)(即報(bào)文類型和標(biāo)志位的組合)為0x10,表示這是一個(gè)CONNECT報(bào)文。

可變報(bào)頭包括了以下字段:

  • 協(xié)議名(Protocol Name):用于標(biāo)識(shí)MQTT協(xié)議的名稱,固定為字符串"MQTT";
  • 協(xié)議級(jí)別(Protocol Level):用于標(biāo)識(shí)所使用的MQTT協(xié)議的版本號(hào),一般情況下為4;
  • 連接標(biāo)志(Connect Flags):用于設(shè)置各種連接選項(xiàng),其中包括:
    • 用戶名/密碼(Username/Password):用于對(duì)連接進(jìn)行身份驗(yàn)證;
    • 清理會(huì)話(Clean Session):表示客戶端需要清除服務(wù)器上舊的Session信息;
    • 遺囑標(biāo)志(Will Flag):表示客戶端是否需要在與服務(wù)器的連接意外斷開時(shí)發(fā)送遺囑信息;
    • 遺囑QoS(Will QoS):用于設(shè)置遺囑消息的服務(wù)質(zhì)量等級(jí);
    • 遺囑保留(Will Retain):表示遺囑消息是否需要被服務(wù)器保留;
    • 用戶名標(biāo)志(Username Flag):表示客戶端是否需要發(fā)送用戶名字段;
    • 密碼標(biāo)志(Password Flag):表示客戶端是否需要發(fā)送密碼字段。
  • 保持連接(Keep Alive):用于設(shè)置心跳包的發(fā)送間隔時(shí)間,以便客戶端和服務(wù)器之間保持連接。

(2)CONNACK:MQTT連接響應(yīng)報(bào)文

CONNACK報(bào)文包括固定報(bào)頭和可變報(bào)頭兩部分。其中,固定報(bào)頭的第一個(gè)字節(jié)為0x20,表示這是一個(gè)CONNACK報(bào)文。

可變報(bào)頭包括了以下字段:

  • 連接應(yīng)答(Connect Acknowledgment):用于表示連接是否成功,一般為0表示成功,其他值表示失??;
  • 保留標(biāo)志(Reserved Flag):保留字段,必須為0。

(3)PUBLISH:MQTT發(fā)布消息報(bào)文

PUBLISH報(bào)文包括固定報(bào)頭和可變報(bào)頭兩部分,以及消息體。其中,固定報(bào)頭的第一個(gè)字節(jié)由報(bào)文類型和QoS級(jí)別組合而成,QoS級(jí)別可以為0、1或2。

可變報(bào)頭包括了以下字段:

  • 主題名(Topic Name):用于標(biāo)識(shí)消息的主題;
  • 報(bào)文標(biāo)識(shí)符(Packet Identifier):用于在QoS級(jí)別為1或2時(shí)確認(rèn)消息分發(fā)的情況,如果為0則表示QoS級(jí)別為0。

消息體包括了要發(fā)布的消息內(nèi)容。

(4)PUBACK:MQTT發(fā)布確認(rèn)報(bào)文

PUBACK報(bào)文包括固定報(bào)頭和可變報(bào)頭兩部分。其中,固定報(bào)頭的第一個(gè)字節(jié)為0x40,表示這是一個(gè)PUBACK報(bào)文。

可變報(bào)頭僅包括一個(gè)報(bào)文標(biāo)識(shí)符(Packet Identifier)字段,用于確認(rèn)QoS級(jí)別為1的發(fā)布消息。

(5)PUBREC:MQTT發(fā)布接收?qǐng)?bào)文

PUBREC報(bào)文包括固定報(bào)頭和可變報(bào)頭兩部分。其中,固定報(bào)頭的第一個(gè)字節(jié)為0x50,表示這是一個(gè)PUBREC報(bào)文。

可變報(bào)頭僅包括一個(gè)報(bào)文標(biāo)識(shí)符(Packet Identifier)字段,用于確認(rèn)QoS級(jí)別為2的發(fā)布消息。

(6)PUBREL:MQTT發(fā)布釋放報(bào)文

PUBREL報(bào)文包括固定報(bào)頭和可變報(bào)頭兩部分。其中,固定報(bào)頭的第一個(gè)字節(jié)為0x62,表示這是一個(gè)PUBREL報(bào)文。

可變報(bào)頭僅包括一個(gè)報(bào)文標(biāo)識(shí)符(Packet Identifier)字段,用于確認(rèn)QoS級(jí)別為2的發(fā)布消息。

(7)PUBCOMP:MQTT發(fā)布完成報(bào)文

PUBCOMP報(bào)文包括固定報(bào)頭和可變報(bào)頭兩部分。其中,固定報(bào)頭的第一個(gè)字節(jié)為0x70,表示這是一個(gè)PUBCOMP報(bào)文。

可變報(bào)頭僅包括一個(gè)報(bào)文標(biāo)識(shí)符(Packet Identifier)字段,用于確認(rèn)QoS級(jí)別為2的發(fā)布消息。

(8)SUBSCRIBE:MQTT訂閱請(qǐng)求報(bào)文

SUBSCRIBE報(bào)文包括固定報(bào)頭和可變報(bào)頭兩部分。其中,固定報(bào)頭的第一個(gè)字節(jié)為0x82,表示這是一個(gè)SUBSCRIBE報(bào)文。

可變報(bào)頭包括了以下字段:

  • 報(bào)文標(biāo)識(shí)符(Packet Identifier):用于確認(rèn)訂閱請(qǐng)求的情況;
  • 訂閱主題(Subscription Topic):用于設(shè)置訂閱的主題;
  • 服務(wù)質(zhì)量等級(jí)(QoS Level):用于設(shè)置訂閱請(qǐng)求使用的服務(wù)質(zhì)量等級(jí),可以為0、1或2。

(9)SUBACK:MQTT訂閱確認(rèn)報(bào)文

SUBACK報(bào)文包括固定報(bào)頭和可變報(bào)頭兩部分。其中,固定報(bào)頭的第一個(gè)字節(jié)為0x90,表示這是一個(gè)SUBACK報(bào)文。

可變報(bào)頭包括了以下字段:

  • 報(bào)文標(biāo)識(shí)符(Packet Identifier):用于確認(rèn)訂閱請(qǐng)求的情況;
  • 訂閱確認(rèn)等級(jí)(Subscription Acknowledgment):用于確認(rèn)訂閱請(qǐng)求的服務(wù)質(zhì)量等級(jí),可以為0、1或2。

(10)UNSUBSCRIBE:MQTT取消訂閱報(bào)文

UNSUBSCRIBE報(bào)文包括固定報(bào)頭和可變報(bào)頭兩部分。其中,固定報(bào)頭的第一個(gè)字節(jié)為0xA2,表示這是一個(gè)UNSUBSCRIBE報(bào)文。

可變報(bào)頭包括了以下字段:

  • 報(bào)文標(biāo)識(shí)符(Packet Identifier):用于確認(rèn)取消訂閱請(qǐng)求的情況;
  • 訂閱主題(Subscription Topic):用于設(shè)置要取消訂閱的主題。

(11)UNSUBACK:MQTT取消訂閱確認(rèn)報(bào)文

UNSUBACK報(bào)文包括固定報(bào)頭和可變報(bào)頭兩部分。其中,固定報(bào)頭的第一個(gè)字節(jié)為0xB0,表示這是一個(gè)UNSUBACK報(bào)文。

可變報(bào)頭僅包含報(bào)文標(biāo)識(shí)符(Packet Identifier)字段,用于確認(rèn)取消訂閱請(qǐng)求。

(12)PINGREQ:MQTT心跳請(qǐng)求報(bào)文

PINGREQ報(bào)文包括固定報(bào)頭和可變報(bào)頭兩部分。其中,固定報(bào)頭的第一個(gè)字節(jié)為0xC0,表示這是一個(gè)PINGREQ報(bào)文。

PINGREQ報(bào)文不包含可變報(bào)頭字段。

(13)PINGRESP:MQTT心跳響應(yīng)報(bào)文

PINGRESP報(bào)文包括固定報(bào)頭和可變報(bào)頭兩部分。其中,固定報(bào)頭的第一個(gè)字節(jié)為0xD0,表示這是一個(gè)PINGRESP報(bào)文。

PINGRESP報(bào)文不包含可變報(bào)頭字段。

(14)DISCONNECT:MQTT斷開連接報(bào)文

DISCONNECT報(bào)文包括固定報(bào)頭和可變報(bào)頭兩部分。其中,固定報(bào)頭的第一個(gè)字節(jié)為0xE0,表示這是一個(gè)DISCONNECT報(bào)文。

DISCONNECT報(bào)文不包含可變報(bào)頭字段。

【3】封裝MQTT協(xié)議

這是一個(gè)使用C語(yǔ)言在Linux下建立TCP通信并發(fā)送MQTT報(bào)文的例子。 根據(jù)MQTT報(bào)文自己封裝協(xié)議。

#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <unistd.h>

// 定義MQTT報(bào)文類型
#define MQTT_CONNECT    0x10
#define MQTT_CONNACK    0x20
#define MQTT_PUBLISH    0x30
#define MQTT_PUBACK     0x40
#define MQTT_SUBSCRIBE  0x80
#define MQTT_SUBACK     0x90
#define MQTT_UNSUBSCRIBE    0xA0
#define MQTT_UNSUBACK   0xB0
#define MQTT_PINGREQ    0xC0
#define MQTT_PINGRESP   0xD0
#define MQTT_DISCONNECT    0xE0

// 定義MQTT連接標(biāo)志
#define MQTT_CONNECT_FLAG_CLEAN     0x02
#define MQTT_CONNECT_FLAG_WILL      0x04
#define MQTT_CONNECT_FLAG_WILL_QOS0 0x00
#define MQTT_CONNECT_FLAG_WILL_QOS1 0x08
#define MQTT_CONNECT_FLAG_WILL_QOS2 0x10
#define MQTT_CONNECT_FLAG_WILL_RETAIN   0x20
#define MQTT_CONNECT_FLAG_PASSWORD  0x40
#define MQTT_CONNECT_FLAG_USERNAME  0x80

// 定義MQTT報(bào)文結(jié)構(gòu)體
typedef struct mqtt_packet 
{
	unsigned char *data;
	unsigned int length;
}
mqtt_packet_t;

// 建立socket連接并返回socket文件描述符
int socket_connect(char *address, int port) 
{
	struct sockaddr_in server_address;
	int socket_fd = socket(AF_INET, SOCK_STREAM, 0);
	if (socket_fd == -1) 
	{
		printf("Failed to create socket!n");
		return -1;
	}
	server_address.sin_family = AF_INET;
	server_address.sin_port = htons(port);
	if ((inet_pton(AF_INET, address, &server_address.sin_addr)) <= 0) 
	{
		printf("Invalid address/ Address not supportedn");
		return -1;
	}
	if (connect(socket_fd, (struct sockaddr *)&server_address, sizeof(server_address)) < 0) 
	{
		printf("Connection Failed!n");
		return -1;
	}
	return socket_fd;
}
// 打包MQTT連接報(bào)文 
mqtt_packet_t *mqtt_connect(char *client_id, char *username, char *password) 
{
	mqtt_packet_t *packet = (mqtt_packet_t *)malloc(sizeof(mqtt_packet_t));
	unsigned char *data = (unsigned char *)malloc(256);
	unsigned int length = 0;
	// 固定報(bào)頭 
	data[length++] = MQTT_CONNECT;
	// 可變報(bào)頭 
	data[length++] = 0x0C;
	// 清理會(huì)話標(biāo)志和協(xié)議版本號(hào)
	data[length++] = 'M';
	data[length++] = 'Q';
	data[length++] = 'T';
	data[length++] = 'T';
	data[length++] = 0x04;
	// 協(xié)議版本號(hào) // 連接標(biāo)志 
	unsigned char flags = MQTT_CONNECT_FLAG_CLEAN;
	if (username != NULL) 
	{
		flags |= MQTT_CONNECT_FLAG_USERNAME;
	}
	if (password != NULL) 
	{
		flags |= MQTT_CONNECT_FLAG_PASSWORD;
	}
	data[length++] = flags;
	data[length++] = 0xFF;
	// 保持連接時(shí)間低8位 
	data[length++] = 0xFF;
	// 保持連接時(shí)間高8位 // 剩余長(zhǎng)度 
	unsigned char remaining_length = length - 1;
	data[remaining_length++] = (unsigned char)(length - 2);
	packet->data = data;
	packet->length = length;
	return packet;
}
// 發(fā)送MQTT報(bào)文 
void mqtt_send(int socket_fd, mqtt_packet_t *packet) 
{
	if (send(socket_fd, packet->data, packet->length, 0) < 0) 
	{
		printf("Failed to send message!n");
	}
}
// 接收MQTT報(bào)文
int mqtt_recv(int socket_fd, mqtt_packet_t *packet) 
{
	unsigned char header[2];
	if (recv(socket_fd, header, 2, 0) != 2) 
	{
		printf("Failed to receive message header!n");
		return -1
	}
	unsigned int remaining_length = 0;
	unsigned int multiplier = 1;
	int i = 1;
	do 
	{
		if (recv(socket_fd, &header[i], 1, 0) != 1) 
		{
			printf("Failed to receive remaining_length byte %d!n", i);
			return -1;
		}
		remaining_length += (header[i] & 127) * multiplier;
		multiplier *= 128;
		i++;
	}
	while ((header[i - 1] & 128) != 0);
	packet->length = remaining_length + i;
	packet->data = (unsigned char *)malloc(packet->length);
	memcpy(packet->data, header, 2);
	if (recv(socket_fd, packet->data + 2, packet->length - 2, 0) != packet->length - 2) 
	{
		printf("Failed to receive full message!n");
		return -1;
	}
	return 0;
}


int main(int argc, char *argv[]) 
{
	// 建立 TCP 連接 
	int socket_fd = socket_connect("test.mosquitto.org", 1883);
	if (socket_fd == -1) 
	{
		printf("Failed to connect to MQTT server!n");
		return -1;
	}
	printf("Connected to MQTT server!n");
	// 打包并發(fā)送 MQTT 連接報(bào)文
	mqtt_packet_t *connect_packet = mqtt_connect("test_client", NULL, NULL);
	mqtt_send(socket_fd, connect_packet);
	printf("Sent MQTT CONNECT packet!n");
	free(connect_packet->data);
	free(connect_packet);
	// 接收 MQTT CONNACK 報(bào)文
	mqtt_packet_t *connack_packet = (mqtt_packet_t *)malloc(sizeof(mqtt_packet_t));
	if (mqtt_recv(socket_fd, connack_packet) != 0) 
	{
		printf("Failed to receive MQTT CONNACK packet!n");
		return -1;
	}
	if (connack_packet->data[1] != 0x00) 
	{
		printf("MQTT server rejected connection!n");
		return -1;
	}
	printf("Received MQTT CONNACK packet!n");
	free(connack_packet->data);
	free(connack_packet);
	// 斷開 TCP 連接 close(socket_fd); return 0; 
}
	
  • 更多詳細(xì)資料請(qǐng)聯(lián)系.docx
    下載

相關(guān)推薦

方案定制

去合作
方案開發(fā)定制化,2000+方案商即時(shí)響應(yīng)!