域名扫描工具subDomainBrute源码分析

这篇具有很好参考价值的文章主要介绍了域名扫描工具subDomainBrute源码分析。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

SubDomainsBrute简介

​ SubDomainsBrute是一款目标域名收集工具 ,用小字典递归地发现三级域名、四级域名、五级域名等不容易被探测到的域名。字典较为全面,小字典就包括3万多条,大字典多达8万条。默认使用114DNS、百度DNS、阿里DNS这几个快速又可靠的公共DNS进行查询,可随时修改配置文件添加你认为可靠的DNS服务器。自动筛选泛解析的域名,当前规则是: 超过10个域名指向同一IP,则此后发现的其他指向该IP的域名将被丢弃。整体速度还可以,每秒稳定扫描100到200个域名(10个线程)。
​ 工具可从github上 获取。

Github地址: https://github.com/lijiejie/subDomainsBrute

目录

dict #存放dns目录和域名字典

lib

--__init__.py

--cmdline.py #参数设置

--common_py2.py #获取域名字典

--common_py3.py #检查dns是否可用

--common.py #获取字典

--consle_width.py

--scanner_py2.py #为3.5版本以下的域名扫描文件

--scanner_py3.py #为3.5版本以上的域名扫描文件

subDomainsBrute.py 函数入口处

执行流程

一.检查参数和字典

获取命令行输入的参数

#subDomainsBrute.py
options, args = parse_args()

根据域名创建子文件,存放子域名信息

#subDomainsBrute.py
tmp_dir = 'tmp/%s_%s' % (args[0], int(time.time()))
    if not os.path.exists(tmp_dir):
        os.makedirs(tmp_dir)

调用common_py3.py中的load_next_sub()函数,传入命令行输入的参数作为函数参数,得到公共dns列表

#subDomainsBrute.py
dns_servers = load_dns_servers()

进入load_dns_servers()查看,其中dns_servers.txt存放公共dns:如119.29.29.29是腾讯DNSPod公共DNS,还有114dns(114.114.114.114),阿里(223.5.5.5.5),百度(180.76.76.76),360(101.226.4.6),google(8.8.8.8)等 ,使用一个已经存在的域名验证dns是否有效,

#common_py3.py
def load_dns_servers():
    print_msg('[+] Validate DNS servers', line_feed=True)
    dns_servers = []

    servers_to_test = [] #存储可用的dns地址
    for server in open('dict/dns_servers.txt').readlines(): #遍历dns的Ip地址
        server = server.strip() #移除字符串头尾指定的字符(默认为空格或换行符)
        if server and not server.startswith('#'): #检查字符串是否是以#开头,将不以#开头的加入
            servers_to_test.append(server) #将不以#开头的添加到列表

    loop = asyncio.get_event_loop()
    loop.run_until_complete(async_load_dns_servers(servers_to_test, dns_servers)) #调用async_load_dns_servers函数
    # loop.close()

    server_count = len(dns_servers) 
    print_msg('\n[+] %s DNS Servers found' % server_count, line_feed=True) 
    if server_count == 0:
        print_msg('[ERROR] No valid DNS Server !', line_feed=True)
        sys.exit(-1)
    return dns_servers #返回公共dns列表


#进入async_load_dns_servers
async def async_load_dns_servers(servers_to_test, dns_servers): #async为异步标志
    tasks = []
    for server in servers_to_test:
        task = test_server_python3(server, dns_servers) #调用test_server_python3函数
        tasks.append(task)
    await asyncio.gather(*tasks) #异步执行

#进入test_server_python3()
async def test_server_python3(server, dns_servers):
    resolver = aiodns.DNSResolver()
    try:
        resolver.nameservers = [server]
        answers = await resolver.query('public-dns-a.baidu.com', 'A')    #一个已经存在的域名
        if answers[0].host != '180.76.76.76': #判断解析是否正确
            raise Exception('Incorrect DNS response')
        try:
            await resolver.query('test.bad.dns.lijiejie.com', 'A')    #尝试一个不存在的域名
            with open('bad_dns_servers.txt', 'a') as f:
                f.write(server + '\n') #解析失败,dns无效写入bad_dns_servers.txt文件
            print_msg('[+] Bad DNS Server found %s' % server)
        except Exception as e:
            dns_servers.append(server) #解析正确判断为有效dns,加入列表
        print_msg('[+] Server %s < OK >   Found %s' % (server.ljust(16), len(dns_servers)))
    except Exception as e:
        print_msg('[+] Server %s <Fail>   Found %s' % (server.ljust(16), len(dns_servers)))

