1.
背景

花了整整两天时间,本
qiang~
开发了一个关于
AI
新闻资讯的自动聚合及报告生成工具。

本篇记录一下整体的框架和实现原理,并且本着它山之石可以攻玉,本
qiang~
开放了所有的源码,源码可见如下第
5
章节,感谢各位看官的大力支持。如有问题,可私信或留言沟通。

成品可以参考链接:《
AI
资讯每日速递
(2024.11.05)

2.
为什么要做这件事?

深处
AI
时代,想要追赶前沿的一手技术与资讯,有一个工具能够实时获取每天的重点内容,包括咨询和技术相关内容,并且能够按照公司及内容的优先级进行筛选,然后午后捧着一杯奶茶,点开自动生成的报告,岂不美哉美哉?

3.
相关技术

  1. Crawl4ai
    :

    一块集成
    LLM
    的开源爬虫工具
  2. Swarm
    : OpenAI

    发布的
    Multi-Agent
    编排框架,可以参考本人先前的辛苦整理:《
    LLM
    应用实战
    : OpenAI
    多代理框架
    -Swarm

  3. Python-docx
    : word

    的操作工具
  4. Textdistance
    :
    用于报告模块中资讯排序结果与原始资讯结果的对齐
  5. Gpt-4o-mini
    :

    采用的大模型是
    gpt-4o-mini
    ,每日免费调用
    200
    次,不够用
    ...

4.
整体框架

整体框架分为三个模块:

4.1
下载模块

下载模块的数据源包括各大
AI
新闻网站及知名博客,然后通过开源爬虫工具
crawl4ai
进行爬取,爬取的维度包括标题、内容、图片等。

4.2
解析模块

解析模块是针对爬取的结果进行解析,采用
OpenAi Swarm
框架,包含
4

Agent
,其中
Analysis Agent
是主体
Agent
,遍历下载的每一个资讯,将每条资讯分别同步给其他
Agent
完成具体的解析任务。其中
Translator Agent
主要功能是翻译,将英文翻译为中文;
Classifier Agent
主要功能是针对资讯进行分类,如涉及技术还是产品之类的;
Modifier Agent
主要功能是将资讯的标题和内容进行改写,标题可以改写更醒目一些,内容主要是提取摘要信息。

Analysis Agent
负责串联其他
3

Agent
,每个
Agent
结束后均会返回到
Analysis Agent
,以便让
Analysis Agent
决定下一步的操作。

4.3
报告模块

报告模块包含
Sorter Agent
,主要功能是将解析后的资讯按照公司、内容等维度进行排序,然后筛选出其中相对排名较高的资讯。

经过排序
Agent
后,最终将结果保存为
word

5.
全部源码

5.1
下载模块

采用
crawl4ai
工具进行网站爬取,示例的网站是
https://www.aibase.com
,网站存在中文及英文,但增加翻译
Agent
是为了兼容其他网站。

1.
文件处理
file_util.py

importjsonimporthashlibdef get_datas(file_path, json_flag=True, all_flag=False, mode='r'):"""读取文本文件"""results=[]

with open(file_path, mode, encoding
='utf-8') as f:for line inf.readlines():ifjson_flag:
results.append(json.loads(line))
else:
results.append(line.strip())
ifall_flag:ifjson_flag:return json.loads(''.join(results))else:return '\n'.join(results)returnresultsdef save_datas(file_path, datas, json_flag=True, all_flag=False, with_indent=False, mode='w'):"""保存文本文件"""with open(file_path, mode, encoding='utf-8') as f:ifall_flag:ifjson_flag:
f.write(json.dumps(datas, ensure_ascii
=False, indent= 4 if with_indent elseNone))else:
f.write(
''.join(datas))else:for data indatas:ifjson_flag:
f.write(json.dumps(data, ensure_ascii
=False) + '\n')else:
f.write(data
+ '\n')

2.
网站爬取
web_crawler.py


