跳转到主要内容

category

1.概述


在本教程中,我们将看到如何使用Spring实现基于服务器发送事件的API。

简单地说,Server Sent Events(简称SSE)是一种HTTP标准,它允许web应用程序处理单向事件流,并在服务器发出数据时接收更新。

Spring4.2版本已经支持它,但从Spring5开始,我们现在有了一种更惯用、更方便的方法来处理它。

2.带Spring 6 Webflux的SSE


为了实现这一点,我们可以使用Reactor库提供的Flux类等实现,或者潜在的ServerSentEvent实体,它使我们能够控制事件元数据。

2.1. 使用通量的流事件


Flux是事件流的反应性表示——它根据指定的请求或响应媒体类型进行不同的处理。

要创建SSE流式端点,我们必须遵循W3C规范,并将其MIME类型指定为文本/事件流:

@GetMapping(path = "/stream-flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamFlux() {
    return Flux.interval(Duration.ofSeconds(1))
      .map(sequence -> "Flux - " + LocalTime.now().toString());
}


interval方法创建一个以增量方式发射长值的Flux。然后我们将这些值映射到我们想要的输出。

让我们启动我们的应用程序,然后通过浏览端点来尝试它。

我们将看到浏览器对服务器逐秒推送的事件的反应。有关通量和堆芯的更多信息,我们可以查看这篇文章。

2.2. 使用ServerSentEvent元素


现在,我们将把输出字符串包装到ServerSentSevent对象中,并检查这样做的好处:
@GetMapping("/stream-sse")
public Flux<ServerSentEvent<String>> streamEvents() {
    return Flux.interval(Duration.ofSeconds(1))
      .map(sequence -> ServerSentEvent.<String> builder()
        .id(String.valueOf(sequence))
          .event("periodic-event")
          .data("SSE - " + LocalTime.now().toString())
          .build());
}


正如我们所理解的,使用ServerSentEvent实体有几个好处:

  • 我们可以处理事件元数据,这在实际情况下是需要的
  • 我们可以忽略“文本/事件流”媒体类型声明

在这种情况下,我们指定了一个id、一个事件名称,最重要的是,指定了事件的实际数据。

此外,我们还可以添加一个comments属性和一个retry值,该值将指定尝试发送事件时要使用的重新连接时间。

2.3. 使用WebClient使用服务器发送的事件


现在,让我们使用WebClient来使用我们的事件流。:

public void consumeServerSentEvent() {
    WebClient client = WebClient.create("http://localhost:8080/sse-server");
    ParameterizedTypeReference<ServerSentEvent<String>> type
     = new ParameterizedTypeReference<ServerSentEvent<String>>() {};

    Flux<ServerSentEvent<String>> eventStream = client.get()
      .uri("/stream-sse")
      .retrieve()
      .bodyToFlux(type);

    eventStream.subscribe(
      content -> logger.info("Time: {} - event: name[{}], id [{}], content[{}] ",
        LocalTime.now(), content.event(), content.id(), content.data()),
      error -> logger.error("Error receiving SSE: {}", error),
      () -> logger.info("Completed!!!"));
}

 

订阅方法允许我们指示当我们成功接收到事件、发生错误以及流传输完成时将如何进行。

在我们的示例中,我们使用了retrieve方法,这是一种获取响应体的简单而直接的方法。

如果我们收到4xx或5xx响应,除非我们处理添加onStatus语句的场景,否则此方法会自动抛出WebClientResponseException。

另一方面,我们也可以使用exchange方法,该方法提供对ClientResponse的访问,并且在失败响应时不会发出错误信号。

我们必须考虑到,如果不需要事件元数据,我们可以绕过ServerSentEvent包装器。

3.Spring MVC中的SSE流
正如我们所说,SSE规范自引入SseEmitter类的Spring4.2以来就得到了支持。

简单地说,我们将定义一个ExecutorService,一个SseEmitter将在其中推送数据的线程,并返回发射器实例,以这种方式保持连接打开:

@GetMapping("/stream-sse-mvc")
public SseEmitter streamSseMvc() {
    SseEmitter emitter = new SseEmitter();
    ExecutorService sseMvcExecutor = Executors.newSingleThreadExecutor();
    sseMvcExecutor.execute(() -> {
        try {
            for (int i = 0; true; i++) {
                SseEventBuilder event = SseEmitter.event()
                  .data("SSE MVC - " + LocalTime.now().toString())
                  .id(String.valueOf(i))
                  .name("sse event - mvc");
                emitter.send(event);
                Thread.sleep(1000);
            }
        } catch (Exception ex) {
            emitter.completeWithError(ex);
        }
    });
    return emitter;
}


始终确保为您的用例场景选择正确的ExecutiorService。

我们可以在SpringMVC中了解更多关于SSE的信息,并通过阅读这篇有趣的教程来了解其他示例。

4.了解服务器发送的事件


现在我们知道了如何实现SSE端点,让我们通过理解一些基本概念来尝试更深入地了解一下。

SSE是大多数浏览器采用的一种规范,允许在任何时候单向地流式传输事件。

“事件”只是一个UTF-8编码的文本数据流,遵循规范定义的格式。

此格式由一系列用换行符分隔的键值元素(id、retry、data和event,表示名称)组成。

也支持评论。

该规范没有以任何方式限制数据有效载荷格式;我们可以使用简单的String或更复杂的JSON或XML结构。

我们必须考虑的最后一点是使用SSE流和WebSockets之间的区别。

WebSockets提供服务器和客户端之间的全双工(双向)通信,而SSE使用单向通信。
此外,WebSockets不是HTTP协议,与SSE相反,它不提供错误处理标准。

5.结论


总之,在本文中,我们学习了SSE流的主要概念,这无疑是一个很好的资源,可以让我们创建下一代系统。

当我们使用该协议时,我们现在处于一个很好的位置来了解幕后发生的事情。

此外,我们还用一些简单的例子补充了这一理论,这些例子可以在我们的Github存储库中找到。
 

标签