1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
|
import json
from typing import List, Dict, Optional
import requests
from pydantic import BaseModel
from utils.LogHandler import log
# 配置管理使用Pydantic模型,便于验证和文档化
class DashScopeConfig(BaseModel):
base_url: str = "https://dashscope.aliyuncs.com/compatible-mode/v1"
api_key: str
default_model: str = "qwen-plus"
timeout: int = 30
max_retries: int = 3
class GPTChatResponse(BaseModel):
content: str
tool_calls: Optional[List[Dict]] = None
def gpt_stream_chat(
messages: List[Dict[str, str]],
config: DashScopeConfig,
model: Optional[str] = None,
tools: Optional[List[Dict]] = None,
on_content: Optional[callable] = None
) -> GPTChatResponse:
"""
流式对话实现,支持实时内容处理和工具调用
:param messages: 对话消息列表
:param config: 服务配置
:param model: 指定模型
:param tools: 可用工具列表
:param on_content: 内容回调函数
:return: GPTChatResponse对象
"""
headers = {
'Content-Type': 'application/json',
'Authorization': f'Bearer {config.api_key}'
}
payload = {
'model': model or config.default_model,
'messages': messages,
'stream': True
}
if tools:
payload['tools'] = tools
content_list = []
tool_calls = None
try:
resp = requests.post(
f"{config.base_url}/chat/completions",
headers=headers,
json=payload,
stream=True,
timeout=config.timeout
)
resp.raise_for_status()
for line in resp.iter_lines():
if not line or line == b'data: [DONE]':
continue
try:
chunk = json.loads(line.decode('utf-8')[6:])
delta = chunk['choices'][0]['delta']
# 处理推理过程内容
if content := delta.get('reasoning_content'):
if on_content:
on_content(content, 'reasoning')
content_list.append(content)
# 处理最终回复内容
if content := delta.get('content'):
if on_content:
on_content(content, 'content')
content_list.append(content)
# 处理工具调用
if 'tool_calls' in delta:
tool_calls = delta['tool_calls']
if tool_calls and 'name' in str(tool_calls):
break
except json.JSONDecodeError:
log.warning(f"Failed to decode chunk: {line}")
continue
except requests.exceptions.RequestException as e:
log.error(f"Stream request failed: {str(e)}")
raise
return GPTChatResponse(content=''.join(content_list), tool_calls=tool_calls)
def handle_stream_content(content: str, content_type: str):
print(content, end='', flush=True) # 实时输出,不换行
if __name__ == "__main__":
config = DashScopeConfig(api_key="sk-xxxxxxxxxxxxxxxxxxxxxx")
messages = [
{"role": "system", "content": "你是一名专业客服人员"},
{"role": "user", "content": "你好?"}
]
response = gpt_stream_chat(messages, config, on_content=handle_stream_content)
# print(response.content)
|