FastAPI WebSocket:揭秘流畅的双向沟通秘诀

文章标题:

FastAPI中WebSocket的运用:探索高效双向交互之道

文章内容:

示例代码运行环境

Python 3.8+

安装依赖:pip install fastapi==0.68.0 uvicorn==0.15.0 websockets==10.3 pydantic==1.10.7

一、WebSocket路由声明规范

1.1 基础路由定义

通过@app.websocket装饰器来定义WebSocket路由:

from fastapi import FastAPI, WebSocket

app = FastAPI()


@app.websocket("/ws/chat/{room_id}")
async def websocket_chat(websocket: WebSocket, room_id: int):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f"Room {room_id}: {data}")
    except WebSocketDisconnect:
        print("Client disconnected")

关键要点解读:
* 路径参数通过URL直接传递(如{room_id})
* WebSocket对象会自动注入到路由函数中
* 必须显式调用await websocket.accept()来建立连接
* 通过receive_text()send_text()实现双向的数据通信

1.2 路由参数验证

结合路径参数与查询参数进行复合验证:

from fastapi import Query, WebSocket


@app.websocket("/ws/secure/{client_id}")
async def secure_ws(
        websocket: WebSocket,
        client_id: int = Path(..., gt=0),
        token: str = Query(..., min_length=8)
):
    if not validate_token(token):
        await websocket.close(code=1008)
        return
    # ...连接处理逻辑...


## 二、客户端连接建立与握手验证

### 2.1 握手过程控制
自定义握手验证流程:

```python
@app.websocket("/ws/auth")
async def auth_ws(websocket: WebSocket):
    # 手动控制握手过程
    await websocket.accept()

    # 获取握手时的请求头
    headers = websocket.headers
    auth_token = headers.get("authorization")

    if not verify_jwt_token(auth_token):
        await websocket.close(code=1008, reason="Invalid credentials")
        return

    # 验证通过后的处理逻辑

2.2 验证失败处理模式

graph TD
A[客户端请求连接] --> B{验证Headers}
B -- 成功 --> C[返回101状态码]
B -- 失败 --> D[返回403状态码]
C --> E[建立双向通信]
D --> F[关闭TCP连接]

三、连接状态维护与生命周期管理

3.1 连接状态跟踪

使用字典来维护活跃的连接:

from typing import Dict

active_connections: Dict[str, WebSocket] = {}


@app.websocket("/ws/status")
async def status_ws(websocket: WebSocket):
    await websocket.accept()
    client_id = str(websocket.client)
    active_connections[client_id] = websocket

    try:
        while True:
            # 保持连接活跃
            await websocket.receive_text()
    except WebSocketDisconnect:
        del active_connections[client_id]

3.2 心跳检测机制

import asyncio


async def heartbeat(websocket: WebSocket, interval: int = 30):
    try:
        while True:
            await asyncio.sleep(interval)
            await websocket.send_json({
                "type": "heartbeat",
                "timestamp": time.time()
            })
    except WebSocketDisconnect:
        print("Heartbeat terminated")


@app.websocket("/ws/realtime")
async def realtime_ws(websocket: WebSocket):
    await websocket.accept()
    # 启动独立心跳任务
    heartbeat_task = asyncio.create_task(heartbeat(websocket))

    try:
        # 主消息处理循环
        while True:
            data = await websocket.receive_json()
            # 处理业务逻辑...
    finally:
        heartbeat_task.cancel()
        await websocket.close()


## 课后 Quiz

  1. 当客户端发送非文本格式数据时如何处理?

```python
# 正确处理方法
try:
    data = await websocket.receive_json()
except WebSocketDisconnect:
    # 处理断开逻辑
except ValueError:
    await websocket.send_text("ERROR: 仅支持JSON格式")

# 错误处理方法:直接使用未校验的receive()
  1. 如何获取客户端的真实IP地址?
# 正确方法
client_ip = websocket.client.host

# 常见错误:直接读取X-Forwarded-For头
# 需要配合代理设置处理

常见报错解决方案

错误1:403 Handshake Failed

