WebSocket

什么是WebSocket

首先从维基百科了解什么是WebSocket。

WebSocket是一种网络传输协议,可在单个TCP连接上进行全双工通信,位于OSI模型的应用层。WebSocket协议在2011年由IETF标准化为RFC 6455,后由RFC 7936补充规范。Web IDL中的WebSocket API由W3C标准化。——维基百科

使用WebSocket的优点

为什么不使用常规的http请求而使用WebSocket.

以下是维基百科上记录的WebSocket相较于http的一些优点:

  • 较少的控制开销。连接建立之后,发送的数据包头部想较于http而言更小,减小了发送数据。
  • 更强的实时性。由于协议是全双工,因此服务端也可以向客户端主动发送消息,因此相较于轮询实时性更强。
  • 保持连接状态。由于WebSocket有状态的特性,相比http可以省略部分状态信息。
  • 更好的二进制支持。WebSocket定义了二进制帧,相对http,可以更轻松地处理二进制内容。
  • 可以支持扩展。可扩展协议、实现部分自定义的子协议。
  • 更好的压缩效果。适当的扩展,沿用内容的上下文,在传递类似的数据时,可以显著地提高压缩率。

在实际应用,选择WebSocket的原因往往是一些接口服务强调实时性,同时又不希望通过轮询的形式增加服务器压力。

简单示例

以下简单介绍一下在Spring中如何起一个简单的WebSocket客户端和服务端,以下demo基于Spring boot做展示。

WebSocket服务端

首先,加入必须要的依赖,如下:

<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-websocket</artifactId>
    </dependency>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>

    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
    </dependency>
</dependencies>

配置WebSocket的处理类,并设置路径,跨域等参数,加上@EnableWebSocket注解开启websokect服务。

@Configuration
@EnableWebSocket
public class WebSocketConfig implements WebSocketConfigurer {

    private final CustomerWsHandler customerWsHandler;

    public WebSocketConfig(CustomerWsHandler customerWsHandler) {
        this.customerWsHandler = customerWsHandler;
    }

    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(this.customerWsHandler, "/ws").setAllowedOrigins("*");
    }
}

然后我们需要创建一个自定义的消息处理类,在诸如连接创建,消息发送等事件回调时执行相应的处理逻辑。

我们可以通过继承AbstractWebSocketHandler覆写所有回调方法,当然我们也可以选择TextWebSocketHandlerBinaryWebSocketHandler继承覆写,区别在于后两个类加入了处理二进制/文本消息时关闭ws连接的逻辑。

以下是处理文件消息的回调函数。

@Override
protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
    String payload = message.getPayload();
    log.info("收到文本消息为:{}", payload);
}

另外还有连接建立后和连接断开后的回调函数。

@Override
public void afterConnectionEstablished(WebSocketSession session) throws Exception {
    log.info("新建连接, id为:{}", session.getId());
}

@Override
public void afterConnectionClosed(WebSocketSession session, CloseStatus status) throws Exception {
    log.info("关闭链接, id为:{}, 状态为:{}", session.getId(), status.getCode());
}

这是被动接受客户端的消息的处理,当我们有业务需要主动向指定的或者所有的客户端发送消息时,应用应当在客户端连接建立时便经由回调函数将WebSocket session存下来,如还有验证,还应将WebSocket session和对应的认证状态关联起来,这样可以在业务触发时找到已登录的指定用户的存活会话并主动发送消息到客户端。

以下是使用WebscoketSession如何发送文本消息的示例。

private void sendMsgToEveryone(String message) throws IOException {
    for (WebSocketSession session : manager.all()) {
        TextMessage textMessage = new TextMessage(message);
        session.sendMessage(textMessage);
    }
}

WebSocket客户端

建立WebSocket我们同样需要先创建一个相应WebSocket相应事件的一个handler类,作为服务端发送信息,断开连接等事件响应的回调类,具体代码如下。

@Slf4j
@Component
public class CustomerHandler extends AbstractWebSocketHandler {

    @Override
    protected void handleTextMessage(WebSocketSession session, TextMessage message) throws Exception {
        log.info("收到消息,内容为:{}", message.getPayload());
    }
}

接下来,就可以根据WebSocket的服务地址来创建WebSocketClient类,并创建连接会话了,此时连接用的是标准的WebSocket协议。

@Slf4j
@Component
public class WebsocketClient {

    private WebSocketSession session;

    public WebsocketClient(CustomerHandler handler) {
        createConnection(handler);
    }

    public boolean isAlive() {
        return this.session.isOpen();
    }

    private void createConnection(WebSocketHandler handler) {
        String url = "ws://localhost:8080/ws";
        WebSocketClient client = new StandardWebSocketClient();
        try {
            this.session = client.doHandshake(handler, url).get(10, TimeUnit.SECONDS);
        } catch (Exception e) {
            log.error("连接ws服务失败", e);
        }
    }

    public void sendTextMessage(String message) {
        TextMessage textMessage = new TextMessage(message);
        try {
            session.sendMessage(textMessage);
        } catch (IOException e) {
            log.error("发送文本消息失败", e);
        }
    }
}

注: 此处将创建连接的方法抽出并创建了判断session是否存活的方法,可以在外部创建心跳检测,在session断开连接时重新连接并创建session。

Stomp over WebSocket示例

为什么要使用Stomp over WebSocket

WebSocket虽然实现了全双公的通信方式,但是只定义了文本及二进制的两种类型的消息,并没有定义内容主体的格式,因此需要一个更高级的子协议去规定服务器和客户端间通信的消息格式。

什么是stomp

