惯性聚合 高效追踪和阅读你感兴趣的博客、新闻、科技资讯
阅读原文 在惯性聚合中打开

推荐订阅源

T
Tenable Blog
Last Week in AI
Last Week in AI
P
Proofpoint News Feed
Engineering at Meta
Engineering at Meta
H
Help Net Security
F
Fortinet All Blogs
MyScale Blog
MyScale Blog
宝玉的分享
宝玉的分享
让小产品的独立变现更简单 - ezindie.com
让小产品的独立变现更简单 - ezindie.com
博客园 - 司徒正美
量子位
N
Netflix TechBlog - Medium
Apple Machine Learning Research
Apple Machine Learning Research
小众软件
小众软件
Recorded Future
Recorded Future
博客园 - 三生石上(FineUI控件)
Vercel News
Vercel News
aimingoo的专栏
aimingoo的专栏
I
InfoQ
Microsoft Security Blog
Microsoft Security Blog
Scott Helme
Scott Helme
The Last Watchdog
The Last Watchdog
cs.AI updates on arXiv.org
cs.AI updates on arXiv.org
IT之家
IT之家
AI
AI
WordPress大学
WordPress大学
Security Archives - TechRepublic
Security Archives - TechRepublic
Google Online Security Blog
Google Online Security Blog
U
Unit 42
V2EX - 技术
V2EX - 技术
MongoDB | Blog
MongoDB | Blog
Schneier on Security
Schneier on Security
博客园 - Franky
H
Heimdal Security Blog
奇客Solidot–传递最新科技情报
奇客Solidot–传递最新科技情报
Jina AI
Jina AI
W
WeLiveSecurity
P
Privacy & Cybersecurity Law Blog
Cloudbric
Cloudbric
B
Blog RSS Feed
N
News | PayPal Newsroom
S
Securelist
Threat Intelligence Blog | Flashpoint
Threat Intelligence Blog | Flashpoint
I
Intezer
Hacker News - Newest:
Hacker News - Newest: "LLM"
CTFtime.org: upcoming CTF events
CTFtime.org: upcoming CTF events
博客园_首页
罗磊的独立博客
H
Hackread – Cybersecurity News, Data Breaches, AI and More
雷峰网
雷峰网

博客园 - 强悍的抽屉

基于 Dapper 的一个 DbUtils WebAPI 操作返回 c#版 mqtt 3.1.1 client 实现 httpWebRequest 文件下载 一个 go 文件服务器 ssdb MongoDB 刷新几次就报错 C# Win32API - 强悍的抽屉 - 博客园 回车跳转控件焦点 让程序只启动一次 -- Mutex C# 排序 WINDEF.h 变量类型 SqlHelper 数据库操作类2 SqlHelper 数据库操作类 第一个 Windows 应用程序 JavaScript 字符串处理函数 - 强悍的抽屉 - 博客园 JavaScript 字符串函数扩充 - 强悍的抽屉 - 博客园 C# 字符串处理一些方法 希望找人一起写个 Ajax 的封装 几种流行的JS框架的选择
mqtt 协议之 PINGREQ, PINGRESP
强悍的抽屉 · 2015-08-17 · via 博客园 - 强悍的抽屉

  mqtt 协议里最简单的是 ping 协议吧 (心跳包), ping 协议是已连接的客户端发往服务端, 告诉服务端,我还"活着"

PINGREQ - PING request

fixed header format.

bit76543210
byte 1 Message Type (12) DUP flag QoS level RETAIN
  1 1 0 0 x x x x
byte 2 Remaining Length (0)
  0 0 0 0 0 0 0 0

no variable header

no payload 

response:   The response to a PINGREQ message is a PINGRESP message.

PINGRESP - PING response

fixed header

bit76543210
byte 1 Message Type (13) DUP flag QoS level RETAIN
  1 1 0 1 x x x x
byte 2 Remaining Length (0)
  0 0 0 0 0 0 0 0

no variable header

no payload

------------------------------------------------------------------------ 华丽的分界线 ---------------------------------------

客户端会在一个心跳周期内发送一条PINGREQ消息到服务器端。两个字节,固定值。

服务器收到PINGREQ请求之后,会立即响应一个两个字节固定格式的PINGRESP消息。

周期定义在 心跳频率在CONNECT(连接包)可变头部“Keep Alive timer”中定义时间,单位为秒,无符号16位short表示。

ok ,上代码 :