现象 :客户端连接时立即断开
分析 :未通过握手验证,常见于:

  • 缺少必要认证头
  • 验证逻辑返回False后未及时close()
    解决
# 在拒绝连接时立即关闭
if not valid:
    await websocket.close(code=1008)
    return  # 必须立即返回

错误2:1006 Abnormal Closure

现象 :客户端非正常断开
处理方案

try:
    while True:
        data = await websocket.receive_text()
except WebSocketDisconnect as e:
    print(f"断开代码:{e.code}")

错误3:TypeError: 'str' expected

场景 :发送非文本数据时
正确做法

# 发送二进制数据
await websocket.send_bytes(binary_data)

# 发送文本数据
await websocket.send_text("message")

# 发送JSON
await websocket.send_json({"key": "value"})

余下文章内容请点击跳转至 个人博客页面 或者 扫码关注或者微信搜一搜:编程智域 前端至全栈交流与成长
,阅读完整的文章:如何在FastAPI中玩转WebSocket,让实时通信不再烦恼?

往期文章归档:

免费好用的热门在线工具

文章整理自互联网,只做测试使用。发布者:Lomu,转转请注明出处:https://www.it1024doc.com/13051.html

(0)
LomuLomu
上一篇 2025 年 8 月 4 日
下一篇 2025 年 8 月 5 日

相关推荐

  • 金仓数据库数据迁移实战:从MySQL到KES的顺利迁移

    今天,我们将探索金仓数据库的数据迁移功能。在此之前,我们使用的是简化版的Docker镜像,该版本并未集成可视化操作工具。因此,为了提高后续操作的便捷性,我们需要下载并安装Windows版本的安装包。 请留意,如果你没有安装数据库的计划,在安装过程中可以选择跳过相关组件的安装。具体的安装步骤我们将不再展示,因为这一过程非常直观,与其他常见软件的安装过程相似。 …

    2024 年 12 月 24 日
    49400
  • CLion破解工具支持GUI操作吗?是否必须命令行?

    申明:本教程Clion破解补丁、激活码均收集于网络,请勿商用,仅供个人学习使用,如有侵权,请联系作者删除。若条件允许,希望大家购买正版 ! 废话不多说,先上 Clion2025.2.1 版本破解成功的截图,如下图,可以看到已经成功破解到 2099 年辣,舒服的很! 接下来就给大家通过图文的方式分享一下如何破解最新的Clion。 准备工作 注意:如果你之前用过…

    2025 年 9 月 26 日
    21100
  • 永久pycharm激活码安装视频+最新pycharm破解方法

    重要提醒:以下破解补丁、激活码均来源于互联网,仅供个人学习交流,禁止一切商业用途。若条件允许,请支持正版! 先放一张成功截图镇楼:PyCharm 2025.2.1 已激活至 2099 年,稳! 下面用图文方式手把手教你搞定最新版 PyCharm 的激活流程。 嫌折腾?官方正版全家桶低至 32 元/年,直接登录即用:https://panghu.hicxy.c…

    PyCharm激活码 2025 年 11 月 30 日
    6900
  • 开启AI大模型应用开发新篇:LangChain打造RAG增强检索生成

    开启AI大模型应用开发的新篇章:LangChain助力RAG增强检索生成 检索增强生成(RAG)是一种将“向量检索”和“大语言模型”相结合的技术途径,能在问答、摘要、文档分析等场景中极大提升准确性与上下文的利用率。 本文将基于 LangChain 构建完整的 RAG 流程,结合 PGVector 作为向量数据库,并用 LangGraph 构建状态图来控制流程…

    2025 年 6 月 23 日
    30200
  • 全平台同步clion激活码,实用破解教程一起分享

    重要提示:本文所涉及的 Clion 破解补丁与激活码均源自网络公开资源,仅供个人学习交流,严禁商业用途。若条件允许,请支持正版授权! Clion 是 JetBrains 出品的一款跨平台 C/C++ IDE,支持 Windows、macOS 与 Linux。下文将手把手教你借助破解补丁实现永久授权,解锁全部高级特性。 无论你的系统版本或 IDE 版本如何,下…

    2025 年 10 月 15 日
    14900

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信