CENTOS上的网络安全工具(二十七)SPARK+NetSA Security Tools容器化部署(3)

这篇具有很好参考价值的文章主要介绍了CENTOS上的网络安全工具(二十七)SPARK+NetSA Security Tools容器化部署(3)。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

        上回说到在我们搭好的YAF3环境上使用yaf处理pcap文件得到silk flow,再使用super mediator工具转为ipfix,继而在spark中导入mothra,就可以开始数据分析了。然而在我们粗粗一用之下,却发现DPI信息在ipfix文件中找不到,到底是提取的时候就没提取出来(ipfix不支持dpi信息存储)?还是说我们的分析方式不对?——换句话说yaf、super-mediator、methora这三个环节,问题到底出在哪里了?我们卖了个关子,为本篇开启留了个伏笔。实则是我也不知道……。

        所以,本篇从记录排查过程的角度,更深入地探索以下这3个组件——主要是methora的使用。

        一、YAF肯定没有问题

        因为我们在执行 super_mediator -o result.txt -m text test.yaf的时候已经从yaf的结果文件中导出过txt版本的dpi记录,其中是存在dns和tls的,可以证明相关的信息在test.yaf中确实存在。

        使用json格式导出一次,也可以证明结果还比较“完备”:

[root@12c4bd60ff5f testdata]# rm tmp.ipfix -f
[root@12c4bd60ff5f testdata]# super_mediator -o test2.json -m json test2.yaf
Initialization Successful, starting...
[2023-06-30 05:59:34] Running as root in --live mode, but not dropping privilege

         cat输出的json文件,可以看到tls和dns的相关记录也都在里面:

CENTOS上的网络安全工具(二十七)SPARK+NetSA Security Tools容器化部署(3),大数据,spark,网络安全

         二、super-mediator也没问题

        既然super-mediator的txt模式和json模式都没有问题,那是不是ipfix模式不支持dpi信息呢(之前我们是这么猜的)。这个猜测,我们通过观察super-mediator的执行log记录否定了:   

