Amazon GameLift In C# 08: より良いチャットサーバーを作ります(Part1)

Amazon GameLift Walkthrough in C# (with Unity) Tutorial 08

概要

前回作ったチャットソフトは、メッセージ送受信の検証として、一回通信しかできなかったので、これから継続的に、複数のユーザーでチャットできるチャットソフトを作ります。
説明したいことが比較的に多くて、ソースも少し長いので、サーバープログラムの解説はChatServer.csConncetedClient.csを中心として、二つ記事に分けて説明いたします。

マシン環境

CPU : Intel i7-6920HQ
GPU : AMD Radeon Pro 460
Memory : 16GB
System : Windows 10 (with .NET 5 installed)
IDE : Visual Studio 2019 Community
Editor : Visual Studio Code
Terminal : Windows Terminal (or Command Prompt)

ファイルの構造とクラスの運用

前回作ったチャットソフトのソースコードは一つファイルにまとめてテストしたが、今回のソースは比較的に長いので、管理しやすくするため、複数のファイルに分けました。

プロジェクトはGitHubにアップロードしています : AGLW-CSharp-BetterChatServerSample

ソースコードファイル : - Program.cs (プログラムの入口)
- GameLiftServer.cs (Amazon GameLiftとの接続、そしてChatServerを管理するクラス)
- ChatServer.cs (クライアントConnectedClientとの接続、通信管理クラス)
- ConnectedClient.cs (クライアントの接続情報とNetworkStreamでの受送信管理クラス)

メッセージ管理、受送信のコンテナはConcurrentQueue(FIFO)使うので、順序的なメッセージの管理がしやすいです。そしてActionというDelegateを使って、クライアントに一斉送信することが楽です。

Program.csGameLiftServer.csの中身はそこまで変わらないので、説明は省略します。

ChatServer.cs ソースコード

ChatServer.csソースコードを貼り付けます。

using System;
using System.Collections.Generic;

using System.Threading;
using System.Net;
using System.Net.Sockets;
using System.Collections.Concurrent;

using Aws.GameLift.Server;
using Aws.GameLift.Server.Model;

namespace AGLW_CSharp_BetterChatServerSample
{
    class ChatServer
    {
        public static int MessageLength = 256; // 256bytes
        public static int SleepDuration = 100; // 100ms

        // A UTF-8 encoder to process byte[] <-> string conversion
        public readonly System.Text.Encoding Encoder = System.Text.Encoding.UTF8;

        public GameSession ManagedGameSession { get; private set; } = null;

        // TCP lisenter has it's own thread
        private TcpListener listener = null;
        private Thread listenerThread = null;

        private Thread senderThread = null;
        private Thread retrieverThread = null;

        private ConcurrentQueue<byte[]> messagePool = null;

        private event Action<byte[]> sendMsgDelegate;

        List<ConnectedClient> clientPool = null;
        List<ConnectedClient> clientsInQueue = null;

        public ChatServer(GameSession session)
        {
            ManagedGameSession = session;

            messagePool = new ConcurrentQueue<byte[]>();
            clientPool = new List<ConnectedClient>();
            clientsInQueue = new List<ConnectedClient>();

            StartServer();
        }

        internal void UpdateGameSession(GameSession gameSession)
        {
            ManagedGameSession = gameSession;
        }

        public void StartServer()
        {
            // Create a TCP listener(in a listener thread) from port when when ProcessReady() returns success
            LaunchListenerThread(ManagedGameSession.Port);
            LaunchSenderThread();
            LaunchRetrieverThread();
        }

        // A method creates thread for listener
        void LaunchListenerThread(int port)
        {
            listenerThread = new Thread(() =>
                {
                    Listen(port);
                });

            listenerThread.Start();

            Console.WriteLine($"Server : Listener thread is created and started");
        }

        void LaunchSenderThread()
        {
            senderThread = new Thread(() => SendToAllClients());
            senderThread.Start();
        }

        private void LaunchRetrieverThread()
        {
            retrieverThread = new Thread(() => RetrieveFromAllClients());
            retrieverThread.Start();
        }

