← 筆記

《Unity3D網絡遊戲實踐》(第2版)要點摘錄 - 「數據流收發」

書名:****《Unity3D網絡遊戲實踐》(第2版)

作者:羅培羽

所讀版本:機械工業出版社

TCP數據流

  • 在之前的章節中說過,在OS內部,每個Socket都有一個發送緩沖區,用來保存那些接收方未確認的數據,而Send方法只是把數據寫入到發送緩沖區裡,具體的發送過程由操作系統負責;當發送緩沖區滿了,Send方法就會阻塞

  • 與Send對應,Socket的Receive也只是把接收緩沖區中的數據提取出來,當系統的接收緩沖區為空,Receive方法就會阻塞

粘包半包現象

  • 當發送端連續發送多條數據,而接收端沒有及時Receive,就會導致「粘包」的出現。.
    •   //連續發送兩條消息
        string msg1 = "Msg1";
        byte[] b1 = System.Text.Encoding.UTF8.GetBytes(msg1);
        
        string msg2 = "Msg2";
        byte[] b2 = System.Text.Encoding.UTF8.GetBytes(msg2);
        
        socket.Send(b1);
        socket.Send(b2);
    • 預期是,接收端會分別獨立讀取到這兩條數據的內容,並分別輸出Msg1和Msg2

      • 但實際上,連續調用兩次發送,會把數據快速寫入到發送緩沖區,然後快速發送到接收端。數據就會在接收端的接收緩沖區中累積
      • 而此時接收端在調用Receive從接收緩沖區中提取數據,導致最終的輸出是”Msg1Msg2”
  • 解決方案
    • 長度信息法(主要方案)
      • 在每個數據包前加上長度信息,每次接收到數據後,先讀取表示長度的字節,如果緩沖區的數據長度大於要取的字節數,則取出相應的字節,否則等待下一次數據接收

        •   public void Send()
            {
                //長度信息法
                string sendStr = reqField.text;
                byte[] bodyByte = System.Text.Encoding.UTF8.GetBytes(sendStr);
                Int16 len = (Int16)bodyByte.Length;
                byte[] lenByte = BitConverter.GetBytes(len);
                byte[] sendByte = lenByte.Concat(bodyByte).ToArray();
                socket.Send(sendByte);
            }
        • sendStr = “HelloWorld”

        • byteBytes =

          • -----------
            CharacterHelloWorld
        • len = 10(16字節長度)

        • lenByte =

          • ---
            Character0A
        • sendByte =

          • -------------
            Character0AHelloWorld
      • 接收方需要定義一個緩沖區和指示緩沖區有效數據長度的變量

        •   byte[] readBuff = new byte[1024];
            int buffCount = 0;
        • 接收數據時不再從緩沖區開頭的位置寫入,而是把新數據放在有效數據之後

        • 每次寫入後,根據接收到的長度信息更新當前該變量

        • 該變量指示了Receive讀取數據時的起點

          •   socket.BeginReceive(readBuff, buffCount, 1024 - buffCount, 0 ,ReceiveCallback, socket);
        • 收到數據後,buffCount需要更新,使下一次接收數據時,寫入到緩沖區有效數據的末尾

          •   public void ReceiveCallback(IAsyncResult ar)
              {
                  Socket socket = (Socket) ar.AsyncState;
                  int count = socket.EndReceive(ar);
                  buffCount += count;
              }

      • 數據處理 - 緩沖區數據長度 >= 一條完整信息時進行數據時,將數據取出,然後 - OptionA: 將緩沖區後面的數據向前移動(Array.Copy) - OptionB: 使用雙指針,標志緩沖區數據當前的讀取範圍 - 數據處理完後,將起點索引向後移動[上一個數據長度]位,並根據長度更新終點索引 - 緩沖區長度不夠時再做一次Array.Copy - 有效減少OptionA的複雜度 - 如果緩沖區數據長度 <= 長度信息數據長度 或 < 一條完整信息都不處理,等待下一次接收
      • 固定長度法
        • 每次都以相同的長度發送數據
      • 結束符號法
        • 規定一個結束符號,作為消息間的分隔符