回到入口程序,然后调用了common_py3.py中的load_next_sub()函数,得到子域名字典,进入load_next_sub()查看

#subDomainsBrute.py
next_subs = load_next_sub(options)

#common_py3.py
def load_next_sub(options):
    next_subs = []
    _file = 'dict/next_sub_full.txt' if options.full_scan else 'dict/next_sub.txt' #根据参数判断使用哪个字典
    with open(_file) as f:
        for line in f:
            sub = line.strip() #移除每行头尾空格和换行
            if sub and sub not in next_subs: #判断是否在next_subs中重复
                tmp_set = {sub}
                while tmp_set:
                    item = tmp_set.pop() 
                    if item.find('{alphnum}') >= 0:
                        for _letter in 'abcdefghijklmnopqrstuvwxyz0123456789':
                            tmp_set.add(item.replace('{alphnum}', _letter, 1))
                    elif item.find('{alpha}') >= 0:
                        for _letter in 'abcdefghijklmnopqrstuvwxyz':
                            tmp_set.add(item.replace('{alpha}', _letter, 1))
                    elif item.find('{num}') >= 0:
                        for _letter in '0123456789':
                            tmp_set.add(item.replace('{num}', _letter, 1))
                    elif item not in next_subs:
                        next_subs.append(item)  #多次判断是否存在{alphnum}、{alpha}、{num}进行替换,替换后加入到子域名列表中
    return next_subs #返回子域名列表

使用multiprocessing多进程共享数据。i代表类型为int整数,0代表初始值

#subDomainsBrute.py
scan_count = multiprocessing.Value('i', 0)
found_count = multiprocessing.Value('i', 0)
queue_size_array = multiprocessing.Array('i', options.process)

multiprocessing.Value(typecode_or_type, *args, lock=True) #共享单个数据,其值通过value属性访问

typecode_or_type:数组中的数据类型,为代表数据类型的类或者str。比如,'i'表示int,'f'表示float。
args:可以设置初始值。比如:multiprocessing.Value('d',6)生成值为6.0的数据。
lock:bool,是否加锁。

判断w参数不存在时,主程序调用wildcard_test()函数,进入wildcard_test()查看, 和test_server相似 ,通配符测试失败后建议使用参数-w强制扫描

#subDomainsBrute.py
if not options.w:
    domain = wildcard_test(args[0], dns_servers)
else:
    domain = args[0]

#common_py2.py
def wildcard_test(domain, dns_servers, level=1):
    try:
        r = dns.resolver.Resolver(configure=False)
        r.nameservers = dns_servers
        answers = r.query('lijiejie-not-existed-test.%s' % domain)
        ips = ', '.join(sorted([answer.address for answer in answers]))
        if level == 1:
            print('any-sub.%s\t%s' % (domain.ljust(30), ips))
            wildcard_test('any-sub.%s' % domain, dns_servers, 2)
        elif level == 2:
            print('\nUse -w to enable force scan wildcard domain')
            sys.exit(0)
    except Exception as e:
        return domain

然后调用get_sub_file_path(),将字典相对路径返回并赋值

#subDomainsBrute.py
options.file = get_sub_file_path(options)

#common_py2.py
def get_sub_file_path(options):
    if options.full_scan and options.file == 'subnames.txt':
        sub_file_path = 'dict/subnames_full.txt' #判断使用参数--full和subnames.txt才使用subnames.txt字典
    else:
        if os.path.exists(options.file):
            sub_file_path = options.file
        elif os.path.exists('dict/%s' % options.file):
            sub_file_path = 'dict/%s' % options.file
        else:
            print_msg('[ERROR] Names file not found: %s' % options.file)
            exit(-1)
    return sub_file_path #返回字典的具体路径

二.域名获取

根据输入的进程数设置进程数,添加到进程池中,并设置了run_process()函数

#subDomainsBrute.py
all_process = []
for process_num in range(options.process):
            p = multiprocessing.Process(target=run_process,
                                        args=(domain, options, process_num, dns_servers, next_subs,scan_count, found_count, queue_size_array, tmp_dir))
            all_process.append(p)
            p.start()

到run_process()中查看