[root@12c4bd60ff5f testdata]# super_mediator -o tmp.ipfix --verbose test2.yaf
[2023-06-30 06:16:23] super_mediator starting
[2023-06-30 06:16:23] E1: Opening File tmp.ipfix
[2023-06-30 06:16:23] E1: Exporter Active.
Initialization Successful, starting...
[2023-06-30 06:16:23] Running as root in --live mode, but not dropping privilege
[2023-06-30 06:16:23] C1: Opening file: test2.yaf
[2023-06-30 06:16:23] COL C1(1) TID 0xd006: received. Labeled as IE_SPEC - TMD OR IE - EXACT_DEF
[2023-06-30 06:16:23] COL C1(1) TID 0xd007: received. Labeled as TMD - TMD OR IE - EXACT_DEF
[2023-06-30 06:16:23] COL C1(1) TID 0xd008: received. Labeled as TMD - TMD OR IE - EXACT_DEF
[2023-06-30 06:16:23] COL C1(1) TID yaf_dns_a (0xce01): received. Labeled as NESTED DATA - DPI - DPI UNKNOWN - EXACT_DEF
[2023-06-30 06:16:23] EXP E1(1) TID yaf_dns_a (0xce01): Adding new template
[2023-06-30 06:16:23] COL C1(1) TID yaf_dns_aaaa (0xce02): received. Labeled as NESTED DATA - DPI - DPI UNKNOWN - EXACT_DEF
…… …… ……
[2023-06-30 06:16:23] EXP E1(1) TID yaf_ssl_cert (0xca0b): Adding new template
[2023-06-30 06:16:23] COL C1(1) TID yaf_ssh (0xcc00): received. Labeled as NESTED DATA - DPI - DPI UNKNOWN - EXACT_DEF
[2023-06-30 06:16:23] EXP E1(1) TID yaf_ssh (0xcc00): Adding new template
…… …… ……
[2023-06-30 06:16:23] COL C1(1) TID yaf_flow_rle_tcp_ip4_total_biflow (0xb022): Added observationDomainId
[2023-06-30 06:16:23] COL C1(1) TID yaf_flow_rle_tcp_ip4_total_biflow (0xb022): Added yafFlowKeyHash
…… …… ……
[2023-06-30 06:16:23] Collector Stats: C1-INACTIVE-SINGLE FILE: Total Records Read: 3418, Files Read: 1, Restarts: 0, UNKNOWN Records: 0, FLOW Records: 3416, YAF STATS Records: 1, TOMBSTONE Records: 1, DNS DEDUP Records: 0, SSL DEDUP Records: 0, GENERAL DEDUP Records: 0, DNS RR Records: 0, TMD OR IE Records: 0, DPI Records: 0, UNKNOWN DATA Records: 0, UNKNOWN OPTIONS Records: 0, No filters used
[2023-06-30 06:16:23] Core Stats: Records Processed: 3418, UNKNOWN Records: 0, FLOW Records: 3416, YAF STATS Records: 1, TOMBSTONE Records: 1, DNS DEDUP Records: 0, SSL DEDUP Records: 0, GENERAL DEDUP Records: 0, DNS RR Records: 0, TMD OR IE Records: 0, DPI Records: 0, UNKNOWN DATA Records: 0, UNKNOWN OPTIONS Records: 0, AppLabel 0 Records: 1401, AppLabel 53 Records: 601, AppLabel 80 Records: 790, AppLabel 123 Records: 1, AppLabel 137 Records: 42, AppLabel 138 Records: 28, AppLabel 161 Records: 4, AppLabel 443 Records: 537, AppLabel 51443 Records: 12, Tombstone Records Generated: 0
[2023-06-30 06:16:23] Exporter Stats: E1-ACTIVE-SINGLE FILE: Total Records Written: 3418, Files Written: 1, Bytes Written: 614700, Restarts: 0, No filters used, UNKNOWN Ignored: 0, FLOW Ignored: 0, YAF STATS Ignored: 0, TOMBSTONE Ignored: 0, DNS DEDUP Ignored: 0, SSL DEDUP Ignored: 0, GENERAL DEDUP Ignored: 0, DNS RR Ignored: 0, TMD OR IE Ignored: 0, DPI Ignored: 0, UNKNOWN DATA Ignored: 0, UNKNOWN OPTIONS Ignored: 0, UNKNOWN Generated: 0, FLOW Generated: 0, YAF STATS Generated: 0, TOMBSTONE Generated: 0, DNS DEDUP Generated: 0, SSL DEDUP Generated: 0, GENERAL DEDUP Generated: 0, DNS RR Generated: 0, TMD OR IE Generated: 0, DPI Generated: 0, UNKNOWN DATA Generated: 0, UNKNOWN OPTIONS Generated: 0, UNKNOWN Forwarded: 0, FLOW Forwarded: 3416, YAF STATS Forwarded: 1, TOMBSTONE Forwarded: 1, DNS DEDUP Forwarded: 0, SSL DEDUP Forwarded: 0, GENERAL DEDUP Forwarded: 0, DNS RR Forwarded: 0, TMD OR IE Forwarded: 0, DPI Forwarded: 0, UNKNOWN DATA Forwarded: 0, UNKNOWN OPTIONS Forwarded: 0, AppLabel 0 Records: 1401, AppLabel 53 Records: 601, AppLabel 80 Records: 790, AppLabel 123 Records: 1, AppLabel 137 Records: 42, AppLabel 138 Records: 28, AppLabel 161 Records: 4, AppLabel 443 Records: 537, AppLabel 51443 Records: 12, 
[2023-06-30 06:16:23] Exporter Stats: E1-ACTIVE-SINGLE FILE: Total Records Written: 3418, Files Written: 1, Bytes Written: 614700, Restarts: 0, No filters used, UNKNOWN Ignored: 0, FLOW Ignored: 0, YAF STATS Ignored: 0, TOMBSTONE Ignored: 0, DNS DEDUP Ignored: 0, SSL DEDUP Ignored: 0, GENERAL DEDUP Ignored: 0, DNS RR Ignored: 0, TMD OR IE Ignored: 0, DPI Ignored: 0, UNKNOWN DATA Ignored: 0, UNKNOWN OPTIONS Ignored: 0, UNKNOWN Generated: 0, FLOW Generated: 0, YAF STATS Generated: 0, TOMBSTONE Generated: 0, DNS DEDUP Generated: 0, SSL DEDUP Generated: 0, GENERAL DEDUP Generated: 0, DNS RR Generated: 0, TMD OR IE Generated: 0, DPI Generated: 0, UNKNOWN DATA Generated: 0, UNKNOWN OPTIONS Generated: 0, UNKNOWN Forwarded: 0, FLOW Forwarded: 3416, YAF STATS Forwarded: 1, TOMBSTONE Forwarded: 1, DNS DEDUP Forwarded: 0, SSL DEDUP Forwarded: 0, GENERAL DEDUP Forwarded: 0, DNS RR Forwarded: 0, TMD OR IE Forwarded: 0, DPI Forwarded: 0, UNKNOWN DATA Forwarded: 0, UNKNOWN OPTIONS Forwarded: 0, AppLabel 0 Records: 1401, AppLabel 53 Records: 601, AppLabel 80 Records: 790, AppLabel 123 Records: 1, AppLabel 137 Records: 42, AppLabel 138 Records: 28, AppLabel 161 Records: 4, AppLabel 443 Records: 537, AppLabel 51443 Records: 12, 
[2023-06-30 06:16:23] E1: Closing File tmp.ipfix
[2023-06-30 06:16:23] super_mediator Terminating
[root@12c4bd60ff5f testdata]# 

        通过--verbose参数打印处理过程,可以看到即使是在ipfix模式下,super-mediator也处理了 dns、ssl之类的dpi信息,并且在最后统计信息的处理中,也看到了多种类型的AppLabel。所以,super-mediator大概率应该是没有问题的——也是就说数据是没有问题的。那么,问题只能是出在我们对mothra的使用方式上。

        三、Mothra的使用

        1. 载入字段选择

        通过学习mothra的API文档Mothra 1.6.0 - org.cert.netsa.mothra.datasources.ipfix,发现丢失DPI数据的原因可能是由于我们使用了默认的数据载入方式:        CENTOS上的网络安全工具(二十七)SPARK+NetSA Security Tools容器化部署(3),大数据,spark,网络安全

         也就是说,采用spark.read.ipfix(your_file_path)载入的数据,由于没有使用fields方法指定载入数据的范围,可能只是Default Fields。实际上,spark.read是sparksession类的read方法,其调用了mothra提供的数据源CERTDataFrameReader提供的ipfix方法。该数据源还提供了fields方法用于指定读取数据的范围。