固定头部 FinedHeader

    /// <summary>
    /// Fixed header
    /// </summary>
    internal class FixedHeader
    {
        /// <summary>
        /// Message type
        /// </summary>
        public MessageType MessageType { get; set; }

        /// <summary>
        /// DUP flag
        /// </summary>
        public bool Dup { get; set; }

        /// <summary>
        /// QoS flags
        /// </summary>
        public Qos Qos { get; set; }

        /// <summary>
        /// RETAIN 保持
        /// </summary>
        public bool Retain { get; set; }

        /// <summary>
        /// Remaining Length 剩余长度
        /// 单个字节最大值:01111111,16进制:0x7F,10进制为127。
        /// MQTT协议规定,第八位(最高位)若为1,则表示还有后续字节存在。
        /// MQTT协议最多允许4个字节表示剩余长度。
        /// 最大长度为:0xFF,0xFF,0xFF,0x7F,
        /// 二进制表示为:11111111,11111111,11111111,01111111,十进制:268435455
        /// </summary>
        public int RemaingLength { get; set; }

        public FixedHeader() { }

        public FixedHeader(Stream stream)
        {
            if (stream.Length < 2)
                throw new Exception("The supplied header is invalid. Header must be at least 2 bytes long.");

            var byte1 = stream.ReadByte();
            MessageType = (MessageType)((byte1 & 0xf0) >> 4);
            Dup = ((byte1 & 0x08) >> 3) > 0;
            Qos = (Qos)((byte1 & 0x06) >> 1);
            Retain = (byte1 & 0x01) > 0;

            //Remaining Length
            //var byte2 = stream.ReadByte();
            var lengthBytes = ReadLengthBytes(stream);
            RemaingLength = CalculateLength(lengthBytes);
        }

        public void WriteTo(Stream stream)
        {
            var flags = (byte)MessageType << 4;
            flags |= (Dup ? 1 : 0) << 3;
            flags |= (byte)Qos << 1;
            flags |= Retain ? 1 : 0;

            stream.WriteByte((byte)flags);     //byte 1
            if (RemaingLength == 0)         //byte 2
                stream.WriteByte(0);
            else
            {
                do
                {
                    int digit = RemaingLength & 0x7f;
                    RemaingLength = RemaingLength >> 7;
                    if (RemaingLength > 0)
                        digit = digit | 0x80;
                    stream.WriteByte((byte)digit);
                } while (RemaingLength > 0);
            }
        }

        internal static byte[] ReadLengthBytes(Stream stream)
        {
            var lengthBytes = new List<byte>();

            // read until we've got the entire size, or the 4 byte limit is reached
            byte sizeByte;
            int byteCount = 0;
            do
            {
                sizeByte = (byte)stream.ReadByte();
                lengthBytes.Add(sizeByte);
            } while (++byteCount <= 4 && (sizeByte & 0x80) == 0x80);

            return lengthBytes.ToArray();
        }

        internal static int CalculateLength(byte[] lengthBytes)
        {
            var remainingLength = 0;
            var multiplier = 1;

            foreach (var currentByte in lengthBytes)
            {
                remainingLength += (currentByte & 0x7f) * multiplier;
                multiplier *= 0x80;
            }

            return remainingLength;
        }
    }

消息父类: Message

    internal class Message
    {
        public FixedHeader FixedHeader { get; protected set; }

        public Message()
        {
        }

        public Message(MessageType messageType)
        {
            FixedHeader = new FixedHeader
            {
                MessageType = messageType
            };
        }

        public virtual void WriteTo(Stream stream)
        {
        }

        public static Message CreateFrom(byte[] buffer)
        {
            using (var stream = new MemoryStream(buffer))
            {
                return CreateFrom(stream);
            }
        }

        public static Message CreateFrom(Stream stream)
        {
            var header = new FixedHeader(stream);
            return CreateMessage(header, stream);
        }

        public static Message CreateMessage(FixedHeader header, Stream stream)
        {
            switch (header.MessageType)
            {
                case MessageType.CONNACK:
                    return new ConnAckMessage(header, stream);
                case MessageType.DISCONNECT:
                    return null;
                case MessageType.PINGREQ:
                    return new PingReqMessage();
                case MessageType.PUBACK:
                    return new PublishAckMessage(header, stream);
                case MessageType.PUBCOMP:
                    //return new MqttPubcompMessage(str, header);
                case MessageType.PUBLISH:
                    //return new MqttPublishMessage(str, header);
                case MessageType.PUBREC:
                    //return new MqttPubrecMessage(str, header);
                case MessageType.PUBREL:
                    //return new MqttPubrelMessage(str, header);
                case MessageType.SUBACK:
                    //return new MqttSubackMessage(str, header);
                case MessageType.UNSUBACK:
                    //return new MqttUnsubackMessage(str, header);
                case MessageType.PINGRESP:
                    return new PingRespMessage(header, stream);
                case MessageType.UNSUBSCRIBE:
                case MessageType.CONNECT:
                case MessageType.SUBSCRIBE:
                default:
                    throw new Exception("Unsupported Message Type");
            }
        }
    }

两个枚举:

MessageType  (消息类型)

Qos (服务质量等级)

    [Flags]
    public enum MessageType : byte
    {
        CONNECT     = 1,
        CONNACK     = 2,
        PUBLISH     = 3,
        PUBACK      = 4,
        PUBREC      = 5,
        PUBREL      = 6,
        PUBCOMP     = 7,
        SUBSCRIBE   = 8,
        SUBACK      = 9,
        UNSUBSCRIBE = 10,
        UNSUBACK    = 11,
        PINGREQ     = 12,
        PINGRESP    = 13,
        DISCONNECT  = 14
    }

    /// <summary>
    /// 服务质量等级
    /// </summary>
    [Flags]
    public enum Qos : byte
    {
        /// <summary>
        ///     QOS Level 0 - Message is not guaranteed delivery. No retries are made to ensure delivery is successful.
        /// </summary>
        AtMostOnce = 0,

        /// <summary>
        ///     QOS Level 1 - Message is guaranteed delivery. It will be delivered at least one time, but may be delivered
        ///     more than once if network errors occur.
        /// </summary>
        AtLeastOnce = 1,

        /// <summary>
        ///     QOS Level 2 - Message will be delivered once, and only once. Message will be retried until
        ///     it is successfully sent..
        /// </summary>
        ExactlyOnce = 2,
    }

ping 请求包:  PingReqMessage

响应包:         PingRespMessage

    internal sealed class PingReqMessage : Message
    {
        public PingReqMessage()
            : base(MessageType.PINGREQ)
        {
        }

        public override void WriteTo(Stream stream)
        {
            FixedHeader.WriteTo(stream);
        }
    }

    internal class PingRespMessage : Message
    {
        public PingRespMessage()
            : base(MessageType.PINGRESP)
        {
        }

        public PingRespMessage(FixedHeader header, Stream stream)
        {
            FixedHeader = header;
        }
    }

 OK.