MetaGPT入门(三)-OSS订阅智能体

这篇具有很好参考价值的文章主要介绍了MetaGPT入门(三)-OSS订阅智能体。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

‍‬‍‌⁡‍⁢⁤​‍​⁡⁡‬⁢‍​​⁡​​‬‬‍⁡​⁢⁡​​⁣⁡⁤​‍‌​​‌​‍⁣⁣​⁤⁢⁡​《MetaGPT智能体开发入门》教程 - 飞书云文档 (feishu.cn)

import asyncio
import os
import smtplib
import time
from email.mime.multipart import MIMEMultipart
from email.mime.text import MIMEText
from typing import Any, AsyncGenerator, Awaitable, Callable, Optional

import aiohttp
import discord
import pandas as pd
from aiocron import crontab
from bs4 import BeautifulSoup
from pydantic import BaseModel, Field
from pytz import BaseTzInfo

from metagpt.actions.action import Action
from metagpt.config import CONFIG
from metagpt.logs import logger
from metagpt.roles import Role
from metagpt.schema import Message


# 订阅模块,可以from metagpt.subscription import SubscriptionRunner导入,这里贴上代码供参考
class SubscriptionRunner(BaseModel):
    """A simple wrapper to manage subscription tasks for different roles using asyncio.
    Example:
        >>> import asyncio
        >>> from metagpt.subscription import SubscriptionRunner
        >>> from metagpt.roles import Searcher
        >>> from metagpt.schema import Message
        >>> async def trigger():
        ...     while True:
        ...         yield Message("the latest news about OpenAI")
        ...         await asyncio.sleep(3600 * 24)
        >>> async def callback(msg: Message):
        ...     print(msg.content)
        >>> async def main():
        ...     pb = SubscriptionRunner()
        ...     await pb.subscribe(Searcher(), trigger(), callback)
        ...     await pb.run()
        >>> asyncio.run(main())
    """

    tasks: dict[Role, asyncio.Task] = Field(default_factory=dict)

    class Config:
        arbitrary_types_allowed = True

    async def subscribe(
            self,
            role: Role,
            trigger: AsyncGenerator[Message, None],
            callback: Callable[
                [
                    Message,
                ],
                Awaitable[None],
            ],
    ):
        """Subscribes a role to a trigger and sets up a callback to be called with the role's response.
        Args:
            role: The role to subscribe.
            trigger: An asynchronous generator that yields Messages to be processed by the role.
            callback: An asynchronous function to be called with the response from the role.
        """
        loop = asyncio.get_running_loop()

        async def _start_role():
            async for msg in trigger:
                resp = await role.run(msg)
                await callback(resp)

        self.tasks[role] = loop.create_task(_start_role(), name=f"Subscription-{role}")

    async def unsubscribe(self, role: Role):
        """Unsubscribes a role from its trigger and cancels the associated task.
        Args:
            role: The role to unsubscribe.
        """
        task = self.tasks.pop(role)
        task.cancel()

    async def run(self, raise_exception: bool = True):
        """Runs all subscribed tasks and handles their completion or exception.
        Args:
            raise_exception: _description_. Defaults to True.
        Raises:
            task.exception: _description_
        """
        while True:
            for role, task in self.tasks.items():
                if task.done():
                    if task.exception():
                        if raise_exception:
                            raise task.exception()
                        logger.opt(exception=task.exception()).error(f"Task {task.get_name()} run error")
                    else:
                        logger.warning(
                            f"Task {task.get_name()} has completed. "
                            "If this is unexpected behavior, please check the trigger function."
                        )
                    self.tasks.pop(role)
                    break
            else:
                await asyncio.sleep(1)