大端小端問題

  • 上面使用的信息長度計算方法為:BitConverter.ToInt16(buffer, offset)

    • 該方法會根據計算機是「大端編碼」還是「小端編碼」而使用不同的轉換算法
      • 大端

        •   return (short)((*pbyte) | (*(pbyte + 1) << 8));
      • 小端

        •   return (short)((*pbyte << 8) | (*(pbyte + 1)));
  • 計算機的大端模式和小端模式

  • 大小端兼容
    •   public void Send()
        {
            string sendStr = reqField.text;
            byte[] bodyByte = System.Text.Encoding.UTF8.GetBytes(sendStr);        
            Int16 len = (Int16)bodyByte.Length;        
            byte[] lenByte = BitConverter.GetBytes(len);
            if (!BitConverter.IsLittleEndian)
            {
                //大端轉小端
                lenByte.Reverse();
            }
            byte[] sendByte = lenByte.Concat(bodyByte).ToArray();
            socket.Send(sendByte);
        }
    • 手動兼容

      • 16位

        •   Int16 bodyLength = (short)((readBuff[1] << 8) | readBuff[0]);
      • 32位

        •   Int32 bodyLength = (short)((readBuff[3] << 24) |
                                      readBuff[2] << 16 |
                                      readBuff[1] << 8 |
                                      readBuff[0]);
        • ’|’ 符相當於「相加」

        • <<24/<<16/<<8 => 字節左移24位/16位/8位

          • readBuff[1]左移8位等同於 「readBuff[1] * 2^8 = readBuff[1] * 256」

數據發送不完整問題

  • 數據在網絡擁擠,且發送緩沖區飽和的情況下,有可能導致發送出去的數據解析失敗

  • 要確保發送數據的完整,需要在發送前將數據保存,如果發送不完整,在Send回調中繼續發送
    •   int readIdx = 0; //緩沖區偏移值
        int length = 0; //緩沖區剩餘長度
        
        public void Send()
        {
            //sendBytes = 要發送的數據
            length = sendBytes.Length;
            readIdx = 0;
            socket.BeginSend(sendBytes, 0, length, 0, SendCallback, socket);
        }
        
        public void SendCallback(IASyncResult ar)
        {
            Socket socket = (Socket)ar.AsyncState;
            int count = socket.EndSend(ar); //返回已發送數據的長度
            readIdx += count; //將readIdx向後移動至已發送數據長度的末尾
            length -= count;
            if(length > 0) //如果全部數據正常發送,length - count應該 == 0
            {
                //如果>0,代表有數據需要重新發送
                socket.BeginSend(sendBytes, readIdx, length, 0, SendCallback, socket);    
            }
        }

