【AI的未来 - AI Agent系列】【MetaGPT】3. 实现一个订阅智能体,订阅消息并打通微信和邮件

这篇具有很好参考价值的文章主要介绍了【AI的未来 - AI Agent系列】【MetaGPT】3. 实现一个订阅智能体,订阅消息并打通微信和邮件。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

系列文章

【AI的未来 - AI Agent系列】【MetaGPT】0. 你的第一个MetaGPT程序

【AI的未来 - AI Agent系列】【MetaGPT】1. AI Agent如何重构世界

【AI的未来 - AI Agent系列】【MetaGPT】2. 实现自己的第一个Agent

本文主要内容

以《MetaGPT智能体开发入门》课程的 Task4 为例,利用MetaGPT从0开始实现一个自己的订阅智能体,定时收集网页信息,将信息汇总然后自动发送到微信和邮箱。

根据上面教程的介绍,你已经学会了如何针对具体场景开发一个实用的资讯收集助手;现在,你可以试着完成一个能订阅自己感兴趣的资讯的Agent:

  • 根据前面你所学习的爬虫基本知识(如果你对写爬虫代码感到不熟练,使用GPT帮助你),为你的Agent自定义两个获取资讯的Action类
    • Action 1:根据第四章 3.2.1和3.2.2的指引,独立实现对Github Trending(https://github.com/trending)页面的爬取,并获取每一个项目的 名称、URL链接、描述
    • Action 2:独立完成对Huggingface Papers(https://huggingface.co/papers)页面的爬取,先获取到每一篇Paper的链接(提示:标题元素中的href标签),并通过链接访问标题的描述页面(例如:https://huggingface.co/papers/2312.03818),在页面中获取一篇Paper的
      标题、摘要
  • 参考第三章 1.4 的内容,重写有关方法,使你的Agent能自动生成总结内容的目录,然后根据二级标题进行分块,每块内容做出对应的总结,形成一篇资讯文档;
  • 自定义Agent的SubscriptionRunner类,独立实现Trigger、Callback的功能,让你的Agent定时为通知渠道发送以上总结的资讯文档(尝试实现邮箱发送的功能,这是加分项)

这节课其实就是告诉我们一个订阅智能体的主要组成部分,然后练习怎样用MetaGPT串起来。

  • 订阅智能体的主要组成部分:
    • 定时器:定时触发订阅任务
    • 订阅信息的获取和总结:爬虫 + 大模型
    • callback:经过总结的信息通过回调,给第三方发消息(微信、邮箱等)

【AI的未来 - AI Agent系列】【MetaGPT】3. 实现一个订阅智能体,订阅消息并打通微信和邮件,大模型,python,人工智能,微信

Task4 - 任务一:独立实现对Github Trending页面的爬取,并获取每一个项目的 名称、URL链接、描述

这个跟着教程一步步走就行,过程中我没有遇到太大的问题,主要问题是import的包不够…

完整代码及注释

  • 先看执行结果,微信上收到推送
    【AI的未来 - AI Agent系列】【MetaGPT】3. 实现一个订阅智能体,订阅消息并打通微信和邮件,大模型,python,人工智能,微信
    【AI的未来 - AI Agent系列】【MetaGPT】3. 实现一个订阅智能体,订阅消息并打通微信和邮件,大模型,python,人工智能,微信

  • 完整代码及细节注释

# 加载 .env 到环境变量
from dotenv import load_dotenv, find_dotenv
_ = load_dotenv(find_dotenv())

import asyncio
from metagpt.subscription import SubscriptionRunner
from metagpt.roles import Searcher
from metagpt.schema import Message

import aiohttp
from bs4 import BeautifulSoup
from metagpt.actions.action import Action
from metagpt.logs import logger

# 1. Action1,爬虫,爬取 GitHub Trending 页面
class CrawlOSSTrending(Action):
    """
    爬取 GitHub Trending 页面
    """

    async def run(self, url: str = "https://github.com/trending"):
        async with aiohttp.ClientSession() as client:
            # async with client.get(url, proxy=CONFIG.global_proxy) as response:
            async with client.get(url) as response: # 1.1 我这里直接就能访问github,所以不需要设置代理
                response.raise_for_status()
                html = await response.text()
        # logger.debug(html)
        
        soup = BeautifulSoup(html, 'html.parser')
    
        repositories = []
    
        ## 1.2 解析Html页面元素,获取我们想要的数据:项目名称、url、描述、编程语言、star数量、fork数量
        for article in soup.select('article.Box-row'):
            repo_info = {}
            
            repo_info['name'] = article.select_one('h2 a').text.strip().replace("\n", "").replace(" ", "")
            repo_info['url'] = "https://github.com" + article.select_one('h2 a')['href'].strip()
    
            # Description
            description_element = article.select_one('p')
            repo_info['description'] = description_element.text.strip() if description_element else None
    
            # Language
            language_element = article.select_one('span[itemprop="programmingLanguage"]')
            repo_info['language'] = language_element.text.strip() if language_element else None
    
            # Stars and Forks
            stars_element = article.select('a.Link--muted')[0]
            forks_element = article.select('a.Link--muted')[1]
            repo_info['stars'] = stars_element.text.strip()
            repo_info['forks'] = forks_element.text.strip()
    
            # Today's Stars
            today_stars_element = article.select_one('span.d-inline-block.float-sm-right')
            repo_info['today_stars'] = today_stars_element.text.strip() if today_stars_element else None
    
            repositories.append(repo_info)
    
        return repositories


from typing import Any
from metagpt.actions.action import Action

TRENDING_ANALYSIS_PROMPT = """# Requirements
You are a GitHub Trending Analyst, aiming to provide users with insightful and personalized recommendations based on the latest
GitHub Trends. Based on the context, fill in the following missing information, generate engaging and informative titles, 
ensuring users discover repositories aligned with their interests.

# The title about Today's GitHub Trending
## Today's Trends: Uncover the Hottest GitHub Projects Today! Explore the trending programming languages and discover key domains capturing developers' attention. From ** to **, witness the top projects like never before.
## The Trends Categories: Dive into Today's GitHub Trending Domains! Explore featured projects in domains such as ** and **. Get a quick overview of each project, including programming languages, stars, and more.
## Highlights of the List: Spotlight noteworthy projects on GitHub Trending, including new tools, innovative projects, and rapidly gaining popularity, focusing on delivering distinctive and attention-grabbing content for users.
---
# Format Example
# [Title]

## Today's Trends
Today, ** and ** continue to dominate as the most popular programming languages. Key areas of interest include **, ** and **.
The top popular projects are Project1 and Project2.

## The Trends Categories
1. Generative AI
    - [Project1](https://github/xx/project1): [detail of the project, such as star total and today, language, ...]
    - [Project2](https://github/xx/project2): ...
...

## Highlights of the List
1. [Project1](https://github/xx/project1): [provide specific reasons why this project is recommended].
...

---
# Github Trending
{trending}
"""

## 2. 总结爬取到的信息,其实就是 prompt + 爬取到的信息,丢给大模型然后获取结果
class AnalysisOSSTrending(Action):

    async def run(
        self,
        trending: Any
    ):
        return await self._aask(TRENDING_ANALYSIS_PROMPT.format(trending=trending))
    
from metagpt.roles import Role
## 3. 定义role,初始化上面的两个action,按顺序执行
class OssWatcher(Role):
    def __init__(
        self,
        name="Codey",
        profile="OssWatcher",
        goal="Generate an insightful GitHub Trending analysis report.",
        constraints="Only analyze based on the provided GitHub Trending data.",
    ):
        super().__init__(name, profile, goal, constraints)
        self._init_actions([CrawlOSSTrending, AnalysisOSSTrending]) ## 3.1 初始化两个action
        self._set_react_mode(react_mode="by_order") ## 3.2 按顺序执行

    async def _act(self) -> Message:
        logger.info(f"{self._setting}: ready to {self._rc.todo}")
        todo = self._rc.todo

        msg = self.get_memories(k=1)[0] # 3.4 获取最新的一条memory,爬完后的数据存在里面供分析action使用
        result = await todo.run(msg.content)

        msg = Message(content=str(result), role=self.profile, cause_by=type(todo))
        self._rc.memory.add(msg)    # 3.3 第一个action执行完,把结果存入memory
        return msg

##============================ 以上为智能体定义 =================================================


from pydantic import BaseModel, Field
import time
# Trigger
class OssInfo(BaseModel):
    url: str
    timestamp: float = Field(default_factory=time.time)

from typing import Optional
from pytz import BaseTzInfo
from aiocron import crontab

## 4. 定时器的实现,用crontab库实现
class GithubTrendingCronTrigger():

    def __init__(self, spec: str, tz: Optional[BaseTzInfo] = None, url: str = "https://github.com/trending") -> None:
        self.crontab = crontab(spec, tz=tz) ## 4.1 这里将定时策略传入crontab
        self.url = url

    def __aiter__(self):
        return self

    async def __anext__(self): ## 4.3 __anext__ 方法会在每次迭代中被调用
        await self.crontab.next() ## 4.2 等待 crontab 下一次触发,不到定时时间,这里阻塞,不会执行后续代码
        return Message(self.url, OssInfo(url=self.url))

##======================== 以上为定时器定义 =================================================

import os   
# 5. wechat订阅消息callback,这里使用的是wxpusher实现的
# WxPusherClient的功能是给指定用户推送消息
class WxPusherClient:
    def __init__(self, token: Optional[str] = None, base_url: str = "http://wxpusher.zjiecode.com"):
        self.base_url = base_url
        self.token = token or os.environ["WXPUSHER_TOKEN"] # 5.1 从环境变量中获取token,所以你需要在环境变量中配置WXPUSHER_TOKEN或在配置文件中设置WXPUSHER_TOKEN

    async def send_message(
        self,
        content,
        summary: Optional[str] = None,
        content_type: int = 1,
        topic_ids: Optional[list[int]] = None,
        uids: Optional[list[int]] = None,
        verify: bool = False,
        url: Optional[str] = None,
    ):
        payload = {
            "appToken": self.token,
            "content": content,
            "summary": summary,
            "contentType": content_type,
            "topicIds": topic_ids or [],
            # 5.2 从环境变量中获取uids,所以你需要在环境变量中配置WXPUSHER_UIDS
            # uids是你想推送给哪个微信,必须是关注了你这个订阅号的微信才可以知道uid
            "uids": uids or os.environ["WXPUSHER_UIDS"].split(","), 
            "verifyPay": verify,
            "url": url,
        }
        url = f"{self.base_url}/api/send/message"
        return await self._request("POST", url, json=payload)

    async def _request(self, method, url, **kwargs):
        async with aiohttp.ClientSession() as session:
            async with session.request(method, url, **kwargs) as response:
                response.raise_for_status()
                return await response.json()

# 5.3 微信callback wrapper,使用WxPusherClient给指定微信推送消息
async def wxpusher_callback(msg: Message):
    client = WxPusherClient()
    await client.send_message(msg.content, content_type=3)

##======================== 以上为回调和微信推送定义 =========================================

# 6. 运行入口,这里为了测试方便,定时执行策略为当前时间+1分钟
from datetime import datetime, timedelta
current_time = datetime.now() ## 6.1 获取当前时间
target_time = current_time + timedelta(minutes=1) ## 6.2 目标时间,当前时间+1分钟
cron_expression = target_time.strftime('%M %H %d %m %w')
spec = cron_expression
logger.info(f"cron expression: {spec}")
async def main(spec: str = spec, wxpusher: bool = True):
    callbacks = []
    callbacks.append(wxpusher_callback)

    async def callback(msg):
        await asyncio.gather(*(call(msg) for call in callbacks)) # 6.3 遍历所有回调函数,触发回调,分发消息

    runner = SubscriptionRunner()
    await runner.subscribe(OssWatcher(), GithubTrendingCronTrigger(spec), callback) # 订阅智能体,本例中所有的工作都由它组织起来
    await runner.run()

if __name__ == "__main__":
    import fire
    fire.Fire(main)
  • 这里只实现了微信推送,没有去做Discord。用微信跑通一遍流程,也算完整了。
  • 爬虫不太熟悉,这里使用的教程中的代码,但是自己也试着用GPT自己写了爬虫代码,后面可以看到。
  • 定时器不熟悉、网络请求aiohttp也没用过… 但是通过代码完全可以会用,照葫芦画瓢还是很简单的。后面分拆开来具体学。不是本课程重点。

Task4 - 任务二:独立完成对Huggingface Papers页面的爬取

因为Hugging face我访问不到,需要出海,所以我这里换了个爬取页面,爬的是 https://github.com/topics,github的topic页面,然后通过热门topic进入相应topic链接获取简介,如下图。

【AI的未来 - AI Agent系列】【MetaGPT】3. 实现一个订阅智能体,订阅消息并打通微信和邮件,大模型,python,人工智能,微信
【AI的未来 - AI Agent系列】【MetaGPT】3. 实现一个订阅智能体,订阅消息并打通微信和邮件,大模型,python,人工智能,微信

代码及注释

这里就不贴完整代码了,定时器和callback完全相同,主要修改的是爬虫Action

......

## 1. 爬虫Action
class CrawlOSSTrending(Action):
	## 1.2 进入相应topic详细页面,获取topic的具体介绍
    async def _get_brief(self, url):
        async with aiohttp.ClientSession() as client:
            async with client.get(url) as response:
                response.raise_for_status()
                html = await response.text()
        soup = BeautifulSoup(html, 'html.parser')
        topic_list = soup.find_all('div', class_='markdown-body f5 mb-2')

        for topic_item in topic_list:
            topic_intro_tag = topic_item.find('p')
            topic_intro = topic_intro_tag.text.strip() if topic_intro_tag else 'No introduction available.'

            return topic_intro
    
    ## 1.1 爬取topics页面
    async def run(self, url: str = "https://github.com/topics"):
        async with aiohttp.ClientSession() as client:
            async with client.get(url) as response:
                response.raise_for_status()
                html = await response.text()
        soup = BeautifulSoup(html, 'html.parser')
    
        repositories = []
        
        topic_list = soup.find_all('li', class_='col-12 col-sm-6 col-md-4 mb-4')

        for topic_item in topic_list:
            repo_info = {}
            topic_link = topic_item.find('a', class_='no-underline')['href']
            ## 1.1.1 解析出topics的名称
            repo_info['name'] = topic_item.find('p', class_='f3').text.strip()
			
			 ## 1.1.2 解析出topics的url
            repo_info['url'] = f'https://github.com{topic_link}'

			 ## 1.1.3 通过_get_brief进入详细页面获取介绍
            repo_info['description'] = await self._get_brief(repo_info['url'])
    
            repositories.append(repo_info)
        return repositories

......

  • 运行结果展示
    【AI的未来 - AI Agent系列】【MetaGPT】3. 实现一个订阅智能体,订阅消息并打通微信和邮件,大模型,python,人工智能,微信

Task4 - 任务三:形成一篇资讯文档

这个任务其实没看懂什么意思,根据自己的理解实现了一种。

我认为这个任务其实不是练习什么新东西,而是学会怎样将现有的知识串起来,数据流打通。例如我下面的实现步骤。

    1. 爬取 github trending 信息
    1. 总结 github trending 信息,前面的任务到这里结束,给callback了
    1. 本任务将 github trending 总结信息给到 WriteDirectory 去生成目录
    1. 生成的目录给到WriteContent Action,根据目录写内容

代码及注释

  • 先看运行结果 - 微信推送

【AI的未来 - AI Agent系列】【MetaGPT】3. 实现一个订阅智能体,订阅消息并打通微信和邮件,大模型,python,人工智能,微信

  • 先看运行结果 - Markdown文件
    【AI的未来 - AI Agent系列】【MetaGPT】3. 实现一个订阅智能体,订阅消息并打通微信和邮件,大模型,python,人工智能,微信

  • 代码及细节注释

# 加载 .env 到环境变量
from dotenv import load_dotenv, find_dotenv
_ = load_dotenv(find_dotenv())

import asyncio
from metagpt.subscription import SubscriptionRunner
from metagpt.roles import Searcher
from metagpt.schema import Message

import aiohttp
from bs4 import BeautifulSoup
from metagpt.actions.action import Action
from metagpt.logs import logger
from datetime import datetime
from typing import Dict
from metagpt.actions.write_tutorial import WriteDirectory, WriteContent
from metagpt.const import TUTORIAL_PATH
from metagpt.roles import Role
from metagpt.utils.file import File
import fire
import time
from metagpt.prompts.tutorial_assistant import DIRECTORY_PROMPT, CONTENT_PROMPT
from metagpt.utils.common import OutputParser

## 1.写目录的类,根据总结的GitHub Trending趋势,抽取出一二级标题
class WriteDirectory(Action):
    """Action class for writing tutorial directories."""

    def __init__(self, name: str = "", language: str = "Chinese", *args, **kwargs):
        super().__init__(name, *args, **kwargs)
        self.language = language

    async def run(self, topic: str, *args, **kwargs) -> Dict:
        """Execute the action to generate a tutorial directory according to the topic.

        Args:
            topic: The tutorial topic.

        Returns:
            the tutorial directory information, including {"title": "xxx", "directory": [{"dir 1": ["sub dir 1", "sub dir 2"]}]}.
        """
        COMMON_PROMPT = """
        你需要在以下topic中抽取出标题内容,包含一级标题和二级标题。the topic "{topic}".
        注意:一级标题以"##"开头,二级标题以数字序号加"."开头。
        """

        DIRECTORY_PROMPT = COMMON_PROMPT + """
        Please provide the specific table of contents for this tutorial, strictly following the following requirements:
        1. The output must be strictly in the specified language, {language}.
        2. Answer strictly in the dictionary format like {{"title": "xxx", "directory": [{{"dir 1": ["sub dir 1", "sub dir 2"]}}, {{"dir 2": ["sub dir 3", "sub dir 4"]}}]}}.
        3. The directory should be as specific and sufficient as possible, with a primary and secondary directory.The secondary directory is in the array.
        4. Do not have extra spaces or line breaks.
        5. Each directory title has practical significance.
        """
        prompt = DIRECTORY_PROMPT.format(topic=topic, language=self.language)
        resp = await self._aask(prompt=prompt)
        return OutputParser.extract_struct(resp, dict)

## 2. 根据目录标题,写新闻稿
class WriteContent(Action):
    """Action class for writing tutorial content."""

    def __init__(self, name: str = "", directory: str = "", language: str = "Chinese", *args, **kwargs):
        super().__init__(name, *args, **kwargs)
        self.language = language
        self.directory = directory
        logger.info(f"init writeContent, {directory}")

    async def run(self, topic: str, *args, **kwargs) -> str:
        """Execute the action to write document content according to the directory and topic."""
        COMMON_PROMPT = """
        你是一个github技术专家的老手了,现在你需要写一篇关于github每日热门项目趋势的新闻,新闻标题是"{topic}".
        """
        CONTENT_PROMPT = COMMON_PROMPT + """ 
        现在我将给你这个主题的模块目录标题。
        请根据这个标题,写不少于500字的新闻稿。

        The module directory titles for the topic is as follows:
        {directory}

        Strictly limit output according to the following requirements:
        1. Follow the Markdown syntax format for layout.
        2. The output must be strictly in the specified language, {language}.
        3. Do not have redundant output, including concluding remarks.
        4. Strict requirement not to output the topic "{topic}".
        """
        
        prompt = CONTENT_PROMPT.format(
            topic=topic, language=self.language, directory=self.directory)
        return await self._aask(prompt=prompt)

## 3. 爬虫,爬取trending信息
class CrawlOSSTrending(Action):
    async def run(self, url: str = "https://github.com/trending"):
        ...... 与前面的代码完全相同
        return repositories

......与前面的代码完全相同TRENDING_ANALYSIS_PROMPT

## 3. 总结Trending的Action
class AnalysisOSSTrending(Action):

    async def run(
        self,
        trending: Any
    ):
        result = await self._aask(TRENDING_ANALYSIS_PROMPT.format(trending=trending))
        return result
    
from metagpt.roles import Role
## 4. 订阅智能体Role
class OssWatcher(Role):
    def __init__(
        self,
        name="Codey",
        profile="OssWatcher",
        goal="Generate an insightful GitHub Trending analysis report.",
        constraints="Only analyze based on the provided GitHub Trending data.",
    ):
        super().__init__(name, profile, goal, constraints)
        self._init_actions([CrawlOSSTrending, AnalysisOSSTrending, WriteDirectory(language="Chinese")])  ## 4.1 本任务的重点,在总结完trending之后,将内容直接给WriteDirectory去抽取目录
        # self._set_react_mode(react_mode="by_order") ## 4.2 这里与前面不一样,一定要注意不要写这一句!设置了这一句,会执行到 WriteDirectory就结束,不会执行后面动态添加的WriteContent Action,应该与执行顺序有关,后续看下源码
        self.topic = ""
        self.main_title = ""
        self.total_content = ""
        self.language = "Chinese"

	## 4.3 这里执行trendding爬虫和总结的action
    async def _act1(self) -> Message:
        logger.info(f"{self._setting}: ready to {self._rc.todo}")
        todo = self._rc.todo

        msg = self.get_memories(k=1)[0]
        result = await todo.run(msg.content)

        msg = Message(content=str(result), role=self.profile, cause_by=type(todo))
        self._rc.memory.add(msg) ## 4.4 思考1:这里将爬虫结果填入memory,下面又返回了memory,返回的memory在MetaGPT源码中不会再次添加进Memory了?什么时候需要手动添加,什么时候不用手动添加?
        return msg
    
    async def _think(self) -> None:
        """Determine the next action to be taken by the role."""
        logger.info(self._rc.state)
        ...... 与之前代码一样

    async def _handle_directory(self, titles: Dict) -> Message:
        """Handle the directories for the tutorial document."""
        self.main_title = titles.get("title")
        directory = f"{self.main_title}\n"
        self.total_content += f"# {self.main_title}"
        actions = list()
        for first_dir in titles.get("directory"):
            key = list(first_dir.keys())[0]
            directory += f"- {key}\n"
            for second_dir in first_dir[key]:
                directory += f"  - {second_dir}\n"
                actions.append(WriteContent(
                language=self.language, directory=second_dir)) ## 4.5 根据目录,动态添加WriteContent Action

        self._init_actions(actions)
        self._rc.todo = None
        return Message(content=directory)

    async def _act(self) -> Message:
        """Perform an action as determined by the role."""
        
        todo = self._rc.todo
        if type(todo) is CrawlOSSTrending or type(todo) is AnalysisOSSTrending:
            return await self._act1() ## 4.6 trending相关的执行这个act
        
        time.sleep(20) ## 4.7 避免OpenAI的请求速率和tokend限制,每次调用前先歇个20s......不是办法的办法
        if type(todo) is WriteDirectory:
            msg = self._rc.memory.get(k=1)[0] ## 4.8 这里拿的是AnalysisOSSTrending的结果,一大堆文字
            self.topic = msg.content
            resp = await todo.run(topic=self.topic)
            return await self._handle_directory(resp)
        
        resp = await todo.run(topic="Github今日热门项目") ## 4.9 这里传入的是一个固定题目,WriteContent是根据这个题目和具体的目录写内容
        if self.total_content != "":
            self.total_content += "\n\n\n"
        self.total_content += resp
        return Message(content=resp, role=self.profile) ## 4.10 思考2:你看,这里就直接返回了msg,没像思考1那里一样先手动添加msg :self._rc.memory.add(msg),需要弄清楚

    async def _react(self) -> Message:
        """Execute the assistant's think and actions."""
        while True:
            await self._think()
            if self._rc.todo is None:
                break
            msg = await self._act()
        
        ## 4.11 写文件
        root_path = TUTORIAL_PATH / datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
        await File.write(root_path, f"{self.main_title}.md", self.total_content.encode('utf-8'))
        return msg

......定时器和callback,以及运行入口的代码都不变,与前面的任务一样

可能存在的问题和思考

  1. 我没有去详细验证生成的这些项目简介和项目特点是否完全准确,只是答题看着还可以。要详细验证,或保证结果的完全正确性,需要更精细的控制手段。

  2. 推送信息只推送最后一段,没有全文推送,这可能是与callback的msg内容只获取了最后一段有关,有空再调一下

  3. 有的目录没有生成内容,触发了大模型的限制?怎么解决?有待研究

【AI的未来 - AI Agent系列】【MetaGPT】3. 实现一个订阅智能体,订阅消息并打通微信和邮件,大模型,python,人工智能,微信

  1. 自己写的Prompt很烂,勉强能用…
  2. Python代码水平比较烂,都是想到哪写到哪,读者见谅…

Task4 - 任务四:实现邮箱发送的功能

实现邮箱发送功能,也就是实现一个邮箱的callback,callback里调用邮箱接口发送信息。

代码及注释

  • 先看运行结果

【AI的未来 - AI Agent系列】【MetaGPT】3. 实现一个订阅智能体,订阅消息并打通微信和邮件,大模型,python,人工智能,微信

  • 代码及细节注释

...... Action和role,以及定时器的代码不变

import email.utils
from metagpt.schema import Message
import smtplib
from email.mime.text import MIMEText

class EmailClient:
    def __init__(self):
        pass
    
    
    async def send_email(self, sender_email, receiver_email, subject, message):
        # 设置发件人和收件人的邮箱地址
        sender = sender_email
        receiver = receiver_email

        # 设置邮件内容
        msg = MIMEText(message, 'plain', 'utf-8')
        msg['Subject'] = subject
        msg['From'] = email.utils.formataddr(('同学小张', sender))
        msg['To'] = receiver_email

        # 连接到SMTP服务器
        smtp_server = 'smtp.qq.com'
        smtp_port = 25
        smtp_username = '819418509@qq.com'
        smtp_password = 'xxxxxxxxx' # 此处密码是登录QQ邮箱后开启STMP处生成的,非账号密码

        try:
            server = smtplib.SMTP(smtp_server, smtp_port)
            server.login(smtp_username, smtp_password)
            server.sendmail(sender, receiver, msg.as_string())
            server.quit()
            print("邮件发送成功!")
        except Exception as e:
            print("邮件发送失败:", str(e))
    
async def email_pusher_callback(msg: Message):
    client = EmailClient()
    # 调用函数发送邮件
    sender_email = '819418509@qq.com' # 发送邮箱
    receiver_email = 'xxxxxxx@163.com' # 接收邮箱
    subject = '今日GitHub Trending分析'
    message = msg.content
    await client.send_email(sender_email, receiver_email, subject, message)

# asyncio.run(email_pusher_callback(Message("test"))) # 邮箱接口测试

# 运行入口,
from datetime import datetime, timedelta
current_time = datetime.now()
target_time = current_time + timedelta(minutes=1)
cron_expression = target_time.strftime('%M %H %d %m %w')
spec = cron_expression
logger.info(f"cron expression: {spec}")
async def main(spec: str = spec, wxpusher: bool = True):
    callbacks = []
    callbacks.append(wxpusher_callback) # 微信callback  
    callbacks.append(email_pusher_callback) # 邮箱callback

    if not callbacks:
        async def _print(msg: Message):
            print(msg.content)
        callbacks.append(_print)

    async def callback(msg):
        await asyncio.gather(*(call(msg) for call in callbacks))

    runner = SubscriptionRunner()
    await runner.subscribe(OssWatcher(), GithubTrendingCronTrigger(spec), callback)
    await runner.run()

if __name__ == "__main__":
    import fire
    fire.Fire(main)

先写到这,总体来说,订阅智能体的任务是跑通了,对一些概念和MetaGPT的用法有了认知。后面有时间再详细拆解每一步的实现和细节,以及过程中遇到的坑及解决方法。还有爬虫的代码如何用GPT写,微信推送如何做出来的,邮箱推送如何做出来的,定时器的简单探索等,这些本课用到的相关能力也要进一步拆解和学习总结。文章来源地址https://www.toymoban.com/news/detail-806175.html

到了这里,关于【AI的未来 - AI Agent系列】【MetaGPT】3. 实现一个订阅智能体,订阅消息并打通微信和邮件的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【AI的未来 - AI Agent系列】【MetaGPT】6. 用ActionNode重写技术文档助手

    前文【【AI的未来 - AI Agent系列】【MetaGPT】5. 更复杂的Agent实战 - 实现技术文档助手】我们用Action实现了一个技术文档助手,在学习了ActionNode技术之后,我们用ActionNode来尝试重写一下这个技术文档助手。 【AI的未来 - AI Agent系列】【MetaGPT】5. 更复杂的Agent实战 - 实现技术文档

    2024年02月21日
    浏览(38)
  • 【AI的未来 - AI Agent系列】【MetaGPT】4.1 细说我在ActionNode实战中踩的那些坑

    上篇文章 【AI的未来 - AI Agent系列】【MetaGPT】4. ActionNode从理论到实战 中我们学习了MetaGPT中ActionNode的理论和基本用法,跑通了一个简单程序。 原理和代码实现都很简单,但是谁知道当时我遇到了多少坑… 本文带你看看我在上文中遇到的坑,希望能帮到你! 下面说下我遇到的

    2024年01月22日
    浏览(31)
  • MetaGPT( The Multi-Agent Framework):颠覆AI开发的革命性多智能体元编程框架

    一个多智能体元编程框架,给定一行需求,它可以返回产品文档、架构设计、任务列表和代码。这个项目提供了一种创新的方式来管理和执行项目,将需求转化为具体的文档和任务列表,使项目管理变得高效而智能。对于需要进行规划和协调的项目,这个框架提供了强大的支

    2024年01月20日
    浏览(40)
  • MetaGPT入门(三)-OSS订阅智能体

    ‍‬‍‌⁡‍⁢⁤​‍​⁡⁡‬⁢‍​​⁡​​‬‬‍⁡​⁢⁡​​⁣⁡⁤​‍‌​​‌​‍⁣⁣​⁤⁢⁡​《MetaGPT智能体开发入门》教程 - 飞书云文档 (feishu.cn) 照抄的教程啊,增加了个发送邮件的功能,发送邮件功能测试过了,可以发送成功,但是整个的没运行成功

    2024年01月23日
    浏览(32)
  • 智能体AI(Agent AI),多模态交互(MultiModal Interaction), 现阶段综述及未来展望

    本文覆盖了在不同领域和应用程序中的,可进行感知和对应行动的Agent AI系统概述。 Agent AI正在成为通用人工智能(AGI)的一个有前途的路径。 人工智能训练已经证明了在物理世界中进行多模态理解的能力。它通过利用生成人工智能和多个独立的数据源,为现实不可知的训练

    2024年04月17日
    浏览(48)
  • 多Agent框架之-CrewAI-人工智能代理团队的未来

    CrewAI- a role playing AI Agents git地址:https://github.com/joaomdmoura/crewai#why-crewai langchain地址:CrewAI Unleashed: Future of AI Agent Teams Agent具有与另一个Agent联系的能力,以委派工作或提出问题。 任务可以使用特定的代理工具覆盖,这些工具应该被使用,同时还可以指定特定的代理来处理它们

    2024年02月03日
    浏览(53)
  • AI 编程的机会和未来:从 Copilot 到 Code Agent

    大模型的快速发展带来了 AI 应用的井喷。统计 GPT 使用情况,编程远超其他成为落地最快、使用率最高的场景。如今,大量程序员已经习惯了在 AI 辅助下进行编程。数据显示,GitHub Copilot 将程序员工作效率提升了 55%,一些实验中 AI 甚至展示出超越普通程序员的能力。目前

    2024年01月21日
    浏览(38)
  • 探索生成式AI的未来:Chat与Agent的较量与融合

    近年来,生成式人工智能(AI)不仅在技术界引起了广泛关注,更成为了推动多个行业革新的关键力量。这种技术之所以备受瞩目,不仅在于其独特的创造性和高效性,还在于它对未来商业模式和社会结构可能产生的深远影响。在这篇文章中,我们将全面介绍生成式AI的概念、

    2024年04月09日
    浏览(36)
  • AGI时代的奠基石:Agent+算力+大模型是构建AI未来的三驾马车吗?

     ★AI Agent;人工智能体,RPA;大语言模型;prompt;Copilot;AGI;ChatGPT;LLM;AIGC;CoT;Cortex;Genius;MetaGPT;大模型;人工智能;通用人工智能;数据并行;模型并行;流水线并行;混合精度训练;梯度累积;Nvidia;A100;H100;A800;H800;L40s;混合专家;910B;HGX H20;L20 PCIe;L2 PCIe

    2024年02月04日
    浏览(31)
  • 人工智能入门教学——AI代理(AI Agent)

    目录 一、简介 二、特征 三、结构 四、工作流程 五、类型 六、应用 AI代理 (Artificial Intelligence Agent)是指 使用人工智能技术和算法来执行特定任务、解决问题或实现目标的 程序或系统 。 这些代理可以是简单的程序,也可以是复杂的系统,其设计目的是模拟和执行类似人类智

    2024年02月03日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包