跳转到主要内容

category

在现代应用程序开发领域,实时通信不再是奢侈品,而是必需品。异步API设计是实现这一目标的关键,使应用程序能够在不受传统请求响应模式限制的情况下提供及时的更新和通知。

在本文中,我们将探讨异步API设计的四种强大技术:回调、WebSockets、消息队列和服务器事件(SSE)。这些方法提供了独特的优势,使它们对于创建响应迅速的实时应用程序非常重要。

为什么异步API设计很重要:


API设计中的传统请求响应模式有其局限性。当客户端向服务器发送请求时,通常必须等待响应,这可能会导致延迟和用户体验降低,尤其是在实时更新至关重要的情况下。

异步API设计通过允许服务器异步处理耗时的任务并立即响应确认,从而摆脱了这些限制。这使客户端能够在不等待的情况下继续其操作,并在任务完成后立即接收更新。

基本异步API工作流:


在异步API设计中,

  • 客户端通过休息端点请求服务器。
  • 服务器异步处理耗时任务,并立即发送响应以确认任务,“我正在执行任务”,服务器可能会发送一个唯一的ID和即时响应,以便客户端可以使用该ID定期调用以获取任务状态。
     

  • 3.任务完成后,服务器使用各种机制向客户端通知响应消息。机制的选择通常取决于应用程序的特定要求和所使用的通信协议。

如果我们可以将数据推送到API客户端会怎样?


理想的情况是让我们的服务器在新数据或事件可用时通知API客户端。然而,我们无法使用HTTP常见的传统请求-响应交互样式来实现这一点。我们必须找到一种方法,允许服务器将数据推送到客户端。输入异步API。

服务器到客户端的实时通知响应是如何工作的?


方法1:轮询:


客户端反复向服务器发送请求,要求更新。当服务器有新的信息或结果时,服务器会做出响应。虽然实现起来很简单,但轮询可能会导致网络流量增加和延迟。

方法2:WebSockets:


WebSockets提供全双工通信通道,允许服务器在新数据可用时立即向客户端推送消息。WebSocket非常适合于需要低延迟、实时通信的应用程序。

方法3:服务器发送的事件(SSE):


服务器需要以单向方式向客户端推送更新。它使用单个HTTP连接,与打开多个连接相比,减少了开销。

SSE是单向的,这意味着客户端只能从服务器接收更新,而不能发回数据。SSE最适合于单向通信就足够的场景。

方法4:消息队列:


服务器可以使用消息队列(例如RabbitMQ、Apache Kafka)来发布消息。客户端订阅特定主题或队列,并在消息到达时异步接收消息。

方法5:回调URL:


对于服务器需要向客户端通知长时间运行的操作的场景,效率很高。它们最大限度地减少了客户端轮询或维护持久连接的需要。

https://youtu.be/J4J-eHq_Oms

缺点:客户端必须公开可公开访问的回调URL,这可能会带来安全和隐私问题。此外,管理回调URL和在失败的情况下处理重试可能具有挑战性。


使用服务器发送的事件的异步API


服务器发送事件(SSE)代表了一种强大的机制,用于实现服务器和客户端之间的异步通信,特别是在API的上下文中。SSE基于EventSource浏览器接口,该接口由万维网联盟(W3C)标准化为HTML5的一部分。它引入了一种使用HTTP建立寿命更长的连接的方法,允许服务器主动将数据推送到客户端。该数据通常被结构化为事件,并且可以包括相关联的有效载荷信息。

最初,SSE旨在促进向web应用程序的数据传递,但它在API世界中的相关性越来越大。SSE为传统的轮询机制提供了一种引人注目的替代方案,解决了与客户端-服务器通信相关的一些固有挑战。

SSE是如何工作的?


SSE使用标准的HTTP连接,但会保持连接更长的时间,而不是立即断开连接。此连接允许服务器在数据可用时将数据推送回客户端:

该规范概述了返回数据格式的一些选项,允许使用事件名称、注释、基于单行或多行文本的数据以及事件标识符。

使用案例:电子商务批量产品更新API


在这个用例中,电子商务网站允许客户端上传包含大量产品列表的CSV文件。

服务器异步处理CSV文件,并立即发送响应以确认上载。CSV文件的解析和验证完成后,服务器使用服务器发送事件(SSE)将处理后的产品数据发送到客户端。

