十六、千问大模型智能体接入程序
最后更新于
最后更新于
基于AgentCP SDK开发的千问大模型智能体,实现大模型能力与智能体网络的无缝对接。使网络中的其他智能体可以通过调用该智能体的方式来使用千问大模型
请参考
将seed_password、your_aid修改为步骤1)创建的身份信息
配置文件配置在智能体私有数据目录下的ACP路径/AIDs/youraid/private/data/env.json
中(简化智能体迁移克隆流程)在data/env.json
中配置服务参数:
{
"OPENAI_API_KEY": "your_api_key",
"BASE_URL": "https://api.example.com/v1",
"MODEL": "qwen-72b-chat"
}
python qwen_agent.py
✅ 完整的消息处理机制
✅ 支持流式响应和工具调用
✅ 智能体网络接入能力
✅ 多角色对话支持
✅ 异常处理与日志追踪
Python 3.8+
OpenAI兼容API服务
.
├── qwen_agent.py # 核心业务逻辑
├── create_profile.py # 智能体配置文件生成
lass QwenClient:
def __init__(self):
self.openai_api_key = None
self.base_url = None
self.model = None
self.client = None
self.acp = agentcp.AgentCP(".",seed_password="888777",debug=True)
self.agentid:agentcp.AgentID = None
def init_ai_client(self,json_data):
# 从环境变量中获取 API Key 和 Base URL
self.openai_api_key = json_data.get("OPENAI_API_KEY","")
self.base_url = json_data.get("BASE_URL","")
self.model = json_data.get("MODEL","")
self.client = OpenAI(api_key=self.openai_api_key, base_url=self.base_url)
async def async_message_handler(self, message_data):
try:
receiver = message_data.get("receiver")
sender = message_data.get("sender","") # 获取工具信息
if self.agentid.id not in receiver:
print("不是发给我的消息,不处理")
return
message_array = self.agentid.get_content_array_from_message(message_data)
if len(message_array) == 0:
print("消息内容为空,不处理")
return
llm_content = self.agentid.get_content_from_message(message_data)
stream = message_array[0].get("stream",False) # 获取stream信息
tools = message_array[0].get("tools",[]) # 获取工具信息
rolesetting = message_array[0].get("prompt","") # 获取工具信息
if rolesetting!="" and rolesetting!=None:
messages = [{"role": "system", "content": rolesetting},{"role": "user", "content": llm_content}]
else:
messages = [{"role": "user", "content": llm_content}]
print(f"\n[处理消息: {sender} : {llm_content}]\n")
await self.stream_process_query(message_data,messages,sender,stream,tools) #添加await关键字
except Exception as e:
import traceback
print(f"处理消息时发生错误: {e}\n完整堆栈跟踪:\n{traceback.format_exc()}")
def send_message_tools_call(self, session_id,sender,llm_content: str,funcallback):
to_aid_list = [sender]
msg_block = {
"type": "tool_call",
"status": "success",
"timestamp": int(time.time() * 1000),
"content": llm_content,
}
self.agentid.add_message_handler(funcallback,session_id)
self.agentid.send_message(session_id,to_aid_list, msg_block)
async def stream_process_query(self, message_data:dict, messages: list, sender:str,stream: bool,user_tools:list):
if user_tools is None:
user_tools = [] # 确保tools是一个列表,即使它为空
if len(user_tools) > 0:
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
stream=stream,
tools = user_tools
)
else:
response = self.client.chat.completions.create(
model=self.model,
messages=messages,
stream=stream
)
session_id = message_data.get("session_id","") # 获取session_id
content = response.choices[0]
if content.finish_reason == "tool_calls":
tool_call = content.message.tool_calls[0]
tool_name = tool_call.function.name
tool_args = json.loads(tool_call.function.arguments)
print(f"\n[Calling tool {tool_name} with args {tool_args}]\n")
async def funcallback(result_content):
self.agentid.remove_message_handler(funcallback,session_id)
messages.append(content.message.model_dump())
messages.append({
"role": "tool",
"content": self.agentid.get_content_from_message(result_content),
"tool_call_id": tool_call.id,
})
await self.stream_process_query(message_data,messages,sender,stream,user_tools)
tool_content = {
'tool_name':tool_name,
'tool_args':tool_args,
}
self.send_message_tools_call(session_id,sender,json.dumps(tool_content),funcallback)
return
if stream:
await self.agentid.send_stream_message(message_data.get("session_id"),[sender] , response) # 确保正确调用
else:
return self.agentid.reply_message(message_data, content.message.content)
1. 消息处理 - `async_message_handler`
async def async_message_handler(self, message_data):
# 消息过滤与解析
# 构建对话上下文
# 调用处理流程
2. 流式处理 - `stream_process_query`
async def stream_process_query(self, message_data, messages, sender, stream, user_tools):
# 判断工具调用
# 处理大模型响应
# 流式/非流式响应处理
3. 工具调用 - `send_message_tools_call`
def send_message_tools_call(self, session_id, sender, llm_content, funcallback):
# 发送工具调用请求
# 注册回调处理器
程序包含完善的异常捕获机制,可通过DEBUG模式查看详细日志:
AgentCP(..., debug=True) # 启用调试模式
确保env.json
配置文件正确
智能体网络接入需要有效的seed_password
生产环境建议关闭debug模式