# Actions 的实现
TRENDING_ANALYSIS_PROMPT = """# Requirements
You are a GitHub Trending Analyst, aiming to provide users with insightful and personalized recommendations based on the latest
GitHub Trends. Based on the context, fill in the following missing information, generate engaging and informative titles, 
ensuring users discover repositories aligned with their interests.

# The title about Today's GitHub Trending
## Today's Trends: Uncover the Hottest GitHub Projects Today! Explore the trending programming languages and discover key domains capturing developers' attention. From ** to **, witness the top projects like never before.
## The Trends Categories: Dive into Today's GitHub Trending Domains! Explore featured projects in domains such as ** and **. Get a quick overview of each project, including programming languages, stars, and more.
## Highlights of the List: Spotlight noteworthy projects on GitHub Trending, including new tools, innovative projects, and rapidly gaining popularity, focusing on delivering distinctive and attention-grabbing content for users.
---
# Format Example

```
# [Title]

## Today's Trends
Today, ** and ** continue to dominate as the most popular programming languages. Key areas of interest include **, ** and **.
The top popular projects are Project1 and Project2.

## The Trends Categories
1. Generative AI
    - [Project1](https://github/xx/project1): [detail of the project, such as star total and today, language, ...]
    - [Project2](https://github/xx/project2): ...
...

## Highlights of the List
1. [Project1](https://github/xx/project1): [provide specific reasons why this project is recommended].
...
```

---
# Github Trending
{trending}
"""


class CrawlOSSTrending(Action):

    async def run(self, url: str = "https://github.com/trending"):
        async with aiohttp.ClientSession() as client:
            async with client.get(url, proxy=CONFIG.global_proxy) as response:
                response.raise_for_status()
                html = await response.text()

        soup = BeautifulSoup(html, 'html.parser')

        repositories = []

        for article in soup.select('article.Box-row'):
            repo_info = {}

            repo_info['name'] = article.select_one('h2 a').text.strip().replace("\n", "").replace(" ", "")
            repo_info['url'] = "https://github.com" + article.select_one('h2 a')['href'].strip()

            # Description
            description_element = article.select_one('p')
            repo_info['description'] = description_element.text.strip() if description_element else None

            # Language
            language_element = article.select_one('span[itemprop="programmingLanguage"]')
            repo_info['language'] = language_element.text.strip() if language_element else None

            # Stars and Forks
            stars_element = article.select('a.Link--muted')[0]
            forks_element = article.select('a.Link--muted')[1]
            repo_info['stars'] = stars_element.text.strip()
            repo_info['forks'] = forks_element.text.strip()

            # Today's Stars
            today_stars_element = article.select_one('span.d-inline-block.float-sm-right')
            repo_info['today_stars'] = today_stars_element.text.strip() if today_stars_element else None

            repositories.append(repo_info)

        return repositories


class AnalysisOSSTrending(Action):

    async def run(
            self,
            trending: Any
    ):
        return await self._aask(TRENDING_ANALYSIS_PROMPT.format(trending=trending))


# Role实现
class OssWatcher(Role):
    def __init__(
            self,
            name="Codey",
            profile="OssWatcher",
            goal="Generate an insightful GitHub Trending analysis report.",
            constraints="Only analyze based on the provided GitHub Trending data.",
    ):
        super().__init__(name, profile, goal, constraints)
        self._init_actions([CrawlOSSTrending, AnalysisOSSTrending])
        self._set_react_mode(react_mode="by_order")

    async def _act(self) -> Message:
        logger.info(f"{self._setting}: ready to {self._rc.todo}")
        # By choosing the Action by order under the hood
        # todo will be first SimpleWriteCode() then SimpleRunCode()
        todo = self._rc.todo

        msg = self.get_memories(k=1)[0]  # find the most k recent messages
        result = await todo.run(msg.content)

        msg = Message(content=str(result), role=self.profile, cause_by=type(todo))
        self._rc.memory.add(msg)
        return msg


# Trigger
class OssInfo(BaseModel):
    url: str
    timestamp: float = Field(default_factory=time.time)


class GithubTrendingCronTrigger():

    def __init__(self, spec: str, tz: Optional[BaseTzInfo] = None, url: str = "https://github.com/trending") -> None:
        self.crontab = crontab(spec, tz=tz)
        self.url = url

    def __aiter__(self):
        return self

    async def __anext__(self):
        await self.crontab.next()
        return Message(self.url, OssInfo(url=self.url))