from crawl4ai importAsyncWebCrawlerfrom crawl4ai.extraction_strategy importJsonCssExtractionStrategyimportjsonfrom typing importDict, Any, Union, Listfrom bs4 importBeautifulSoupfrom file_util import *
importosimportdatetimeimportreimportrequestsclassAbstractAICrawler():def __init__(self) ->None:pass
    defcrawl():raiseNotImplementedError()classAINewsCrawler(AbstractAICrawler):def __init__(self, domain) ->None:
super().
__init__()
self.domain
=domain
self.file_path
= f'data/{self.domain}.json'self.history=self.init()definit(self):if notos.path.exists(self.file_path):return{}return {ele['id']: ele for ele inget_datas(self.file_path)}defsave(self, datas: Union[List, Dict]):ifisinstance(datas, dict):
datas
=[datas]
self.history.update({ele[
'id']: ele for ele indatas})
save_datas(self.file_path, datas
=list(self.history.values()))

async
def crawl(self, url:str, schema: Dict[str, Any]=None):
extraction_strategy
= JsonCssExtractionStrategy(schema, verbose=True) if schema elseNone
async with AsyncWebCrawler(verbose
=True) as crawler:
result
=await crawler.arun(
url
=url,
extraction_strategy
=extraction_strategy,
bypass_cache
=True,
)
assert result.success, "Failed to crawl the page" ifschema:returnjson.loads(result.extracted_content)returnresult.cleaned_htmlclassAIBasesCrawler(AINewsCrawler):def __init__(self) ->None:
self.domain
= 'aibase'super().__init__(self.domain)
self.url
= 'https://www.aibase.com'asyncdef crawl_home(self, url='https://www.aibase.com/news'):
schema
={'name': 'ai base home page crawler','baseSelector': '.flex','fields': [
{
'name': 'link','selector': 'a[rel="noopener noreferrer"]','type': 'nested_list','fields': [
{
'name': 'href', 'type': 'attribute', 'attribute':'href'}
]
}
]
}
links
=await super().crawl(url, schema)
links
= [link['href'] for ele in links for link in ele['link']]
links
= list(set([f'{self.url}{ele}' for ele in links if ele.startswith('/news')]))
links
= sorted(links, key=lambda x: x, reverse=True)returnlinks

async
defcrawl_newsletter_cn(self, url):
html
=await super().crawl(url)
body
= BeautifulSoup(html, 'html.parser')
title
= body.select_one('h1').get_text().replace('\u200b', '').strip()
date
= [ele.get_text().strip() for ele in body.find_all('span') if re.match(r'(\d{4}年\d{1,2}月\d{1,2}号)', ele.get_text().strip())][0]
date
= datetime.datetime.strptime(date, '%Y年%m月%d号 %H:%M').strftime("%Y-%m-%d")
content
= '\n'.join([ele.get_text().strip().replace('\n', '').replace(' ', '') for ele in body.find_all('p')])
content
= content[:content.index('划重点:')].strip() if '划重点:' in content elsecontentreturn{'title': title,'link': url,'content': content,'date': date
}

async
def crawl_home_cn(self, url='https://www.aibase.com/zh/news'):
schema
={'name': 'ai base home page crawler','baseSelector': '.flex','fields': [
{
'name': 'link','selector': 'a[rel="noopener noreferrer"]','type': 'nested_list','fields': [
{
'name': 'href', 'type': 'attribute', 'attribute':'href'}
]
}
]
}
links
=await super().crawl(url, schema)
links
= [link['href'] for ele in links for link in ele['link']]
links
= list(set([f'{self.url}{ele}' for ele in links if ele.startswith('/zh/news')]))
links
= sorted(links, key=lambda x: x, reverse=True)returnlinks