CENTOS上的网络安全工具(二十七)SPARK+NetSA Security Tools容器化部署(3),大数据,spark,网络安全

         fields方法使用FieldsSpec类型来指定读取数据范围,而IPFIXFields.default正是这样一个类型:

CENTOS上的网络安全工具(二十七)SPARK+NetSA Security Tools容器化部署(3),大数据,spark,网络安全

         同理,IPFIXFields也定义了诸如IPFIXFields:everything这样的fieldsSpec:

CENTOS上的网络安全工具(二十七)SPARK+NetSA Security Tools容器化部署(3),大数据,spark,网络安全

         所以,只要以IPFIXFields.everything指定fields,应该可以将所有信息导入到spark中:

scala> import org.cert.netsa.mothra.datasources._
import org.cert.netsa.mothra.datasources._

scala> import org.cert.netsa.mothra.datasources.ipfix.IPFIXFields
import org.cert.netsa.mothra.datasources.ipfix.IPFIXFields

scala> val all = spark.read.fields(IPFIXFields.everything).ipfix("f:/tmp/test2.ipfix")
all: org.apache.spark.sql.DataFrame = [startTime: timestamp, endTime: timestamp ... 184 more fields]

scala> all.count
res45: Long = 3418

scala> all.show(1,0,true)
-RECORD 0-----------------------------------------------------------
 startTime                                | 2022-11-15 10:34:36.433
 endTime                                  | 2022-11-15 10:34:36.485
 sourceIPAddress                          | 47.110.20.149
 sourcePort                               | 443
 destinationIPAddress                     | 192.168.182.76
 destinationPort                          | 62116
 protocolIdentifier                       | 6
 observationDomainId                      | 0
 vlanId                                   | 0
 reverseVlanId                            | 0
 silkAppLabel                             | 0
 packetCount                              | 3
 reversePacketCount                       | 4
 octetCount                               | 674
 reverseOctetCount                        | 253
 initialTCPFlags                          | 24
 reverseInitialTCPFlags                   | 24
 unionTCPFlags                            | 25
 reverseUnionTCPFlags                     | 21
 dnp3RecordList                           | []
 dnsRecordList                            | []
………………
 reverseFlowAttributes                    | 0
 flowEndReason                            | 3
 reverseFlowDeltaMilliseconds             | 1
 ipClassOfService                         | 20