客户端到服务器(CSV上载和异步处理):


1.客户端启动CSV上传:


客户端与电子商务网站进行交互,并通过用户界面启动CSV文件上传。

2.客户端发送CSV文件:


客户端选择一个包含产品数据的CSV文件,并通过POST请求将其上传到/api/upload.CSV端点。

3.服务器验证文件并生成事务ID:


服务器接收CSV文件并对其进行验证。如果该文件有效,则服务器会立即响应HTTP 202(已接受)状态代码来确认上载。

服务器为此上载生成一个唯一的事务ID,用于跟踪处理进度。


4.异步处理开始:


服务器开始对CSV文件进行异步处理。此处理包括CSV解析、数据验证和产品列表的创建。

处理可能发生在后台,允许服务器在处理CSV时处理其他请求。


5.通过SSE发送的进度更新:


在处理CSV文件和生成产品数据时,服务器会使用服务器发送的事件(SSE)向客户端发送实时进度更新。SSE端点(/SSE)是使用事务ID建立并与客户端连接的。

6.服务器到客户端(进度更新和完成):

 

  • 1客户端侦听SSE端点-实时进度更新:在客户端,JavaScript用于侦听与唯一事务ID关联的SSE端点(/SSE)。客户端通过SSE建立与服务器的持久连接,从而实现实时更新。

 

  • 2.服务器发送进度更新:
    • 在处理CSV文件时,服务器向SSE端点发送部分产品数据更新和进度消息,SSE端点由客户端主动监控。
    • 这些更新实时从服务器推送到客户端,为用户提供有关CSV处理进度的反馈。

// Inside your CSV processing logic
String transactionId = "TXN-123"; // Replace with the actual transaction ID
String progressMessage = "Processing 50% complete"; // Replace with your progress message

// Send an SSE update to the client
sseController.sendSseUpdate(transactionId, progressMessage);

  • 3.通过SSE发送的完成消息:

当整个CSV文件已成功处理后,服务器通过SSE发送最终完成消息,通知客户端产品数据已准备好进行检索。
客户端接收到此消息,并可以继续从服务器检索处理后的产品数据。

  • 4.错误处理:

如果上传的CSV文件有问题(例如,无效的格式或数据),或者在处理过程中发生错误,服务器会向客户端发送错误响应。
客户会被告知该错误,并可以纠正该问题。

import org.springframework.http.HttpStatus;
import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

@RestController
@RequestMapping("/api/upload")
public class CsvUploadController {

    private final Map<String, SseEmitter> sseEmitters = new ConcurrentHashMap<>();

    @PostMapping("/csv")
    public ResponseEntity<String> uploadCsv(@RequestParam("file") MultipartFile file) {
        if (file.isEmpty()) {
            return ResponseEntity.badRequest().body("Please select a CSV file to upload.");
        }

        if (!isCsvValid(file)) {
            return ResponseEntity.badRequest().body("Invalid CSV file format or data.");
        }

        String transactionId = "TXN-" + System.currentTimeMillis();

        //1. Start asynchronous processing and return a CompletableFuture
        CompletableFuture<Void> processingFuture = CompletableFuture.runAsync(() -> {
            asyncProcessCsv(file, transactionId);
        });

        //2. Return a 202 (Accepted) response with the transaction ID
        return ResponseEntity.status(HttpStatus.ACCEPTED).body(transactionId);
    }

    private boolean isCsvValid(MultipartFile file) {
        // Add your CSV validation logic here
        // Return true if the CSV is valid; otherwise, return false
        return true;
    }

    private void asyncProcessCsv(MultipartFile file, String transactionId) {
        CompletableFuture<Void> processingFuture = CompletableFuture.runAsync(() -> {
            // Your CSV processing logic here
            try (CSVReader csvReader = new CSVReader(
                    new InputStreamReader(file.getInputStream()))) {

                // Process CSV rows here
                // ...

                // Send progress updates via SSE
                for (int i = 1; i <= totalRows; i++) {
                    String progressMessage = "Processing row " + i;
                    sendProgressUpdate(transactionId, progressMessage);
                }

                // Send completion message via SSE
                sendCompletionMessage(transactionId, "CSV processing completed.");
            } catch (Exception e) {
                // Handle exceptions during processing
                sendErrorMessage(transactionId, "Error during processing: " + e.getMessage());
            } finally {
                sseEmitters.remove(transactionId);
            }
        });

        // Handle any exceptions that occur during processing
        processingFuture.exceptionally(ex -> {
            sendErrorMessage(transactionId, "Error during processing: " + ex.getMessage());
            return null;
        });
    }