        // A method listens the port.
        // When client connects : 
        // 1) Create ConnectedClient instance from TcpClient -> 2) Register SendMessage() to delegate -> 3) Add client to clientsInQueue
        void Listen(int port)
        {
            listener = TcpListener.Create(ManagedGameSession.Port);
            listener.Start();

            Console.WriteLine($"Server : Start listening port {port}");

            while (true)
            {
                // TcpClient.AccecptTcpClient() blocks
                TcpClient client = listener.AcceptTcpClient();
                ConnectedClient c = new ConnectedClient(client);
                sendMsgDelegate += c.SendMessage;
                c.StartClient();

                clientsInQueue.Add(c);
            }
        }


        private void SendToAllClients()
        {
            while (true)
            {
                SleepForAWhile();

                if (messagePool.Count > 0)
                {
                    byte[] bytes = new byte[MessageLength];
                    if (messagePool.TryDequeue(out bytes))
                    {
                        Console.WriteLine($"Message sent: {Encoder.GetString(bytes)}");
                        sendMsgDelegate(bytes);
                    }
                }
            }
        }

        private void RetrieveFromAllClients()
        {
            while (true)
            {
                if (clientsInQueue.Count > 0)
                {
                    clientPool.AddRange(clientsInQueue);
                    clientsInQueue.Clear();
                }

                SleepForAWhile();

                List<ConnectedClient> disconnectedClients = new List<ConnectedClient>();
                
                byte[] bytes = new byte[MessageLength];
                foreach (var c in clientPool)
                {
                    if (c != null && c.TargetClient.Connected)
                    {
                        if (c.RetrieveMessage(out bytes))
                        {
                            messagePool.Enqueue(bytes);
                        }
                    }
                    else
                    {
                        disconnectedClients.Add(c);
                    }
                }

                // Release disconnected client object
                foreach (var dc in disconnectedClients)
                {
                    sendMsgDelegate -= dc.SendMessage;
                    clientPool.Remove(dc);
                }

            }
        }

        private void SleepForAWhile()
        {
            Thread.Sleep(SleepDuration);
        }
    }
}

ChatServer.cs メンバー変数の解説

public static int MessageLength = 256; // 256bytes
public static int SleepDuration = 100; // 100ms

// A UTF-8 encoder to process byte[] <-> string conversion
public readonly System.Text.Encoding Encoder = System.Text.Encoding.UTF8;

public GameSession ManagedGameSession { get; private set; } = null;

// TCP lisenter has it's own thread
private TcpListener listener = null;
private Thread listenerThread = null;

private Thread senderThread = null;
private Thread retrieverThread = null;

private ConcurrentQueue<byte[]> messagePool = null;

private event Action<byte[]> sendMsgDelegate;

List<ConnectedClient> clientPool = null;
List<ConnectedClient> clientsInQueue = null;

- MessageLength / SleepDuration
メッセージの長さは256bytesに決まっていて、そしてサーバー負荷を軽減するため、100ms(0.1秒)という睡眠時間をつけます。
- ManagedGameSession
GameLiftServer.csOnStartGameSessionUpdateGameSessionのタイミングでGameSessionを保存するメンバー変数です。 - TcpListener : listener
クライアントの接続動作を監視するメンバーです。 - Thread : listenerThread / senderThread / retrieverThread
プログラムのmain thread以外は三つThreadがあります。
listenerThreadは接続を監視するThreadsenderThreadはメッセージの一斉送信するThread、そしてretrieverThreadは各クライアントからのメッセージを受信するThreadです。

  • ConcurrentQueue<byte[]> : messagePool
    First In First Out(FIFO)というデータ構造のコンテナーです。順序的に受信したメッセージはここに保存し、送信するメッセージはここから取り出します。

  • event Action<byte[]> : sendMsgDelegate
    クライアントが接続した後、SendMessage()という関数がこのDelegate(Action)に登録し、メッセージがあったら、一斉送信されます。

  • List clientPool / clientsInQueue
    クライアントを管理するリストです。Multi-Threadの処理には揉め事があるため、clientPoolに関わる処理が終るまで、新しい接続できたクライアントはclientsInQueueに入れて、後でclientPoolに追加します。

ChatServer.cs 関数の解説

