ARL分析与进阶使用

这篇具有很好参考价值的文章主要介绍了ARL分析与进阶使用。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

在使用ARL(Asset Reconnaissance Lighthouse资产侦察灯塔系统,项目地址地址为https://github.com/TophantTechnology/ARL)的时候,有两个问题比较困扰我:

1. ARL使用Fofa导入数据的时候怎么降重?

2. 如何自己手动编写Poc?

在网上查阅了一些相关资料后,我发现并没有有师傅写的很清晰,于是诞生了写这篇文章的想法。

这篇文章不涉及ARL的基础搭建过程和基础使用过程,如果您之前没有使用过ARL,详情可以参考官网教程:https://tophanttechnology.github.io/ARL-doc/system_install/

1.Fofa降重

先说结论,是由于Fofa_api的限制而不是ARL本身的问题

来源于我之前的使用体验,使用同样的Fofa语句,比如能搜到大量地的资产,但是ARL只会跑几千条,然后我们反复运行发现得到的资产结果是一致的,这样就大大地影响了配合Fofa使用好处,只能自己更换不同的Fofa语句来实现降重,非常麻烦。

首先我们先黑盒看看调用fofa的流程:

POST /api/task_fofa/test HTTP/2
​
​
{"query":"org=\"China Education and Research Network Center\""}
HTTP/2 200 OK
​
{"message": "success", "code": 200, "data": {"size": 13492282, "query": "org=\"China Education and Research Network Center\""}}

可以看见这里返回的结果是13492282条

然后我们直接去项目里面去找:

路径为:ARL-2.6.1\app\routes\taskFofa.py

from flask_restx import Namespace, fields
from app.utils import get_logger, auth, build_ret, conn_db
from app.modules import ErrorMsg, CeleryAction
from app.services.fofaClient import fofa_query, fofa_query_result
from app import celerytask
from bson import ObjectId
from . import ARLResource
​
​
ns = Namespace('task_fofa', description="Fofa 任务下发")
​
logger = get_logger()
​
​
test_fofa_fields = ns.model('taskFofaTest',  {
    'query': fields.String(required=True, description="Fofa 查询语句")
})
​
​
@ns.route('/test')
class TaskFofaTest(ARLResource):
​
    @auth
    @ns.expect(test_fofa_fields)
    def post(self):
        """
        测试Fofa查询连接
        """
        args = self.parse_args(test_fofa_fields)
        query = args.pop('query')
        data = fofa_query(query, page_size=1)
        if isinstance(data, str):
            return build_ret(ErrorMsg.FofaConnectError, {'error': data})
​
        if data.get("error"):
            return build_ret(ErrorMsg.FofaKeyError, {'error': data.get("errmsg")})
​
        item = {
            "size": data["size"],
            "query": data["query"]
        }
​
        return build_ret(ErrorMsg.Success, item)
​
​
add_fofa_fields = ns.model('addTaskFofa', {
    'query': fields.String(required=True, description="Fofa 查询语句"),
    'name': fields.String(required=True, description="任务名"),
    'policy_id': fields.String(description="策略 ID")
})
​
​
@ns.route('/submit')
class AddFofaTask(ARLResource):
​
    @auth
    @ns.expect(add_fofa_fields)
    def post(self):
        """
        提交Fofa查询任务
        """
        args = self.parse_args(add_fofa_fields)
        query = args.pop('query')
        name = args.pop('name')
        policy_id = args.get('policy_id')
​
        task_options = {
            "port_scan_type": "test",
            "port_scan": True,
            "service_detection": False,
            "service_brute": False,
            "os_detection": False,
            "site_identify": False,
            "file_leak": False,
            "ssl_cert": False
        }
​
        data = fofa_query(query, page_size=1)
        if isinstance(data, str):
            return build_ret(ErrorMsg.FofaConnectError, {'error': data})
​
        if data.get("error"):
            return build_ret(ErrorMsg.FofaKeyError, {'error': data.get("errmsg")})
​
        if data["size"] <= 0:
            return build_ret(ErrorMsg.FofaResultEmpty, {})
​
        fofa_ip_list = fofa_query_result(query)
        if isinstance(fofa_ip_list, str):
            return build_ret(ErrorMsg.FofaConnectError, {'error': data})
​
        if policy_id and len(policy_id) == 24:
            task_options.update(policy_2_task_options(policy_id))
​
        task_data = {
            "name": name,
            "target": "Fofa ip {}".format(len(fofa_ip_list)),
            "start_time": "-",
            "end_time": "-",
            "task_tag": "task",
            "service": [],
            "status": "waiting",
            "options": task_options,
            "type": "fofa",
            "fofa_ip": fofa_ip_list
        }
        task_data = submit_fofa_task(task_data)
​
        return build_ret(ErrorMsg.Success, task_data)
​
​
def policy_2_task_options(policy_id):
    options = {}
    query = {
        "_id": ObjectId(policy_id)
    }
    data = conn_db('policy').find_one(query)
    if not data:
        return options
​
    policy_options = data["policy"]
    policy_options.pop("domain_config")
​
    ip_config = policy_options.pop("ip_config")
    site_config = policy_options.pop("site_config")
​
    options.update(ip_config)
    options.update(site_config)
    options.update(policy_options)
​
    return options
​
​
def submit_fofa_task(task_data):
    conn_db('task').insert_one(task_data)
    task_id = str(task_data.pop("_id"))
    task_data["task_id"] = task_id
​
    task_options = {
        "celery_action": CeleryAction.FOFA_TASK,
        "data": task_data
    }
​
    celery_id = celerytask.arl_task.delay(options=task_options)
​
    logger.info("target:{} celery_id:{}".format(task_id, celery_id))
​
    values = {"$set": {"celery_id": str(celery_id)}}
    task_data["celery_id"] = str(celery_id)
    conn_db('task').update_one({"_id": ObjectId(task_id)}, values)
​
    return task_data

其中有一个类和俩函数在其他地方:

#  -*- coding:UTF-8 -*-
import base64
from app.config import Config
from app import utils
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
​
​
class FofaClient:
    def __init__(self, email, key, page_size=9999):
        self.email = email
        self.key = key
        self.base_url = Config.FOFA_URL
        self.search_api_url = "/api/v1/search/all"
        self.info_my_api_url = "/api/v1/info/my"
        self.page_size = page_size
        self.param = {}
​
    def info_my(self):
        param = {
            "email": self.email,
            "key": self.key,
        }
        self.param = param
        data = self._api(self.base_url + self.info_my_api_url)
        return data
​
    def fofa_search_all(self, query):
        qbase64 = base64.b64encode(query.encode())
        param = {
            "email": self.email,
            "key": self.key,
            "qbase64": qbase64.decode('utf-8'),
            "size": self.page_size
        }
​
        self.param = param
        data = self._api(self.base_url + self.search_api_url)
        return data
​
    def _api(self, url):
        data = utils.http_req(url, 'get', params=self.param).json()
        if data.get("error") and data["errmsg"]:
            raise Exception(data["errmsg"])
​
        return data
​
    def search_cert(self, cert):
        query = 'cert="{}"'.format(cert)
        data = self.fofa_search_all(query)
        results = data["results"]
        return results
​
​
def fetch_ip_bycert(cert, size=9999):
    ip_set = set()
    logger.info("fetch_ip_bycert {}".format(cert))
    try:
        client = FofaClient(Config.FOFA_EMAIL, Config.FOFA_KEY, page_size=size)
        items = client.search_cert(cert)
        for item in items:
            ip_set.add(item[1])
    except Exception as e:
        logger.warn("{} error: {}".format(cert, e))
​
    return list(ip_set)
​
​
def fofa_query(query, page_size=9999):
    try:
        if not Config.FOFA_KEY or not Config.FOFA_KEY:
            return "please set fofa key in config-docker.yaml"
​
        client = FofaClient(Config.FOFA_EMAIL, Config.FOFA_KEY, page_size=page_size)
        info = client.info_my()
        if info.get("vip_level") == 0:
            return "不支持注册用户"
​
        # 普通会员,最多只查100条
        if info.get("vip_level") == 1:
            client.page_size = min(page_size, 100)
​
        data = client.fofa_search_all(query)
        return data
​
    except Exception as e:
        error_msg = str(e)
        error_msg = error_msg.replace(Config.FOFA_KEY[10:], "***")
        return error_msg
​
​
def fofa_query_result(query, page_size=9999):
    try:
        ip_set = set()
        data = fofa_query(query, page_size)
​
        if isinstance(data, dict):
            if data['error']:
                return data['errmsg']
​
            for item in data["results"]:
                ip_set.add(item[1])
            return list(ip_set)
​
        raise Exception(data)
    except Exception as e:
        error_msg = str(e)
        return error_msg

诶?我看到这个代码的时候我感觉他这里写的没有毛病,那么我的疑问变得更重了,为什么我查询的结果明明有一千万多条,但是实际到我们的项目条数就几千条:

conn_db('task').insert_one(task_data)

根据上面这条代码,于是我进入了docker容器,查看了数据库的具体信息:

大致数据就是下面这种格式:

"fofa_ip" : [ "xxx.xxx.xxx.xxx", ...... ], "celery_id" : "xxx", "statistic" : { "site_cnt" : 3935, "domain_cnt" : 0, "ip_cnt" : 2590, "cert_cnt" : 0, "service_cnt" : 0, "fileleak_cnt" : 3068, "url_cnt" : 0, "vuln_cnt" : 94, "npoc_service_cnt" : 0, "cip_cnt" : 0, "nuclei_result_cnt" : 0, "stat_finger_cnt" : 167, "wih_cnt" : 0 } }

