🐙

基于FastAPI构建 Server-Sent Events API

2023/11/04に公開

前言

最近,LLM与OpenAI非常流行, 相信很多人从Complete和Chat API中的stream模式第一次接触到了SSE API。SSE(Server-Sent Events)是一种支持从Server Side向Client Side推送事件的方式。它与传统的Restful API不同,传统的Restful架构只能从服务器端发起请求,然后客户端返回响应的方式进行数据传输。在需要实时更新数据的场景中,只能依赖轮询或者异步任务机制来实现。但前者性能不高,而后者会增加前后端架构的复杂性。

WebSocket也是一个选择,但实现则更加复杂。

对于需要服务端主动向客户端推送消息,而客户端不需要向服务器发送信息的情况下,SSE会是一种高性能且简单的实现方式。

本文介绍了如何使用python和FastAPI实现SSE API,并且探讨了如何结合Pydantic书写类型安全的代码

使用FastAPIsse-starlette实现SSE API

sse-starlette包包含了SSE的大部分实现。代码实现非常简单:

https://github.com/akishichinibu/zenn-doc/blob/b5889866383b0a24cf9d2c7bb5a887ff1ede3a51/code/1423539f71d8c2/main.py#L1-L27

创建一个返回ServerSentEventAsyncGenerator,然后作为EventSourceResponse的content返回即可。

ServerSentEvent的三个参数event,iddata分别可以用来设置服务端事件的事件名,事件ID和Payload的内容。

输出:


curl -N 'http://127.0.0.1:12345/sse?message=hello'

id: 0
event: echo
data: {'message': 'hello', 'created_at': 1699067601.359488}

id: 1
event: echo
data: {'message': 'hello', 'created_at': 1699067602.36079}

id: 2
event: echo
data: {'message': 'hello', 'created_at': 1699067603.361532}

id: 3
event: echo
data: {'message': 'hello', 'created_at': 1699067604.365153}

id: 4
event: echo
data: {'message': 'hello', 'created_at': 1699067605.367895}

基于Pydantic保证类型安全

在上述代码中,事件名使用了字面量,返回数据的内容是字典。当涉及管理复杂的数据内容结构和多个事件时,这可能会变得很麻烦。这时可以结合Pydantic写出更加类型安全的代码。

首先创建一个描述事件的类BaseEvent[T],它根据类名推断事件名,并包含一个类型为T的Payload。

https://github.com/akishichinibu/zenn-doc/blob/b5889866383b0a24cf9d2c7bb5a887ff1ede3a51/code/1423539f71d8c2/main2.py#L13-L26

然后创建一个装饰器,将BaseEvent[T]的流转换为ServerSentEvent一个的流。
https://github.com/akishichinibu/zenn-doc/blob/b5889866383b0a24cf9d2c7bb5a887ff1ede3a51/code/1423539f71d8c2/main2.py#L32-L54

然后我们定义Echo的PayloadEchoPayload,然后定义事件Echo,最后创建事件流echo_stream2

https://github.com/akishichinibu/zenn-doc/blob/b5889866383b0a24cf9d2c7bb5a887ff1ede3a51/code/1423539f71d8c2/main2.py#L57-L76

curl -N 'http://127.0.0.1:12345/sse2?message=hello'
id: 0
event: echo
data: {"message":"hello","created_at":"2023-11-04T13:02:10.760797"}

id: 1
event: echo
data: {"message":"hello","created_at":"2023-11-04T13:02:11.762338"}

id: 2
event: echo
data: {"message":"hello","created_at":"2023-11-04T13:02:12.764145"}

id: 3
event: echo
data: {"message":"hello","created_at":"2023-11-04T13:02:13.765405"}

id: 4
event: echo
data: {"message":"hello","created_at":"2023-11-04T13:02:14.765821"}

id: 5
event: echo
data: {"message":"hello","created_at":"2023-11-04T13:02:15.767593"}

如果需要在一个API中合并多个流

可以考虑使用aiostream中的merge操作符。

创建另外一个流echo_stream3并且将其与echo_stream2进行合并,可以看到两个流交替输出。

https://github.com/akishichinibu/zenn-doc/blob/b5889866383b0a24cf9d2c7bb5a887ff1ede3a51/code/1423539f71d8c2/main2.py#L84-L109

curl -N 'http://127.0.0.1:12345/sse3?message=hello'

id: 0
event: echo
data: {"message":"hello","created_at":"2023-11-04T13:04:26.129152"}

id: 1
event: echo
data: {"message":"hello","created_at":"2023-11-04T13:04:27.130363"}

id: 0
event: reverse_echo
data: {"message":"olleh","created_at":"2023-11-04T13:04:27.130311"}

id: 2
event: echo
data: {"message":"hello","created_at":"2023-11-04T13:04:28.131686"}

id: 1
event: reverse_echo
data: {"message":"olleh","created_at":"2023-11-04T13:04:28.131802"}

id: 3
event: echo
data: {"message":"hello","created_at":"2023-11-04T13:04:29.132987"}

id: 2
event: reverse_echo
data: {"message":"olleh","created_at":"2023-11-04T13:04:29.133048"}

id: 4
event: echo
data: {"message":"hello","created_at":"2023-11-04T13:04:30.133555"}

id: 3
event: reverse_echo
data: {"message":"olleh","created_at":"2023-11-04T13:04:30.133599"}

id: 5
event: echo
data: {"message":"hello","created_at":"2023-11-04T13:04:31.134985"}

id: 4
event: reverse_echo
data: {"message":"olleh","created_at":"2023-11-04T13:04:31.135166"}

总结

本文介绍了如何使用FastAPI和sse-starlette构建SSE API,实现了从服务端实时推送数据的方式。我们还讨论了如何通过Pydantic确保类型安全,提高代码可维护性,以及在一个API中合并多个流的方法。

GitHubで編集を提案

Discussion