    @GetMapping("/sse/{transactionId}")
    public SseEmitter getSseEmitter(@PathVariable String transactionId) {
        SseEmitter sseEmitter = new SseEmitter();
        sseEmitters.put(transactionId, sseEmitter);
        return sseEmitter;
    }

    private void sendProgressUpdate(String transactionId, String message) {
        SseEmitter sseEmitter = sseEmitters.get(transactionId);
        if (sseEmitter != null) {
            try {
                sseEmitter.send(SseEmitter.event().name("progress").data(message));
            } catch (IOException e) {
                // Handle exceptions when sending SSE updates
                e.printStackTrace();
            }
        }
    }

    private void sendCompletionMessage(String transactionId, String message) {
        SseEmitter sseEmitter = sseEmitters.get(transactionId);
        if (sseEmitter != null) {
            try {
                sseEmitter.send(SseEmitter.event().name("complete").data(message));
                sseEmitter.complete(); // Close the SSE connection
            } catch (IOException e) {
                // Handle exceptions when sending SSE updates
                e.printStackTrace();
            }
        }
    }

    private void sendErrorMessage(String transactionId, String message) {
        SseEmitter sseEmitter = sseEmitters.get(transactionId);
        if (sseEmitter != null) {
            try {
                sseEmitter.send(SseEmitter.event().name("error").data(message));
                sseEmitter.completeWithError(new RuntimeException(message)); // Complete with an error
            } catch (IOException e) {
                // Handle exceptions when sending SSE updates
                e.printStackTrace();
            }
        }
    }
}

要点:

 

  • 收到请求后,服务器立即用TXN ID进行响应
  • 客户端收到TXN ID后,注册到SSE进行实时进度更新、错误处理消息和任务完成通知
  • 服务器异步处理任务,完成后,SSE向客户端发送通知

客户端实施:


在客户端(通常是网页),您需要使用JavaScript来侦听SSE端点(/SSE/stream)并处理传入的更新。以下是如何在JavaScript中执行此操作的简化示例:

<!DOCTYPE html>
<html>
<head>
    <title>Asynchronous Order Processing</title>
</head>
<body>
    <h1>Asynchronous Order Processing</h1>
    <button onclick="processOrder()">Process Order</button>
    <div id="result"></div>

    <script>
        let eventSource = null;

        async function processOrder() {
            const orderRequest = {
                csvFilePath: "Path"
            };

            try {
                const response = await fetch('/api/upload/csv', {
                    method: 'POST',
                    headers: {
                        'Content-Type': 'application/json'
                    },
                    body: JSON.stringify(orderRequest)
                });

                if (response.status === 202) {
                    document.getElementById('result').textContent = 'Order processing initiated. Waiting for completion...';
                     const transactionId =response.result.transactionId;
                    // Connect to the SSE endpoint for this order
        const eventSource = new EventSource(`/sse/stream?transactionId=${transactionId}`);                    
                    eventSource.onmessage = (event) => {
                        document.getElementById('result').textContent = event.data;
                    };
                    
                    eventSource.onerror = (error) => {
                        console.error('SSE Error:', error);
                    };
                }
            } catch (error) {
                console.error('Error:', error);
            }
        }

        function closeEventSource() {
            if (eventSource) {
                eventSource.close();
                eventSource = null;
            }
        }

        // Close the SSE connection when leaving the page
        window.addEventListener('beforeunload', closeEventSource);
    </script>
</body>
</html>

结论


在现代应用程序开发的时代,响应能力和实时通信至关重要。异步API设计采用回调、WebSockets、消息队列和服务器事件(SSE)等一系列技术,使开发人员能够构建及时向用户提供更新和通知的应用程序。在我们真实的电子商务用例中,SSE被证明是一个游戏规则的改变者,它为客户提供实时进度更新和完成通知,同时优化性能和用户体验。

当您浏览异步API设计时,请考虑应用程序的独特需求,并选择最符合您目标的方法。无论是让用户了解产品更新,还是在协作平台中实现实时协作,掌握这些异步技术都将使您的应用程序在当今动态的数字世界中脱颖而出。

标签