























//关键代码 IMqttClient mqttClient; public async void ReceiveData() { try { //不等空,断开连接 if (mqttClient == null) { var factory = new MqttFactory(); var client = factory.CreateMqttClient(); var options = new MqttClientOptionsBuilder() .WithTcpServer("192.168.250.141",1883) .Build(); client.ApplicationMessageReceivedAsync += e => { var topic = e.ApplicationMessage.Topic; var playload = Encoding.UTF8.GetString(e.ApplicationMessage.Payload); SaveData(e); return Task.CompletedTask; }; //连接MQ await client.ConnectAsync(options); //订阅主题 await client.SubscribeAsync(new MqttTopicFilterBuilder() .WithTopic("itema/#") .Build()); //历史数据删除(方法内部定时删除,不会每次都删除) threadDeleteHistoryData = new Thread(DeleteHistoryData_Thread); threadDeleteHistoryData.IsBackground = true; threadDeleteHistoryData.Start(); } } catch (Exception ex) { string strLog = $"MQTT订阅消息异常\n:{ex.Message}\n{ex.StackTrace}"; Logger.WriteLog(strLog); Console.WriteLine(strLog); } } private void SaveData(MqttApplicationMessageReceivedEventArgs arg) { //Do Something }
此内容由惯性聚合(RSS阅读器)自动聚合整理,仅供阅读参考。 原文来自 — 版权归原作者所有。