跳到主要内容

流式响应 / Streaming

通过 SSE(Server-Sent Events) 逐块接收模型输出,让用户第一个字出现的时间(TTFT)从几秒降到几百毫秒。聊天 UI、语音朗读、代码实时补全的标配。

何时使用

  • 交互式聊天 UI(逐字打字机效果)
  • 长回答(> 500 tokens),用户体感至关重要
  • 需要早停(用户打断、hit 某个条件后 cancel)
  • 下游是流式消费(TTS、视频字幕)

不要用在:后端批处理 / 只取最终结果的场景 —— 非流式简单可靠。


基础用法

Chat Completions(OpenAI 协议)

from openai import OpenAI

client = OpenAI(
api_key="your-api-key",
base_url="https://live-turing.cn.llm.tcljd.com/api/v1",
)

stream = client.chat.completions.create(
model="turing/gpt-5.4-mini",
messages=[{"role": "user", "content": "写一段关于春天的短诗"}],
stream=True,
stream_options={"include_usage": True}, # 流结束时多推一条 usage
)

for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
if chunk.usage:
print(f"\n[total={chunk.usage.total_tokens}]")
curl -N $TURING_BASE_URL/chat/completions \
-H "Authorization: Bearer $TURING_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"model": "turing/gpt-5.4-mini",
"messages": [{"role": "user", "content": "写一段关于春天的短诗"}],
"stream": true,
"stream_options": {"include_usage": true}
}'

响应是 SSE 流:

data: {"id":"...","choices":[{"delta":{"content":"春"},"index":0}]}

data: {"id":"...","choices":[{"delta":{"content":"天"},"index":0}]}

data: {"id":"...","choices":[{"delta":{},"index":0,"finish_reason":"stop"}]}

data: {"choices":[],"usage":{"prompt_tokens":12,"completion_tokens":45,"total_tokens":57}}

data: [DONE]
SSE 事件分隔符

图灵平台从 2025-04-27 起对 Portal 客户端使用单 \n 分隔,对旧客户端兼容双 \n\n。标准 SSE 解析器(eventsource-parserhttpxopenai SDK)都能自动处理。

Messages(Anthropic 协议)

Anthropic 的流有更细的事件类型message_start / content_block_start / content_block_delta / content_block_stop / message_delta / message_stop / ping / error)。

from anthropic import Anthropic

client = Anthropic(
base_url="https://live-turing.cn.llm.tcljd.com/api/v1",
auth_token="your-api-key",
)

with client.messages.stream(
model="turing/claude-sonnet-5",
max_tokens=1024,
messages=[{"role": "user", "content": "写一段关于春天的短诗"}],
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)

原始事件流结构:

event: message_start
data: {"type":"message_start","message":{"id":"msg_…","role":"assistant",…}}

event: content_block_start
data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}

event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"春"}}

event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"天"}}

event: content_block_stop
data: {"type":"content_block_stop","index":0}

event: message_delta
data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"output_tokens":45}}

event: message_stop
data: {"type":"message_stop"}

Responses(OpenAI Responses 协议)

事件名更长更细:response.created / response.output_item.added / response.output_text.delta / response.output_text.done / response.completed 等。SDK 会抽象这层——直接消费 text 即可。

response = client.responses.create(
model="turing/gpt-5.4-mini",
input="写一段关于春天的短诗",
stream=True,
)

for event in response:
if event.type == "response.output_text.delta":
print(event.delta, end="", flush=True)

流中的 usage

默认流里 没有 usage 数据。要拿到 token 统计:

协议怎么开
Chat Completionsstream_options: {"include_usage": true} → 流最后多一条 chunk choices=[] + usage={...}
Messages最后的 message_delta 事件天然带 usage.output_tokens
Responsesresponse.completed 事件带完整 usage

流中的 Tool Use

工具调用也通过流增量返回——function.arguments分片出现,需要客户端拼接。

stream = client.chat.completions.create(
model="turing/gpt-5.4-mini",
messages=[{"role": "user", "content": "查上海天气"}],
tools=[...],
stream=True,
)

tool_calls = {} # tool_call_id -> accumulated args
for chunk in stream:
delta = chunk.choices[0].delta
for tc in (delta.tool_calls or []):
if tc.id: # new call
tool_calls[tc.id] = {"name": tc.function.name, "args": ""}
# 用 index 定位已开始的 call(id 只在 start 事件出现)
call = tool_calls[list(tool_calls)[tc.index]]
if tc.function.arguments:
call["args"] += tc.function.arguments

完整文档见 工具调用 / 流式响应段


流式错误处理

场景:连接了第一个 chunk 后,中途服务端报错 / 网络断。

  • OpenAI SDKfor chunk in stream: 会抛出异常,需要 try/except 包
  • Anthropic SDKerror 事件会在流里出现,不会直接抛
  • 裸 SSE:建议每条 data: 都 JSON 解析容错

最佳实践

import logging

logger = logging.getLogger(__name__)

try:
for chunk in stream:
handle(chunk)
except Exception as e:
# 记录已收到的内容 + trace_id(见请求追踪)
logger.error("stream interrupted after N chars, trace_id=...", exc_info=e)
raise

若想在发起流之前就验证容量(避免发了一半才 429),用 count_tokens 先估算输入。


参数参考


计费影响

  • 流式和非流式计费完全一致(按 token 算)
  • 流式不影响 Prompt 缓存 的计价
  • 流式超时 / 断流时,已生成的 token 仍然计费(后端无法"回退")

常见问题

  • 第一个 chunk 很慢(> 10s) → 大概率模型思考中。看 思考与推理 或切非思考模型
  • 流中途卡住没有 chunk → 后端可能正在工具调用 / reasoning;一些供应商在这段不推送;合理设置 timeout
  • 拿不到 usage → 忘了 stream_options: {"include_usage": true}
  • SDK 报"Unexpected end of JSON" → 通常是网关中间件(nginx、CDN)断了流;换 Cache-Control: no-cache 或检查代理
  • 中文字符半截 → 多字节字符可能跨 chunk,必须拼接完整 UTF-8 再输出

See also