async
defcrawl_newsletter(self, url):
html
=await super().crawl(url)
body
= BeautifulSoup(html, 'html.parser')
title
= body.select_one('h1').get_text().replace('\u200b', '').strip()
date
= ';'.join([ele.get_text().strip() for ele in body.find_all('span')])
date
= re.findall(r'(\b\w{3}\s+\d{1,2},\s+\d{4}\b)', date)[0]
date
= datetime.datetime.strptime(date, '%b %d, %Y').strftime("%Y-%m-%d")
content
= '\n'.join([ele.get_text().strip().replace('\n', '') for ele in body.find_all('p')])
content
= content[:content.index('Key Points:')].strip() if 'Key Points:' in content elsecontent
pic_urls
= [ele.get('src').strip() for ele in body.select('img') if ele.get('title')]
pic_url
= pic_urls[0] if pic_urls else ''pic_url= pic_url.replace('\\"', '')
pic_path
= '' ifpic_url:
pic_path
= f'data/images/{md5(url)}.jpg'response=requests.get(pic_url)if response.status_code == 200:
with open(pic_path,
'wb') as f:
f.write(response.content)
return{'title': title,'link': url,'content': content,'date': date,'pic': pic_path,'id': md5(url)
}

async
defcrawl(self):
links
=await self.crawl_home()
results
=[]for link inlinks:
_id
=md5(link)if _id inself.history:continueresults.append({'id': _id,'link': link,'contents': await self.crawl_newsletter(link),'time': datetime.datetime.now().strftime('%Y-%m-%d')
})
self.save(results)
returnawait self.get_last_day_data()

async
defget_last_day_data(self):
last_day
= (datetime.date.today() - datetime.timedelta(days=1)).strftime('%Y-%m-%d')
datas
=self.init()for v indatas.values():
v[
'contents']['id'] = v['id']return [v['contents'] for v in datas.values() if v['contents']['date'] == last_day]

View Code

5.2
解析模块

1. 解析提示语prompt.py