这里对应的fofa_ip数量与我在前端上面看到的数量一致,我就纳闷了为什么这里就只有这么几千条IP数量呢?我准备把这部分的代码手动抽出来在我本地上跑一遍试试看看结果到底是什么?

【----帮助网安学习,以下所有学习资料免费领!加vx:dctintin,备注 “博客园” 获取!】

 ① 网安学习成长路径思维导图
 ② 60+网安经典常用工具包
 ③ 100+SRC漏洞分析报告
 ④ 150+网安攻防实战技术电子书
 ⑤ 最权威CISSP 认证考试指南+题库
 ⑥ 超1800页CTF实战技巧手册
 ⑦ 最新网安大厂面试题合集(含答案)
 ⑧ APP客户端安全检测指南(安卓+IOS)

我在本地运行后发现返回的ip数量是由page_size决定的,如下面的代码

def fofa_query_result(query, page_size=9999):
    try:
        ip_set = set()
        data = fofa_query(query, page_size)
​
        if isinstance(data, dict):
            if data['error']:
                return data['errmsg']
​
            for item in data["results"]:
                ip_set.add(item[1])
            return list(ip_set)

我将page_size改为20000,发现根本不返回结果了,这里我才想起来回到Fofa_API的官网去查看,发现了是API一次只能返回最多10000条的限制。

