使用子协议创建可靠的 Websocket
使用子协议创建可靠的 Websocket
WebSocket是一种在单个TCP连接上进行全双工通信的协议,广泛应用于实时通信场景。然而,由于网络的不稳定,WebSocket连接可能会出现断开的情况,导致消息丢失。为了解决这个问题,Azure Web PubSub服务提供了可靠的子协议,可以帮助客户端实现消息的可靠传输。本文将详细介绍如何使用这些子协议创建可靠的WebSocket客户端。
什么是可靠的WebSocket
当WebSocket客户端连接由于间歇性网络问题而断开时,消息可能会丢失。在发布/订阅系统中,发布服务器与订阅服务器是分离的,因此发布服务器可能无法检测到订阅服务器连接断开或消息丢失。客户端克服间歇性网络问题并保持可靠的消息传送至关重要。要在Azure Web PubSub服务的帮助下创建可靠的WebSocket客户端,可以使用可靠的子协议。
Web PubSub服务支持两个可靠子协议:
- json.reliable.webpubsub.azure.v1
- protobuf.reliable.webpubsub.azure.v1
客户端必须与子协议的发布服务器、订阅服务器和恢复部分保持一致才能实现可靠性。不正确实现子协议可能导致消息传送无法按预期工作,或者服务由于协议冲突而终止客户端。
简单方法:使用客户端SDK
创建可靠客户端的最简单方法是使用客户端SDK。客户端SDK实现了Web PubSub客户端规范,默认使用json.reliable.webpubsub.azure.v1。有关快速入门,请参阅在客户端之间发布/订阅。
困难方法:手动实现
以下教程可指导你完成实现Web PubSub客户端规范的重要部分。本指南不适用于既想要了解快速入门知识,又想要了解可靠性实现原则的读者。如需快速入门,请使用客户端SDK。
初始化
若要使用可靠的子协议,必须在构造WebSocket连接时设置子协议。在JavaScript中,可使用以下代码:
使用Json可靠子协议:
var pubsub = new WebSocket(
"wss://test.webpubsub.azure.com/client/hubs/hub1",
"json.reliable.webpubsub.azure.v1"
);
使用Protobuf可靠子协议:
var pubsub = new WebSocket(
"wss://test.webpubsub.azure.com/client/hubs/hub1",
"protobuf.reliable.webpubsub.azure.v1"
);
连接恢复
连接恢复是实现可靠性的基础,在使用json.reliable.webpubsub.azure.v1和protobuf.reliable.webpubsub.azure.v1协议时必须实现重新连接。WebSocket连接依赖于TCP。当连接未断开时,消息不会丢失,而是按顺序传送。为了防止因连接断开而丢失消息,Web PubSub服务将保留连接状态信息,包括组和消息信息。此信息用于在连接恢复时还原客户端
当客户端使用可靠子协议重新连接到服务时,客户端将收到包含connectionId和reconnectionToken的Connected消息。connectionId标识服务中连接的会话。
{
"type": "system",
"event": "connected",
"connectionId": "<connection_id>",
"reconnectionToken": "<reconnection_token>"
}
WebSocket连接断开后,客户端应尝试使用相同的connectionId重新连接以还原同一会话。客户端不需要与服务器进行协商并获取access_token。在恢复连接时,客户端应使用服务主机名、connection_id和reconnection_token直接向服务发出WebSocket连接请求:
wss://<service-endpoint>/client/hubs/<hub>?awps_connection_id=<connection_id>&awps_reconnection_token=<reconnection_token>
如果网络问题尚未得到解决,连接恢复可能会失败。客户端应持续重试重新连接,直到:
- WebSocket连接关闭并出现状态代码1008。该状态代码表示已从服务中删除了此connectionId。
- 恢复失败的持续时间超过1分钟。
Publisher
将事件发送到事件处理程序或者将消息发布到其他客户端的客户端称为发布服务器。发布服务器应在消息中设置ackId,以接收来自Web PubSub服务的确认,其中指出消息发布是否成功。ackId是消息的标识符,每个新消息应使用唯一ID。重新发送消息时应使用原始ackId。
示例组发送消息:
{
"type": "sendToGroup",
"group": "group1",
"dataType": "text",
"data": "text data",
"ackId": 1
}
示例确认响应:
{
"type": "ack",
"ackId": 1,
"success": true
}
如果Web PubSub服务返回带有success: true的确认响应,则表示该消息已由服务处理,并且客户端可以预期该消息将传递给所有订阅服务器。
当服务遇到暂时性内部错误并且无法将消息发送到订阅服务器时,发布服务器将收到带有success: false的确认。发布服务器应读取错误,以确定是否要重新发送消息。如果重新发送消息,则应使用相同的ackId。
{
"type": "ack",
"ackId": 1,
"success": false,
"error": {
"name": "InternalServerError",
"message": "Internal server error"
}
}
如果服务的确认响应因WebSocket连接断开而丢失,发布服务器应在恢复后使用相同的ackId重新发送消息。如果服务以前处理了消息,它将发送包含Duplicate错误的确认。发布服务器应停止重新发送此消息。
{
"type": "ack",
"ackId": 1,
"success": false,
"error": {
"name": "Duplicate",
"message": "Message with ack-id: 1 has been processed"
}
}
订阅者
从事件处理程序或发布服务器接收消息的客户端称为订阅服务器。当连接因网络问题而断开时,Web PubSub服务不知道已向订阅服务器发送了多少消息。为了确定订阅服务器收到的最后一条消息,服务将发送包含sequenceId的数据消息。订阅服务器使用序列确认消息做出响应:
示例序列确认:
{
"type": "sequenceAck",
"sequenceId": 1
}
sequenceId是连接ID会话中的uint64递增编号。订阅服务器应记录它收到的最大sequenceId,仅接受具有较大sequenceId的消息,并删除具有较小或相等sequenceId的消息。订阅服务器应使用它记录的最大sequenceId进行确认,以便服务可以跳过重新传送已由订阅服务器收到的消息的操作。例如,如果订阅服务器使用具有sequenceId: 5的sequenceAck做出响应,则服务仅重新发送sequenceId大于5的消息。
所有消息均按顺序传送到订阅服务器,直到WebSocket连接断开。根据sequenceId,服务知道订阅服务器在一个会话中通过WebSocket连接收到了多少消息。在WebSocket连接断开后,服务将重新传送未由订阅服务器确认的消息。服务将存储有限数量的未确认消息。当消息数超过限制时,服务将关闭WebSocket连接并删除会话。因此,订阅服务器应尽快确认sequenceId。