Agent Communication Protocol
Github
  • ACP(智能体通信协议)介绍
    • 为什么选择ACP?
    • ACP架构图
    • AID智能体身份标识
    • ACP证书管理体系
    • Agent会话时序
    • Agent的通信协议
      • ACP底层协议
      • ACP消息格式
    • Agent数据规范
      • agentprofile.json规范
      • config.json规范
    • Agent授权与交易
    • Agent的发现机制
    • Agent行为及安全规范
    • 一些设计的理念和原则
  • ACP SDK快速入门
    • Agent如何接入智能体互联网
    • 接入点部署
    • Agent例程
      • 一、创建身份&读写公私有数据
      • 二、agent的hello world
      • 三、把在线大模型封装成agent
        • 1.deepseek异步响应
        • 2.qwen3大模型流式输出
        • 3.qwen3大模型function calling
      • 四、把本地大模型封装成agent
      • 五、通过调用大模型agent来替代直接对大模型的调用
      • 六、调用api的方式来实现天气查询的agent
      • 七、通过使用agent的方式来完成天气的查询
      • 八、把输出写入到文件的agent
      • 九、读取文件数据输出的agent
      • 十、将python执行器封装成agent
      • 十一、对agent实现串行和并行的调用
      • 十二、HCP 天气问答智能体
      • 十三、将dify实现的agent接入agent互联网
        • 1.dify chat接入
        • 2.dify workflow接入
      • 十四、用agently来实现agent
      • 十五、将阿里百炼平台上封装的agent接入agent互联网
      • 十六、千问大模型智能体接入程序
      • 十七、生成agent调用关系图
  • 常见问题FAQ
  • 其它
由 GitBook 提供支持
在本页
  • Github
  • README.md
  • 1、使用指南
  • 2、功能特性
  • 3、环境要求
  • 4、代码结构
  • 5、核心类说明
  • 6、错误处理
  • 7、注意事项
  1. ACP SDK快速入门
  2. Agent例程

十六、千问大模型智能体接入程序

上一页十五、将阿里百炼平台上封装的agent接入agent互联网下一页十七、生成agent调用关系图

最后更新于23天前

Github

README.md

基于AgentCP SDK开发的千问大模型智能体,实现大模型能力与智能体网络的无缝对接。使网络中的其他智能体可以通过调用该智能体的方式来使用千问大模型

1、使用指南

1)、创建一个agent身份

请参考

2)、修改qwen_agent.py文件

将seed_password、your_aid修改为步骤1)创建的身份信息

3)、配置文件

配置文件配置在智能体私有数据目录下的ACP路径/AIDs/youraid/private/data/env.json中(简化智能体迁移克隆流程)在data/env.json中配置服务参数:

{
    "OPENAI_API_KEY": "your_api_key",
    "BASE_URL": "https://api.example.com/v1",
    "MODEL": "qwen-72b-chat"
}

4)、执行代码

python qwen_agent.py

2、功能特性

  • ✅ 完整的消息处理机制

  • ✅ 支持流式响应和工具调用

  • ✅ 智能体网络接入能力

  • ✅ 多角色对话支持

  • ✅ 异常处理与日志追踪

3、环境要求

  • Python 3.8+

  • OpenAI兼容API服务

4、代码结构

.
├── qwen_agent.py       # 核心业务逻辑
├── create_profile.py   # 智能体配置文件生成

5、核心类说明

QwenClient

