八、把输出写入到文件的agent
最后更新于
最后更新于
请参考
AGENT_NAME 为实际的身份信息
python main.py
该Agent基于agentcp
库构建的文件读取示例,可以将agentcp
目录下的文件列出或者读取文件内容作为消息传递,也可以把你输入的消息存储到文件。主要演示以下功能:
安全的文件读取和写入
支持遍历目录查看文件列表
交互式消息处理
from agentcp import AgentCP
import asyncio
import os
import re
import os
import json
class FileOperator:
"""文件操作类,负责安全地读取文本文件内容"""
def __init__(self, sandbox_root: str = None):
self.sandbox_root = sandbox_root or os.getcwd() # 默认使用当前工作目录
def is_text_file(self, file_path: str) -> bool:
"""
通过检查文件内容判断是否为文本文件
Args:
file_path: 文件路径
Returns:
bool: 是否为文本文件
"""
try:
# 尝试以文本模式打开文件
with open(file_path, 'r', encoding='utf-8') as f:
# 读取前1024个字符
f.read(1024)
return True
except UnicodeDecodeError:
# 如果解码失败,说明不是文本文件
return False
except Exception:
# 处理其他异常,如文件不存在等,默认不是文本文件
return False
def extract_file_path(self, text: str) -> str:
"""从文本中提取文件名(不需要完整路径)"""
match = re.search(r"(?:读取|打开|查看)文件\s*([^\s\/\\]+)", text)
return match.group(1) if match else None
def sanitize_path(self, filename: str) -> str:
"""遍历sandbox目录,找到这个文件"""
def recursive_search(root_dir):
try:
for root, dirs, files in os.walk(root_dir):
if filename in files:
full_path = os.path.join(root, filename)
normalized_path = os.path.normpath(full_path)
return normalized_path
except:
return None
return None
return recursive_search(self.sandbox_root)
def walk_directory(self):
"""
遍历目录并打印所有文件和文件夹
"""
filenamess = []
for root, dirs, files in os.walk(self.sandbox_root):
for file in files:
filenamess.append(file)
return filenamess
def exist_file(self, filename):
"""
检查文件是否存在
"""
safe_path = self.sanitize_path(file_path)
return os.path.exists(safe_path)
def read_file(self, file_path: str) -> str:
"""
安全地读取文本文件内容
Args:
file_path: 文件路径
Returns:
文件内容字符串
Raises:
ValueError: 如果文件不是文本格式
"""
safe_path = self.sanitize_path(file_path)
if not safe_path:
raise ValueError("文件不存在")
if not self.is_text_file(safe_path):
raise ValueError("只能读取文本文件")
with open(safe_path, "r") as file:
return file.read()
def parse_command(text):
if not text:
return False
if text.find("查询") >=0 or text.find("列表") >=0:
return 'list'
if text.find("写入") >=0 or text.find("保存") >=0:
return 'write'
if text.find("读取") >= 0 or text.find("查看") >= 0:
return "read"
class FileAgent:
def __init__(self, endpoint: str, name: str):
"""
初始化文件Agent
"""
self.acp = AgentCP("./", seed_password="888777")
self.endpoint = endpoint
self.name = name
self.aid = None
self.file_operator = FileOperator(self.acp.app_path)
self.last_command = ''
self.last_data = ''
async def message_handler(self, msg):
"""
消息处理器 - 根据消息内容安全地读取文件
{
'session_id': '1831173476580327424',
'request_id': '', 'message_id': '9',
'ref_msg_id': '', 'sender': 'samplesdeveloper.agentunion.cn',
'receiver': 'guest_1831158907166261248.agentunion.cn',
'message': '[{"type": "text", "status": "success", "timestamp": 1746343146261,
"content": "{\\"text\\":\\"\\u8bfb\\u53d6\\u6587\\u4ef6agentprofile.json\\",\\"files\\":[],\\"links\\":[],\\"search\\":false,\\"think\\":false}",
"stream": false, "prompt": null, "extra": null, "artifact": null}]',
'timestamp': '1746343146265'
}
"""
try:
ref_msg_id = msg.get("ref_msg_id")
content = msg.get("message", "\"{}\"")
content = json.loads(content)[0]["content"]
content = json.loads(content)
text = content.get("text", "")
command = parse_command(text)
print(f"收到消息: {content}")
if self.last_command:
if self.last_command == "write" and self.last_data:
ret = await self.write_file(msg, text, self.last_data)
if not ret:
await self._send_reply(msg, "文件已存在请重新输入名字")
return
self.last_data = ''
self.last_command = ""
return
elif self.last_command == "write":
self.last_data = text
await self._send_reply(msg, "请输入你要保存的文件名")
return True
elif self.last_command == "read":
self.last_command = ''
return await self.read_file(msg, text) # 调用 read_file 方法处理读取文件的逻辑
elif command == 'list':
files = self.file_operator.walk_directory()
if not files:
await self._send_reply(msg, "当前目录下没有文件")
return True
to = "文件列表<br>" + "<br>".join(files)
await self._send_reply(msg, to)
return True
elif command == 'write':
self.last_command = "write"
await self._send_reply(msg, "请输入你要写入的内容")
return True
elif command == "read":
self.last_command = "read"
files = self.file_operator.walk_directory()
if not files:
await self._send_reply(msg, "当前目录下没有文件")
return True
to = "读取哪一个文件?<br>" + "<br>".join(files)
print(f"send message: {to}")
await self._send_reply(msg, to)
return True
else:
await self._send_reply(
msg, "你可以对我说: <br>查询文件<br>读取文件<br>写入文件"
)
except Exception as e:
print(f"处理消息出错: {str(e)}")
await self._send_reply(msg, f"处理文件时出错: {str(e)}")
return False
async def write_file(self, msg, filename, text):
file_path = f"{self.aid.app_path}/aid/{self.aid.id}/public/{filename}"
# 检查文件是否已存在
if os.path.exists(file_path):
return False
try:
# 以写入模式打开文件,并写入内容
with open(file_path, 'w', encoding='utf-8') as file:
file.write(text)
await self._send_reply(msg, f"成功写入文件: {file_path}")
return True
except Exception as e:
print(f"写入文件时出错: {str(e)}")
await self._send_reply(msg, f"写入文件 {file_path} 时出错: {str(e)}")
return False
async def read_file(self, msg, text):
try:
filename = self.file_operator.extract_file_path(text) # 现在只获取文件名
if not filename:
try:
print(f"未提供文件名尝试读取: {text}")
file_content = self.file_operator.read_file(text) # 直接传入文件名
await self._send_reply(msg, f"文件内容:<br>{file_content}")
return
except Exception as e:
print(f"处理消息出错: {str(e)}")
try:
file_content = self.file_operator.read_file(filename) # 直接传入文件名
await self._send_reply(msg, f"文件内容:<br>{file_content}")
except PermissionError:
await self._send_reply(msg, "访问文件被拒绝")
except FileNotFoundError:
await self._send_reply(msg, f"文件不存在: {filename}")
except ValueError as e:
await self._send_reply(msg, f"{str(e)}")
return True
except Exception as e:
print(f"处理消息出错: {str(e)}")
await self._send_reply(msg, f"处理文件时出错: {str(e)}")
return False
async def _send_reply(self, original_msg, content: str):
"""
发送回复消息
"""
try:
self.aid.send_message_content(
to_aid_list=[original_msg.get("sender")],
session_id=original_msg.get("session_id"),
llm_content=content)
except Exception as e:
print(f"发送回复消息出错: {str(e)}")
async def run(self):
"""
运行Agent
"""
try:
self.aid = self.acp.create_aid(self.endpoint, self.name)
self.aid.add_message_handler(self.message_handler)
self.aid.online()
print("Agent已上线,等待文件读取指令...")
while True:
await asyncio.sleep(1)
except Exception as e:
print(f"发生错误: {str(e)}")
finally:
if self.aid:
self.aid.offline()
print("Agent已下线")
if __name__ == "__main__":
# 配置参数
ENDPOINT = "aid.pub"
AGENT_NAME = "" #实际业务中请使用真实的aid
# 创建并运行Agent
agent = FileAgent(
ENDPOINT,
AGENT_NAME,
)
# agent.test()
asyncio.run(agent.run())
发送指令格式:- 查询文件列表: "查询文件" 或 "列表"- 读取文件内容: "读取文件" 或 "查看文件" (会进入交互模式)- 写入文件内容: "写入文件" (会进入交互模式)
文件操作限制在沙箱目录内(AgentID的公有数据目录)
只能操作文本文件