category
1.概述
在本教程中,我们将看到如何使用Spring实现基于服务器发送事件的API。
简单地说,Server Sent Events(简称SSE)是一种HTTP标准,它允许web应用程序处理单向事件流,并在服务器发出数据时接收更新。
Spring4.2版本已经支持它,但从Spring5开始,我们现在有了一种更惯用、更方便的方法来处理它。
2.带Spring 6 Webflux的SSE
为了实现这一点,我们可以使用Reactor库提供的Flux类等实现,或者潜在的ServerSentEvent实体,它使我们能够控制事件元数据。
2.1. 使用通量的流事件
Flux是事件流的反应性表示——它根据指定的请求或响应媒体类型进行不同的处理。
要创建SSE流式端点,我们必须遵循W3C规范,并将其MIME类型指定为文本/事件流:
@GetMapping(path = "/stream-flux", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<String> streamFlux() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> "Flux - " + LocalTime.now().toString());
}
interval方法创建一个以增量方式发射长值的Flux。然后我们将这些值映射到我们想要的输出。
让我们启动我们的应用程序,然后通过浏览端点来尝试它。
我们将看到浏览器对服务器逐秒推送的事件的反应。有关通量和堆芯的更多信息,我们可以查看这篇文章。
2.2. 使用ServerSentEvent元素
现在,我们将把输出字符串包装到ServerSentSevent对象中,并检查这样做的好处:
@GetMapping("/stream-sse")
public Flux<ServerSentEvent<String>> streamEvents() {
return Flux.interval(Duration.ofSeconds(1))
.map(sequence -> ServerSentEvent.<String> builder()
.id(String.valueOf(sequence))
.event("periodic-event")
.data("SSE - " + LocalTime.now().toString())
.build());
}
正如我们所理解的,使用ServerSentEvent实体有几个好处:
- 我们可以处理事件元数据,这在实际情况下是需要的
- 我们可以忽略“文本/事件流”媒体类型声明
在这种情况下,我们指定了一个id、一个事件名称,最重要的是,指定了事件的实际数据。
此外,我们还可以添加一个comments属性和一个retry值,该值将指定尝试发送事件时要使用的重新连接时间。
2.3. 使用WebClient使用服务器发送的事件
现在,让我们使用WebClient来使用我们的事件流。:
public void consumeServerSentEvent() {
WebClient client = WebClient.create("http://localhost:8080/sse-server");
ParameterizedTypeReference<ServerSentEvent<String>> type
= new ParameterizedTypeReference<ServerSentEvent<String>>() {};
Flux<ServerSentEvent<String>> eventStream = client.get()
.uri("/stream-sse")
.retrieve()
.bodyToFlux(type);
eventStream.subscribe(
content -> logger.info("Time: {} - event: name[{}], id [{}], content[{}] ",
LocalTime.now(), content.event(), content.id(), content.data()),
error -> logger.error("Error receiving SSE: {}", error),
() -> logger.info("Completed!!!"));
}
订阅方法允许我们指示当我们成功接收到事件、发生错误以及流传输完成时将如何进行。
在我们的示例中,我们使用了retrieve方法,这是一种获取响应体的简单而直接的方法。
如果我们收到4xx或5xx响应,除非我们处理添加onStatus语句的场景,否则此方法会自动抛出WebClientResponseException。
另一方面,我们也可以使用exchange方法,该方法提供对ClientResponse的访问,并且在失败响应时不会发出错误信号。
我们必须考虑到,如果不需要事件元数据,我们可以绕过ServerSentEvent包装器。
3.Spring MVC中的SSE流
正如我们所说,SSE规范自引入SseEmitter类的Spring4.2以来就得到了支持。
简单地说,我们将定义一个ExecutorService,一个SseEmitter将在其中推送数据的线程,并返回发射器实例,以这种方式保持连接打开:
@GetMapping("/stream-sse-mvc") public SseEmitter streamSseMvc() { SseEmitter emitter = new SseEmitter(); ExecutorService sseMvcExecutor = Executors.newSingleThreadExecutor(); sseMvcExecutor.execute(() -> { try { for (int i = 0; true; i++) { SseEventBuilder event = SseEmitter.event() .data("SSE MVC - " + LocalTime.now().toString()) .id(String.valueOf(i)) .name("sse event - mvc"); emitter.send(event); Thread.sleep(1000); } } catch (Exception ex) { emitter.completeWithError(ex); } }); return emitter; }
始终确保为您的用例场景选择正确的ExecutiorService。
我们可以在SpringMVC中了解更多关于SSE的信息,并通过阅读这篇有趣的教程来了解其他示例。
4.了解服务器发送的事件
现在我们知道了如何实现SSE端点,让我们通过理解一些基本概念来尝试更深入地了解一下。
SSE是大多数浏览器采用的一种规范,允许在任何时候单向地流式传输事件。
“事件”只是一个UTF-8编码的文本数据流,遵循规范定义的格式。
此格式由一系列用换行符分隔的键值元素(id、retry、data和event,表示名称)组成。
也支持评论。
该规范没有以任何方式限制数据有效载荷格式;我们可以使用简单的String或更复杂的JSON或XML结构。
我们必须考虑的最后一点是使用SSE流和WebSockets之间的区别。
WebSockets提供服务器和客户端之间的全双工(双向)通信,而SSE使用单向通信。
此外,WebSockets不是HTTP协议,与SSE相反,它不提供错误处理标准。
5.结论
总之,在本文中,我们学习了SSE流的主要概念,这无疑是一个很好的资源,可以让我们创建下一代系统。
当我们使用该协议时,我们现在处于一个很好的位置来了解幕后发生的事情。
此外,我们还用一些简单的例子补充了这一理论,这些例子可以在我们的Github存储库中找到。
- 登录 发表评论