三門峽網(wǎng)站建設(shè)電話熱狗網(wǎng)站排名優(yōu)化外包
一,橋接資源配置及規(guī)則配置
Emqx橋接配置流程
1,配置資源并測試連接通過
規(guī)則引擎——>資源——>新建——>選擇MQTT Bridge——>填寫參數(shù)測試連接
參數(shù)描述詳見3.1資源配置
2,配置規(guī)則
2.1根據(jù)實(shí)際業(yè)務(wù)選擇合適sql
規(guī)則引擎——>規(guī)則——>新建——>規(guī)則sql
(sql見06手冊/實(shí)施部署手冊/empx 橋接規(guī)則配置模版.xlsx)
2.2填寫規(guī)則id
“rule:當(dāng)前節(jié)點(diǎn)_upload_目標(biāo)節(jié)點(diǎn)”;例如“rule:yantai_upload_shandong”
2.3添加響應(yīng)動作
動作:選擇“橋接數(shù)據(jù)到 MQTT Broker”
關(guān)聯(lián)資源:選擇配置好的目標(biāo)資源節(jié)點(diǎn)(沒有目標(biāo)資源點(diǎn)擊新建資源去新建)
轉(zhuǎn)發(fā)消息主題:空著即可
(轉(zhuǎn)發(fā)消息時(shí)使用的主題。如果未提供,則默認(rèn)為橋接消息的主題)
消息內(nèi)容模板: 填寫”${payload}”
(支持變量。若使用空模板(默認(rèn)),消息內(nèi)容為 JSON 格式的所有字段)
3,參數(shù)配置
3.1資源配置
1,資源類型:下拉選擇MQTT Bridge
2,資源ID:
一對一:“resource:”+當(dāng)前節(jié)點(diǎn)_to_目標(biāo)節(jié)點(diǎn);例如“resource:yantai_to_shandong”
一對多:“resource:”+當(dāng)前節(jié)點(diǎn)_to_目標(biāo)節(jié)點(diǎn);例如“resource:guojia_to_provinces”
3,連接池大小:設(shè)為默認(rèn)值8
4,客戶端id:當(dāng)前節(jié)點(diǎn)+client;例如“yantai_client”
5,附加GUID:設(shè)為默認(rèn)值true(附加 GUID 選項(xiàng),設(shè)置為 true 時(shí),MQTT 連接使用的 clientid 增加隨機(jī)后綴以保證全局唯一性。 設(shè)置為 false 時(shí),會導(dǎo)致 clientid 使用同一個(gè),連接池中線程互踢,EMQX 多個(gè)節(jié)點(diǎn)之間的橋接也會互踢,推薦僅在單節(jié)點(diǎn) EMQX 且連接池大小為 1 時(shí)開啟此選項(xiàng)。)
6,用戶名:連接遠(yuǎn)程Broker的用戶名
7,密碼: 連接遠(yuǎn)程Broker的密碼
8,橋接主題的掛載點(diǎn):示例: 本地節(jié)點(diǎn)向 topic1
發(fā)消息,遠(yuǎn)程橋接節(jié)點(diǎn)的主題會變換為 bridge/aws/${node}/topic1
,程序中應(yīng)設(shè)置為空
9,磁盤緩存:設(shè)為默認(rèn)值off
10,協(xié)議版本:設(shè)為默認(rèn)值mqttV4
11,心跳間隔:設(shè)為默認(rèn)值60s
12,重連間隔:設(shè)為默認(rèn)值30s
13,重傳間隔:默認(rèn)值20s
14,橋接模式:false
15,開啟SSL連接:false
16,服務(wù)器名稱知識:指定用于對端證書驗(yàn)證時(shí)使用的主機(jī)名,
或者設(shè)置為 disable 以關(guān)閉此項(xiàng)驗(yàn)證。(默認(rèn)不填即可)
注意:配置完畢點(diǎn)擊測試連接,顯示連接成功即可應(yīng)用
二,過多的消息發(fā)布
ERROR,MQTT(32202): 正在發(fā)布過多的消息
解決方案
1,增大maxInflight(最低需要paho1.2.0版本)
2,配置多個(gè)mqtt client
由于mqttMessageHandler只會引用一個(gè)paho客戶端,并且在內(nèi)部對paho客戶端做了封裝,所以直接修改MqttPahoMessageHandler復(fù)雜度較高,我們可以重新寫一個(gè)MultiMqttMessageHandler,內(nèi)部初始化多個(gè)MqttPahoMessageHandler,這樣通過MessageingGateway發(fā)送消息時(shí),直接通過MultiMqttMessageHandler來處理mqtt消息,MultiMqttMessageHandler可以通過負(fù)載均衡的方式來把消息分派給各個(gè)MqttPahoMessageHandler
1,自定義MyMqttPahoMessageHandler類,繼承MqttPahoMessageHandler,注意權(quán)限由protected改成public。handleMessageInternal()會由channel通過dispatcher間接調(diào)用;重寫onInit()用來手動初始化MqttPahoMessageHandler。
@Overridepublic void doStop() {super.doStop();}@Overridepublic void handleMessageInternal(Message<?> message) throws Exception {super.handleMessageInternal(message);}@Overridepublic void onInit() {try {super.onInit();} catch (Exception e) {e.printStackTrace();}}
2,自定義MultiMqttMessageHandler類,繼承AbstractMessageHandler,并implements Lifecycle,自定義一個(gè)MessageHandler,添加一個(gè)Map成員屬性,用來維系多個(gè)MyMqttPahoMessageHandler;handlerCount變量可配置多個(gè)mqtt client。這里只用了radom隨機(jī)數(shù)來做負(fù)載均衡
private final AtomicBoolean running = new AtomicBoolean();
private volatile Map<Integer, MessageHandler> mqttHandlerMap;@Value("${spring.mqtt.sender.count}")
private Integer handlerCount;@Autowired
private MqttSenderConfig senderConfig;@Override
public void start() {if (!this.running.getAndSet(true)) {doStart();}
}private void doStart(){mqttHandlerMap = new ConcurrentHashMap<>();for(int i=0;i<handlerCount;i++){mqttHandlerMap.put(i, senderConfig.createMqttOutbound());}
}@Override
public void stop() {if (this.running.getAndSet(false)) {doStop();}
}private void doStop(){for(Map.Entry<Integer, MessageHandler> e : mqttHandlerMap.entrySet()){MessageHandler handler = e.getValue();((MyMqttPahoMessageHandler)handler).doStop();}
}@Override
public boolean isRunning() {return this.running.get();
}@Override
protected void handleMessageInternal(Message<?> message) throws Exception {Random random = new Random();MyMqttPahoMessageHandler messageHandler = (MyMqttPahoMessageHandler)mqttHandlerMap.get(random.nextInt(handlerCount));messageHandler.handleMessageInternal(message);
}
3,消息發(fā)布配置類
@Bean
public MqttPahoClientFactory mqttClientFactory() {DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();factory.setServerURIs(hostUrl);factory.setUserName(username);factory.setPassword(password);return factory;
}
public MessageHandler createMqttOutbound(){String tempId = MqttAsyncClient.generateClientId();MyMqttPahoMessageHandler messageHandler = new MyMqttPahoMessageHandler(clientId + "sender" + tempId, mqttClientFactory());messageHandler.setAsync(true);messageHandler.setDefaultTopic(defaultTopic);messageHandler.setDefaultQos(1);messageHandler.onInit();return messageHandler;
}@Bean
@ServiceActivator(inputChannel = "mqttOutboundChannel")
public MessageHandler mqttOutbound() {return new MultiMqttMessageHandler();
}@Bean
public MessageChannel mqttOutboundChannel() {return new DirectChannel();
}@MessagingGateway(defaultRequestChannel = "mqttOutboundChannel")
public interface MqttGateway {void sendToMqtt(String data, @Header(MqttHeaders.TOPIC) String topic);
}
三,報(bào)文內(nèi)容過大
調(diào)整emqx參數(shù)值
zone.external.max_packet_size
mqtt.max_packet_size
四,隊(duì)列已滿
調(diào)整emqx參數(shù)值
zone.external.max_mqueue_len
消息隊(duì)列最大長度。當(dāng)飛行窗口滿,或客戶端離線后,消息會被存儲至該隊(duì)列中。0 表示不限制。
五,發(fā)送大消息的時(shí)候,客戶端會被強(qiáng)制kill掉
emqx升級到4.4.10版本之后
六,其他重點(diǎn)相關(guān)優(yōu)化參數(shù)
參數(shù)介紹api鏈接
https://www.emqx.io/docs/zh/v4.3/configuration/configuration.html#cluster
//集群節(jié)點(diǎn)發(fā)現(xiàn)方式。可選值為:manual: 手動加入集群static: 配置靜態(tài)節(jié)點(diǎn)。配置幾個(gè)固定的節(jié)點(diǎn),新節(jié)點(diǎn)通過連接固定節(jié)點(diǎn)中的某一個(gè)來加入集群。mcast: 使用 UDP 多播的方式發(fā)現(xiàn)節(jié)點(diǎn)。dns: 使用 DNS A 記錄的方式發(fā)現(xiàn)節(jié)點(diǎn)。etcd: 使用 etcd 發(fā)現(xiàn)節(jié)點(diǎn)。k8s: 使用 Kubernetes 發(fā)現(xiàn)節(jié)點(diǎn)。
cluster.discovery//指定多久之后從集群中刪除離線節(jié)點(diǎn)。
cluster.autoclean//當(dāng)使用 static 方式集群時(shí),指定固定的節(jié)點(diǎn)列表,多個(gè)節(jié)點(diǎn)間使用逗號分隔
cluster.static.seeds//節(jié)點(diǎn)名。格式為 <name>@<host>。其中 <host> 可以是 IP 地址,也可以是 FQDN:注意格式限制
node.name//系統(tǒng)調(diào)優(yōu)參數(shù),設(shè)置 Erlang 允許的最大進(jìn)程數(shù),這將影響 emqx 節(jié)點(diǎn)能處理的連接數(shù)
//integer 1024 - 134217727 默認(rèn):2097152
node.process_limit//系統(tǒng)調(diào)優(yōu)參數(shù),設(shè)置 Erlang 允許的最大 Ports 數(shù)量
//integer 1024 - 134217727 1048576
node.max_ports//系統(tǒng)調(diào)優(yōu)參數(shù),設(shè)置 Erlang 分布式通信使用的最大緩存大小
//bytesize 1KB - 2GB 8MB
node.dist_buffer_size//系統(tǒng)調(diào)優(yōu)參數(shù),設(shè)置 Erlang 運(yùn)行時(shí)允許的最大 ETS 表數(shù)量 integer 默認(rèn)262144
node.max_ets_tables//系統(tǒng)調(diào)優(yōu)參數(shù),設(shè)置 Erlang 運(yùn)行多久強(qiáng)制進(jìn)行一次全局垃圾回收。默認(rèn)15m
node.global_gc_interval//系統(tǒng)調(diào)優(yōu)參數(shù),設(shè)置 Erlang 運(yùn)行時(shí)多少次 generational GC 之后才進(jìn)行一次 fullsweep GC。
//integer 0 - 65535 默認(rèn):1000
node.fullsweep_after//系統(tǒng)調(diào)優(yōu)參數(shù),當(dāng)一個(gè)節(jié)點(diǎn)持續(xù)無響應(yīng)多久之后,認(rèn)為其已經(jīng)宕機(jī)并斷開連接 默認(rèn)120node.dist_net_ticktime//MQTT 服務(wù)器會為每個(gè)主題存儲最新一條保留消息,以方便消息發(fā)布后才上線的客戶端在訂閱主題時(shí)仍可以接收到該消息。
mqtt.retain_available//是否忽略自己發(fā)送的消息:默認(rèn)false
mqtt.ignore_loop_deliver//當(dāng)收到一定數(shù)量的消息,或字節(jié),就強(qiáng)制執(zhí)行一次垃圾回收。
//16000|16MB 表示當(dāng)收到 16000 條消息,或 16MB 的字節(jié)流入就強(qiáng)制執(zhí)行一次垃圾回收
zone.external.force_gc_policy//允許客戶端訂閱主題的最大層級。0 表示不限制,層級多會有性能問題zone.external.max_topic_levels//飛行窗口大小。飛行窗口用于存儲未被應(yīng)答的 QoS 1 和 QoS 2 消息
zone.external.max_inflight//消息重發(fā)間隔。EMQX 在每個(gè)間隔檢查是否需要進(jìn)行消息重發(fā)
zone.external.retry_interval//消息隊(duì)列是否存儲 QoS 0 消息。
zone.external.mqueue_store_qos0//ACL機(jī)制
MQTT 授權(quán)(authorization)是指對 MQTT 客戶端的發(fā)布和訂閱操作進(jìn)行 權(quán)限控制。 控制的內(nèi)容主要是哪些客戶端可以發(fā)布或者訂閱哪些 MQTT 主題。
EMQX 支持集中類型的授權(quán)。權(quán)限列表(亦即 ACL)??梢詮睦?MongoDB, MySQL,PostgreSQL,Redis,或者 EMQX 的內(nèi)置數(shù)據(jù)庫中讀取這個(gè)列表。加載一個(gè)包含全局的 ACL 的文件。動態(tài)訪問一個(gè) HTTP 后端服務(wù),并通過該 HTTP 調(diào)用的返回值來客戶端是否有訪問的權(quán)限。通過提取認(rèn)證過程中攜帶的授權(quán)數(shù)據(jù),例如 JWT 的某個(gè)字段。
EMQX 最大文件句柄數(shù)done:ulimit -n 1048576done: /etc/security/limits.confdone: /etc/sysctl.confdone: /etc/systemd/system.confdone: 重啟 emqx 服務(wù):ulimit -n 1048576; ./emqx stop; ./emqx startdone: 確認(rèn) EMQX Web 后臺顯示
tcp并發(fā)數(shù)
/etc/systemd/system.conf查看默認(rèn)值$ systemctl --user show syncthing | grep LimitNOFILELimitNOFILE=4096LimitNOFILESoft=1024設(shè)置DefaultLimitNOFILE=1048576
7,jmeter壓測emqx
1. 下載jmeter,解壓
https://jmeter.apache.org/download_jmeter.cgi
以 5.4.3 為例,下載地址: https://dlcdn.apache.org//jmeter/binaries/apache-jmeter-5.4.3.zip
linux下解壓: unzip apache-jmeter-5.4.3.zip
2. 下載mqtt-jmeter插件
下載地址:
https://github.com/emqx/mqtt-jmeter/releases
https://github.com/emqx/mqtt-jmeter/releases/download/v2.0.2/mqtt-xmeter-2.0.2-jar-with-dependencies.jar
3. 將插件放置于jmeter的lib/ext目錄下,windows/linux同樣操作
4. 本文先在windows下生成的jmx腳本,然后傳至linux下使用
4.1 新建兩個(gè)線程組
第一個(gè)僅包含一個(gè) MQTT DisConnect,執(zhí)行一次
第二個(gè)里面包含具體的壓測,開啟1000個(gè)線程,1s內(nèi)將線程創(chuàng)建完畢,無限循環(huán)。創(chuàng)建兩個(gè)計(jì)數(shù)器,pub_counter用來技術(shù)發(fā)布消息數(shù),thread_counter用來線程計(jì)數(shù)
4.2 事先創(chuàng)建1000個(gè)設(shè)備,名稱為cosmoiottest000001 - cosmoiottest000001000(可自己定義)。添加一次性控制器(mqtt連接一次,后續(xù)pub消息),寫上配置信息。
4.3 添加循環(huán)控制器,循環(huán)一次。包含固定定時(shí)器,休眠1000ms,一個(gè)發(fā)布MQTT Pub Sampler,即每個(gè)線程進(jìn)來執(zhí)行一次發(fā)布消息然后休眠1000ms進(jìn)入下一次循環(huán)。每個(gè)消息包含100個(gè)點(diǎn)位(根據(jù)自己需要設(shè)置),每個(gè)點(diǎn)位隨機(jī)生成一個(gè)整數(shù)。配置詳見截圖
4.4 添加觀察結(jié)果樹、匯總報(bào)告、聚合報(bào)告等,可在windows下面查看結(jié)果
4.5 配置截圖如下:
循環(huán)執(zhí)行線程
數(shù)器
thread_counter計(jì)數(shù)器
mqtt connect設(shè)置:
MQTT發(fā)布消息:
5. linux壓測命令:(需要先將bin/jmeter添加可執(zhí)行權(quán)限)
chmod +x bin/jmeter
./bin/jmeter -n -t mqtt_test.jmx -l result.jtl
6. 將結(jié)果jtl生成可視化報(bào)告,放置于result目錄
mkdir result
./bin/jmeter -g result.jtl -o result
將結(jié)果目錄拉下來,點(diǎn)開即可查看圖形化結(jié)果
注,可能遇到問題:
1. 執(zhí)行jmeter壓測后,進(jìn)程不退出,編輯 jmeter.properties,打開配置
jmeterengine.force.system.exit=true
2. jmx文件傳到linux后可能出錯(cuò),建議英文環(huán)境下生成jmx文件,語言控制jmeter.properties
#language=en (默認(rèn)英文,切換為中文為:zh_CN)
3. mqtt-jmeter 的jar包需要傳至lib/ext目錄,否則不可用
4. 生成報(bào)告時(shí)報(bào)錯(cuò):Consumer failed with message :Begin size 0 is not equal to fixed size 5
將jdk換成8版本
5. jtl結(jié)果文件,也可拉到windows,使用jmeter直接查看,新建線程組->聚合報(bào)告,選擇jtl文件