Stomp协议,全称为simple text-oriented messaging protocol,即简单的面向文本的消息传递协议。Stomp协议可以在任何可靠的双向流网络协议(例如TCP和WebSocket)上使用。尽管协议本身是面向文本的,但是消息的主体可以是文本或二进制的。

帧的格式

Stomp协议是基于帧的消息协议,帧以HTTP为模型,以下是帧的格式

COMMAND
header1:value1
header2:value2

Body^@

其中第一行的COMMAND是指令行,指定了消息的操作类型,如SEND为发送,SUBSCRIBE为订阅。

而第二行到第一个换行中间的内容为消息的首部内容,配合首行指令,补充语义,如destinationcontent-type等,以下例子为订阅目的地为/topic/price.stock.*的broker.

SUBSCRIBE
id:sub-1
destination:/topic/price.stock.*

^@

服务器端开启Stomp over WebSocket

以下是Spring官网文档中服务端开启stomp over WebSocket的Java config 配置方式以及处理接受消息的message controller的定义。

import org.springframework.web.socket.config.annotation.EnableWebSocketMessageBroker;
import org.springframework.web.socket.config.annotation.StompEndpointRegistry;

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry config) {
        config.setApplicationDestinationPrefixes("/app");
        config.enableSimpleBroker("/topic", "/queue");
    }

}

@Controller
public class GreetingController {

    @MessageMapping("/greeting") {
    public String handle(String greeting) {
        return "[" + getTimestamp() + ": " + greeting;
    }

}

message-flow-simple-broker

  • WebSocket客户端通过地址http://<ip>:<port>/portfolio连接;
  • 订阅"/topic/greeting"目标的消息会经由clientInboundChannel转发到broker中;
  • 发送到"/app/greeting" 目标的消息会经由clientInboundChannel转发到GreetingController中,controller处理完成后的返回值会作为消息经由brokerChannel推送到订阅了/topic/greeting的客户端上。消息发送的目标地默认是由订阅目的地路径前缀加上处理的controller方法的@MessageMapping指定路径组合而成,即/topic + /greeting。当然也可以使用@SendTo注解显示指定发送的订阅目的地。

以下是服务端中消息流转的流程图,其中simple broker是一个简单的消息代理broker,可以替换成功能更加完善的broker,替换后的java config配置及流程图如下所示:

@Configuration
@EnableWebSocketMessageBroker
public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {

    @Override
    public void registerStompEndpoints(StompEndpointRegistry registry) {
        registry.addEndpoint("/portfolio").withSockJS();
    }

    @Override
    public void configureMessageBroker(MessageBrokerRegistry registry) {
        registry.enableStompBrokerRelay("/topic", "/queue");
        registry.setApplicationDestinationPrefixes("/app");
    }

}

message-flow-simple-broker

发送消息

如果需要在应用的任何一处发送消息到broker,只要将相应的MessagingTemplate注入到调用类上即可,以下以SimpMessagingTemplate为例:

@Controller
public class GreetingController {

    private SimpMessagingTemplate template;

    @Autowired
    public GreetingController(SimpMessagingTemplate template) {
    this.template = template;
    }

    @RequestMapping(path="/greetings", method=POST)
    public void greet(String greeting) {
        String text = "[" + getTimestamp() + "]:" + greeting;
        this.template.convertAndSend("/topic/greetings", text);
    }

}

Stomp over Websocket客户端连接

以下是官方文档给出的Stomp Client连接实例.setMessageConverter()方法设置了序列化消息主体的转换器,而setTaskScheduler()方法设置了执行心跳发送的scheduler。

WebSocketClient webSocketClient = new StandardWebSocketClient();
WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
stompClient.setMessageConverter(new StringMessageConverter());
stompClient.setTaskScheduler(taskScheduler);

创建连接方法如下,其中MyStompSessionHandler用于处理服务端推送的消息。

String url = "ws://127.0.0.1:8080/endpoint";
StompSessionHandler sessionHandler = new MyStompSessionHandler();
// 连接,超时设置为30秒
StompSession session = stompClient.connect(url, sessionHandler).get(30, TimeUnit.SECONDS);

public class MyStompSessionHandler extends StompSessionHandlerAdapter {

    @Override
    public void afterConnected(StompSession session, StompHeaders connectedHeaders) {
        // ...
    }
}

当连接建立后,就拿到了类型为StompSession的session,使用session可以向服务端主动发送消息,第一个参数指定了消息的目的地,具体如下:

session.send("/app/foo", "payload");

另外,也可以订阅服务端指定目的地,接受服务端主动的消息推送。订阅时需要传入处理目的地推送消息的handler,订阅后返回值Subscription可以用于取消订阅。

Subscription subscription = session.subscribe("/topic/foo", new StompFrameHandler() {

    @Override
    public Type getPayloadType(StompHeaders headers) {
        return String.class;
    }

    @Override
    public void handleFrame(StompHeaders headers, Object payload) {
        // ...
    }

});

// 取消订阅
subscription.unsubscribe();

其他

Stomp over WebSocket 本身还有诸如用户认证,WebSocket Scope等功能,本文就不一一详述了,具体内容可以查阅官方文档

总结

WebSocket协议是对http协议本身的补充,弥补了http无法实现全双工通信的缺点,但需要了解的一点是,WebSocket本身也是基于http的,因为webSocket是在http连接建立后完成一次"握手"(发送一次切换协议请求并成功获取服务器响应)后建立ws全双工通信的。在一些对响应有实时获取并希望由服务端主动推送消息或者通知的场景,WebSocket是一个相对不错的选择,搭配上Stomp协议为消息确定格式,并选择Kafka,RabbitMQ等功能丰富的消息队列作为broker,可以提供更为强大的消息推送服务。