从一条突发新闻说起
2026年5月26日,比利时Buggenhout镇发生火车与校车相撞事故,多人死亡。新闻在几分钟内传遍全球,但如果你是一个需要实时跟踪这类事件的开发者(比如做风险分析、情报聚合、自动化新闻简报),你不能只靠手动刷页面。你需要一个事件监控Agent——它能持续抓取RSS、解析内容、提取关键字段、评估影响,甚至自动生成摘要推送到你的Slack。
这篇文章不做空洞的概念介绍。我会用这条新闻作为完整案例,拆解一个生产级别的新闻监控Agent应该怎么设计。你会看到:
- 架构全景图(事件流、规划、工具、记忆)
- 核心模块的伪代码(真正能跑的逻辑)
- 我在实际项目中踩过的三个坑
- 一个简化版实现,你可以30分钟内跑起来

为什么需要Agent,而不是一个简单的爬虫+LLM调用?
单次对话能做:给定一篇新闻,让LLM总结。但你要监控持续变化的事件流,并且根据事件类型执行不同动作(比如火车事故需要计算死亡人数趋势,而股票新闻需要对比价格波动)。纯爬虫+LLM方案有两个硬伤:
- 状态缺失:昨天已处理过的事件今天又出现,需要去重和增量更新。
- 任务僵化:每个事件的处理步骤写死在代码里,无法根据内容动态调整(比如事故新闻需要查历史同类事件对比,而娱乐新闻不需要)。
Agent架构通过规划器动态决定步骤,通过记忆模块保留历史状态,通过工具调用获取外部数据。这就是为什么虽然单次LLM调用也能做摘要,但面对真实事件流,你必须用Agent。
Agent架构拆解:四个模块一条管道
整个系统分为四个核心模块:
1. 事件流接入层(工具:RSS/API监听)
输入源可以是NewsAPI、特定RSS、Twitter流(X API)等。工具的作用是定期轮询或通过Webhook接收新事件,返回原始文本。
# 伪代码:RSS轮询工具
class RSSPoller:
def fetch_new_entries(self, feed_url: str, last_check: datetime) -> List[dict]:
feed = feedparser.parse(feed_url)
new_items = []
for entry in feed.entries:
if entry.published_parsed > last_check:
new_items.append({
'title': entry.title,
'body': entry.summary,
'url': entry.link,
'published': entry.published_parsed
})
return new_items
关键点:这里要处理时区、编码、HTML标签剥离。我踩的第一个坑就是某些RSS的摘要带大量HTML,直接喂给LLM会导致Token浪费和格式污染。所以工具必须暴露一个clean_text方法。
2. 规划器(LLM驱动的决策者)
规划器读取新事件,根据事件类型和优先级决定后续步骤。比如对于“火车事故”这类高影响力事件,规划器可能输出:
- 提取关键字段(伤亡人数、地点、原因)
- 查询历史相似事件(记忆模块)
- 计算风险等级(调用计算器工具)
- 生成报告并推送
规划器通常是一个LLM调用,输入事件原始文本 + 系统指令,输出JSON格式的计划。
def plan(event_text: str) -> dict:
prompt = f"""
你是一个新闻监控规划器。给定以下事件文本,输出一个JSON列表表示需要执行的步骤,每个步骤包含:
- step_name: 唯一标识
- tool: 需要调用的工具名(可选:extractor, memory_searcher, calculator, reporter)
- params: 传递给该工具的参数
事件文本:{event_text}
输出格式:{{'steps': [{{'step_name': 'extract_fields', 'tool': 'extractor', 'params': {{'fields': ['deaths','location']}} }}]}}
"""
response = llm.chat(prompt)
return json.loads(response)
实际踩坑:LLM经常输出不完整的JSON,或者包含注释。需要在后置处理中用正则修正或使用结构化输出(如用JSON mode或function calling)。我推荐直接用OpenAI的function calling,让LLM以工具调用的形式返回计划。
3. 记忆模块(短期+长期)
- 短期记忆:当前事件的处理上下文,比如已经提取的字段,正在生成中的报告。用字典存储,每个事件ID对应一个上下文。
- 长期记忆:历史事件的embedding向量,用于相似度检索。我们使用FAISS或简单的ChromaDB,将过往新闻标题+摘要向量化,新事件到来后计算余弦相似度,找出最相关的历史事件。
class Memory:
def __init__(self):
self.short = {} # event_id -> context dict
self.long = vector_store(...) # 存储过往事件的embedding
def find_similar(self, event_text: str, top_k=3):
query_embed = get_embedding(event_text)
return self.long.search(query_embed, top_k)
为什么需要长期记忆:比利时这条新闻如果单独看,你不会知道它是不是最近第五起铁路事故。记忆模块能告诉你“过去30天欧洲火车事故有3起,死亡总数上升趋势”,从而影响风险评估。
4. 执行器与工具集
执行器接收规划器的步骤列表,依次调用工具并收集结果。每个工具是一个独立函数,可以同步或异步。
常用工具:
extractor:调用LLM从原始文本中提取结构化字段(如死亡人数、地点、时间)calculator:做数值运算(如死亡率趋势计算)reporter:生成最终摘要并推送到Slack/邮件search:搜索外部知识库(比如维基百科、新闻数据库)
class Executor:
def __init__(self):
self.tools = {
'extractor': ExtractorTool(),
'memory_searcher': MemoryTool(),
'calculator': CalculatorTool(),
'reporter': ReporterTool()
}
def execute(self, plan: dict, event_id: str):
context = {}
for step in plan['steps']:
tool = self.tools[step['tool']]
result = tool.run(**step['params'], context=context)
context[step['step_name']] = result
return context
核心流程图:从事件到报告
下图描述了Agent处理比利时火车事故的完整流程。

