CENTO OS上的网络安全工具(二十三)VSCODE SPARK 容器式编程环境构建

这篇具有很好参考价值的文章主要介绍了CENTO OS上的网络安全工具(二十三)VSCODE SPARK 容器式编程环境构建。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

       在vscode上使用maven构建spark的scala编程环境,很大程度上需要不断地从网络上下载各种依赖和插件,而且这一过程复杂而不可控。下面这段,是整个安装过程中/root目录下不断增加的内容。

[root@d7ff8f448a0d /]# cd /root
[root@d7ff8f448a0d ~]# ls -a
.  ..  .bash_logout  .bash_profile  .bashrc  .cshrc  .ssh  .tcshrc  .vscode-server  anaconda-ks.cfg  hadoop  init-vscode.sh  m2repo  maven  spark
[root@d7ff8f448a0d ~]# ls -a
.   .bash_history  .bash_profile  .cshrc  .ssh     .vscode-server   hadoop          m2repo  program
..  .bash_logout   .bashrc        .pki    .tcshrc  anaconda-ks.cfg  init-vscode.sh  maven   spark
[root@d7ff8f448a0d ~]# ls -a
.   .bash_history  .bash_profile  .cache  .dotnet  .ssh     .vscode-server   hadoop          m2repo  program
..  .bash_logout   .bashrc        .cshrc  .pki     .tcshrc  anaconda-ks.cfg  init-vscode.sh  maven   spark
[root@d7ff8f448a0d ~]# ls -a
.   .bash_history  .bash_profile  .cache  .dotnet  .ssh     .vscode-server   hadoop          m2repo  program
..  .bash_logout   .bashrc        .cshrc  .pki     .tcshrc  anaconda-ks.cfg  init-vscode.sh  maven   spark
[root@d7ff8f448a0d ~]# ls -a
.   .bash_history  .bash_profile  .bloop  .config  .dotnet  .pki     .ssh     .vscode-server   hadoop          m2repo  program
..  .bash_logout   .bashrc        .cache  .cshrc   .m2      .redhat  .tcshrc  anaconda-ks.cfg  init-vscode.sh  maven   spark
[root@d7ff8f448a0d ~]# ls -a
.   .bash_history  .bash_profile  .bloop  .config  .dotnet  .m2   .redhat  .tcshrc         anaconda-ks.cfg  init-vscode.sh  maven    spark
..  .bash_logout   .bashrc        .cache  .cshrc   .local   .pki  .ssh     .vscode-server  hadoop           m2repo          program
[root@d7ff8f448a0d ~]# 

        另一方面,vscode的离线安装也需要匹配容器中code与外部code的版本。所以完全构造一个不依靠网络随时可启动、还适配任何主机vscode的scala环境着实困难,甚至直接从构建完成的容器导出的镜像,都很难保证再次导入后可用。所以最终我还是放弃了基于Dockerfile构建离线可用的容器环境,转为在所有环境构建完成后,通过export继而import导入最终的镜像。

        一、编译容器的Dockerfile

        先准备一个安装了各种语言开发包的容器,以便以后使用vscode连接。当然,为了能够进行hadoop和spark编程,我们把这两个的单机安装过程也跑一遍。装上ssh,这样就可以从vscode连接了。

FROM centos:centos7

#口令参数需要从外部传入
ARG password

#构造更改了清华镜像源的centos7镜像
RUN sed -e 's|^mirrorlist=|#mirrorlist=|g' \
        -e 's|^#baseurl=http://mirror.centos.org/centos|baseurl=https://mirrors.tuna.tsinghua.edu.cn/centos|g' \
        -i.bak \
        /etc/yum.repos.d/CentOS-*.repo\
 && yum clean all

ADD hadoop-3.3.5.tar.gz /root
ADD spark-3.4.0-bin-hadoop3.tgz /root

#拷贝SSH免密登录的相关密钥文件,目前只放置了15个
COPY .ssh /root/.ssh
#拷贝所有待安装软件
COPY ./rpm /root/rpm/.
#拷贝初始化脚本
COPY ./init-vscode.sh /root/.

