惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

L
LangChain Blog
博客园 - 司徒正美
美团技术团队
WordPress大学
WordPress大学
奇客Solidot–传递最新科技情报
奇客Solidot–传递最新科技情报
人人都是产品经理
人人都是产品经理
让小产品的独立变现更简单 - ezindie.com
让小产品的独立变现更简单 - ezindie.com
T
Troy Hunt's Blog
S
Schneier on Security
T
The Exploit Database - CXSecurity.com
P
Proofpoint News Feed
云风的 BLOG
云风的 BLOG
Engineering at Meta
Engineering at Meta
Cisco Talos Blog
Cisco Talos Blog
T
Tor Project blog
B
Blog
NISL@THU
NISL@THU
月光博客
月光博客
博客园 - 【当耐特】
AWS News Blog
AWS News Blog
cs.CL updates on arXiv.org
cs.CL updates on arXiv.org
腾讯CDC
L
Lohrmann on Cybersecurity
The Cloudflare Blog
L
LINUX DO - 最新话题
S
Security @ Cisco Blogs
S
Secure Thoughts
Spread Privacy
Spread Privacy
有赞技术团队
有赞技术团队
The Last Watchdog
The Last Watchdog
Project Zero
Project Zero
cs.AI updates on arXiv.org
cs.AI updates on arXiv.org
Vercel News
Vercel News
H
Hacker News: Front Page
S
SegmentFault 最新的问题
Schneier on Security
Schneier on Security
aimingoo的专栏
aimingoo的专栏
P
Privacy & Cybersecurity Law Blog
博客园 - 三生石上(FineUI控件)
Forbes - Security
Forbes - Security
C
CXSECURITY Database RSS Feed - CXSecurity.com
I
InfoQ
T
Tailwind CSS Blog
Application and Cybersecurity Blog
Application and Cybersecurity Blog
G
GRAHAM CLULEY
W
WeLiveSecurity
小众软件
小众软件
Recorded Future
Recorded Future
Cyberwarzone
Cyberwarzone
cs.CV updates on arXiv.org
cs.CV updates on arXiv.org

博客园 - 接云网络

使用Docker搭建MQTT服务器 使用Docker搭建MQTT服务器 在Raspberry Pi上安装Docker 阿里云=>RHSA-2019:1884-中危: libssh2 安全更新 MediaAPIController CentOS7 mysql支持中文 设置centos7中的mysql5.7不区分表名大小写有关操作 CentOS7 安装mysql(YUM源方式) centos7下安装nginx CentOS7利用systemctl添加dotnet后台服务 CentOS7利用systemctl添加自定义系统服务 mysql 8.0.13开启远程连接 配置方式 vue 父子组件数据的双向绑定大法 .NET Core 3.0 发布单文件可执行程序 asp.net core In Docker(Image) ASP.NET Core 2.1 使用Docker运行 Debian 8 安装Nginx最新版本 postman 发送json请求 Simple ASP.NET CORE 2.2 App +Vue JS
.Net5开发MQTT服务器
接云网络 · 2020-12-20 · via 博客园 - 接云网络

.Net5开发MQTT服务器主要借助MQTTnet包,自主开发MQTT服务器,经测试,非常稳定。

using IoT;
using JieYun.Admin.Net5;
using JieYun.IoT.Common.Models;
using JieYun.IoT.Server.Services;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using MQTTnet;
using MQTTnet.Protocol;
using MQTTnet.Server;
using System;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using static IoT.IoTRpc;

namespace JieYun.IoT.Server
{
    public class ServerWorker : BackgroundService
    {
        public static IMqttServer mqttServer;

        private readonly ILogger<ServerWorker> _logger;
        private readonly IoTRpcClient _client;

        public ServerWorker(ILogger<ServerWorker> logger, IoTRpcClient client, IServiceProvider provider)
        {
            _logger = logger;
            _client = client;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            await StartMqttServer();
        }

        //启动Mqtt服务器
        private async Task StartMqttServer()
        {
            try
            {
                //验证客户端信息
                string hostIp = AppConfigProvider.AppConfig.IoTServerAddress;//IP地址
                int hostPort = AppConfigProvider.AppConfig.IoTServerPort;//端口号
                int timeout = 5;//超时时间
                string username = "admin";//用户名
                string password = "admin";//密码

                var optionBuilder = new MqttServerOptionsBuilder()
                  // .WithDefaultEndpointBoundIPAddress(System.Net.IPAddress.Parse(hostIp))
                   .WithDefaultEndpointPort(hostPort)
                   .WithDefaultCommunicationTimeout(TimeSpan.FromSeconds(timeout))
                   .WithConnectionValidator(t =>
                   {
                       if (t.Username != username || t.Password != password)
                       {
                           t.ReasonCode = MQTTnet.Protocol.MqttConnectReasonCode.BadUserNameOrPassword;
                       }
                       t.ReasonCode = MqttConnectReasonCode.Success;
                   });
                var options = optionBuilder.Build();

                //创建Mqtt服务器
                mqttServer = new MqttFactory().CreateMqttServer();

                //开启订阅事件
                mqttServer.ClientSubscribedTopicHandler = new MqttServerClientSubscribedHandlerDelegate(MqttNetServer_SubscribedTopic);

                //取消订阅事件
                mqttServer.ClientUnsubscribedTopicHandler = new MqttServerClientUnsubscribedTopicHandlerDelegate(MqttNetServer_UnSubscribedTopic);

                //客户端消息事件
                mqttServer.UseApplicationMessageReceivedHandler(MqttServe_ApplicationMessageReceivedAsync);

                //客户端连接事件
                mqttServer.UseClientConnectedHandler(MqttNetServer_ClientConnected);

                //客户端断开事件
                mqttServer.UseClientDisconnectedHandler(MqttNetServer_ClientDisConnected);

                //启动服务器
                await mqttServer.StartAsync(options);

                _logger.LogInformation("MQTT服务器已启动.");
            }
            catch (Exception e)
            {
                _logger.LogError($"MQTT服务启动失败:{e}");
            }
        }

