流式响应 / Streaming
通过 SSE(Server-Sent Events) 逐块接收模型输出,让用户第一个字出现的时间(TTFT)从几秒降到几百毫秒。聊天 UI、语音朗读、代码实时补全的标配。
何时使用
- 交互式聊天 UI(逐字打字机效果)
- 长回答(> 500 tokens),用户体感至关重要
- 需要早停(用户打断、hit 某个条件后 cancel)
- 下游是流式消费(TTS、视频字幕)
不要用在:后端批处理 / 只取最终结果的场景 —— 非流式简单可靠。
基础用法
Chat Completions(OpenAI 协议)
from openai import OpenAI
client = OpenAI(
api_key="your-api-key",
base_url="https://live-turing.cn.llm.tcljd.com/api/v1",
)
stream = client.chat.completions.create(
model="turing/gpt-5.4-mini",
messages=[{"role": "user", "content": "写一段关于春天的短诗"}],
stream=True,
stream_options={"include_usage": True}, # 流结束时多推一条 usage
)
for chunk in stream:
if chunk.choices and chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="", flush=True)
if chunk.usage:
print(f"\n[total={chunk.usage.total_tokens}]")
curl -N $TURING_BASE_URL/chat/completions \
-H "Authorization: Bearer $TURING_API_KEY" \
-H "Content-Type: application/json" \
-d '{
"model": "turing/gpt-5.4-mini",
"messages": [{"role": "user", "content": "写一段关于春天的短诗"}],
"stream": true,
"stream_options": {"include_usage": true}
}'
响应是 SSE 流:
data: {"id":"...","choices":[{"delta":{"content":"春"},"index":0}]}
data: {"id":"...","choices":[{"delta":{"content":"天"},"index":0}]}
data: {"id":"...","choices":[{"delta":{},"index":0,"finish_reason":"stop"}]}
data: {"choices":[],"usage":{"prompt_tokens":12,"completion_tokens":45,"total_tokens":57}}
data: [DONE]
SSE 事件分隔符
图灵平台从 2025-04-27 起对 Portal 客户端使用单 \n 分隔,对旧客户端兼容双 \n\n。标准 SSE 解析器(eventsource-parser、httpx、openai SDK)都能自动处理。
Messages(Anthropic 协议)
Anthropic 的流有更细的事件类型(message_start / content_block_start / content_block_delta / content_block_stop / message_delta / message_stop / ping / error)。
from anthropic import Anthropic
client = Anthropic(
base_url="https://live-turing.cn.llm.tcljd.com/api/v1",
auth_token="your-api-key",
)
with client.messages.stream(
model="turing/claude-sonnet-5",
max_tokens=1024,
messages=[{"role": "user", "content": "写一段关于春天的短诗"}],
) as stream:
for text in stream.text_stream:
print(text, end="", flush=True)
原始事件流结构:
event: message_start
data: {"type":"message_start","message":{"id":"msg_…","role":"assistant",…}}
event: content_block_start
data: {"type":"content_block_start","index":0,"content_block":{"type":"text","text":""}}
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"春"}}
event: content_block_delta
data: {"type":"content_block_delta","index":0,"delta":{"type":"text_delta","text":"天"}}
event: content_block_stop
data: {"type":"content_block_stop","index":0}
event: message_delta
data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"output_tokens":45}}
event: message_stop
data: {"type":"message_stop"}
Responses(OpenAI Responses 协议)
事件名更长更细:response.created / response.output_item.added / response.output_text.delta / response.output_text.done / response.completed 等。SDK 会抽象这层——直接消费 text 即可。
response = client.responses.create(
model="turing/gpt-5.4-mini",
input="写一段关于春天的短诗",
stream=True,
)
for event in response:
if event.type == "response.output_text.delta":
print(event.delta, end="", flush=True)
流中的 usage
默认流里 没有 usage 数据。要拿到 token 统计:
| 协议 | 怎么开 |
|---|---|
| Chat Completions | stream_options: {"include_usage": true} → 流最后多一条 chunk choices=[] + usage={...} |
| Messages | 最后的 message_delta 事件天然带 usage.output_tokens |
| Responses | response.completed 事件带完整 usage |
流中的 Tool Use
工具调用也通过流增量返回——function.arguments 会分片出现,需要客户端拼接。
stream = client.chat.completions.create(
model="turing/gpt-5.4-mini",
messages=[{"role": "user", "content": "查上海天气"}],
tools=[...],
stream=True,
)
tool_calls = {} # tool_call_id -> accumulated args
for chunk in stream:
delta = chunk.choices[0].delta
for tc in (delta.tool_calls or []):
if tc.id: # new call
tool_calls[tc.id] = {"name": tc.function.name, "args": ""}
# 用 index 定位已开始的 call(id 只在 start 事件出现)
call = tool_calls[list(tool_calls)[tc.index]]
if tc.function.arguments:
call["args"] += tc.function.arguments
完整文档见 工具调用 / 流式响应段。
流式错误处理
场景:连接了第一个 chunk 后,中途服务端报错 / 网络断。
- OpenAI SDK:
for chunk in stream:会抛出异常,需要 try/except 包 - Anthropic SDK:
error事件会在流里出现,不会直接抛 - 裸 SSE:建议每条 data: 都 JSON 解析容错
最佳实践:
import logging
logger = logging.getLogger(__name__)
try:
for chunk in stream:
handle(chunk)
except Exception as e:
# 记录已收到的内容 + trace_id(见请求追踪)
logger.error("stream interrupted after N chars, trace_id=...", exc_info=e)
raise
若想在发起流之前就验证容量(避免发了一半才 429),用 count_tokens 先估算输入。
参数参考
- Chat Completions:
/api/create-chat-completion—stream、stream_options - Messages:
/api/create-message—stream;事件结构见AnthropicStreamEvent - Responses:
/api/create-response—stream、stream_options
计费影响
- 流式和非流式计费完全一致(按 token 算)
- 流式不影响 Prompt 缓存 的计价
- 流式超时 / 断流时,已生成的 token 仍然计费(后端无法"回退")
常见问题
- 第一个 chunk 很慢(> 10s) → 大概率模型思考中。看 思考与推理 或切非思考模型
- 流中途卡住没有 chunk → 后端可能正在工具调用 / reasoning;一些供应商在这段不推送;合理设置 timeout
- 拿不到 usage → 忘了
stream_options: {"include_usage": true} - SDK 报"Unexpected end of JSON" → 通常是网关中间件(nginx、CDN)断了流;换
Cache-Control: no-cache或检查代理 - 中文字符半截 → 多字节字符可能跨 chunk,必须拼接完整 UTF-8 再输出
See also
- 超时、重试与 Fallback — 流式超时怎么定,断流重试
- 工具调用 — 流中 tool_calls 拼接
- 请求追踪 — 流中断后用 trace_id / client_request_id 定位
- Chat Completions / Messages / Responses