lass QwenClient:
    def __init__(self):
        self.openai_api_key = None
        self.base_url = None
        self.model = None
        self.client = None
        self.acp = agentcp.AgentCP(".",seed_password="888777",debug=True)
        self.agentid:agentcp.AgentID = None
        
    def init_ai_client(self,json_data):
        # 从环境变量中获取 API Key 和 Base URL
        self.openai_api_key = json_data.get("OPENAI_API_KEY","")
        self.base_url = json_data.get("BASE_URL","")
        self.model = json_data.get("MODEL","")
        self.client = OpenAI(api_key=self.openai_api_key, base_url=self.base_url)

    async def async_message_handler(self, message_data):
        try:
            receiver = message_data.get("receiver")
            sender  = message_data.get("sender","")  # 获取工具信息
            if self.agentid.id not in receiver:
                print("不是发给我的消息,不处理")
                return
            message_array = self.agentid.get_content_array_from_message(message_data)
            if len(message_array) == 0: 
                print("消息内容为空,不处理")
                return
            llm_content = self.agentid.get_content_from_message(message_data)
            stream = message_array[0].get("stream",False)  # 获取stream信息
            tools  = message_array[0].get("tools",[])  # 获取工具信息
            rolesetting = message_array[0].get("prompt","")  # 获取工具信息
            if rolesetting!="" and rolesetting!=None:
                messages = [{"role": "system", "content": rolesetting},{"role": "user", "content": llm_content}]
            else:
                messages = [{"role": "user", "content": llm_content}]
            print(f"\n[处理消息: {sender} : {llm_content}]\n") 
            await self.stream_process_query(message_data,messages,sender,stream,tools)  #添加await关键字
        except Exception as e:
            import traceback
            print(f"处理消息时发生错误: {e}\n完整堆栈跟踪:\n{traceback.format_exc()}")
    
    def send_message_tools_call(self, session_id,sender,llm_content: str,funcallback):
        to_aid_list = [sender]
        msg_block = {
            "type": "tool_call",
            "status": "success",
            "timestamp": int(time.time() * 1000),
            "content": llm_content,
        }
        self.agentid.add_message_handler(funcallback,session_id)
        self.agentid.send_message(session_id,to_aid_list, msg_block)
          
    async def stream_process_query(self, message_data:dict, messages: list, sender:str,stream: bool,user_tools:list):
        if user_tools is None:
            user_tools = []  # 确保tools是一个列表,即使它为空
        if len(user_tools) > 0:
            response = self.client.chat.completions.create(
                model=self.model,
                messages=messages,
                stream=stream,
                tools = user_tools
            )   
        else:
            response = self.client.chat.completions.create(
                model=self.model,
                messages=messages,
                stream=stream
            )   
        session_id = message_data.get("session_id","")  # 获取session_id
        content = response.choices[0]
        if content.finish_reason == "tool_calls":
            tool_call = content.message.tool_calls[0]
            tool_name = tool_call.function.name
            tool_args = json.loads(tool_call.function.arguments)
            print(f"\n[Calling tool {tool_name} with args {tool_args}]\n")
            async def funcallback(result_content):
                self.agentid.remove_message_handler(funcallback,session_id)
                messages.append(content.message.model_dump())
                messages.append({
                    "role": "tool",
                    "content": self.agentid.get_content_from_message(result_content),
                    "tool_call_id": tool_call.id,
                })
                await self.stream_process_query(message_data,messages,sender,stream,user_tools)
            tool_content = {
                'tool_name':tool_name,
                'tool_args':tool_args,
            }
            self.send_message_tools_call(session_id,sender,json.dumps(tool_content),funcallback)
            return
        if stream:
            await self.agentid.send_stream_message(message_data.get("session_id"),[sender] , response)  # 确保正确调用
        else:
            return self.agentid.reply_message(message_data, content.message.content)

主要方法

1. 消息处理 - `async_message_handler`

async def async_message_handler(self, message_data):
    # 消息过滤与解析
    # 构建对话上下文
    # 调用处理流程

2. 流式处理 - `stream_process_query`

async def stream_process_query(self, message_data, messages, sender, stream, user_tools):
    # 判断工具调用
    # 处理大模型响应
    # 流式/非流式响应处理

3. 工具调用 - `send_message_tools_call`

def send_message_tools_call(self, session_id, sender, llm_content, funcallback):
    # 发送工具调用请求
    # 注册回调处理器

6、错误处理

程序包含完善的异常捕获机制,可通过DEBUG模式查看详细日志:

AgentCP(..., debug=True)  # 启用调试模式

7、注意事项

  1. 确保env.json配置文件正确

  2. 智能体网络接入需要有效的seed_password

  3. 生产环境建议关闭debug模式

https://github.com/auliwenjiang/agentcp/blob/master/samples/llm_agent/qwen_agent.py
一、创建身份&读写公私有数据
AgentCP SDK