#subDomainsBrute.py
def run_process(*params):
    signal.signal(signal.SIGINT, user_abort)
    s = SubNameBrute(*params) #定义该类
    s.run()
    
#scanner_py2.py
class SubNameBrute(object): 
    def __init__(self, *params): #SubNameBrute类中首先执行函数
            self.domain, self.options, self.process_num, self.dns_servers, self.next_subs, self.scan_count, self.found_count, self.queue_size_array, tmp_dir = params #传入参数
            /*
            ... #默认参数赋值
            */
            self.outfile = open('%s/%s_part_%s.txt' % (tmp_dir, self.domain, self.process_num), 'w') #定义输出文件
            self.normal_names_set = set()
            self.load_sub_names() #调用自身的load_sub_names函数
            self.lock = RLock()
            self.threads_status = ['1'] * self.options.threads
            
def load_sub_names(self): #返回测试域名列表作为字典
        normal_lines = []
        wildcard_lines = []
        wildcard_set = set()
        regex_list = []
        lines = set()
        with open(self.options.file) as inFile:
            for line in inFile.readlines():
                sub = line.strip()
                if not sub or sub in lines: #排除sub为空和sub重复的情况
                    continue
                lines.add(sub)

                brace_count = sub.count('{') #检测{通配符}数量进行替换
                if brace_count > 0:
                    wildcard_lines.append((brace_count, sub))
                    sub = sub.replace('{alphnum}', '[a-z0-9]')
                    sub = sub.replace('{alpha}', '[a-z]')
                    sub = sub.replace('{num}', '[0-9]')
                    if sub not in wildcard_set:
                        wildcard_set.add(sub)
                        regex_list.append('^' + sub + '$')
                else:
                    normal_lines.append(sub)
                    self.normal_names_set.add(sub)

        if regex_list:
            pattern = '|'.join(regex_list)
            _regex = re.compile(pattern)
            for line in normal_lines:
                if _regex.search(line):
                    normal_lines.remove(line)

        for _ in normal_lines[self.process_num::self.options.process]:
            self.queue.put((0, _))    # priority set to 0
        for _ in wildcard_lines[self.process_num::self.options.process]:
            self.queue.put(_)

def run(self):
    threads = [gevent.spawn(self.scan, i) for i in range(self.options.threads)]
    gevent.joinall(threads)
    