那么解决办法是什么呢?

我们可以看到Fofa Api这里存在一个翻页参数,我们的改进措施就是让ARL使用Fofa API的时候增加一个翻页参数而不是不添加导致每次都是第一页。面向大众的话,要我们一个一个去修改源代码是不太现实的,我这里将给原作者发起一个issue,期待他的更新。

最简单的措施就是我们改进一下Fofa语句:

(status_code="200" || banner="HTTP/1.1 200 OK") && org="China Education and Research Network Center"

避免过期资产误杀:

这里是有20万多条独立IP,我们可以利用Fofa_api先把这20多万条独立IP下载下来,使用ARL本身的添加任务功能将这些IP填进去,这样的缺陷就是不能跑Poc,要跑Poc的话可以等待我们这20多万的数据跑完一遍后,然后直接风险任务下发选择对应的Poc就可以了。

添加任务的时候使用下列的格式加入:

IP1
IP2
IP3
IP4
IP5
IP6
...

2.Poc编写

想要优雅地使用ARL,会自己编写更新Poc是必不可少的。

ARL的poc工具在路径/opt/ARL-NPoC/xing/plugins/poc中我们后续在这个路径去修改,我们可以从作者的仓库看看这个工具:https://github.com/1c3z/ARL-NPoC

这里需要单独注意一点,我们在安装的时候

