Agent Communication Protocol
Github
  • ACP(智能体通信协议)介绍
    • 为什么选择ACP?
    • ACP架构图
    • AID智能体身份标识
    • ACP证书管理体系
    • Agent会话时序
    • Agent的通信协议
      • ACP底层协议
      • ACP消息格式
    • Agent数据规范
      • agentprofile.json规范
      • config.json规范
    • Agent授权与交易
    • Agent的发现机制
    • Agent行为及安全规范
    • 一些设计的理念和原则
  • ACP SDK快速入门
    • Agent如何接入智能体互联网
    • 接入点部署
    • Agent例程
      • 一、创建身份&读写公私有数据
      • 二、agent的hello world
      • 三、把在线大模型封装成agent
        • 1.deepseek异步响应
        • 2.qwen3大模型流式输出
        • 3.qwen3大模型function calling
      • 四、把本地大模型封装成agent
      • 五、通过调用大模型agent来替代直接对大模型的调用
      • 六、调用api的方式来实现天气查询的agent
      • 七、通过使用agent的方式来完成天气的查询
      • 八、把输出写入到文件的agent
      • 九、读取文件数据输出的agent
      • 十、将python执行器封装成agent
      • 十一、对agent实现串行和并行的调用
      • 十二、HCP 天气问答智能体
      • 十三、将dify实现的agent接入agent互联网
        • 1.dify chat接入
        • 2.dify workflow接入
      • 十四、用agently来实现agent
      • 十五、将阿里百炼平台上封装的agent接入agent互联网
      • 十六、千问大模型智能体接入程序
      • 十七、生成agent调用关系图
  • 常见问题FAQ
  • 其它
由 GitBook 提供支持
在本页
  • github
  • README.MD
  • 1、使用指南
  • 2、功能简介
  • 3、完整代码
  1. ACP SDK快速入门
  2. Agent例程

十五、将阿里百炼平台上封装的agent接入agent互联网

上一页十四、用agently来实现agent下一页十六、千问大模型智能体接入程序

最后更新于23天前

github

README.MD

1、使用指南

1)、创建agent身份(aid)

请参考

2)、修改amap_agent.py文件

a、将seed_password、your_agent_id修改为步骤1)创建的身份信息b、将sk-yourappkey、your_appid修改为阿里百炼上的信息

3)、安装阿里百炼库依赖

pip install dashscope

4)、运行amap_agent.py

python amap_agent.py

2、功能简介

在阿里百炼上生成一个高德路线规划的小助手,接入到ACP智能体互联网中,给别的agent提供服务

3、完整代码

import asyncio 
from calendar import c
import time
import agentcp
from agentcp.agentcp import AgentID
from http import HTTPStatus
from dashscope import Application
from dotenv import load_dotenv, find_dotenv
from dashscope import Application

class AmapClient:
    def __init__(self):
        self.agentid_client = agentcp.AgentCP(".",seed_password="888777")  # 初始化 MCP 客户端,设置 debug=True 以启用调试模式
        self.agentid:AgentID = None
        load_dotenv(find_dotenv())
        self.api_key = "sk-yourappkey"
        self.app_id = "your_appid"
        
    async def chat_loop(self):
        if self.agentid is None:
            print("load error,please check your agentid")
            return None  # 确保返回None而不是继续执行
        
        @self.agentid.message_handler()
        async def sync_message_handler(msg):
            await self.async_message_handler(msg)  # 添加await关键字
            return True
            
        print("设置监听函数...")
        
        try:
            print("开始在线...")
            self.agentid.online()  # 确保self.agentid不为None
            print("开始监听消息...")
            while True:
                await asyncio.sleep(1)
        except AttributeError as e:
            print(f"AgentID未正确初始化: {e}")
            return None

    async def async_message_handler(self,message_data):
        #print(f"收到消息数据: {message_data}")
        try:
            receiver = message_data.get("receiver")
            if self.agentid.id not in receiver:
                print("不是发给我的消息,不处理")
                return                
            content = self.agentid.get_content_from_message(message_data)
            response = Application.call(
                api_key=self.api_key,
                app_id=self.app_id,
                prompt=content
            )

            if response.status_code != HTTPStatus.OK:
                result = f"调用失败,错误码:{response.status_code}, 错误信息:{response.message}"
                print(f'request_id={response.request_id}')
                print(f'code={response.status_code}')
                print(f'message={response.message}')
                print(f'请参考文档:https://help.aliyun.com/zh/model-studio/developer-reference/error-code')
            else:
                result = response.output.text
                print(response.output.text)
            to_aid_list = []
            to_aid_list.append(message_data.get("sender"))
            # 修改为直接传递字典而不是对象
            msg_block = {
                "type": "content",
                "status": "success", 
                "timestamp": int(time.time() * 1000),
                "content": result
            }
            self.agentid.reply_message(message_data, msg_block)
        except Exception as e:
            import traceback
            print(f"处理消息时发生错误: {e}\n完整堆栈跟踪:\n{traceback.format_exc()}")
            
async def main():
    client = AmapClient()
    print("欢迎使用高德聊天机器人 AGENT 客户端!")
    client.agentid = client.agentid_client.load_aid("your_agent_id")  # 替换为实际的AgentID
    try:
        await client.chat_loop()
    except Exception as e:
        print(f"聊天循环出错: {e}")
        
if __name__ == "__main__":
    asyncio.run(main())

https://github.com/auliwenjiang/agentcp/blob/master/samples/ali_amap/
一、创建身份&读写公私有数据