MQTT是基于TCP/IP协议栈构建的异步通信消息协议,是一种轻量级的发布、订阅信息传输协议。可以在不可靠的网络环境中进行扩展,适用于设备硬件存储空间或网络带宽有限的场景。使用MQTT协议,消息发送者与接收者不受时间和空间的限制。
IOT中继宝盒嵌入了MQTT消息服务器为接入设备接入物联网服务。
此设备采用Mqtt协议接入本IOT中继系统平台,实现从接入嵌入设备读、写、上报物模型数据。 设备读写报文数据采用字符串BASE64加密传输。
(本例程以stm32开发版+W5500以太网模块设备接入为例)
MQTT服务器地址: tcp://:192.168.0.105:1883( IOT中继宝盒IOT设备接口–配置说明中显示的MQTT服务器连接地址和端口)
用户名:txbxxxxx(IOT中继宝盒IOT设备接口–配置说明中显示的MQTT服务器用户名)
密码:dHhiMDIxMjE4 (IOT中继宝盒IOT设备接口–配置说明中显示的MQTT服务器客户端口令,BASE64解密使用)
客户端ID: 44080000001111000019(连接IOT中继宝盒的设备ID)
设备侧MQTT协议使用前提为设备启用、连接协议配置生效。设备侧Mqtt发布TOPIC为配置的物模型属性、物模型事件对应的上报设备属性数据和事件发生上报的数据。 数据流向为上行
对应的发布的主题TOPIC 、服务质量格式分别为:
| 类别 | 主题TOPIC | 描述 | 服务质量 |
|---|---|---|---|
| 属性数据 | /iotboxProperties/44080000001111000019/P_1696583483495 | 此topic对应设备物模型属性数据,由类别标识、设备ID、物模型属性ID组成。 主题中 “44080000001111000019” 为设备ID; "P_1696583483495"为设备物模型定义的属性ID标识 | QS1 |
| 事件数据 | /iotboxEvent/44080000001111000019/E_1696584389929 | 此topic对应设备物模型事件数据,由类别标识、设备ID、物模型事件ID组成。 主题中 “44080000001111000019” 为设备ID; "E_1696584389929"为设备物模型定义的事件ID标识 | QS1 |
设备侧Mqtt订阅TOPIC为配置的物模型功能,通常用于设备接收处理功能指令的操作。 数据流向为下行
对应的订阅的主题TOPIC 、服务质量格式分别为:
| 类别 | 主题TOPIC | 描述 | 服务质量 |
|---|---|---|---|
| 功能数据 | /iotboxFunction/44080000001111000019/F_1696584326733 | 此topic对应设备物模型功能数据,由类别标识、设备ID、物模型功能ID组成。 主题中 “44080000001111000019” 为设备ID; "F_1696584326733"为设备物模型定义的功能ID标识 | QS1 |
设备侧向MQTT服务器发布消息体格式采用字符串BASE64加密传输。 参数属性说明:
| 类别 | 主题TOPIC | 加密前消息体内容 | 加密后消息体内容 |
| 属性数据 | /iotboxProperties/44080000001111000019/P_1696583483495 | 如:温度 "36" 度 | MzY= |
| 事件数据 | /iotboxEvent/44080000001111000019/E_1696584389929 | 事件数据为json数值格式,由物模型事件属性中的输出参数组成。加密前格式如: [ {"paramsId": "aaaaaa", "dataValue": "36"}, {"paramsId": "bbbbbb", "dataValue": "45"}, ...... ]json字符串内容说明paramsId上报设备事件输出参数ID标识,为设备物模型配置的事件输出参数ID标识 如上例 "aaaaaa" 、"bbbbbb" 为温度和湿度参数标识dataValue上报设备事件输出参数数据值,dataValue 为上报设备事件输出参数值标识 上例"36"、"45" 上报设备事件输出参数数据值,为输出参数温度和湿度数据值 | WyB7InBhcmFtc0lkIjogImFhYWFhYSIsICJkYXRhVmFsdWUiOiAiMzYiIH0seyJwYXJhbXNJZCI6ICJiYmJiYmIiLCAgImRhdGFWYWx1ZSI6ICI0NSJ9LF0= |
设备侧向MQTT服务器订阅消息体格式采用字符串BASE64加密传输。 参数属性说明:
主题TOPIC: /iotboxFunction/44080000001111000019/F_1696584326733
订阅返回消息体内容:
ewogICAgICJkZXZpY2VJZCI6ICI0NDA4MDAwMDAwMTExMTAwMDAxNiIsCiAgICAgImZ1bmN0aW9uSWQiOiAiRl8xNjk2NTg0MzI2NzMzIiwKICAgICAiZnVuY3Rpb25OYW1lIjogIui/nOeoi+WFs+acuiIsCiAgICAgWwogICAgIHsicGFyYW1zSWQiOiAiYWFhYWFhIiwgInBhcmFtc05hbWUiOiAi5bCP6L2m5b2S5L2NIiwgImRhdGFWYWx1ZSI6ICJ0cnVlIiB9LAogICAgIF0KIH0=
订阅返回消息体内容:
消息体数据为json数值格式,由物模型功能定义中的输入参数组成。解密后的格式如:
{
"deviceId": "44080000001111000016",
"functionId": "F_1696584326733",
"functionName": "远程关机",
"inputParamsList": [
{"paramsId": "aaaaaa", "paramsName": "小车归位", "dataValue": "true" },
......
]
}| json字符串内容说明 | |
| deviceId | 定义的设备ID标识 如上例黄色 "44080000001111000016" 为需要处理下发的功能指令设备标识 |
| functionId | 设备物模型功能定义中的功能ID标识 如上例黄色 "F_1696584326733" 为设备物模型功能定义中的功能ID标识 |
| functionName | 设备物模型功能定义中的功能名称 如上例黄色 "远程关机" 为设备物模型功能定义中的功能名称 |
| paramsId | 输入参数ID标识,为设备物模型功能定义中的输入参数ID标识 如上例黄色 "aaaaaa" 为输入参数标识 |
| paramsName | 输入参数名称,为设备物模型功能定义中的输入参数名称 如上例黄色 "远程关机" 为设备物模型功能定义中的输入参数名称 |
| dataValue | 输入参数数据值,dataValue 为设备物模型功能定义中的输入参数数值标识 上例黄色 "true" 设备物模型功能定义中的输入参数数值,为输入参数布尔值true |
ts_mqtt.c文件中定义:
char *mqtt_clientID="44080000001111000019"; //Mqtt 客户端ID 填写iotrelay中继平台接入设备的ID
char *mqtt_account ="txb0727"; //Mqtt 服务器连接账号 填写iotrelay中继平台接入设备接口生成的账号
char *mqtt_password ="xxxxxx"; //Mqtt 服务器连接密码 填写iotrelay中继平台接入设备接口生成的口令
uint16 local_port = 5000; //设置开发板本地端口,根据需要设置
uint8 remote_ip[4] = {192.168.0.102}; //连接mqtt服务器IP
uint16 remote_port = 1883; //连接mqtt服务器端口参数说明:
| 参数 | 说明 |
|---|---|
| mqtt_clientID | 连接本IOT中继平台Mqtt服务器的客户端ID, 填写iot中继平台接入设备的ID。 |
| mqtt_account | 连接本IOT中继平台Mqtt服务器的连接账号, 填写iot中继平台接入设备接口生成的账号 |
| mqtt_password | 连接本IOT中继平台Mqtt服务器的口令, 填写iot中继平台接入设备接口生成的口令,嵌入设备中填写base64解密后的口令 |
| remote_ip[4] | 连接本IOT中继平台Mqtt服务器的IP, 填写iot中继平台的IP |
| remote_port | 连接本IOT中继平台Mqtt服务器的端口, 填写iot中继平台Mqtt服务器的端口 |
发布上报物模型属性数据,在本样例中ts_mqtt.c文件publishIotrelayModelProperties方法实现,部分代码段:
/**** *此处为构建上报的报文,获取设备属性数据在此填入,例如在此调用gpio获取设备的温度,在此范例处先直接填入数字 *上报设备物模型属性数据,例如 温度 36 度, *****/ char *indata="36" ; //上报设备属性数据 温度 ,在此范例处先直接填入数字,实际应用调用gpio获取设备的物模型属性数据 char outdata[20]; //加密后数据 int outlen; memset(outdata,0,sizeof(outdata)); /* 对上报属性数据Base64加密处理 */ int jm= base64_encode(indata, strlen(indata), outdata, &outlen);
参数说明:
“36” 为要上报的属性实际数值,例如温度36 度(设备的属性值为您程序开发中获取,获取后替换); int jm= base64_encode(indata, strlen(indata), outdata, &outlen); 为对上报属性数据Base64加密处理
发布上报物模型事件数据,在本样例中ts_mqtt.c文件publishIotrelayModelEvent方法实现,部分代码段:
/****
* 此处为构建上报的报文,设备属性数据在此填入,例如在此调用gpio获取设备的光照、温度
*上报设备物模型事件数据,事件数据为json数值格式,由物模型事件属性中的输出参数组成,
* 例如:设备关机事件上报数据
* 数据为json格式字符串 "[{\"paramsId\":\"suning\", \"dataValue\": \"80\" },{\"paramsId\": \"temperature\", \"dataValue\": \"45\"}]"
*****/
char *indata="[{\"paramsId\":\"suning\", \"dataValue\": \"80\" },{\"paramsId\": \"temperature\", \"dataValue\": \"45\"}]"; //上报设备事件数据
printf("加密前报文: %s \r\n",indata);
char outdata[512]; //加密后数据
int outlen;
memset(outdata,0,sizeof(outdata));
/* 对上报数据Base64加密处理 */
int jm= base64_encode(indata, strlen(indata), outdata, &outlen);char *indata=“[{"paramsId":"suning", "dataValue": "80" },{"paramsId": "temperature", "dataValue": "45"}]”; 为上报设备事件json数据 。 int jm= base64_encode(indata, strlen(indata), outdata, &outlen); 为对上报数据Base64加密处理
设备事件json数据参数说明:
| 参数 | 说明 |
|---|---|
| paramsId | 上报设备事件输出参数ID标识,为设备物模型配置的事件输出参数ID标识 如上例 “suning” 、 “temperature” 为光照和温度参数标识 |
| dataValue | 上报设备事件输出参数数据值,dataValue 为上报设备事件输出参数值标识 上例 “80” 、 “45” 上报设备事件输出参数数据值,为输出参数光照温和度数据值;设备的属性值为您程序开发中获取,获取后替换) |
订阅物模型功能,在本样例中ts_mqtt.c文件 **subscribIotrelayModelFunction **方法实现,部分代码段:
//Base64解密接收到的报文
char outdata[600]; //接收到的报文数据
int outlen;
memset(outdata,0,sizeof(outdata));
int jm= base64_decode((const char *)payload_in, strlen((const char *)payload_in), outdata, &outlen);
if(jm<0){
printf("解密数据错误:\r\n");
}else{
printf("解密后数据: %s \r\n",outdata);
//解析JSON格式字符串
cJSON *root = cJSON_Parse(outdata);
if(root != NULL)
{
char *json_str = cJSON_Print(root);
printf("解析解密后的JSON数据 %s", json_str);
//此处根据获取的物模型功能指令,填写设备需要操作的代码,例如设备关机int jm= base64_decode((const char *)payload_in, strlen((const char *)payload_in),outdata, &outlen); 为base64解密订阅功能接收的json字符串数据 。 cJSON *root = cJSON_Parse(outdata); 为解析JSON格式字符串
//此处根据获取的物模型功能指令,填写设备需要操作的代码,例如设备关机
/******************************************************************************
* @file mqqt协议发布上报设备属性或设备事件数据,订阅设备功能 Project Template ../main.c
* @author txb0727
* @version V1.0.0
* @date 2023-12-10
* @brief Main program body
******************************************************************************
* @attention
* 本范例用于嵌入式开发版采用mqqt协议发布上报设备属性或设备事件数据,订阅设备功能,仅供参考
* 本范例硬件为stm32f103开发板,通讯模块为W5500以太网通讯模块
*
* <h2><center>© COPYRIGHT 2023 txb0727.</center></h2>
******************************************************************************/
/* Includes ------------------------------------------------------------------*/
#include <stm32f10x.h>
#include "FreeRTOS.H"
#include "task.h"
#include "mcu_init.h"
#include "config.h"
#include "device.h"
#include "spi2.h"
#include "socket.h"
#include "w5500.h"
#include "at24c16.h"
#include "util.h"
#include "dhcp.h"
#include "string.h"
#include <stdio.h>
#include "ts_mqtt.h"
static TaskHandle_t AppTaskCreate_Handle = NULL;/* 创建任务句柄 */
static TaskHandle_t subscribeFunctionTopicTask_Handle = NULL;/* KEY任务句柄 */
static TaskHandle_t publishPropertiesTopicTask_Handle = NULL;/* KEY任务句柄 */
static TaskHandle_t publishEventTopicTask_Handle = NULL;/* LED任务句柄 */
static TaskHandle_t dhcpTask_Handle = NULL;/* KEY任务句柄 */
static void subscribeFunctionTopicTask(void* pvParameters);/* subscribeFunctionTopicTask任务实现 */
static void publishPropertiesTopicTask(void* pvParameters);/* publishPropertiesTopicTask任务实现 */
static void publishEventTopicTask(void* pvParameters);/* publishEventTopicTask任务实现 */
static void dhcpTask(void* pvParameters);/* dhcpTask任务实现 */
static char publishPropertiesMeassage[200]; //发布物模型属性消息体
static char publishEventMeassage[200]; //发布物模型事件消息体
/******************
* 此topic对应iotrelay中继平台中定义的设备物模型属性/功能/事件,由类别标识、设备ID、物模型属性ID/功能ID/事件ID组成
*************/
//订阅物模型功能Topic
char *subscribeFunctionTopic="/iotboxFunction/44080000001111000019/F_1696584326733";
//发布物模型属性Topic
char *publishPropertiesTopic="/iotboxProperties/44080000001111000019/P_1696583483495";
//发布物模型事件Topic
char *publishEventTopic="/iotboxEvent/44080000001111000019/E_1696584389929";
static void subscribeFunctionTopicTask(void *pvParameters){
while (1) {
/*********
* 订阅iotrelay平台设备功能
* subscribeFunctionTopic 为订阅物模型功能主题
********/
subscribIotrelayModelFunction(subscribeFunctionTopic);
printf("subscribeFunctionTopicTask min free stack size is %d \r\n",(int32_t)uxTaskGetStackHighWaterMark(NULL));
vTaskDelay(5*1000); // 延时5000毫秒
}
}
static void publishPropertiesTopicTask(void *pvParameters) {
while (1) {
/*********
* 发布iotrelay平台设备属性
* publishPropertiesTopic 为发布物模型属性主题
********/
memset(publishPropertiesMeassage,0,sizeof(publishPropertiesMeassage));
publishIotrelayModelProperties(publishPropertiesTopic,publishPropertiesMeassage);
printf("publishPropertiesTopicTask min free stack size is %d \r\n",(int32_t)uxTaskGetStackHighWaterMark(NULL));
vTaskDelay(60*1000); // 延时1秒
}
}
static void publishEventTopicTask(void *pvParameters) {
while (1) {
/*********
* 发布iotrelay平台设备事件 根据业务需要调用此发布物模型事件,如设备关机、设备预警等
* publishEventTopic 为发布物模型事件主题
********/
memset(publishEventMeassage,0,sizeof(publishEventMeassage));
publishIotrelayModelEvent(publishEventTopic,publishEventMeassage);
printf("publishEventTopicTask min free stack size is %d \r\n",(int32_t)uxTaskGetStackHighWaterMark(NULL));
vTaskDelay(60*1000); // 延时1秒
}
}
static void dhcpTask(void *pvParameters) {
while (1) {
DHCP_run();
printf("dhcpTask min free stack size is %d \r\n",(int32_t)uxTaskGetStackHighWaterMark(NULL));
vTaskDelay(1000); // 延时100毫秒
}
}
static void AppTaskCreate(void){
BaseType_t xReturn = pdPASS;
taskENTER_CRITICAL();
xReturn = xTaskCreate(
(TaskFunction_t) subscribeFunctionTopicTask,
(const char*) "subscribeFunctionTopicTask",
(uint32_t) 1024,
(void*) NULL,
(UBaseType_t) 1,
(TaskHandle_t*) &subscribeFunctionTopicTask_Handle
);
if (pdPASS == xReturn)
printf("subscribeFunctionTopicTask created successfully\r\n");
else
printf("subscribeFunctionTopicTask created failed\r\n");
xReturn = xTaskCreate(
(TaskFunction_t) publishPropertiesTopicTask,
(const char*) "publishPropertiesTopicTask",
(uint32_t) 350,
(void*) NULL,
(UBaseType_t) 1,
(TaskHandle_t*) &publishPropertiesTopicTask_Handle
);
if (pdPASS == xReturn)
printf("publishPropertiesTopicTask created successfully\r\n");
else
printf("publishPropertiesTopicTask created failed\r\n");
xReturn = xTaskCreate(
(TaskFunction_t) publishEventTopicTask,
(const char*) "publishEventTopicTask",
(uint32_t) 350,
(void*) NULL,
(UBaseType_t) 1,
(TaskHandle_t*) &publishEventTopicTask_Handle
);
if (pdPASS == xReturn)
printf("publishEventTopicTask created successfully\r\n");
else
printf("publishEventTopicTask created failed\r\n");
xReturn = xTaskCreate(
(TaskFunction_t) dhcpTask,
(const char*) "dhcpTask",
(uint32_t) 100,
(void*) NULL,
(UBaseType_t) 1,
(TaskHandle_t*) &dhcpTask_Handle
);
if (pdPASS == xReturn)
printf("dhcpTask created successfully\r\n");
else
printf("dhcpTask created failed\r\n");
vTaskDelete(AppTaskCreate_Handle);
taskEXIT_CRITICAL();
}
int main()
{
BaseType_t xReturn = pdPASS;
RCC_Configuration(); /* 配置单片机系统时钟*/
NVIC_Configuration();/* 配置嵌套中断向量*/
Systick_Init(72);/* 初始化Systick工作时钟*/
GPIO_Configuration();/* 配置GPIO*/
Timer_Configuration();/*定时器初始化*/
USART1_Init(); /*初始化串口通信:115200@8-n-1*/
at24c16_init();/*初始化eeprom*/
printf("W5500 EVB initialization over.\r\n");
Reset_W5500();/*硬重启W5500*/
WIZ_SPI_Init();/*初始化SPI接口*/
printf("W5500 initialized!\r\n");
set_default();
init_dhcp_client();
// 创建任务
xReturn = xTaskCreate(
(TaskFunction_t) AppTaskCreate,
(const char*) "AppTaskCreate",
(uint32_t) 128,
(void*) NULL,
(UBaseType_t) 3,
(TaskHandle_t*) &AppTaskCreate_Handle
);
if (xReturn == pdPASS)
vTaskStartScheduler(); // 启动调度器
while(1);
}
/******************* (C) COPYRIGHT 2023 txb0727 ****** END OF FILE ****/#include "ts_mqtt.h"
#include "MQTTPacket.h"
#include "transport.h"
#include "util.h"
#include "w5500.h"
#include "socket.h"
#include <string.h>
#include <stdlib.h>
#include "stm32f10x.h" // Device header
#include "base64.h"
#include "cJSON.h"
#include "utf8_gb2312_switch.h"
char *mqtt_clientID="44080000001111000019"; //Mqtt 客户端ID 填写iotrelay中继平台接入设备的ID
char *mqtt_account = "iotrelayAdmin"; //Mqtt 服务器连接账号 填写iotrelay中继平台接入设备接口生成的账号
char *mqtt_password = "txb021218"; //Mqtt 服务器连接密码 填写iotrelay中继平台接入设备接口生成的口令
uint16 local_port = 5000; //设置开发板本地端口
uint8 remote_ip[4] = {192, 168, 0, 103}; //连接mqtt服务器端口
uint16 remote_port = 1883; //连接mqtt服务器端口
//MQTT发布消息函数
/*********
* 发布iotrelay平台设备属性 消息体内容Base64加密传输
* @params pTopic 为发布物模型属性主题 ,此topic对应iotrelay中继平台中定义的设备物模型属性数据,由类别标识、设备ID、物模型属性ID组成,例如 "/iotboxProperties/44080000001111000019/P_1696583483495"
* @params publishPropertiesMeassage 为发布消息体内容
********/
void publishIotrelayModelProperties(char *pTopic,char *publishPropertiesMeassage)
{
MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
int rc = 1;
int mysock = SOCK_PROPERTIES;
unsigned char buf[200];
int buflen = sizeof(buf);
MQTTString topicString = MQTTString_initializer;
int len = 0;
switch (getSn_SR(SOCK_PROPERTIES))
{
case SOCK_CLOSED:
socket(SOCK_PROPERTIES, Sn_MR_TCP, local_port++, Sn_MR_ND);
printf("SOCK_CLOSED \r\n");
break;
case SOCK_INIT:
connect(SOCK_PROPERTIES, remote_ip, remote_port);
printf("SOCK_INIT \r\n");
break;
case SOCK_ESTABLISHED:
if (getSn_IR(SOCK_PROPERTIES)&Sn_IR_CON)
{
printf("TCP established \r\n");
setSn_IR(SOCK_PROPERTIES, Sn_IR_CON);
}
data.clientID.cstring = mqtt_clientID; //client ID
data.keepAliveInterval = 60; //keep alive time
data.cleansession = 1; // clean the last info after re-link (1: clean; 0: not clean)
data.username.cstring = mqtt_account; // the username if MQTT server need
data.password.cstring = mqtt_password; // the password if MQTT server need
len = MQTTSerialize_connect(buf, buflen, &data); // Serialize the MQTT Packet according the connect data
printf("Serialize the MQTT Packet according the connect data %d \r\n",len);
rc = transport_sendPacketBuffer(mysock, buf, len); // sned packet to Server by driver
printf("sned packet to Server by driver %d \r\n",rc);
/*wait for connack*/
if (MQTTPacket_read(buf, buflen, transport_getdata3)==CONNACK) // call the driver to get conn ack packet from Server
{
unsigned char sessionPresent, connack_rc;
if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0) //deserialize packet to confirm the MQTT connect success
{
printf("Unable to connect, return code %d \n", connack_rc);
transport_close(mysock);
// return 0;
}
else
{
printf("Connect succeed!\r\n");
}
}
else
{
transport_close(mysock);
// return 0;
}
/****
*此处为构建上报的报文,获取设备属性数据在此填入,例如在此调用gpio获取设备的温度,在此范例处先直接填入数字
*上报设备物模型属性数据,例如 温度 36 度,
*****/
char *indata="36"; //上报设备属性数据 温度 ,在此范例处先直接填入数字,实际应用调用gpio获取设备的物模型属性数据
char outdata[20]; //加密后数据
int outlen;
memset(outdata,0,sizeof(outdata));
/* 对上报属性数据Base64加密处理 */
int jm= base64_encode(indata, strlen(indata), outdata, &outlen);
printf("加密后数据: %s \r\n",outdata);
strcpy(publishPropertiesMeassage,outdata);//填充Ba64加密后的消息体内容
int pubStringLen= strlen(publishPropertiesMeassage);
printf(" char *pubString. %s \r\n",publishPropertiesMeassage);
memset(buf,0,buflen);
topicString.cstring = pTopic; // publish topic name to Server
len = MQTTSerialize_publish(buf, buflen, 0, 0, 0, 0, topicString, (unsigned char*)publishPropertiesMeassage, pubStringLen);
rc = transport_sendPacketBuffer(mysock, buf, len);
printf("disconnecting. \r\n");
len = MQTTSerialize_disconnect(buf, buflen);
rc = transport_sendPacketBuffer(mysock, buf, len);
free(outdata);
free(buf);
break;
case SOCK_CLOSE_WAIT:
close(SOCK_PROPERTIES);
printf("SOCK_CLOSE_WAIT. \r\n");
break;
}
}
/*********
* 发布iotrelay平台设备事件 消息体内容Base64加密传输
* @params pTopic 为发布物模型事件主题 ,此topic对应iotrelay中继平台中定义的设备物模型事件数据,由类别标识、设备ID、物模型事件ID组成,例如 "/iotboxEvent/44080000001111000019/E_1696584389929"
* @params publishEventMeassage 为发布消息体内容
********/
void publishIotrelayModelEvent(char *pTopic,char *publishEventMeassage)
{
MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
int rc = 1;
int mysock = SOCK_EVENT;
unsigned char buf[200];
int buflen = sizeof(buf);
MQTTString topicString = MQTTString_initializer;
int len = 0;
switch (getSn_SR(SOCK_EVENT))
{
case SOCK_CLOSED:
socket(SOCK_EVENT, Sn_MR_TCP, local_port++, Sn_MR_ND);
printf("SOCK_CLOSED \r\n");
break;
case SOCK_INIT:
connect(SOCK_EVENT, remote_ip, remote_port);
printf("SOCK_INIT \r\n");
break;
case SOCK_ESTABLISHED:
if (getSn_IR(SOCK_EVENT)&Sn_IR_CON)
{
printf("TCP established \r\n");
setSn_IR(SOCK_EVENT, Sn_IR_CON);
}
data.clientID.cstring = mqtt_clientID; //client ID
data.keepAliveInterval = 60; //keep alive time
data.cleansession = 1; // clean the last info after re-link (1: clean; 0: not clean)
data.username.cstring = mqtt_account; // the username if MQTT server need
data.password.cstring = mqtt_password; // the password if MQTT server need
len = MQTTSerialize_connect(buf, buflen, &data); // Serialize the MQTT Packet according the connect data
printf("Serialize the MQTT Packet according the connect data %d \r\n",len);
rc = transport_sendPacketBuffer(mysock, buf, len); // sned packet to Server by driver
printf("sned packet to Server by driver %d \r\n",rc);
/*wait for connack*/
if (MQTTPacket_read(buf, buflen, transport_getdata4)==CONNACK) // call the driver to get conn ack packet from Server
{
unsigned char sessionPresent, connack_rc;
if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0) //deserialize packet to confirm the MQTT connect success
{
printf("Unable to connect, return code %d \r\n", connack_rc);
transport_close(mysock);
// return 0;
}
else
{
printf("Connect succeed!\r\n");
}
}
else
{
transport_close(mysock);
// return 0;
}
/****
* 此处为构建上报的报文,设备属性数据在此填入,例如在此调用gpio获取设备的湿度、温度
*上报设备物模型事件数据,事件数据为json数值格式,由物模型事件属性中的输出参数组成,
* 例如:设备关机事件上报数据
* 数据为json格式字符串 "[{\"paramsId\":\"suning\", \"dataValue\": \"80\" },{\"paramsId\": \"temperature\", \"dataValue\": \"45\"}]"
*****/
char *indata="[{\"paramsId\":\"suning\", \"dataValue\": \"80\" },{\"paramsId\": \"temperature\", \"dataValue\": \"45\"}]"; //上报设备事件数据
printf("加密前报文: %s \r\n",indata);
char outdata[512]; //加密后数据
int outlen;
memset(outdata,0,sizeof(outdata));
/* 对上报数据Base64加密处理 */
int jm= base64_encode(indata, strlen(indata), outdata, &outlen);
printf("加密后数据: %s \r\n",outdata);
strcpy(publishEventMeassage,outdata);//填充Ba64加密后的消息体内容
int pubStringLen= strlen(publishEventMeassage);
printf(" char *pubString. %s \r\n",publishEventMeassage);
memset(buf,0,buflen);
topicString.cstring = pTopic; // publish topic name to Server
len = MQTTSerialize_publish(buf, buflen, 0, 0, 0, 0, topicString, (unsigned char*)publishEventMeassage, pubStringLen);
rc = transport_sendPacketBuffer(mysock, buf, len);
printf("disconnecting. \r\n");
len = MQTTSerialize_disconnect(buf, buflen);
rc = transport_sendPacketBuffer(mysock, buf, len);
free(outdata);
free(buf);
break;
case SOCK_CLOSE_WAIT:
close(SOCK_EVENT);
printf("SOCK_CLOSE_WAIT. \r\n");
break;
}
}
/*********
* MQTT订阅消息函数
* 订阅iotrelay平台设备物模型功能
* @params pTopic 为订阅物模型功能主题 ,此topic对应iotrelay中继平台中定义的设备物模型功能数据,由类别标识、设备ID、物模型功能ID组成,例如 "/iotboxFunction/44080000001111000019/F_1696584326733"
*
********/
void subscribIotrelayModelFunction(char *pTopic)
{
MQTTPacket_connectData data = MQTTPacket_connectData_initializer;
int rc = 1;
int mysock = SOCK_SUBSCRIBED;
unsigned char buf[256];
int buflen = sizeof(buf);
int msgid = 1;
MQTTString topicString = MQTTString_initializer;
int req_qos = 0;
int len = 0;
switch (getSn_SR(SOCK_SUBSCRIBED))
{
case SOCK_CLOSED:
socket(SOCK_SUBSCRIBED, Sn_MR_TCP, local_port++, Sn_MR_ND);
printf("SOCK_CLOSED \r\n");
break;
case SOCK_INIT:
connect(SOCK_SUBSCRIBED, remote_ip, remote_port);
printf("SOCK_INIT \r\n");
break;
case SOCK_ESTABLISHED:
if (getSn_IR(SOCK_SUBSCRIBED)&Sn_IR_CON)
{
printf("TCP established \r\n");
setSn_IR(SOCK_SUBSCRIBED, Sn_IR_CON);
}
data.clientID.cstring = mqtt_clientID; //client ID
data.keepAliveInterval = 20; //keep alive time
data.cleansession = 1; // clean the last info after re-link (1: clean; 0: not clean)
data.username.cstring = mqtt_account; // the username if MQTT server need
data.password.cstring = mqtt_password; // the password if MQTT server need
len = MQTTSerialize_connect(buf, buflen, &data); // Serialize the MQTT Packet according the connect data
printf("Serialize the MQTT Packet according the connect data %d buf: %s \r\n",len,buf);
rc = transport_sendPacketBuffer(mysock, buf, len); // sned packet to Server by driver
printf("sned packet to Server by driver %d \r\n",rc);
vTaskDelay(5*1000);
/*wait for connack*/
if (MQTTPacket_read(buf, buflen, transport_getdata2)==CONNACK) // call the driver to get conn ack packet from Server
{
unsigned char sessionPresent, connack_rc;
if (MQTTDeserialize_connack(&sessionPresent, &connack_rc, buf, buflen) != 1 || connack_rc != 0) //deserialize packet to confirm the MQTT connect success
{
printf("Unable to connect, return code %d \r\n", connack_rc);
transport_close(mysock);
// return 0;
}
else
{
printf("Connect succeed!\r\n");
}
}
else
{
transport_close(mysock);
// return -1;
}
printf("into subscribe topic name from Server \r\n");
/*subscribe*/
topicString.cstring =pTopic; // subscribe topic name from Server
len = MQTTSerialize_subscribe(buf, buflen, 0, msgid, 1, &topicString, &req_qos); //serialize subscribe topic packet
rc = transport_sendPacketBuffer(mysock, buf, len); // sned packet to Server by driver
/* wait for suback */
if (MQTTPacket_read(buf, buflen, transport_getdata2) == SUBACK) // call the driver to get subscribe ack packet from Server
{
unsigned short submsgid;
int subcount;
int granted_qos;
rc = MQTTDeserialize_suback(&submsgid, 1, &subcount, &granted_qos, buf, buflen); // deserialize packet to confirm the MQTT subscribe success
if (granted_qos != 0)
{
printf("granted qos != 0, %d \r\n", granted_qos);
transport_close(mysock);
// return -1;
}
else
{
printf("Received suback \r\n");
}
}
else
{
transport_close(mysock);
// return -1;
}
vTaskDelay(5*1000);
printf("loop get msgs on 'subscribed' topic. %s \r\n",pTopic);
printf("loop get msgs on 'subscribed' topic. \r\n");
/*loop get msgs on 'subscribed' topic and send msgs on 'pubtopic' topic*/
memset(buf,0,buflen);
/*接收数据会阻塞,除非服务器断开连接后才返回*/
while(getSn_SR(SOCK_SUBSCRIBED)==SOCK_ESTABLISHED)
{
printf("接收数据会阻塞. buf: %s buflen= %d\r\n",buf,buflen);
/* transport_getdata() has a built-in 1 second timeout,
your mileage will vary */
if (MQTTPacket_read(buf, buflen, transport_getdata2) == PUBLISH)
{
unsigned char dup; //re-send flag
int qos=0; // Service quality level
unsigned char retained; //keep flag
unsigned short msgid;
int payloadlen_in;
unsigned char* payload_in;
MQTTString receivedTopic;
printf("接收数据会阻塞. buf: %s buflen= %d\r\n",buf,buflen);
rc = MQTTDeserialize_publish(&dup, &qos, &retained, &msgid, &receivedTopic,
&payload_in, &payloadlen_in, buf, buflen);
printf("message arrived %d: %s\n\r", payloadlen_in, payload_in);
if(payload_in!=NULL){
if (strlen((const char *)payload_in) > 0) {
//Base64解密接收到的报文
char outdata[600]; //接收到的报文数据
int outlen;
memset(outdata,0,sizeof(outdata));
int jm= base64_decode((const char *)payload_in, strlen((const char *)payload_in), outdata, &outlen);
if(jm<0){
printf("解密数据错误:\r\n");
}else{
printf("解密后数据: %s \r\n",outdata);
//解析JSON格式字符串
cJSON *root = cJSON_Parse(outdata);
if(root != NULL)
{
char *json_str = cJSON_Print(root);
printf("解析解密后的JSON数据 %s", json_str);
//此处根据获取的物模型功能指令,填写设备需要操作的代码,例如设备关机
char *deviceId = cJSON_GetObjectItem(root,"deviceId")->valuestring;
printf("deviceId = %s\r\n",deviceId);//解析获取的设备ID
char *functionId = cJSON_GetObjectItem(root,"functionId")->valuestring;
printf("functionId = %s\r\n",functionId);//解析获取的物模型功能ID
char *functionName = cJSON_GetObjectItem(root,"functionName")->valuestring;
printf("functionName = %s\r\n",functionName);//解析获取的物模型功能命令字符串
//解析解密后的JSON格式字符串 根据解析的数据,提供设备的功能服务,例如设备关机
/*
{
"deviceId": "44080000001111000019",
"functionId": "F_1696584326733",
"functionName": "close system"
}
*/
cJSON_Delete(root); // 释放内存空间
}
else
{
printf("Error before: [%s]\r\n",cJSON_GetErrorPtr());
}
}
free(outdata);
}
}
}
else
{
printf("No data arrived.\r\n");
}
}
// Delay_s(1);
vTaskDelay(5*1000);
printf("disconnecting. \r\n");
len = MQTTSerialize_disconnect(buf, buflen);
rc = transport_sendPacketBuffer(mysock, buf, len);
free(buf);
break;
case SOCK_CLOSE_WAIT:
close(SOCK_SUBSCRIBED);
printf("SOCK_CLOSE_WAIT. \r\n");
rc=1;
break;
}
// return rc;
}Mqtt协议接入STM例程打包源码进入IOT中继宝盒主操作界面打开“IOT设备接口”窗口,选择对应的设备–设备接入端接口中对应的协议接入样例中下载。
长按关注宜联科技公众号