pip3 install -r requirements.txt

这里最后一个PyYAML直接安装会报错,我们直接使用下列命令直接安装。

pip install PyYAML

然后安装运行:

pip3 install -e .

就可以使用了:

大致使用教程:

这里我拿直接ARL给我扫出来的一个弱口令进行验证:

xing brute -t 目标地址

然后我们就可以开始编写Poc了

我们来分析一个较为简单的但是很实用的Actuator API 未授权访问漏洞的POC:

from xing.core.BasePlugin import BasePlugin
from xing.utils import http_req
from xing.core import PluginType, SchemeType
​
class Plugin(BasePlugin):
    def __init__(self):
        super(Plugin, self).__init__()
        self.plugin_type = PluginType.POC # 定义该插件的类型便于后续调用
        self.vul_name = "Actuator API 未授权访问"
        self.app_name = 'Actuator'
        self.scheme = [SchemeType.HTTPS, SchemeType.HTTP]
​
    def verify(self, target):
        paths = ["/env", "/actuator/env", "/manage/env", "/management/env", "/api/env", "/api/actuator/env"]
        for path in paths:
            url = target + path
            conn = http_req(url) 
            if b'java.runtime.version' in conn.content:
                self.logger.success("发现 Actuator API 未授权访问 {}".format(self.target))
                return url

主要流程就是先定义一个插件的类,然后使用函数__init__(self)写出这个插件的一些信息,具体实现过程在verify函数中实现。

这里我就编写一个influxdb的未授权访问的漏洞:

from xing.core.BasePlugin import BasePlugin
from xing.utils import http_req
from xing.core import PluginType, SchemeType
​
​
class Plugin(BasePlugin):
    def __init__(self):
        super(Plugin, self).__init__()
        self.plugin_type = PluginType.POC
        self.vul_name = "Influxdb未授权访问"
        self.app_name = 'Influxdb'
        self.scheme = [SchemeType.HTTPS, SchemeType.HTTP]
​
    def verify(self, target):
        url = target + "/query?q=SHOW%20USERS"
        conn = http_req(url)
​
        if b'"results":' in conn.content:
            self.logger.success("发现 Influxdb 未授权访问 {}".format(self.target))
            return url
        else:
            return False

然后我们直接在本地复现一下,是可以使用的:

接着我们部署到我们的服务器上,注意这里我们将POC同步到arl_web和arl_work两个容器中:

大致流程就是分别进入这两个容器然后添加对应文件下的Poc即可:

cd /opt/ARL-NPoC/xing/plugins/

我们更新一下Poc后再前端也可以查看到了:

然后经过测试确实可以:

3.总结

在我的实际渗透测试过程中,ARL给我的信息搜集带来了很大的便利性。是一种全面的信息搜集的有力方式!这篇文章主要是解决一点使用ARL过程中的问题,以及编写自己的Poc的流程。

更多网安技能的在线实操练习,请点击这里>>

  文章来源地址https://www.toymoban.com/news/detail-830654.html