def scan(self, j):
        self.resolvers[j].nameservers = [self.dns_servers[j % self.dns_count]] + self.dns_servers

        while True:
            try:

                if time.time() - self.count_time > 1.0:
                    self.lock.acquire()
                    self.scan_count.value += self.scan_count_local
                    self.scan_count_local = 0
                    self.queue_size_array[self.process_num] = self.queue.qsize()
                    if self.found_count_local:
                        self.found_count.value += self.found_count_local
                        self.found_count_local = 0
                    self.count_time = time.time()
                    self.lock.release()
                brace_count, sub = self.queue.get_nowait()
                self.threads_status[j] = '1'
                if brace_count > 0:
                    brace_count -= 1
                    if sub.find('{next_sub}') >= 0:
                        for _ in self.next_subs:
                            self.queue.put((0, sub.replace('{next_sub}', _)))
                    if sub.find('{alphnum}') >= 0:
                        for _ in 'abcdefghijklmnopqrstuvwxyz0123456789':
                            self.queue.put((brace_count, sub.replace('{alphnum}', _, 1)))
                    elif sub.find('{alpha}') >= 0:
                        for _ in 'abcdefghijklmnopqrstuvwxyz':
                            self.queue.put((brace_count, sub.replace('{alpha}', _, 1)))
                    elif sub.find('{num}') >= 0:
                        for _ in '0123456789':
                            self.queue.put((brace_count, sub.replace('{num}', _, 1)))
                    continue
            except gevent.queue.Empty as e:
                self.threads_status[j] = '0'
                gevent.sleep(0.5)
                if '1' not in self.threads_status:
                    break
                else:
                    continue

            try:

                if sub in self.found_subs:
                    continue

                self.scan_count_local += 1
                cur_domain = sub + '.' + self.domain #子域名和域名拼接成当前域名
                answers = self.resolvers[j].query(cur_domain) #请求dns进行解析当前域名

                if answers: #返回解析存在执行if块
                    self.found_subs.add(sub) #添加到列表中
                    ips = ', '.join(sorted([answer.address for answer in answers])) #使用,分隔解析的ip字符串
                    if ips in ['1.1.1.1', '127.0.0.1', '0.0.0.0', '0.0.0.1']: #排除非正常ip
                        continue
                    if self.options.i and is_intranet(answers[0].address): #调用is_intranet判断ip不是内网地址时加入数组
                        continue

                    try:
                        self.scan_count_local += 1
                        answers = self.resolvers[j].query(cur_domain, 'cname') #根据cname记录查找新的域名别名
                        cname = answers[0].target.to_unicode().rstrip('.') #分隔新域名别名
                        if cname.endswith(self.domain) and cname not in self.found_subs: #判断当与原域名不同且不在发现的域名列表中时添加到域名列表中
                            cname_sub = cname[:len(cname) - len(self.domain) - 1]    
                            if cname_sub not in self.normal_names_set:
                                self.found_subs.add(cname)
                                self.queue.put((0, cname_sub))
                    except Exception as e:
                        pass

                    first_level_sub = sub.split('.')[-1]
                    max_found = 20

                    if self.options.w:
                        first_level_sub = ''
                        max_found = 3

                    if (first_level_sub, ips) not in self.ip_dict:
                        self.ip_dict[(first_level_sub, ips)] = 1
                    else:
                        self.ip_dict[(first_level_sub, ips)] += 1
                        if self.ip_dict[(first_level_sub, ips)] > max_found:
                            continue

                    self.found_count_local += 1

                    self.outfile.write(cur_domain.ljust(30) + '\t' + ips + '\n')
                    self.outfile.flush()
                    try:
                        self.scan_count_local += 1
                        self.resolvers[j].query('lijiejie-test-not-existed.' + cur_domain)
                    except (dns.resolver.NXDOMAIN, ) as e:    # dns.resolver.NoAnswer
                        if self.queue.qsize() < 50000:
                            for _ in self.next_subs:
                                self.queue.put((0, _ + '.' + sub))
                        else:
                            self.queue.put((1, '{next_sub}.' + sub))
                    except Exception as e:
                        pass

            except (dns.resolver.NXDOMAIN, dns.resolver.NoAnswer) as e:
                pass
            except dns.resolver.NoNameservers as e:
                self.queue.put((0, sub))    # Retry
            except dns.exception.Timeout as e:
                self.timeout_subs[sub] = self.timeout_subs.get(sub, 0) + 1
                if self.timeout_subs[sub] <= 1:
                    self.queue.put((0, sub))    # Retry
            except Exception as e:
                import traceback
                traceback.print_exc()
                with open('errors.log', 'a') as errFile:
                    errFile.write('[%s] %s\n' % (type(e), str(e)))

其中调用的is_intranet()函数如下,将10,172.16~31,192.168开头的判定为内网地址

#common.py
def is_intranet(ip):
    ret = ip.split('.')
    if len(ret) != 4:
        return True
    if ret[0] == '10':
        return True
    if ret[0] == '172' and 16 <= int(ret[1]) <= 31:
        return True
    if ret[0] == '192' and ret[1] == '168':
        return True
    return False

三.数据输出

之后主程序根据进程活跃情况绘制进度条

#subDomainsBrute.py
char_set = ['\\', '|', '/', '-'] #进度条显示的字符
count = 0
while all_process:
            for p in all_process:
                if not p.is_alive():
                    all_process.remove(p)
            groups_count = 0
            for c in queue_size_array:
                groups_count += c
            msg = '[%s] %s found, %s scanned in %.1f seconds, %s groups left' % (
                char_set[count % 4], found_count.value, scan_count.value, time.time() - start_time, groups_count)
            print_msg(msg)
            count += 1
            time.sleep(0.3)

之后调用get_out_file_name()函数获取输出文件名,将域名数据存放到文件中

#subDomainsBrute.py
out_file_name = get_out_file_name(domain, options)
    all_domains = set()
    domain_count = 0
    with open(out_file_name, 'w') as f:
        for _file in glob.glob(tmp_dir + '/*.txt'):
            with open(_file, 'r') as tmp_f:
                for domain in tmp_f:
                    if domain not in all_domains:
                        domain_count += 1
                        all_domains.add(domain)       # cname query can result in duplicated domains
                        f.write(domain)

函数调用关系图

参考:

SubDomainsBrute源码分析 | 码农家园 (codenong.com)文章来源地址https://www.toymoban.com/news/detail-708089.html