文字描述:
- RSS轮询工具推送新事件(比利时火车事故)到Agent入口。
- 规划器接收事件文本,决定需要提取字段、查询记忆、生成报告。
- 提取器调用LLM,输出:
{ "deaths": "several", "location": "Buggenhout, Belgium", "time": "2026-05-26 09:36" }。注意“several”是不精确的,规划器随后会决定(根据历史记忆)是否需要等待后续官方数据再更新。 - 记忆模块检索相似事件,返回最近欧洲铁路事故列表。
- 计算器根据历史死亡人数中位数,估算本次“several”对应的可能范围(比如4-8人)。
- 报告器合并以上信息,生成一条带风险等级的推送:“比利时火车撞校车事故,死亡数若干(估计4-8人),为近30天欧洲第4起铁路事故,建议关注救援进展。”
- 报告推送到Slack,同时更新短期记忆标记该事件为“已处理”。
关键实现细节和踩坑记录
坑1:LLM提取字段的不稳定性
当原文说“several people”,LLM可能会提取为“0”或空字符串。我们在extractor工具中加入了验证层:如果数值字段不是整数或明确范围,则标记为unknown并传给规划器,让规划器决定是等待后续更新还是使用默认值。
class ExtractorTool:
def run(self, fields, text):
result = llm.extract(text, fields)
for key in fields:
if key in ['deaths', 'injured']:
if not result[key] or not result[key].isdigit():
result[key] = 'unknown'
return result
坑2:事件去重
同一则新闻可能被多个RSS源重复推送。我们的方法是对新闻URL做SHA256哈希作为事件ID,并在记忆模块的短期存储中记录已处理ID。如果新事件的ID已存在,则直接跳过。
坑3:异步与并发
如果你监听多个RSS源,每个新事件都需要独立走一遍规划-执行流程。使用异步事件循环(asyncio)并行处理,但要注意规划器LLM调用是IO密集型,应该用协程池。Python的asyncio.gather配合semaphore控制并发数,避免API限速。
简化版动手实现:30分钟跑起来
以下是一个最小化但可运行的Agent核心。只需要Python 3.9+,openai库和feedparser。
步骤1:安装依赖
pip install openai feedparser python-dotenv
步骤2:配置.env
OPENAI_API_KEY=sk-...
RSS_URL=https://rss.nytimes.com/services/xml/rss/nyt/World.xml
步骤3:写主文件agent.py
import os
import json
import hashlib
from datetime import datetime
import feedparser
from openai import OpenAI
client = OpenAI()
MEMORY = {} # event_id -> context
def fetch_news():
feed = feedparser.parse(os.getenv('RSS_URL'))
entries = []
for entry in feed.entries:
entries.append({
'title': entry.title,
'summary': entry.summary,
'link': entry.link,
'published': entry.published_parsed
})
return entries
def plan(event_text: str) -> dict:
response = client.chat.completions.create(
model='gpt-4',
messages=[{
"role": "system",
"content": "你是一个新闻规划器。输出JSON列表的步骤,每个步骤有step_name和tool。可能工具:extractor, reporter"
},{
"role": "user",
"content": f"事件:{event_text}"
}],
response_format={"type": "json_object"}
)
return json.loads(response.choices[0].message.content)
def run(event: dict):
event_id = hashlib.sha256(event['link'].encode()).hexdigest()
if event_id in MEMORY:
return
text = f"标题:{event['title']}\n摘要:{event['summary']}"
plan_steps = plan(text)
context = {}
for step in plan_steps.get('steps', []):
if step['tool'] == 'extractor':
fields = step['params']['fields']
# 简化:直接调用LLM提取
resp = client.chat.completions.create(
model='gpt-3.5-turbo',
messages=[{"role": "user", "content": f"从以下文本提取{fields}并返回JSON:{text}"}],
response_format={"type": "json_object"}
)
extracted = json.loads(resp.choices[0].message.content)
context['extracted'] = extracted
elif step['tool'] == 'reporter':
report = f"事故报告:{context.get('extracted', {})}\n来源:{event['link']}"
print(report)
MEMORY[event_id] = context
if __name__ == '__main__':
entries = fetch_news()
for e in entries:
run(e)
用法:运行python agent.py,它会抓取最新世界新闻,对每条自动规划提取和报告。你可以扩展工具(比如添加记忆检索)和规划逻辑。
你现在应该做什么
- 如果你在构建实时事件系统:参考本文的四个模块,先从事件接入和提取开始,不要一开始就做完美记忆。
- 如果你只是想快速验证想法:运行上面的简化版,用你的新闻源替换RSS,观察Agent是如何自动处理不同类型的新闻的。
- 注意风险:Agent自动处理新闻可能产生误报。务必加入人工审核通道(比如报告推送到Slack后由人确认)。
记住,Agent的价值在于动态适应而非固定流水线。从比利时火车事故这个案例可以看出,当事件信息不完整时(“several”),Agent会调用记忆和计算来估算,而不是停滞。这是传统爬虫做不到的。
现在,打开你的编辑器,动手吧。