………………
 reverseMaxPacketSize                     | null
 reverseStandardDeviationPayloadLength    | null
 tcpSequenceNumber                        | 3558855353
 reverseTcpSequenceNumber                 | 341543645
 ndpiL7Protocol                           | null
 ndpiL7SubProtocol                        | null
 mplsTopLabelStackSection                 | null
 mplsLabelStackSection2                   | null
 mplsLabelStackSection3                   | null
 yafFlowKeyHash                           | 4001181757
only showing top 1 row

        可以看到,明显比使用default时的数据是要多了好多的。

        2. 提取特定的DPI数据

        下一步的问题,是证明mothra是否真的将数据读入了进来,还是说虽然字段多了但其实是张空表。

        (1)统计AppLabel

scala> all.groupBy('silkAppLabel).count.show
+------------+-----+
|silkAppLabel|count|
+------------+-----+
|         137|   42|
|          53|  601|
|        null|    2|
|       51443|   12|
|         161|    4|
|         443|  537|
|          80|  790|
|         123|    1|
|           0| 1401|
|         138|   28|
+------------+-----+

        通过统计AppLabel,我们能够直到,至少在ipfix中存在601条dns信息、537条tls信息和790条http信息,这个是我们在执行yaf时设置过需要提取DPI信息的,接下来需要搞清楚的是这些信息有没有存下来。

        (2)提取DNS记录

        根据everthing的schema,我们知道存储dns信息的列名是dnsRecordList,因此可以使用filter appLabel的方式进行过滤后提取。

scala> all.printSchema
root
 |-- startTime: timestamp (nullable = true)
 |-- endTime: timestamp (nullable = true)
 |-- sourceIPAddress: string (nullable = true)
 |-- sourcePort: integer (nullable = true)
…………………………
 |-- dnsRecordList: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- dnsName: string (nullable = true)
 |    |    |-- dnsTTL: long (nullable = true)
 |    |    |-- dnsRRType: integer (nullable = true)
 |    |    |-- dnsQueryResponse: integer (nullable = true)
 |    |    |-- dnsAuthoritative: integer (nullable = true)
 |    |    |-- dnsResponseCode: integer (nullable = true)
 |    |    |-- dnsSection: integer (nullable = true)
 |    |    |-- dnsId: integer (nullable = true)
 |    |    |-- dnsA: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- dnsAAAA: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- dnsCNAME: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- dnsMX: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- exchange: string (nullable = true)
 |    |    |    |    |-- preference: integer (nullable = true)
 |    |    |-- dnsNS: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- dnsPTR: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- dnsTXT: array (nullable = true)
 |    |    |    |-- element: string (containsNull = true)
 |    |    |-- dnsSOA: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- mname: string (nullable = true)
 |    |    |    |    |-- rname: string (nullable = true)
 |    |    |    |    |-- serial: long (nullable = true)
 |    |    |    |    |-- refresh: long (nullable = true)
 |    |    |    |    |-- retry: long (nullable = true)
 |    |    |    |    |-- expire: long (nullable = true)
 |    |    |    |    |-- minimum: long (nullable = true)
 |    |    |-- dnsSRV: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- target: string (nullable = true)
 |    |    |    |    |-- priority: integer (nullable = true)
 |    |    |    |    |-- weight: integer (nullable = true)
 |    |    |    |    |-- port: integer (nullable = true)
 |-- enipDataList: array (nullable = true)
…………………………


scala> val dns = all.filter("silkAppLabel='53'").select('dnsRecordList)
dns: org.apache.spark.sql.DataFrame = [dnsRecordList: array<struct<dnsName:string,dnsTTL:bigint,dnsRRType:int,dnsQueryResponse:int,dnsAuthoritative:int,dnsResponseCode:int,dnsSection:int,dnsId:int,dnsA:array<string>,dnsAAAA:array<string>,dnsCNAME:array<string>,dnsMX:array<struct<exchange:string,preference:int>>,dnsNS:array<string>,dnsPTR:array<string>,dnsTXT:array<string>,dnsSOA:array<struct<mname:string,rname:string,serial:bigint,refresh:bigint,retry:bigint,expire:bigint,minimum:bigint>>,dnsSRV:array<struct<target:string,priority:int,weight:int,port:int>>>>]

scala> dns.count
res49: Long = 601

        打印一下试试

scala> dns.show
+--------------------+
|       dnsRecordList|
+--------------------+
|[{pagead2.googles...|
|[{SMS_SLP., 0, 1,...|
|[{SMS_SLP., 0, 1,...|
…… …… ……
+--------------------+
only showing top 20 rows


scala> dns.show(1,0,true)
-RECORD 0------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 dnsRecordList | [{pagead2.googlesyndication.com., 0, 1, 0, 0, 0, 0, 37404, [], [], [], [], [], [], [], [], []}, {pagead2.googlesyndication.com., 254, 1, 1, 0, 0, 1, 37404, [180.163.150.166], [], [], [], [], [], [], [], []}]
only showing top 1 row

        可见Dns数据确实是读进来了,下一步就是怎么把它展开以方便分析了。

       四、操作复杂的spark类型

        上文中可以看到,经由ipfix读进来的数据其结构是比较复杂的,包括了Array和Struct等scala类型的嵌套,这些类型难以被简单地转成rdd进行处理,需要掌握一些展平数据的方法才能快乐的使用。

        例如,上文我们已经打印了dnsRecordList的schema结构,这里还可以看看http的主要IE的schema结构:

scala> val http = all.filter("silkAppLabel='80'").select('httpHostList,
     | 'httpGetList,
     | 'httpResponseList,
     | 'httpViaList,
     | 'httpContentTypeList,
     | 'httpContentLengthList)
http: org.apache.spark.sql.DataFrame = [httpHostList: array<string>, httpGetList: array<string> ... 4 more fields]

scala> http.count
res2: Long = 790

scala> http.show(1)
+--------------------+-----------+----------------+-----------+--------------------+---------------------+
|        httpHostList|httpGetList|httpResponseList|httpViaList| httpContentTypeList|httpContentLengthList|
+--------------------+-----------+----------------+-----------+--------------------+---------------------+
|[pr.x.hub.sandai....|   [POST /]|        [200 OK]|         []|[application/octe...|            [192, 36]|
+--------------------+-----------+----------------+-----------+--------------------+---------------------+
only showing top 1 row

scala> http.printSchema
root
 |-- httpHostList: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- httpGetList: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- httpResponseList: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- httpViaList: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- httpContentTypeList: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- httpContentLengthList: array (nullable = true)
 |    |-- element: string (containsNull = true)

        1.纵向打开

        纵向打开是比较常见的打开方式,就是当一行数据中存在嵌套时,将数据展开成多行。这在dns记录中比较常见,比如在一条dns应答中得到了多个options结果,为了分析方便起见,需要将options结果展开为多行,一行一个,而且每行都需要保持dns应答中的相关信息。所以纵向打开也会显著增加数据的冗余量。

        (1)explode方法

        explode函数是在Spark 3.4.1 ScalaDoc - org.apache.spark.sql.functions中定义的,官方的说法,是从给定的array/map列创建新的数据行。

CENTOS上的网络安全工具(二十七)SPARK+NetSA Security Tools容器化部署(3),大数据,spark,网络安全

       使用explode,如果被炸对象是空值,会导致整行数据被删掉。所以,如果还希望保留被炸对象为空值的数据行,可以使用explode_outer方法。

        (2)单个array打开

        以dnsRecordList为例。 打印出第一行,大概能够猜出来,dnsRecordList是一个包含struct类型对象的array数组,这个array实际就是以这个struct为条目包装的dns查询应答,一条一个struct。

scala> dns.show(1,0,true)
-RECORD 0------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 dnsRecordList | [{pagead2.googlesyndication.com., 0, 1, 0, 0, 0, 0, 37404, [], [], [], [], [], [], [], [], []}, {pagead2.googlesyndication.com., 254, 1, 1, 0, 0, 1, 37404, [180.163.150.166], [], [], [], [], [], [], [], []}]
only showing top 1 row

        由上面给出的官方说明,explode输入一个列,输出也是列,所以可以用select来选择explode炸出的列。 炸成多行后是这个样子:

scala> dns.select(explode('dnsRecordList).as("dnsRecord")).show
+--------------------+
|           dnsRecord|
+--------------------+
|{pagead2.googlesy...|
|{pagead2.googlesy...|
|{SMS_SLP., 0, 1, ...|
|{SMS_SLP., 0, 1, ...|
…… …… …… ……
+--------------------+
only showing top 20 rows

        可以看到,那个代表array的中括号没了,中括号中如果有多个struct的话,就会被炸成多行的形式。

        (3)多个array打开

        多个array打开和单个是一样的,但要注意空值的处理,所以比较合适的是使用explode_outer;另外一个麻烦的问题,是在一个select语句中不能使用多次explode,所以需要使用withColumn来配合,代码上感觉冗余,不太舒服。  

scala> http.withColumn("Host",explode_outer('httpHostList)).
     |      | withColumn("Get",explode_outer('httpGetList)).
     |      | withColumn("Response",explode_outer('httpResponseList)).
     |      | withColumn("ContentType",explode_outer('httpContentTypeList)).
     |      | withColumn("ContentLen",explode_outer('httpContentLengthList)).
     |      | select('Host,'Get,'Response,'ContentType,'ContentLen).show
+--------------------+--------------------+----------------+--------------------+----------+
|                Host|                 Get|        Response|         ContentType|ContentLen|
+--------------------+--------------------+----------------+--------------------+----------+
|pr.x.hub.sandai.n...|              POST /|          200 OK|application/octet...|       192|
|pr.x.hub.sandai.n...|              POST /|          200 OK|application/octet...|        36|
|pr.x.hub.sandai.n...|              POST /|          200 OK|application/octet...|       192|
|pr.x.hub.sandai.n...|              POST /|          200 OK|application/octet...|        36|
|pr.x.hub.sandai.n...|              POST /|          200 OK|application/octet...|       192|
|pr.x.hub.sandai.n...|              POST /|          200 OK|application/octet...|        36|
|pr.x.hub.sandai.n...|              POST /|          200 OK|application/octet...|       192|
|pr.x.hub.sandai.n...|              POST /|          200 OK|application/octet...|        36|
…………………………
+--------------------+--------------------+----------------+--------------------+----------+
only showing top 20 rows

        炸开的结果,可以明显感觉到数据的冗余量上来了。对比一下原始的数据: 

scala> http.select('httpHostList,'httpGetList,'httpResponseList,'httpContentTypeList,'httpContentLengthList).show
+--------------------+--------------------+--------------------+--------------------+---------------------+
|        httpHostList|         httpGetList|    httpResponseList| httpContentTypeList|httpContentLengthList|
+--------------------+--------------------+--------------------+--------------------+---------------------+
|[pr.x.hub.sandai....|            [POST /]|            [200 OK]|[application/octe...|            [192, 36]|
|[pr.x.hub.sandai....|            [POST /]|            [200 OK]|[application/octe...|            [192, 36]|

        对比一下原始的数据,可以看到这主要是因为Contentlength被炸开造成的;所以,观察数据的时候,谨慎选择炸开的列还是很有必要的。 

        2. 横向打开

        横向打开,就是将复杂数据类型打开成多列。需要注意到对于表格来说,行数是可以随意增长的,这也是纵向打开相对容易的原因,只要接受数据冗余就好了;但是列数,就不是能够随意打开的了,毕竟表头不能随着数据动态增长——那就太自由了。所以,根据数据类型的不同,横向展开的方式也会不同。

        (1)Struct类型横向打开

        如果数据类型是Struct,也就暗示着实际复杂类型包含的子要素的个数是恒定的,相对来说,展成多列也就容易接受。

        比如dnsRecordList里面的这个struct,通过printSchema我们已经知道其数据结构,包含dnsName、dnsTTL等下级元素。只需要使用select(“structname.*”)就可以解决问题。

scala> dns.select(explode('dnsRecordList).as("RecordList")).select("RecordList.*").show
+--------------------+------+---------+----------------+----------------+---------------+----------+-----+-----------------+--------------------+--------+-----+-----+------+------+--------------------+------+
|             dnsName|dnsTTL|dnsRRType|dnsQueryResponse|dnsAuthoritative|dnsResponseCode|dnsSection|dnsId|             dnsA|             dnsAAAA|dnsCNAME|dnsMX|dnsNS|dnsPTR|dnsTXT|              dnsSOA|dnsSRV|
+--------------------+------+---------+----------------+----------------+---------------+----------+-----+-----------------+--------------------+--------+-----+-----+------+------+--------------------+------+
|pagead2.googlesyn...|     0|        1|               0|               0|              0|         0|37404|               []|                  []|      []|   []|   []|    []|    []|                  []|    []|
|pagead2.googlesyn...|   254|        1|               1|               0|              0|         1|37404|[180.163.150.166]|                  []|      []|   []|   []|    []|    []|                  []|    []|
|            SMS_SLP.|     0|        1|               0|               0|              0|         0|14041|               []|                  []|      []|   []|   []|    []|    []|                  []|    []|
|            SMS_SLP.|     0|        1|               0|               0|              0|         0|14041|               []|                  []|      []|   []|   []|    []|    []|                  []|    []|

        如果只需要查看有限几个下级元素,也可以直接指定元素进行选择:

scala> dns.select(explode('dnsRecordList).as("RecordList")).select("RecordList.dnsName","RecordList.dnsA").show
+--------------------+-----------------+
|             dnsName|             dnsA|
+--------------------+-----------------+
|pagead2.googlesyn...|               []|
|pagead2.googlesyn...|[180.163.150.166]|
|            SMS_SLP.|               []|

        (2)Array类型横向展开

        如果数据类型是Array,就不携带任何下级元素的表头信息了,而且还可能因为数据带来不同array大小,这种展开就比较麻烦了:

scala> import org.apache.spark.sql.functions.size
import org.apache.spark.sql.functions.size

scala> http.select('httpHostList).withColumn("listSize",size('httpHostList)).groupBy('listSize).count.sort('listSize.desc).show
+--------+-----+
|listSize|count|
+--------+-----+
|       3|    2|
|       2|    1|
|       1|  106|
|       0|  681|
+--------+-----+

        过滤一下看看,确实有host为3的情况:

scala> val tmp = http.select('httpHostList).withColumn("size",size('httpHostList)).filter("size='3'")
tmp: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [httpHostList: array<string>, size: int]

scala> tmp.show
+--------------------+----+
|        httpHostList|size|
+--------------------+----+
|[ctldl.windowsupd...|   3|
|[1d.tlu.dl.delive...|   3|
+--------------------+----+

        对于array类型,需要在dataframe中选择的话,直接使用“(索引)”操作符就行。所以,比较笨的方法,就是手动展开:

scala> http.select('httpHostList(0).as("host0"),'httpHostList(1).as("host1"),'httpHostList(2).as("host2")).show
+--------------------+--------------------+--------------------+
|               host0|               host1|               host2|
+--------------------+--------------------+--------------------+
|pr.x.hub.sandai.n...|                null|                null|
|pr.x.hub.sandai.n...|                null|                null|
|extshort.weixin.q...|                null|                null|
|      42.187.182.123|                null|                null|
|ctldl.windowsupda...|ctldl.windowsupda...|ctldl.windowsupda...|
|      x1.c.lencr.org|                null|                null|
……………………

        自动一点,可以考虑采取如下的写法,其中“:_*”表示将对应的参数序列化后使用:

scala> http.select( $"httpGetList" +: (0 until 3).map(i => 'httpHostList(i).alias(s"host$i")):_* ).show
+--------------------+--------------------+--------------------+--------------------+
|         httpGetList|               host0|               host1|               host2|
+--------------------+--------------------+--------------------+--------------------+
|            [POST /]|pr.x.hub.sandai.n...|                null|                null|
|            [POST /]|pr.x.hub.sandai.n...|                null|                null|
|[POST /mmtls/0000...|extshort.weixin.q...|                null|                null|
|[POST /mmtls/0000...|      42.187.182.123|                null|                null|
|[GET /msdownload/...|ctldl.windowsupda...|ctldl.windowsupda...|ctldl.windowsupda...|
………………
+--------------------+--------------------+--------------------+--------------------+
only showing top 20 rows

PS

        最后,推荐几篇不错的文章,帮助我解决了scala小白过程中遭遇的问题:

        1. 数组列转多列:

Spark DataFrame数组列转多列 - 知乎 (zhihu.com)

        2. 操作复杂的spark类型:

 Spark之处理复杂数据类型(Struct、Array、Map、JSON字符串等)_spark string 转成json array_大数据翻身的博客-CSDN博客

        3. explode的本质是什么: 

 Spark Sql中的Map和flatMap_spark flatmap_数仓白菜白的博客-CSDN博客

        4. 带表达式的select: 

 Spark---DataFrame学习(二)——select、selectExpr函数_stan1111的博客-CSDN博客

        5. 奇怪的Scala操作符

scala中:_*的使用和scala中的:: , +:, :+, :::, +++ 等操作_scala中*_爱学习的孙同学的博客-CSDN博客文章来源地址https://www.toymoban.com/news/detail-533284.html

到了这里,关于CENTOS上的网络安全工具(二十七)SPARK+NetSA Security Tools容器化部署(3)的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • CENTOS上的网络安全工具(八)Scapy协议解析

            一般来说,使用诸如Arkima、Suricata等现成的开源网络安全工具已经可以满足大多数需求,但需求总是无止境的。当我们需要关注网络通信中一些奇奇怪怪的行为的时候,常规工具给出的数据特征常常无法满足我们特立独行的需求,这个时候往往需要我们自己进行网络

    2024年02月02日
    浏览(49)
  • CENTO OS上的网络安全工具(二十二)Spark HA swarm容器化集群部署

            在Hadoop集群swarm部署的基础上,我们更进一步,把Spark也拉进来。相对来说,在Hadoop搞定的情况下,Spark就简单多了。          之所以把这件事还要拿出来讲……当然是因为掉过坑。我安装的时候,hadoop是3.3.5,所以spark下载这个为hadoop 3.3 预编译的版本就好——一

    2024年02月05日
    浏览(55)
  • 【网络安全 --- 工具安装】Centos 7 详细安装过程及xshell,FTP等工具的安装(提供资源)

    分享一个非常详细的网络安全笔记,是我学习网安过程中用心写的,可以点开以下链接获取: 超详细的网络安全笔记 VMware虚拟机的安装教程如下,如没有安装,可以参考这篇博客安装(提供资源) 【网络安全 --- 工具安装】VMware 16.0 详细安装过程(提供资源)-CSDN博客 【网

    2024年02月08日
    浏览(44)
  • 网络安全学习(十七)VlAN

    vlan是二层技术,路由器上不能配置vlan表 主要应用在交换机上 广播的危害:增加网络/终端负担,传播病毒,安全性 路由器也可以隔离,但有缺点         成本高         不灵活 1.目的         • 划分广播域         • 增强网络安全性         • 简化了网络的管理 2.v

    2024年02月05日
    浏览(57)
  • 网络安全入门学习第十七课——PHP数组

    索引数组是指 键名为整数 的数组。 默认 情况下,索引数组的 键名是从0开始 ,并依次递增。它主要适用于利用位置(0、1、2……)来标识数组元素的情况。另外,索引数组的键名也可以自己指定。 关联数组是指 键名为字符串 的数组。通常情况下,关联数组元素的“键”和

    2024年02月09日
    浏览(55)
  • 网络安全入门学习第十七课——PHP表单交互

    表单的主要功能:就是在网页上用于输入信息的区域,收集用户输入的信息,并将其提交给后端的服务器进行处理,实现用户与服务器的交互。 例如:购物结算、信息搜索等都是通过表单实现的。 一个完整的表单是由表单域和表单控件组成的。其中,表单域由form标记定义,

    2024年02月12日
    浏览(49)
  • 【送书福利-第十七期】用“价值”的视角来看安全:《构建新型网络形态下的网络空间安全体系》

    😎 作者介绍:我是程序员洲洲,一个热爱写作的非著名程序员。CSDN全栈优质领域创作者、华为云博客社区云享专家、阿里云博客社区专家博主、前后端开发、人工智能研究生。公粽号:程序员洲洲。 🎈 本文专栏:本文收录于洲洲的《送书福利》系列专栏,该专栏福利多多

    2024年02月10日
    浏览(46)
  • 网络安全进阶学习第二十一课——XXE

    XXE(XML External Entity,XML) 外部实体 注入攻击。 — — 攻击者通过构造 恶意的外部实体 ,当解析器解析了包含“恶意”外部实体的XML类型文件时,便会导致被XXE攻击。XXE漏洞主要由于危险的外部实体引用并且未对外部实体进行敏感字符的过滤,从而可以造成命令执行,目录遍

    2024年02月06日
    浏览(62)
  • 网络安全进阶学习第二十一课——XML介绍

    XML(eXtensible Markup Language),可扩展标记语言,是一种标记语言,使用简单标记描述数据;(另一种常见的标记语言是HTML) XML是一种非常灵活的语言, 没有固定的标签,所有标签都可以自定义 ; 通常 XML被用于信息的传递和记录 ,因此,xml经常被用于充当配置文件。如果把

    2024年02月06日
    浏览(48)
  • 网络安全进阶学习第二十课——CTF之文件操作与隐写

    ------ 当文件没有文件扩展名,或者具有文件扩展名但无法正常打开时,可以根据识别到的文件类型进行修改文件扩展名,从而使文件能够正常打开。 使用场景:不知道后缀名,无法打开文件。 格式: file myheart 这里就识别到是一个PCAP的流量包 ------ 通过WinHex程序可以查看文件

    2024年02月07日
    浏览(43)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包