# callback
async def discord_callback(msg: Message):
    intents = discord.Intents.default()
    intents.message_content = True
    client = discord.Client(intents=intents, proxy=CONFIG.global_proxy)
    token = os.environ["DISCORD_TOKEN"]
    channel_id = int(os.environ["DISCORD_CHANNEL_ID"])
    async with client:
        await client.login(token)
        channel = await client.fetch_channel(channel_id)
        lines = []
        for i in msg.content.splitlines():
            if i.startswith(("# ", "## ", "### ")):
                if lines:
                    await channel.send("\n".join(lines))
                    lines = []
            lines.append(i)

        if lines:
            await channel.send("\n".join(lines))


class WxPusherClient:
    def __init__(self, token: Optional[str] = None, base_url: str = "http://wxpusher.zjiecode.com"):
        self.base_url = base_url
        self.token = token or os.environ["WXPUSHER_TOKEN"]

    async def send_message(
            self,
            content,
            summary: Optional[str] = None,
            content_type: int = 1,
            topic_ids: Optional[list[int]] = None,
            uids: Optional[list[int]] = None,
            verify: bool = False,
            url: Optional[str] = None,
    ):
        payload = {
            "appToken": self.token,
            "content": content,
            "summary": summary,
            "contentType": content_type,
            "topicIds": topic_ids or [],
            "uids": uids or os.environ["WXPUSHER_UIDS"].split(","),
            "verifyPay": verify,
            "url": url,
        }
        url = f"{self.base_url}/api/send/message"
        return await self._request("POST", url, json=payload)

    async def _request(self, method, url, **kwargs):
        async with aiohttp.ClientSession() as session:
            async with session.request(method, url, **kwargs) as response:
                response.raise_for_status()
                return await response.json()


async def wxpusher_callback(msg: Message):
    client = WxPusherClient()
    await client.send_message(msg.content, content_type=3)


async def send_via_email(msg:Message):

    # 设置邮件信息
    sender = '123455@qq.com'
    receiver = '123455@qq.com'
    subject = 'Github Trending'
    body = '<br>'.join(msg.content)

    # 创建邮件对象
    msg = MIMEMultipart()
    msg['From'] = sender
    msg['To'] = receiver
    msg['Subject'] = subject

    # 添加邮件正文
    msg.attach(MIMEText(body, 'html'))

    # 发送邮件
    try:
        server = smtplib.SMTP('smtp.qq.com', 587)
        server.starttls()
        #注意这个地方的密码不是邮箱密码,是授权码,不知道咋获取的搜索一下吧
        server.login(sender, 'authenticCODE')
        server.sendmail(sender, receiver, msg.as_string())
        server.quit()
        print('邮件发送成功')
    except Exception as e:
        print('邮件发送失败', e)

# 运行入口,cron 表达式遵循特定的格式,通常是:分钟、小时、日、月、星期几。测试使用,1分钟一次,用几下就关掉程序啊,不然。。。
async def main(spec: str = "1 * * * *", discord: bool = False, wxpusher: bool = False, email:bool = True):
    callbacks = []
    if discord:
        callbacks.append(discord_callback)

    if wxpusher:
        callbacks.append(wxpusher_callback)

    if email:
        callbacks.append(send_via_email)

    if not callbacks:
        async def _print(msg: Message):
            print(msg.content)

        callbacks.append(_print)

    async def callback(msg):
        await asyncio.gather(*(call(msg) for call in callbacks))

    runner = SubscriptionRunner()
    await runner.subscribe(OssWatcher(), GithubTrendingCronTrigger(spec), callback)
    await runner.run()


if __name__ == "__main__":
    import fire

    fire.Fire(main)

照抄的教程啊,增加了个发送邮件的功能,发送邮件功能测试过了,可以发送成功,但是整个的没运行成功,应该是没科学上网,打不开trending页面的事,时间紧张,有空再搞通或者爬个国内的网页。文章来源地址https://www.toymoban.com/news/detail-818979.html