        /// <summary>
        /// 客户订阅
        /// </summary>
        private void MqttNetServer_SubscribedTopic(MqttServerClientSubscribedTopicEventArgs e)
        {
            //客户端Id
            var ClientId = e.ClientId;
            var Topic = e.TopicFilter.Topic;
            _logger.LogInformation($"客户端【{ClientId}】订阅:{Topic}");
        }

        /// <summary>
        /// 客户取消订阅
        /// </summary>
        private void MqttNetServer_UnSubscribedTopic(MqttServerClientUnsubscribedTopicEventArgs e)
        {
            //客户端Id
            var ClientId = e.ClientId;
            var Topic = e.TopicFilter;
            _logger.LogInformation($"客户端【{ClientId}】取消订阅:{Topic}");
        }

        /// <summary>
        /// 接收消息
        /// </summary>
        private async Task MqttServe_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs e)
        {
            _logger.LogInformation(e.ApplicationMessage.ToConsoleMessage());

            //转发消息到WebClient
            var msgStr = Encoding.UTF8.GetString(e.ApplicationMessage.Payload);
            var msg = JsonSerializer.Deserialize<MQTTMessage>(msgStr);

            if(msg.To == "SERVER"&&e.ApplicationMessage.Topic == MQTTTOPIC.UPDATE)
            {
                var msgSend = new MQTTMessage()
                {
                    From = msg.From,
                    To = "WebClient",
                    Msg = msg.Msg
                };
                await SendMessageToWebClient(msgSend);
            }
        }

        /// <summary>
        /// 客户连接
        /// </summary>
        private async Task MqttNetServer_ClientConnected(MqttServerClientConnectedEventArgs e)
        {
            var ClientId = e.ClientId;
            _logger.LogInformation($"{DateTime.Now} 客户端【{ClientId}】已连接");
           
            //通知服务器,客户端连接了
            var msg = new MQTTMessage()
            {
                From = ClientId,
                To = "SERVER",
                Msg = ClientStatus.Connected
            };
            await SendMessageToWebClient(msg);
        }

        /// <summary>
        /// 客户连接断开
        /// </summary>
        private async Task MqttNetServer_ClientDisConnected(MqttServerClientDisconnectedEventArgs e)
        {
            var ClientId = e.ClientId;
            
            _logger.LogInformation($"{DateTime.Now} 客户端【{ClientId}】已断开");

            //通知服务器,客户端断开了
            var msg = new MQTTMessage()
            {
                From = ClientId,
                To = "SERVER",
                Msg = ClientStatus.Disconnected
            };
            await SendMessageToWebClient(msg);
        }

        private async Task SendMessageToWebClient(MQTTMessage msg)
        {
            var msgStr = JsonSerializer.Serialize(msg);
            var payload = Encoding.UTF8.GetBytes(msgStr);
            MqttApplicationMessage mm = new MqttApplicationMessage()
            {
                Topic = MQTTTOPIC.UPDATE,
                Payload = payload
            };
            await mqttServer.PublishAsync(mm);
        }
    }
}
消息类MQTTMessage
using System;
using System.Collections.Generic;
using System.Text;
using System.Text.Json;

namespace JieYun.IoT.Common.Models
{
    public class MQTTMessage
    {
        public string Msg { get; set; }
        public string From { get; set; } = "SERVER";
        public string To { get; set; } = "e098060e71ef";

        public override string ToString()
        {
            return JsonSerializer.Serialize(this);
        }
    }
}
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Configuration;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading.Tasks;

namespace JieYun.IoT.Server
{
    public class Program
    {
        public static void Main(string[] args)
        {
            var config = new ConfigurationBuilder()
            .AddCommandLine(args)
            .Build();


            var host = new WebHostBuilder()
              .UseConfiguration(config)
              .UseKestrel()
              .UseContentRoot(Directory.GetCurrentDirectory())
              .UseStartup<Startup>()
              .ConfigureServices((hostContext, services) =>
                {
                    services.AddHostedService<ServerWorker>();
                })
              .ConfigureLogging(logging => {
                  logging.ClearProviders();
                  logging.SetMinimumLevel(LogLevel.Trace);
                  logging.AddConsole();
              })
              .Build();

            host.Run();
        }
    }
}