#构造可swarm一健部署的SSH免密登录镜像-------------------------------------------------------------
#设置初始化脚本的可执行属性
#更改私钥文件属性为仅root用户可见,否则ssh会拒绝执行
#需要为root用户设置口令,否则免密登录还会弹出框,docker镜像默认没有密钥
#安装openssh,并生成服务端密钥,更改强制指纹验证配置项为no,以免弹出指纹询问框
RUN    chmod 0400 /root/.ssh/id_rsa \
 &&    chmod 0600 /root/.ssh/authorized_keys \
 &&    echo ${password} | passwd --stdin root \
 &&    yum localinstall /root/rpm/*.rpm -y \
 &&    /sbin/sshd-keygen \
 &&    echo -e '\nHost *\nStrictHostKeyChecking no\nUserKnownHostsFile=/dev/null' >> etc/ssh/ssh_config \
 &&    rm /root/rpm -rf \
 &&    chmod +x /root/init-vscode.sh \
 &&    mv /root/hadoop-3.3.5 /root/hadoop \
 &&    echo -e "export HADOOP_HOME=/root/hadoop\nexport PATH=\$PATH:\$HADOOP_HOME/bin\n" >> /root/.bashrc\
 &&    mv /root/spark-3.4.0-bin-hadoop3 /root/spark\
 &&    echo -e "export SPARK_HOME=/root/spark \nexport PATH=\$PATH:\$SPARK_HOME/bin">>/root/.bashrc\
 &&    echo -e "export JAVA_HOME=/usr/java/latest">>/root/.bashrc \
 &&    echo -e "export SCALA_HOME=/usr/share/scala">>/root/.bashrc
#默认启动脚本
CMD ["/root/init-vscode.sh"]

        所以初始化脚本的主要作用是把sshd启动起来:

#! /bin/bash

source ~/.bashrc

/sbin/sshd -D &

tail -f /dev/null

         二、 远程VSCODE安装

        在windows本地安装vscode之类的啥就不提了,直接从安装好了开始:

        1. 远程SSH连接容器

        只需要在创建ssh连接的时候填入 ssh root@pighost1 -p9025即可

        仅仅是比原先多了一个端口设定,以便连接到容器绑定的端口上。

vscode 容器开发,大数据,hadoop,spark

         2. 插件安装

        安装插件主要是2个大包。

        (1)Extension Pack for java

vscode 容器开发,大数据,hadoop,spark

        (2)Extension Pack for scala        

        不过这个包会捆绑一些我们当下其实并不需要的一些东西,而且在没有注册的时候还经常报错,很是烦人,所以我们也可以选择手动安装必要的插件,主要包括: 

        Scala(Metals)

vscode 容器开发,大数据,hadoop,spark

        Scala Syntax(official)

vscode 容器开发,大数据,hadoop,spark

        Scala Language Server 

vscode 容器开发,大数据,hadoop,spark

         Scala Snippets

vscode 容器开发,大数据,hadoop,spark

         亲测这4个装完完全够scala开发了。

        3. Metals环境安装

        此时点击Metals图标,会开始安装,等待就好 

vscode 容器开发,大数据,hadoop,spark

        里里外外会下载还几个G的东西,所以等待时间会比较长。下载完成后会警告说没有编译工具对象,大意就是指maven的那个pom文件。没关系,还没构建呢。

        4. 使用Mave创建一个Scala工程

vscode 容器开发,大数据,hadoop,spark

        构建scalameta/maven-scala-seed.g8,

vscode 容器开发,大数据,hadoop,spark

        点击program(之前我们建的文件夹),再点击ok后,会要求给一个名字,默认是maven-scala-seed。

        然后安静等待,视网络情况,可能得一会儿生成器才会开始工作…这个“一会儿”可能长达一分钟,所以一定要选择 相信 …注意看网络速率那里(如果有装相关的监视软件或者杀毒软件小插件之类的话)其实一直在下载,在没有下载完成之前,这个自动生成的操作是不会开始的。

        生成结束后,vscode会询问是否在新窗口中打开项目,选择yes(这样项目会跳到pom文件所在的目录下,从而不会导致生成可执行文件时遭遇no target错误)。

vscode 容器开发,大数据,hadoop,spark        关闭原来的窗口,使用新打开的窗口就好。此时项目的目录结构应该时这样的,可以看到pom文件已经出现了:

vscode 容器开发,大数据,hadoop,spark

        这时一般vscode会很贴心的问要不要import build一下。此时千万不要import!!!此时千万不要import!!!此时千万不要import!!!

         否则可能会由于java版本不匹配造成某些地方的配置错误,我也不知道错在哪(估计是在metals的java环境同步设置中,懒得找了)。pom文件中默认配置jdk是1.8版本的。由于我们使用的是jdk 11,所以pom文件的这里需要改动一下:

vscode 容器开发,大数据,hadoop,spark

         1即可:

vscode 容器开发,大数据,hadoop,spark

         然后这次VS从的仍然会询问是否import,这次就可以选择import build了

vscode 容器开发,大数据,hadoop,spark

import过程中,maven会运行bloopinstall,这个也需要花很长时间。 

vscode 容器开发,大数据,hadoop,spark

        最后在输出窗口中会看到SUCCESS。但是不要以为这就是结束。

vscode 容器开发,大数据,hadoop,spark

        然后就是需要继续观察下载速率,等待下载……总之装这个一路都很玄学,因为有些下载在输出窗口里面是能看到的(如果选择了观察logs),有些下载操作在窗口是什么都看不到的——如果你以为什么动静都看不到就是装完了而试图区执行代码的时候,一般会收到internal error。

        信则有,不信就没有。我们要坚定地相信vscode会帮助我们搞定一切的(=反正不信也没有别的什么办法……)。直到所有配置工作搞定之后,应该能发现示例代码可执行的类上方出现了“run | debug”这样的图标,代表vscode准备好了。

vscode 容器开发,大数据,hadoop,spark

         如果怎么都等不到这个结果,不妨直接运行一下代码,当然会收到Internal error的错误。不过这次运行似乎会刺激metals继续工作。因为不久后右下角会出现indexing以及scalafix工作的字样,完成后一般metals的packages窗口内会出现内容,而不是那两个找不到scala程序及要求创建的按钮:

vscode 容器开发,大数据,hadoop,spark

         如果是jdk11的环境(我也不知道1.8会不会有类似问题……因为下面这一步我没有配置的时候也能成功,但有时候不配就是不成功:)

        在帮助栏中执行run doctor:

vscode 容器开发,大数据,hadoop,spark

        得到如下界面:

vscode 容器开发,大数据,hadoop,spark

 错误的图木有了,正确的图示如上,总之这个semanticdb插件没有的话,就不行,还得手动安装一下。——到maven仓库里面找到,在pom文件中添加如下即可:

    <!-- https://mvnrepository.com/artifact/org.scalameta/semanticdb -->
    <dependency>
        <groupId>org.scalameta</groupId>
        <artifactId>semanticdb_2.12</artifactId>
        <version>4.1.6</version>
    </dependency>

         此时执行程序,应该结果就不会有问题了:

vscode 容器开发,大数据,hadoop,spark          

        5. 导入Spark依赖

vscode 容器开发,大数据,hadoop,spark

         对于Spark,需要向pom文件中导入对应的依赖,如上。

        最终的pom文件:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.example</groupId>
  <artifactId>testspark</artifactId>
  <version>1.0-SNAPSHOT</version>
  <name>testspark</name>
  <description>A minimal Scala project using the Maven build tool.</description>
  <licenses>
    <license>
      <name>My License</name>
      <url>http://....</url>
      <distribution>repo</distribution>
    </license>
  </licenses>

  <properties>
    <maven.compiler.source>11</maven.compiler.source>
    <maven.compiler.target>11</maven.compiler.target>
    <encoding>UTF-8</encoding>
    <scala.version>2.13.11</scala.version>
  </properties>

  <dependencies>
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>2.13.11</version>
    </dependency>

    <!-- https://mvnrepository.com/artifact/org.scalameta/semanticdb -->
    <dependency>
        <groupId>org.scalameta</groupId>
        <artifactId>semanticdb_2.12</artifactId>
        <version>4.1.6</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-core -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.13</artifactId>
        <version>3.4.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-sql -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.13</artifactId>
        <version>3.4.0</version>
        <scope>provided</scope>
    </dependency>


    <!-- Test -->
    <dependency>
      <groupId>org.scalameta</groupId>
      <artifactId>munit_2.13</artifactId>
      <version>0.7.29</version>
      <scope>test</scope>
    </dependency>
  </dependencies>

  <build>
    <sourceDirectory>src/main/scala</sourceDirectory>
    <testSourceDirectory>src/test/scala</testSourceDirectory>
    <plugins>
      <plugin>
        <!-- see http://davidb.github.com/scala-maven-plugin -->
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.3.2</version>
        <configuration></configuration>
        <executions>
          <execution>
            <goals>
              <goal>compile</goal>
              <goal>testCompile</goal>
            </goals>
            <configuration>
              <args></args>
            </configuration>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>
</project>

        然后就是继续等待vscode下载。当然,一般来说这些java包下载存储在 ~/.m2下。我们也可以直接从mave repository上下载java包放到对应目录下即可。

        放上经典的分词并且计数代码,执行:

vscode 容器开发,大数据,hadoop,spark

         结果可以到容器里面观察一下:

vscode 容器开发,大数据,hadoop,spark

        6. Scala 和 Java 混合编程

         Scala本身就是从Java生长出来的,所以在scala环境中使用java代码,或者在java中使用scala代码应该说是很直接的需求。要满足这样混合编程的条件,需要在pom文件的plugin中添加插件:

      <!--为混合编程添加-->
      <plugin>
        <groupId>org.codehaus.mojo</groupId>
        <artifactId>build-helper-maven-plugin</artifactId>
        <executions>
          <execution>
            <id>add-source</id>
            <phase>generate-sources</phase>
            <goals>
              <goal>add-source</goal>
            </goals>
            <configuration>
              <sources>
                <source>src/main/scala</source>
              </sources>
            </configuration>
          </execution>
          <execution>
            <id>add-test-source</id>
            <phase>generate-sources</phase>
            <goals>
              <goal>add-test-source</goal>
            </goals>
            <configuration>
              <sources>
                <source>src/test/scala</source>
              </sources>
            </configuration>
          </execution>
        </executions>
      </plugin>

        然后直接在工作目录中增加java文件:

vscode 容器开发,大数据,hadoop,spark

         就可以在scala中调用了:

vscode 容器开发,大数据,hadoop,spark

         结果如下:       vscode 容器开发,大数据,hadoop,spark

        三、导出镜像

        1. 使用docker export导出当前的容器

[root@pighost share]# docker ps -a
CONTAINER ID   IMAGE        COMMAND                  CREATED         STATUS         PORTS                                   NAMES
76d0b875683b   pig/vscode   "/root/init-vscode.sh"   8 minutes ago   Up 8 minutes   0.0.0.0:9025->22/tcp, :::9025->22/tcp   pig
[root@pighost share]# docker export -o pigvscode.tar pig

        2. 使用dockers import构建镜像

[root@pighost share]# docker import pigvscode.tar pig/vscode:scala
sha256:ed572d045eba3c481dc557b2646958548a3e6f469ece866c5605aaef3cdccdc3
[root@pighost share]# docker images
REPOSITORY   TAG       IMAGE ID       CREATED         SIZE
pig/vscode   scala     ed572d045eba   9 seconds ago   4.49GB
centos       centos7   eeb6ee3f44bd   21 months ago   204MB

        3. 基于镜像启动容器

        由于我们是直接使用容器构建的镜像,所以初始化脚本不会在容器载入时执行了,需要我们在载入过程中显示指定。(当然,这样我们以后就无法在swarm中使用了。如果需要,应该基于该镜像,重新使用Dockerfile构建一个新的镜像并指明CMD就好了。)        

[root@pighost share]# docker run -itd --name pig -p 9025:22 pig/vscode:scala /root/init-vscode.sh
37b6651fe96fe58ae17eda584310e83778a6d3d012ba55584a53a42c1ecd7065
[root@pighost share]# 

         然后启动vscode直接进行远程连接就可以了。

        四、Spark-shell交互式分析

        使用scala编程,有很多事可做。当然,在开始编程之前,使用spark-shell交互式环境,体验一下scala的使用效果,也是不错的。直接在命令行中执行spark-shell就可以了:

        1. 读csv文件:

        val df = spark.read.option("header",true).csv("/sample/DOS/*.csv")

        或

        val df = spark.read.option("header",true).format("csv").load("/sample/DOS/*.csv")

scala> val df = spark.read.option("header",true).csv("/sample/DOS/*.csv")
df: org.apache.spark.sql.DataFrame = [sid: string, message_type: string ... 23 more fields]

        2. 查看读入文件的统计信息

        df.count

        df.columns

scala> df.count
res0: Long = 155395                                                             

scala> df.columns
res1: Array[String] = Array(sid, message_type, attack_type, src_ip, src_port, src_mac, src_country, src_province, src_city, src_location, dest_ip, dest_port, dest_mac, dest_country, dest_province, dest_city, dest_location, device_id, application_protocol, protocol, ip_version, timestamp, risk_level, length, message)

        3. 查看读入数据的模式信息        

        df.schema

scala> df.schema
res2: org.apache.spark.sql.types.StructType = StructType(StructField(sid,StringType,true),StructField(message_type,StringType,true),StructField(attack_type,StringType,true),StructField(src_ip,StringType,true),StructField(src_port,StringType,true),StructField(src_mac,StringType,true),StructField(src_country,StringType,true),StructField(src_province,StringType,true),StructField(src_city,StringType,true),StructField(src_location,StringType,true),StructField(dest_ip,StringType,true),StructField(dest_port,StringType,true),StructField(dest_mac,StringType,true),StructField(dest_country,StringType,true),StructField(dest_province,StringType,true),StructField(dest_city,StringType,true),StructField(dest_location,StringType,true),StructField(device_id,StringType,true),Stru...


scala> df.printSchema
root
 |-- sid: string (nullable = true)
 |-- message_type: string (nullable = true)
 |-- attack_type: string (nullable = true)
 |-- src_ip: string (nullable = true)
 |-- src_port: string (nullable = true)
 |-- src_mac: string (nullable = true)
……………………
 |-- device_id: string (nullable = true)
 |-- application_protocol: string (nullable = true)
 |-- protocol: string (nullable = true)
 |-- ip_version: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- risk_level: string (nullable = true)
 |-- length: string (nullable = true)
 |-- message: string (nullable = true)

        刚导入进来的全是string格式

        4. 列操作

        (1)选择列

scala> val smalldf = df.select(col("src_ip"),col("src_port"),col("dest_ip"),col("dest_port"),col("message"))
scala> smalldf.show()
+---------------+--------+---------------+---------+--------------------+
|         src_ip|src_port|        dest_ip|dest_port|             message|
+---------------+--------+---------------+---------+--------------------+
|  192.168.11.20|   38021|172.0.1.68     |      123|FRN DOS Possible ...|
………………………………
|  192.168.11.20|   47430|172.0.1.68     |      123|FRN DOS Possible ...|
+---------------+--------+---------------+---------+--------------------+
only showing top 20 rows

smalldf: Unit = ()

        选择列的简单方式:


scala> lines.select('src_ip).show()
+---------------+                                                               
|         src_ip|
+---------------+
|  192.168.1.46|
|  192.168.1.46|
|  192.168.1.46|
…………………………
only showing top 20 rows

         (2)增加列

scala> lines.select('src_ip).withColumn("count",lit(1)).show
+---------------+-----+
|         src_ip|count|
+---------------+-----+
|  192.168.1.111|    1|
|  192.168.1.112|    1|

        5. 去重

        去重只有distinct可用,如果是只正对一个字段的话,得把这个字段选出来

scala> lines.count
res9: Long = 155395                                                             

scala> lines.distinct.count
23/05/24 02:01:31 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
res10: Long = 154022                                                            

scala> lines.select('src_ip).distinct.count
res11: Long = 1454                                                              

scala> 

        可以看出效果截然不同

        6. 过滤

scala> lines.count
res16: Long = 155395                                                            


scala> lines.filter("dest_port='123'").count
res17: Long = 97147


scala> lines.filter("src_ip='192.168.0.11' and dest_port='123'").count
res18: Long = 3202

        where 就是sql语句where后面得表达式

scala> lines.where('dest_ip.like("192%")).select('src_ip,'src_port,'dest_ip,'dest_port).show
+---------------+--------+---------------+---------+
|         src_ip|src_port|        dest_ip|dest_port|
+---------------+--------+---------------+---------+
|  192.168.1.11 |   38021|    172.18.0.13|      123|

        列语法糖 $ === =!=

scala> lines.filter($"protocol"==="tcp").select('src_ip,'src_port,'dest_ip,'dest_port,'protocol).show
+---------------+--------+--------------+---------+--------+
|         src_ip|src_port|       dest_ip|dest_port|protocol|
+---------------+--------+--------------+---------+--------+
|  192.168.1.112|   61841|    172.17.0.1|     2710|     tcp|

        7. 统计

scala> lines.groupBy('dest_ip).count().orderBy('count.desc).show()
+---------------+-----+                                                         
|        dest_ip|count|
+---------------+-----+
|   192.168.1.11|  273|
|   192.168.1.12|  148|
|   192.168.1.13|  142|

          8. 类型转换

        spark有3中主要类型,RDD、Dataframe和Dataset,各有各的优势,所以经常需要转换使用。

        (1)DataFrame转Dataset

scala> val fivetuples = lines.select('src_ip,'src_port,'dest_ip,'dest_port,'protocol)
fivetuples: org.apache.spark.sql.DataFrame = [src_ip: string, src_port: string ... 3 more fields]

scala> case class FIVETUPLE(src_ip:String,src_port:String,dest_ip:String,dest_port:String,protocol:String)
defined class FIVETUPLE

scala> val fset = fivetuples.as[FIVETUPLE]
fset: org.apache.spark.sql.Dataset[FIVETUPLE] = [src_ip: string, src_port: string ... 3 more fields]

        (2)DataSet转DataFrame

scala> val fdf = fset.toDF
fdf: org.apache.spark.sql.DataFrame = [src_ip: string, src_port: string ... 3 more fields]

scala> fdf.columns
res31: Array[String] = Array(src_ip, src_port, dest_ip, dest_port, protocol)

        (3)DataFrame转RDD

scala> val frdd = fdf.rdd
frdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[124] at rdd at <console>:23

        (4)RDD转DataFrame 

        转回来需要map一下

scala> val fdf2 = frdd.map(l=>FIVETUPLE(l(0).toString(),l(1).toString(),l(2).toString(),l(3).toString(),l(4).toString())).toDF
fdf2: org.apache.spark.sql.DataFrame = [src_ip: string, src_port: string ... 3 more fields]

        (5)RDD和DataSet互转

scala> val fset2 = frdd.map(l=>FIVETUPLE(l(0).toString(),l(1).toString(),l(2).toString(),l(3).toString(),l(4).toString())).toDS
fset2: org.apache.spark.sql.Dataset[FIVETUPLE] = [src_ip: string, src_port: string ... 3 more fields]

scala> fset2.rdd
res32: org.apache.spark.rdd.RDD[FIVETUPLE] = MapPartitionsRDD[129] at rdd at <console>:24

        9. 碰撞

         在交互式环境中进行碰撞操作,可以将需要碰撞的一个集合通过array或者list导出来。比如:

     (1) 获取array

scala> val ip11 = lines.where('dest_ip.like("%11")).select('dest_ip).collect()
ip11: Array[org.apache.spark.sql.Row] = Array([192.168.………………

        (2)获取List

scala> val ip11 = lines.where('dest_ip.like("%11")).select('dest_ip).collectAsList()
ip11: java.util.List[org.apache.spark.sql.Row] = [[192.168.……

        (3)转为可用的数组或列表

        但以上两种获得的都是Row类型,在碰撞程序中不好用。要获得程序中好用的List,需要转化为rdd,然后使用map方法将row中的元素提取出来:

scala> val ip11 = lines.where('dest_ip.like("%111")).distinct.select('dest_ip).rdd.map(r=>r(0)).collect()
ip11: Array[Any] = Array(192.168.0.111,………………

        可再使用toList转为List 

        (4)碰撞发现ip名单内的数据

scala> lines.where('src_ip.isin(ip11:_*)).show

        (5)碰撞排除ip名单内的数据

scala> lines.where(!'dest_ip.isin(ip11:_*)).select('dest_ip).show

      10. SQL操作

scala> val lines = spark.read.option("header",true).csv("/sample/DOS/*.csv")
lines: org.apache.spark.sql.DataFrame = [sid: string, message_type: string ... 23 more fields]

scala> lines.createOrReplaceTempView("mytable")

scala> spark.sql("select src_ip from mytable").show
+---------------+
|         src_ip|
+---------------+
|    192.168.1.1|

        (1)分组统计排序

scala> spark.sql("select dest_ip,dest_port,count(*) as count from mytable group by dest_port,dest_ip order by count desc").show
+---------------+---------+-----+                                               
|        dest_ip|dest_port|count|
+---------------+---------+-----+
|  192.168.1.111|      123|  240|
|  192.168.1.112|      123|  122|

        (2)对分组排序条件进行限制

scala> spark.sql("select dest_ip,dest_port,count(*) as count from mytable group by dest_port,dest_ip having count<20 order by count desc").show
+---------------+---------+-----+                                               
|        dest_ip|dest_port|count|
+---------------+---------+-----+
| 192.168.21.222|      111|   19|
| 192.168.21.222|      111|   19|
scala> spark.sql("select dest_ip,dest_port,count(*) as count from mytable where dest_ip like '173%' group by dest_port,dest_ip having count<20 order by count desc").show
+---------------+---------+-----+                                               
|        dest_ip|dest_port|count|
+---------------+---------+-----+
|173.112.111.111|      111|   19|
|173.112.111.112|      111|   19|

        (3)嵌套碰撞

scala> spark.sql("select src_ip from mytable where src_ip in (select dest_ip from mytable)").show
+---------------+
|         src_ip|
+---------------+
|       10.0.0.1|

        (4)仅显示5行:

scala> spark.sql("select src_ip,count(*) as count from mytable group by src_ip having count>100 limit 5").show
+---------------+-----+
|         src_ip|count|
+---------------+-----+
|    192.168.1.1|  726|
|    192.168.1.2| 2030|
|    192.168.1.3|  328|
|    192.168.1.4|  204|
|    192.168.1.5|  190|

        (5)去重

scala> spark.sql("select protocol from mytable").show
+--------+
|protocol|
+--------+
|     udp|
|     udp|
|     udp|
…………
|     udp|
|     udp|
+--------+
only showing top 20 rows


scala> spark.sql("select distinct protocol from mytable").show
+--------+
|protocol|
+--------+
|     tcp|
|     udp|
+--------+

        11. DataSet操作

        Dataset操作和Dataframe操作类似:

        过滤 分组聚合 排序

scala> fset.select('src_ip,'dest_ip).groupBy('src_ip).count.orderBy('count.desc).show
+---------------+-----+
|         src_ip|count|
+---------------+-----+
|       10.0.0.1|19720|
|       10.0.0.2| 8037|
|       10.0.0.3| 5778|

        可以map

scala> fset.map(l => (l.src_ip,1)).show
+---------------+---+
|             _1| _2|
+---------------+---+
|  192.168.10.11|  1|
|  192.168.10.12|  1|
|  192.168.10.12|  1|
|  192.168.10.11|  1|

        reduce 统计行数

scala> fset.map(l=>1).reduce(_+_)
res44: Int = 155395

scala> fset.count
res45: Long = 155395

        12. RDD操作:

        RDD的最大好处就是可以支持灵活的map-reduce,最重要的事可以支持K-V操作。

        按src ip过滤

scala> frdd.count
res53: Long = 155395

scala> frdd.filter(l => l(0)=="192.168.0.1").count
res54: Long = 3202

        转KV操作

scala> frdd.keyBy(l=>l(0)).first

        countByKey文章来源地址https://www.toymoban.com/news/detail-639431.html

scala> frdd.keyBy(l=>l(0)).countByKey

到了这里,关于CENTO OS上的网络安全工具(二十三)VSCODE SPARK 容器式编程环境构建的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • CENTOS上的网络安全工具(二十五)SPARK+NetSA Security Tools容器化部署(1)

                 YAF(Yet Another Flowmeter)是作为CERT NetSA安全工具套件的传感器部分存在的,支持输入实时数据流和PCAP文件,解析并输出流数据,或针对特定协议的深包检测元数据。目前,YAF在整个系统的作用如下图所示:         其中Pipeline和SiLK都存在好多年了,用起来也算

    2024年02月11日
    浏览(42)
  • CENTOS上的网络安全工具(八)Scapy协议解析

            一般来说,使用诸如Arkima、Suricata等现成的开源网络安全工具已经可以满足大多数需求,但需求总是无止境的。当我们需要关注网络通信中一些奇奇怪怪的行为的时候,常规工具给出的数据特征常常无法满足我们特立独行的需求,这个时候往往需要我们自己进行网络

    2024年02月02日
    浏览(49)
  • 网络安全进阶学习第十三课——SQL注入Bypass姿势

    1、like,rlike语句,其中rlike是正则 2、大于号,小于号 3、符号:为不等于的意思相当于!= 4、采用regexp函数 5、in 6、between 1、 substring、substrB 2、 locate(str1,str2) 返回str1字符串在str2里第一次出现的位置,没有则返回0; locate(str1,str2, pos ) 返回str1字符串在str2里pos(起始位置)出现

    2024年02月13日
    浏览(49)
  • 网络安全进阶学习第二十一课——XXE

    XXE(XML External Entity,XML) 外部实体 注入攻击。 — — 攻击者通过构造 恶意的外部实体 ,当解析器解析了包含“恶意”外部实体的XML类型文件时,便会导致被XXE攻击。XXE漏洞主要由于危险的外部实体引用并且未对外部实体进行敏感字符的过滤,从而可以造成命令执行,目录遍

    2024年02月06日
    浏览(62)
  • 网络安全进阶学习第二十一课——XML介绍

    XML(eXtensible Markup Language),可扩展标记语言,是一种标记语言,使用简单标记描述数据;(另一种常见的标记语言是HTML) XML是一种非常灵活的语言, 没有固定的标签,所有标签都可以自定义 ; 通常 XML被用于信息的传递和记录 ,因此,xml经常被用于充当配置文件。如果把

    2024年02月06日
    浏览(48)
  • 网络安全进阶学习第二十课——CTF之文件操作与隐写

    ------ 当文件没有文件扩展名,或者具有文件扩展名但无法正常打开时,可以根据识别到的文件类型进行修改文件扩展名,从而使文件能够正常打开。 使用场景:不知道后缀名,无法打开文件。 格式: file myheart 这里就识别到是一个PCAP的流量包 ------ 通过WinHex程序可以查看文件

    2024年02月07日
    浏览(43)
  • 软考高级系统架构设计师系列论文九十三:论计算机网络的安全性设计

    软考高级系统架构设计师:计算机网络

    2024年02月10日
    浏览(70)
  • 开源安全测试工具 | 网络安全工具列表

    • AttackSurfaceMapper (https://github.com/superhedgy/AttackSurfaceMapper) - 自动化渗透测试工具, 使用手册/测试流程 (https://www.uedbox.com/post/59110/)。 • vajra (https://github.com/r3curs1v3-pr0xy/vajra) - 自动化渗透测试. • Savior (https://github.com/Mustard404/Savior) - 渗透测试报告自动生成工具!. • OneForAll (h

    2024年02月03日
    浏览(54)
  • 【网络安全-信息收集】网络安全之信息收集和信息收集工具讲解(提供工具)

    分享一个非常详细的网络安全笔记,是我学习网安过程中用心写的,可以点开以下链接获取: 超详细的网络安全笔记 工具下载百度网盘链接(包含所有用到的工具): 百度网盘 请输入提取码 百度网盘为您提供文件的网络备份、同步和分享服务。空间大、速度快、安全稳固,

    2024年02月08日
    浏览(53)
  • 网络安全工具合计(攻防工具)

    目录 All-Defense-Tool 免责声明 半/全自动化利用工具 信息收集工具 资产发现工具 子域名收集工具 目录扫描工具 指纹识别工具 端口扫描工具 Burp插件 浏览器插件 邮箱钓鱼 社工个人信息收集类 APP/公众号/小程序相关工具 常用小工具 漏洞利用工具 漏洞扫描框架/工具 中间件/应用

    2024年02月13日
    浏览(57)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包