ANALYST = """你是一个AI领域的分析师,主要工作步骤如下:
1. 首先执行transform_to_translate_agent方法,切换到translate agent,执行翻译任务;
2. 然后再执行transform_to_classifier_agent,调用classifier agent,针对内容进行分类;
3. 接着再执行transform_to_modifier_agent,调用modifier agent,针对内容进行改写;
4. 前三步执行完毕后,意味着整个分析工作已经完成,最后调用finish方法,退出该整个工作流程。
需要注意的是:每个步骤必须执行完成后,才能执行后续的步骤,且同时只能有1个步骤在执行;如果modifier agent已经执行完毕,一定要调用finish退出整体工作流程。
"""TRANSLATE= """你现在是一个AI领域的翻译专家,请将如下英文的标题和内容分别翻译为中文。步骤及要求如下:
1. 首先调用translate方法进行翻译,要求如下:
a. 需要注意的标题和内容中如果包含公司名称、产品名称、技术名称等专业词汇,针对这些专业词汇需要保留英文形式,其他非专业词汇需要翻译为中文,注意标题也必须翻译;
b. 输出格式为 "标题: xxxxx\n内容: xxxxx",且需要保留换行符;
c. 注意该translate方法没有输入参数,返回的结果只是需要翻译的原始文本,需要你执行翻译操作,然后返回翻译结果;
d. 该translate方法执行完成后,需要你执行具体的翻译,等待翻译完成后,才能开展下一个步骤,不能直接将原文作为参数传给下一个步骤;

2. 抽取完成后,执行extract_translate_result方法,要求如下:
a. 该extract_translate_result方法存在1个输入参数,即执行1后得到的翻译结果

3. 待步骤2执行完成后,执行transform_to_analysis_agent方法,切换至analysis agent,执行其他工作。

4. 步骤1,2,3必须按照顺序执行,且同时只能有1个步骤在执行

5. 如果历史记录中已经执行了任何步骤,注意严格禁止再次重复执行,而要直接执行未执行的步骤,
"""CLASSIFIER= """你是一个AI领域的分类器,请判断输入是否与AI的技术相关。步骤及要求如下:
1. 首先调用classify方法进行分类,要求如下:
a. 输入的内容包括标题和内容两部分,重点基于内容进行判断这条信息是否与AI技术相关;
b. 如果是相关技术细节、技术原理、代码说明、架构说明,则输出"是",如果是与公司的最新资讯相关,如发行新产品、成立新部门、公司合作等非技术相关的,则输出"否"
c. 输出的结果只能是"是"、"否"两个选项中的一个,不要输出其他内容,包括解释信息等。
d. 注意该classify方法没有输入参数,返回的结果只是需要分类的原始文本,需要你执行分类任务,然后返回分类结果;


2. 获取到分类结果后,执行extract_classify_result方法,要求如下:
a. 该extract_classify_result方法存在1个输入参数,即执行1后得到的分类结果

3. 待步骤2执行完成后,执行transform_to_analysis_agent方法,切换至analysis agent,执行其他工作

4. 步骤1,2,3必须按照顺序执行,且同时只能有1个步骤在执行

5. 如果历史记录中已经执行了任何步骤,注意严格禁止再次重复执行,而要直接执行未执行的步骤,
"""MODIFIER= """你是一个AI新闻的改写器,请基于输入中的标题和内容进行改写。步骤及要求如下:
1. 首先调用modify方法进行改写,要求如下:
a. 输入的内容包括"标题"和"内容"两部分,需要分别针对"标题"和"内容"进行改写;
b. "标题"的改写目标是需要醒目且具有吸引力,能够吸引读者进一步阅读,要求字数不能超过30字;
c. "内容"需要摘要总结,需要准确提取主要内容,要求字数不超过200字;
d. 输出格式为 "标题: xxxx\n内容: xxxxx",且需要保留换行符,"标题"和"内容"需要以输入的中文为准;
e. 注意该modify方法没有输入参数,返回的结果是需要改写的原始文本,需要你执行改写任务,然后返回改写结果;


2. 获取到改写结果后,执行extract_modify_result方法,要求如下:
a. 该extract_modify_result方法存在1个输入参数,即执行1后得到的改写结果

3. 待步骤2执行完成后,执行transform_to_analysis_agent方法,切换至analysis agent,执行其他工作

4. 步骤1,2,3必须按照顺序执行,且同时只能有1个步骤在执行

5. 如果历史记录中已经执行了任何步骤,注意严格禁止再次重复执行,而要直接执行未执行的步骤
"""

View Code

2. 解析Agent整体流程agent.py


agent copy 2from swarm importSwarm, Agentfrom web_crawler importAIBasesCrawlerimportasynciofrom prompt import *
from file_util import *
from tqdm importtqdmimportdatetime