ByteArray和Queue

  • 上述方案雖然可以確保數據在最終能正常被完整發送出去,但是如果在Callback之前再次調用Send,就會把length和readIdx修改,把這個保障機制干碎。

    • 因此我們需要一個封裝好readIdx、length以及具體數據bytes的數據結構,然後維護一個隊列(Queue);每次調用Send時把這個數據入隊,Callback時只修改它自身的length和readIdx,處理完成後出隊。
  • ByteArray

    •   using System;
        
        public class ByteArray
        {
            //默認大小
            const int DEFAULT_SIZE = 1024;
            //初始大小
            int initSize = 0;
        
            //緩沖區
            public byte[] bytes;
        
            //讀寫位置
            public int readIdx = 0;
            public int writeIdx = 0;
        
            //容量
            private int capicity = 0;
            //剩餘空間
            public int remain { get { return capicity - writeIdx; } }
        
            //數據長度
            public int length { get { return writeIdx - readIdx; } }
        
            public ByteArray(int size = DEFAULT_SIZE)
            {
                bytes = new byte[size];
                capicity = size;
                initSize = size;
                readIdx = 0;
                writeIdx = 0;
            }
        
            public ByteArray(byte[] defaultBytes)
            {
                bytes = defaultBytes;
                capicity = defaultBytes.Length;
                initSize = defaultBytes.Length;
                readIdx = 0;
                writeIdx = defaultBytes.Length;
            }
        
            //重設尺寸
            public void ReSize(int size)
            {
                if (size < length) { return; }
                if (size < initSize) { return; }
                int n = 1;
                while(n < size) //計算新數組的尺寸, 為避免頻繁重設,一般直接翻倍
                {
                    n *= 2;
                }
                capicity = n;
        
                //申請新的byte數組並把原數據複製過去
                byte[] newBytes = new byte[capicity];
                Array.Copy(bytes, readIdx, newBytes, 0, writeIdx - readIdx);        
                bytes = newBytes;
        
                //重設讀寫索引
                writeIdx = length;
                readIdx = 0;
            }
        
            //檢查並移動數據
            public void CheckAndMoveBytes()
            {
                if(length < 8)
                {
                    MoveBytes();
                }
            }
        
            //移動數據
            public void MoveBytes()
            {
                if(length > 0)
                {
                    Array.Copy(bytes, readIdx, bytes, 0, length);
                }
                writeIdx = length;
                readIdx = 0;
            }
        
            //寫入數據
            public int Write(byte[] bs, int offset, int count)
            {
                if(remain < count)
                {
                    //檢查緩沖區剩餘空間
                    ReSize(length + count);
                }
                Array.Copy(bs, offset, bytes, writeIdx, count); //從offset位置寫入
                writeIdx += count;
                return count; //返回新的有效數據長度
            }
        
            //讀取數據
            public int Read(byte[] bs, int offset, int count)
            {
                count = Math.Min(count, length);
                Array.Copy(bytes, readIdx, bs, offset, count); //把數據從bytes的readIdx讀取,保存在bs的offset到count的空間中
                readIdx += count; //根據已讀取的數據量更新readIdx
                CheckAndMoveBytes(); //如果剩餘的有效數據很小,可以將數據前移
                return count;
            }
        }
  • 具體應用

    •   public void Send()
        {
            //byte[] sendByte = 發送的數據字符串
            ByteArray ba = new ByteArray(sendByte);
            int count = 0;
            //BeginSend和回調函數可能於不同線程執行,可能出現多個線程同時操作writeQueue的情況,因此需要加鎖
            lock (writeQueue)
            {
                writeQueue.Enqueue(ba);
                count = writeQueue.Count;
            }
        
            if(count == 1)
            {
                socket.BeginSend(sendByte, 0, sendByte.Length, 0, SendCallback, socket);
            }
        }
        
        public void SendCallback(IAsyncResult result)
        {
            Socket socket = (Socket)result.AsyncState;
            int count = socket.EndSend(result);
            ByteArray ba;
        
            //BeginSend和回調函數可能於不同線程執行,可能出現多個線程同時操作writeQueue的情況,因此需要加鎖
            lock (writeQueue)
            {
                ba = writeQueue.First();
            }
        
            ba.readIdx += count;
            if(ba.length == 0) //ba.length = ba.writeIdx - ba.readIdx
            {
                //Send Finished
                //BeginSend和回調函數可能於不同線程執行,可能出現多個線程同時操作writeQueue的情況,因此需要加鎖
                lock (writeQueue)
                {
                    writeQueue.Dequeue();
                    ba = writeQueue.First(); //Get Next Send ByteArray
                }                
            }
        
            if(ba != null)
            {
                //Send Remain ByteArray
                socket.BeginSend(ba.bytes, ba.readIdx, ba.length, 0, SendCallback, socket);
            }
        }