基于WebSocket实现的后台服务,用于接收客户端的心跳消息,并根据心跳消息来维护客户端连接。
具体实现中,服务启动后会创建一个HttpListener对象,用于监听客户端的WebSocket连接请求。当客户端连接成功后,服务会为每个连接创建一个Task实例,用于接收客户端发送的心跳消息,并根据心跳消息更新心跳时间戳。服务还会定期向客户端发送心跳消息,以保持连接的活跃状态。
如果服务在一定时间内没有收到客户端发送的心跳消息,就会认为客户端已经掉线,服务会关闭连接并从连接列表中移除该客户端。文章来源:https://www.toymoban.com/news/detail-603765.html
此服务适用于需要实现长连接的场景,例如实时消息推送、在线游戏等。需要注意的是,此服务只能用于WebSocket通信,客户端必须实现WebSocket协议。文章来源地址https://www.toymoban.com/news/detail-603765.html
using Microsoft.Extensions.Hosting;
using MSEBP.Kernel.Common.Logging;
using System;
using System.Collections.Concurrent;
using System.Linq;
using System.Net;
using System.Net.Sockets;
using System.Net.WebSockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace Authorization.WebApi
{
/// <summary>
/// 此代码只能用于 websocket通信,客户端必须websocket实现,暂时无用。
/// </summary>
public class WebSocketBackgroundService : IHostedService, IDisposable
{
private const int _heartBeatInterval = 30000; // 心跳间隔(毫秒)
private const int _heartBeatTimeout = 60000; // 心跳超时时间(毫秒)
private const int _clientIdLength = 10;
private readonly CancellationTokenSource _cts = new CancellationTokenSource();
private readonly ConcurrentDictionary<string, WebSocket> _clients = new ConcurrentDictionary<string, WebSocket>();
private readonly ILogger _logger;
/// <summary>
///
/// </summary>
/// <param name="logger"></param>
public WebSocketBackgroundService(ILogger logger)
{
_logger = logger;
}
/// <summary>
///
/// </summary>
/// <param name="cancellationToken"></param>
/// <returns></returns>
public async Task StartAsync(CancellationToken cancellationToken)
{
IPAddress localIp = Dns.GetHostEntry(Dns.GetHostName()).AddressList.FirstOrDefault(ip => ip.AddressFamily == AddressFamily.InterNetwork);
if (localIp == null)
{
throw new Exception("Cannot find local IP address.");
}
IPEndPoint localEndPoint = new IPEndPoint(localIp, 8181);
HttpListener listener = new HttpListener();
//listener.Prefixes.Add($"http://{localEndPoint}/");
listener.Start();
_ = Task.Run(async () =>
{
try
{
while (!_cts.IsCancellationRequested)
{
HttpListenerContext context = await listener.GetContextAsync();
if (context.Request.IsWebSocketRequest)
{
WebSocket webSocket = await AcceptWebSocketAsync(context);
_ = Task.Run(async () =>
{
await ReceiveHeartbeatAsync(webSocket);
}, _cts.Token);
}
else
{
context.Response.StatusCode = 400;
context.Response.Close();
}
}
}
catch (Exception ex)
{
_logger.Error(ex, "WebSocket server error.");
}
}, _cts.Token);
}
private async Task<WebSocket> AcceptWebSocketAsync(HttpListenerContext context)
{
HttpListenerWebSocketContext wsContext = await context.AcceptWebSocketAsync(null);
WebSocket webSocket = wsContext.WebSocket;
return webSocket;
}
private async Task ReceiveHeartbeatAsync(WebSocket webSocket)
{
byte[] buffer = new byte[1024];
CancellationToken token = _cts.Token;
DateTime lastHeartbeatTime = DateTime.UtcNow;
try
{
while (webSocket.State == WebSocketState.Open && !token.IsCancellationRequested)
{
WebSocketReceiveResult result = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
if (result.CloseStatus.HasValue)
{
await CloseWebSocketAsync(webSocket, result.CloseStatus.Value, result.CloseStatusDescription);
break;
}
else if (result.MessageType == WebSocketMessageType.Text)
{
string message = Encoding.UTF8.GetString(buffer, 0, result.Count).Trim();
if (message.StartsWith("heartbeat"))
{
lastHeartbeatTime = DateTime.UtcNow;
string clientId = message.Substring(0, Math.Min(message.Length, _clientIdLength));
_clients.TryAdd(clientId, webSocket);
}
else if (string.IsNullOrEmpty(message))
{
await CloseWebSocketAsync(webSocket, WebSocketCloseStatus.NormalClosure, "Closed by client");
break;
}
else
{
// 处理业务逻辑
}
}
// 检测心跳超时
if ((DateTime.UtcNow - lastHeartbeatTime).TotalMilliseconds > _heartBeatTimeout)
{
await CloseWebSocketAsync(webSocket, WebSocketCloseStatus.NormalClosure, "Heartbeat timeout");
break;
}
}
}
catch (WebSocketException ex) when (ex.WebSocketErrorCode == WebSocketError.ConnectionClosedPrematurely)
{
// WebSocket 连接被意外关闭,忽略异常
}
catch (Exception ex)
{
_logger.Error(ex, "WebSocket error.");
}
finally
{
// 移除客户端连接
foreach (var item in _clients)
{
if (item.Value == webSocket)
{
_clients.TryRemove(item.Key, out _);
break;
}
}
await CloseWebSocketAsync(webSocket, WebSocketCloseStatus.NormalClosure, "Closed by server");
}
}
private async Task CloseWebSocketAsync(WebSocket webSocket, WebSocketCloseStatus closeStatus, string closeStatusDescription)
{
try
{
await webSocket.CloseAsync(closeStatus, closeStatusDescription, CancellationToken.None);
}
catch (WebSocketException ex) when (ex.WebSocketErrorCode == WebSocketError.ConnectionClosedPrematurely)
{
// WebSocket 连接已经关闭,忽略异常
}
catch (Exception ex)
{
_logger.Error(ex, "Failed to close WebSocket.");
}
}
public async Task StopAsync(CancellationToken cancellationToken)
{
_cts.Cancel();
await Task.CompletedTask;
}
public void Dispose()
{
_cts.Dispose();
}
}
}
到了这里,关于基于WebSocket实现的后台服务的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!