client
=Swarm()defdownload():returnasyncio.run(AIBasesCrawler().crawl())deftransform_to_analysis_agent():returnanalysis_agentdeftransform_to_translate_agent():returntranslate_agentdeftransform_to_classifier_agent():returnclassifier_agentdeftransform_to_modifier_agent():returnmodifier_agentdeftranslate(context_variables):return f'现在请按要求翻译如下内容:\n标题: {context_variables["title"]}\n内容: {context_variables["content"]}' defextract_translate_result(result: str, context_variables: dict):"""翻译的结果进行抽取

Args:
result (str): 翻译结果
Returns:
str: 翻译结果提取结束标志
"""context_variables['title_zh'] = result[result.index('标题:')+len('标题:'):result.index('内容:')]
context_variables[
'content_zh'] = result[result.index('内容:')+len('内容:'):]return '翻译结果提取任务已经完成,请继续下一步操作。' defclassify(context_variables):return f'现在请按要求针对以下内容进行分类,\n输入:\n标题: {context_variables["title_zh"]}\n内容: {context_variables["content_zh"]},\n输出:' defextract_classify_result(result: str, context_variables: dict):"""分类的结果进行抽取

Args:
result (str): 翻译结果
Returns:
str: 分类结果提取结束标志
"""context_variables['classify'] =resultreturn '分类结果提取任务已经完成,请继续下一步操作。' defmodify(context_variables):return f'现在请按要求针对以下内容进行改写,\n输入:\n标题: {context_variables["title_zh"]}\n内容: {context_variables["content_zh"]},\n输出:' defextract_modify_result(result: str, context_variables: dict):"""改写的结果进行抽取

Args:
result (str): 改写结果
Returns:
str: 改写结果提取结束标志
"""context_variables['title_modify'] = result[result.index('标题:')+len('标题:'):result.index('内容:')]
context_variables[
'content_modify'] = result[result.index('内容:')+len('内容:'):]return '改写结果提取任务已经完成,请继续下一步操作。' deffinish():return '分析任务已经完成,请直接退出整个工作流程,直接输出"退出"。'analysis_agent= Agent(name='analysis_agent', instructions=ANALYST, functions=[transform_to_translate_agent, transform_to_classifier_agent, transform_to_modifier_agent, finish])
translate_agent
= Agent(name='translate_agent', instructions=TRANSLATE, functions=[translate, extract_translate_result, transform_to_analysis_agent])
classifier_agent
= Agent(name='classifier_agent', instructions=CLASSIFIER, functions=[classify, extract_classify_result, transform_to_analysis_agent])
modifier_agent
= Agent(name='modifier_agent', instructions=MODIFIER, functions=[modify, extract_modify_result, transform_to_analysis_agent])

output_file_pre
= (datetime.date.today() - datetime.timedelta(days=1)).strftime('%Y.%m.%d')
output_path
= f'data/{output_file_pre}_final_results.json'results=get_datas(output_path)
process_ids
= [data['id'] for data inresults]for data intqdm(download()):if data['id'] in process_ids: continuecontext_variables= {'title': data['title'], 'content': data['content']}try:
result
= client.run(analysis_agent, messages=[{"role": "user", "content": "现在,请开始分析!"}], context_variables=context_variables, debug=True)
context_variables
=result.context_variables
data[
'title_zh'] = context_variables['title_zh']
data[
'content_zh'] = context_variables['content_zh']
data[
'classify'] = context_variables['classify']
data[
'title_modify'] = context_variables['title_modify']
data[
'content_modify'] = context_variables['content_modify']
save_datas(output_path, [data], mode
='a')exceptException as e:print(e)continue

View Code

5.3
报告模块

1. 排序提示语prompt.py


SORTER = """你是一个AI新闻的排序助手,请给予输入的新闻标题进行排序。要求如下:
1. 排序的规则是基于标题中所提及公司、组织机构的名气和重要性进行排序,名气和重要性是基于你所学的知识进行排序,名气和重要性越高,排名越靠前;
2. 排序的结果只返回名气最高的top10即可,输出的格式为"1xxxxx\n2xxxxx\n3xxxxx...\n10xxxxx",注意一定要以"\n"进行换行;
3. 输出的每个标题,需要和输入中对应的标题保持完全一致,禁止更改;
"""

View Code

2. 排序流程agent.py


from swarm importSwarm, Agentfrom prompt import *
from file_util import *
from collections importdefaultdictimportreimporttextdistancefrom word_util importsave_2_wordimportdatetimeimportrandom


client
=Swarm()
output_file_pre
= (datetime.date.today() - datetime.timedelta(days=1)).strftime('%Y.%m.%d')
output_path
= f'data/{output_file_pre}_final_results.json'sort_agent= Agent(name='sort_agent', instructions=SORTER)

datas
=get_datas(output_path)for ele indatas:
ele[
'title_modify'] = ele['title_modify'].strip()
ele[
'content_modify'] = ele['content_modify'].strip()defget_most_similar(t1, texts):
most_similarity
= 0.0most_similar_text= '' for ele intexts:
similarity
=textdistance.levenshtein.similarity(t1, ele)if similarity >most_similarity:
most_similarity
=similarity
most_similar_text
=elereturnmost_similar_text