// A method listens the port.
// When client connects : 
// 1) Create ConnectedClient instance from TcpClient -> 2) Register SendMessage() to delegate -> 3) Add client to clientsInQueue
void Listen(int port)
{
    listener = TcpListener.Create(ManagedGameSession.Port);
    listener.Start();

    Console.WriteLine($"Server : Start listening port {port}");

    while (true)
    {
        // TcpClient.AccecptTcpClient() blocks
        TcpClient client = listener.AcceptTcpClient();
        ConnectedClient c = new ConnectedClient(client);
        sendMsgDelegate += c.SendMessage;
        c.StartClient();

        clientsInQueue.Add(c);
    }
}

Listen(int port)関数は前回と少し変わりました。
接続できたTcpClientのインスタンスでConnectedClientを作って、そしてSentMessage関数をsendMsgDelegateに登録します。
ConnectedClientのインスタンス自体はStartClient()で作業開始し、clientsInQueueに追加され、他の関数でclientPoolに追加待ち。

 
 

private void SleepForAWhile()
{
    Thread.Sleep(SleepDuration);
}

Threadを睡眠させる操作をこのSleepForAWhile()関数に統一させます。

 
 

private void LaunchRetrieverThread()
{
    retrieverThread = new Thread(() => RetrieveFromAllClients());
    retrieverThread.Start();
}

private void RetrieveFromAllClients()
{
    while (true)
    {
        if (clientsInQueue.Count > 0)
        {
            clientPool.AddRange(clientsInQueue);
            clientsInQueue.Clear();
        }

        SleepForAWhile();

        List<ConnectedClient> disconnectedClients = new List<ConnectedClient>();
        
        byte[] bytes = new byte[MessageLength];
        foreach (var c in clientPool)
        {
            if (c != null && c.TargetClient.Connected)
            {
                if (c.RetrieveMessage(out bytes))
                {
                    messagePool.Enqueue(bytes);
                }
            }
            else
            {
                disconnectedClients.Add(c);
            }
        }

        // Release disconnected client object
        foreach (var dc in disconnectedClients)
        {
            sendMsgDelegate -= dc.SendMessage;
            clientPool.Remove(dc);
        }

    }
}

LaunchRetrieverThread()RetrieveFromAllClients()を実行するThreadを起動します。

 

if (clientsInQueue.Count > 0)
{
    clientPool.AddRange(clientsInQueue);
    clientsInQueue.Clear();
}

メッセージをまとめる前に、隊列に待っているクライアントを確認し、clientPoolに追加する。

List<ConnectedClient> disconnectedClients = new List<ConnectedClient>();
        
byte[] bytes = new byte[MessageLength];
foreach (var c in clientPool)
{
    if (c != null && c.TargetClient.Connected)
    {
        if (c.RetrieveMessage(out bytes))
        {
            messagePool.Enqueue(bytes);
        }
    }
    else
    {
        disconnectedClients.Add(c);
    }
}

ループですべてのクライアントからメッセージを保存し、切断されるクライアントもdisconnectedClientsに記録します。

 

// Release disconnected client object
foreach (var dc in disconnectedClients)
{
    sendMsgDelegate -= dc.SendMessage;
    clientPool.Remove(dc);
}

切断したクライアントのSendMessage関数を登録解除し、clientPoolから削除します。

 
 

void LaunchSenderThread()
{
    senderThread = new Thread(() => SendToAllClients());
    senderThread.Start();
}

private void SendToAllClients()
{
    while (true)
    {
        SleepForAWhile();

        if (messagePool.Count > 0)
        {
            byte[] bytes = new byte[MessageLength];
            if (messagePool.TryDequeue(out bytes))
            {
                Console.WriteLine($"Message sent: {Encoder.GetString(bytes)}");
                sendMsgDelegate(bytes);
            }
        }
    }
}

LaunchSenderThread()SendToAllClients()を実行するThreadを起動します。
メッセージがあったら、messagePool.TryDequeue(out bytes)でメッセージを取り出して、sendMsgDelegate()でクライアントに一斉送信します。

これでChatServer.csの説明が完了いたします。

最後

クライアントとの通信全体はChatServer.csで管理していますが、細かいメッセージ処理はConnectedClient.csでやっています。
ConnectedClient.csは次回の記事で解説します。