到了这里,关于域名扫描工具subDomainBrute源码分析的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • AWVS 批量扫描工具

    调用Acunetix AWVS的API实现批量扫描,并且使用代理池(项目地址goProxyPool),实现批量扫描时的每个扫描目标都使用不同的代理IP。提供常驻后台监控功能,控制最大扫描任务数量以及最大扫描时间。 仅批量添加目标(可设置添加数量),并设置扫描使用的代理,不进行扫描(

    2024年02月16日
    浏览(42)
  • 网络安全工具——AWVS漏洞扫描工具

    请勿乱扫目标,该扫描工具在没有授权的情况下使用是违法的 仅个人使用,不用于商业用途,使用的是15.2版本的破解版 1.下载Acunetix破解版安装包,解压安装包 2.双击允许acunetix_15.2.221208162.exe 3.双击安装包出现下面界面,安装路径可以修改 4.点击下一步填写邮箱、密码。设置

    2024年02月13日
    浏览(47)
  • Nmap扫描工具的使用

    本实验可以在虚拟机中搭建windows系列系统进行操作,建议选择老旧的windows xp,windows 7,windows sever2012等系统,这样可以扫描出很多的漏洞,也方便后面做入侵实验。攻击机可以使用kali linux,被攻击机还有肉鸡使用windows系列系统比较好。 主机探测:Nmap可査找目标网络中的在

    2024年02月13日
    浏览(41)
  • Nmap开源网络扫描工具

    Nmap的官方网站是https://nmap.org/。 在该网站上,您可以找到有关Nmap的详细信息,包括下载最新版本的Nmap软件、文档、教程和其他资源。 更多信息也可以咨询GPT ^^ Nmap是一个开源的网络扫描工具,它用于探测和分析网络上的主机和服务。它可以帮助管理员识别网络上的漏洞和安

    2024年02月16日
    浏览(41)
  • 基于python开发端口扫描工具

    本人仍然还在学习阶段,有问题欢迎大佬指正,希望我的文章能够帮助到你.以后我很多东西都会在博客更新,和大家一起进步,加油. 本次博客主要是开发一个端口扫描工具,用python语言,要求要能指定ip,指定c段,指定端口号和端口范围,还有多线程或者线程池实现,提高端口扫描效率

    2024年02月04日
    浏览(39)
  • 常用Web安全扫描工具合集

      漏洞扫描是一种安全检测行为,更是一类重要的网络安全技术,它能够有效提高网络的安全性,而且漏洞扫描属于主动的防范措施,可以很好地避免黑客攻击行为,做到防患于未然。那么好用的漏洞扫描工具有哪些? 答案就在本文! 1、AWVS Acunetix Web Vulnerability Scanner(简称

    2024年04月13日
    浏览(37)
  • 网络安全:namp扫描工具

      -sP可以扫描一个网段ip以及状态和基本信息,10.1.1.2-3就是扫描2和3这两个ip的主机 -p可以扫描指定ip对应主机的端口号,可以是一个范围 nmap简单扫描:nmap 地址 检查地址是否在线以及open的端口号 在端口开放,不一定可以与对方正常连接或通信。-sT就可以检测是否可以连接

    2024年02月02日
    浏览(40)
  • Nuclei漏洞扫描工具

    Nuclei漏洞扫描工具:          Nuclei 是一款基于YAML语法模板的开发的定制化快速漏洞扫描器。它使用Go语言开发,具有很强的可配置性、可扩展性和易用性。 提供 TCP、DNS、HTTP、FILE 等各类协议的扫描,通过强大且灵活的模板,可以使用 Nuclei 模拟各种安全检查。 Nucl

    2024年02月15日
    浏览(44)
  • 御剑高速TCP端口扫描工具

    鉴于CSDN平台想恰钱想疯了,连下载个软件也要付费/VIP  那我在这里就无私贡献出珍藏的——御剑高速TCP端口扫描工具 网址:百度网盘 请输入提取码 提取码:7fk6  

    2024年02月13日
    浏览(52)
  • 漏洞扫描工具-goby(九)

    什么是Goby? Goby是一款基于网络空间测绘技术的新一代网络安全工具,它通过给目标网络建立完整的资产知识库,进行网络安全事件应急与漏洞应急。 Goby可提供最全面的资产识别,目前预置了超过10万种规则识别引擎,能够针对硬件设备和软件业务系统进行自动化识别和分类

    2024年02月14日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包