到了这里,关于MetaGPT入门(三)-OSS订阅智能体的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • MetaGPT入门(一)

    本文在Win11操作系统下进行,工具pycharm 一、环境准备 1.建议使用conda虚拟环境 安装anaconda参考:Windows10下Anaconda的安装_windows anaconda 路径-CSDN博客 打开Anaconda Powershell Prompt命令窗口,输入下面命令,创建3.10版本的python解释器 conda create -n metagpt python=3.10 2.切换到创建的metagpt虚拟

    2024年01月17日
    浏览(46)
  • MetaGPT( The Multi-Agent Framework):颠覆AI开发的革命性多智能体元编程框架

    一个多智能体元编程框架,给定一行需求,它可以返回产品文档、架构设计、任务列表和代码。这个项目提供了一种创新的方式来管理和执行项目,将需求转化为具体的文档和任务列表,使项目管理变得高效而智能。对于需要进行规划和协调的项目,这个框架提供了强大的支

    2024年01月20日
    浏览(54)
  • 云服务——阿里云OSS的入门使用

    紧接着上一集里面说到的,网络资源大多都不会直接放在服务器端,而是放在别的专门的云存储平台里面。 比如短信服务如果自己实现会非常繁琐,需要和各个运营商对接,使用阿里云提供的短信服务后只需要调用短信服务即可  存储网络资源需要用到对象存储:概念如下  

    2024年02月09日
    浏览(39)
  • Java 使用OSS 文件上传+下载 简单入门

    官方SDK文档:Java对象/文件_对象存储-阿里云帮助中心           阿里云对象存储OSS(Object Storage Service)为您提供基于网络的数据存取服务。使用OSS,可以通过网络随时存储和调用包括文本、图片、音视频在内的各类数据文件;可以通过OSS控制台创建Bucket,并将文件上传

    2024年02月10日
    浏览(46)
  • RabbitMQ入门案例之发布订阅模式

    本文章主要介绍RabbitMQ的发布订阅模式,该模式下,消息为广播形式,一经发布则会进入交换机绑定的队列中,详细介绍可以阅读官方文档。 官网文档地址:https://rabbitmq.com/getstarted.html RabbitMQ中的发布与订阅模式是一种消息传递的方式,用于在分布式系统中传递消息。 在该模

    2024年02月09日
    浏览(57)
  • 【Midjourney入门教程1】Midjourney的注册、订阅

    AI绘画即指人工智能绘画,是一种计算机生成绘画的方式。是AIGC应用领域内的一大分支。 AI绘画主要分为两个部分,一个是对图像的分析与判断,即“学习”,一个是对图像的处理和还原,即“输出”。人工智能通过对数以万计的图像及绘画作品进行不断学习,如今已经达到

    2024年02月02日
    浏览(48)
  • 手把手教你Midjourney|入门·订阅管理套餐

    大家好,这里是Dennis的AI说,上一期是教大家如何注册一个账号,那么今天的教程教是教会大家如何在Midjourney上购买套餐以及后续的退订步骤。 Midjourney里购买套餐主要是针对于后续的做图时间速度,不同套餐生成图片的速度是不一样的。 如何在Midjourney里正确的订阅管理套餐

    2024年02月08日
    浏览(71)
  • 腾讯员工内部培训:微信订阅号运营从入门到精通

    这篇文章算是对微信运营的小小总结了,说不上是经验,毕竟一万个哈姆雷特有一万种活法。这篇文章从定位、运营(内容运营、用户运营、微信元素拆解)、推广、工具、公众号推荐几个方面来总结微信运营的一些规律。 1.定位 在开始运营一个微信公众号之前可以从以下三

    2024年02月08日
    浏览(48)
  • 入门人工智能 —— 学习 python 使用 IDE :vscode 完成编程 (2)

    在上一篇文章中,介绍了如何入门人工智能编程,并开始了学习 Python 编程语言的基础知识。本文是系列文章的第二部分,我们将继续探讨 Python 编程,但这次我们将使用 Visual Studio Code(简称 VSCode)作为我们的集成开发环境(IDE)。VSCode 是一个功能强大的文本编辑器,它可以

    2024年02月09日
    浏览(64)
  • Fomepay:ChatGPT、Midjourney等人工智能圈最稳定的订阅工具,全面解析!

    众所周知可以使用Fomepay官网的美版虚拟信用卡来订阅Chatgpt/Midjourney-Plus会员。但是很多人还不是很了解Fomepay平台。这篇文章将从多角度全面分析,全网的最全讲解,读完会有一个清晰的认识。 FomePay是一家专注于资产管理和跨境支付技术的香港金融科技公司。其核心价值观是

    2024年02月02日
    浏览(63)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包