到了这里,关于ARL分析与进阶使用的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Lighthouse前端性能分析工具】超详细!!!

    我们多数性能测试,基本上针对接口的性能测试,很少涉及到前端页面的性能测试。 但影响用户体验的因素除了后端接口数据的返回,还有前端页面的渲染等等。 所以我们除了在开发的过程中注意代码的质量,同时还需要专业的网站测试工具辅助,让我们知道自己的网页还

    2024年02月09日
    浏览(44)
  • ARL灯塔安装与使用

      目前暂不支持Windows,初次体验可采用Docker方式运行,长期使用建议采用源码安装方式运行。系统配置建议:CPU:4线程 内存:8G 带宽:10M。   由于自动资产发现过程中会有大量的的发包,建议采用云服务器可以带来更好的体验。   我这里这里使用的是云服务器配置是2

    2024年02月06日
    浏览(41)
  • ARL资产侦察灯塔系统搭建及使用

    ARL资产侦察灯塔系统旨在快速侦察与目标关联的互联网资产,构建基础资产信息库。协助甲方安全团队或者渗透测试人员有效侦察和检索资产,发现存在的薄弱点和攻击面 资产梳理的路径会根据输入的数据进行变动与调整。如输入数据为基于域名时,采用的技术手段包括DN

    2024年02月10日
    浏览(31)
  • ARL 资产侦察灯塔系统搭建及使用

    1、系统要求 目前暂不支持 Windows,Linux 和 MAC 建议采用 Docker 运行,系统配置最低 2 核 4G。 由于自动资产发现过程中会有大量的的发包,建议采用云服务器可以带来更好的体验。                  2、资产侦察灯塔系统搭建 1)、部署 docker 容器虚拟化平台 安装 docker 环

    2024年02月14日
    浏览(35)
  • 前端基本性能指标及lighthouse使用

    首先前端性能指标一般分为以下几种: 首屏绘制(First Paint,FP) 首屏内容绘制(First Contentful Paint,FCP) 可交互时间(Time to Interactive,TTI) 最大内容绘制(Largest Contentful Paint,LCP) 首次有效绘制(First Meaning Paint, FMP)   FP 是时间线上的第一个“时间点”,是指浏览器从响

    2024年02月21日
    浏览(44)
  • 前端性能测试工具 LightHouse (灯塔)使用

    Lighthouse是什么? —— 一种工具 Lighthouse 是一个开源的自动化工具,用来测试页面性能。 为什么要用Lighthouse? —— 提升用户体验 Web性能可以直接影响业务指标,例如转化率和用户满意度 分析收集各种应用页面性能指标,并进行评估,以此我们可以根据评估结果进行针对性

    2024年04月12日
    浏览(46)
  • linux环境下使用lighthouse与selenium

    shell脚本 指定安装node 18以上的版本 curl -fsSL https://deb.nodesource.com/setup_18.x | bash 安装nodejs apt install -y nodejs 安装npm apt install -y npm 安装lighthouse npm install -g lighthouse lighthouse Git地址:https://github.com/GoogleChrome/lighthouse 需要安装18以上版本的node,所以指定nodejs的版本 添加 Google Chrome

    2024年01月22日
    浏览(28)
  • @EnableAsync的使用、进阶、源码分析

    使用@EnableAsync开启异步切面,然后在异步调用的方法上加上@Asyc注解即可 @Async注解是异步切面默认的异步注解,我们可以在@EnableAsync(annotation = AsyncCustom.class)开启异步切面时指定自定义的异步注解 配置默认线程池 当@Async注解的value没有指定线程池名称时,将会使用此线程池,

    2024年02月06日
    浏览(35)
  • Python数据分析之Pandas核心使用进阶

    在Pandas中,有两种常见的方法可以进行DataFrame的行级遍历:使用 iterrows() 和使用 iteritems() 。 使用 iterrows() 方法: iterrows() 方法返回一个迭代器,可以按行遍历DataFrame。每次迭代返回一个包含行索引和该行数据的元组。 输出结果为: 在上面的例子中,我们使用 iterrows() 方法遍

    2024年02月11日
    浏览(76)
  • MySQL数据库进阶第二篇(索引,SQL性能分析,使用规则)

    本篇博客深入详细地介绍了数据库索引的概念和重要性。内容包含:索引的概念和目标、索引的优点与缺点。此外,博客还深入解析了三种主要的索引结构:B-Tree、B+Tree和Hash,提供了详细的结构解析和优化方法,并通过插图进一步增强了理解。 博客的部分内容专注于对B-Tr

    2024年02月21日
    浏览(64)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包