type_2_title
=defaultdict(list)
{type_2_title[ele[
'classify']].append(ele['title_modify']) for ele indatas}
title_2_data
= {ele['title_modify']: ele for ele indatas}
final_results
=defaultdict(list)for k, v intype_2_title.items():
content
= "\n".join([ele for ele inv])
message
= f'现在请根据你所学习的知识,按照要求对以下输入进行排序,并且按照输出格式进行输出,\n输入:\n{content},\n输出:'result= client.run(sort_agent, messages=[{"role": "user", "content": message}], debug=True)
sort_results
= [ele['content'] for ele in result.messages[::-1] if 'content' in ele and ele['content'] and ele['content']]
sort_results
= sort_results[0].split('\n') if sort_results else random.sample(v, 10)
sort_results
= [re.sub(r'^\d+[\.,、\s]*', '', ele).strip() for ele insort_results]
final_results[k].extend([title_2_data[get_most_similar(ele, list(title_2_data.keys()))]
for ele insort_results])

sort_output
= f'data/{output_file_pre}_sort_results.json'save_datas(sort_output, [final_results])#生成word save_2_word(final_results, output_file_pre)

View Code

3. 报告生成word_util.py


from docx importDocumentfrom docx.shared importInches, Pt, RGBColorfrom docx.enum.text importWD_PARAGRAPH_ALIGNMENTimportosdefsave_2_word(info_dict, file_pre):
doc
=Document()

categories
= ['', '']
category_color
= 'FF5733' for category incategories:
news
=info_dict[category]
category_paragraph
=doc.add_paragraph()
category
= '技术' if category == '' else '资讯'category_run=category_paragraph.add_run(category)
category_run.bold
=True
category_run.font.size
= Pt(25)
category_run.font.color.rgb
=RGBColor.from_string(category_color)
category_paragraph.alignment
=WD_PARAGRAPH_ALIGNMENT.CENTERfor i, item inenumerate(news):
title
= item['title_modify']
doc.add_heading(f
'{i+1}. {title}', level=1)

pic
= item['pic'] if 'pic' in item else '' if pic andos.path.exists(pic):
pic_paragraph
=doc.add_paragraph()
pic_paragraph.alignment
=WD_PARAGRAPH_ALIGNMENT.CENTER
doc.add_picture(pic, width
=Inches(5))

content
= item['content_modify']
doc.add_paragraph(content)

doc.save(f
'data/AI资讯每日速递({file_pre}).docx')

View Code

6.
优化思考

1.
爬取模块目前是串行下载,且未增加反爬机制
,后续可以增加多线程,且增加代理池机制。

2.
免费的
gpt-4o-mini
每日调用次数仅有
200

次,执行本任务远远不够
,因此后期尝试切换为私有部署的

Qwen2.5

其实已经尝试了
Qwen2.5
,以
vllm
部署,但与
Swarm
框架中的
OpenAi
接口存在少许不兼容,例如不支持特定的参数,只能运行一轮。不过可以进一步优化
Swarm
框架来进行适配。

本次实验本
qiang~
花费了
30
大洋,买了一个
gpt-4o-mini
,生成最终结果,直接耗费了其中的
8
个大洋,烧钱
....

3.
信息推送机制不支持,如一键同步
到公众号、

CSDN
、知乎,这块如果有精力可以基于网站的开发接口,实现一键自动发布文章。

7.
总结

一句话足矣
~

开发了一块
AI
资讯的自动聚合及报告生成工具,包括具体的框架、实现原理以及完整源码,满满诚意,提供给各位看官。欢迎转发、订阅
~

有问题可以私信或留言沟通!

8.
参考

(1)
Swarm:
https://github.com/openai/swarm

(2)
Crawl4ai:
https://github.com/unclecode/crawl4ai

(3)
资讯网站
:
https://www.aibase.com/news

标签: none

添加新评论