Flink是一款应用非常广泛的流处理系统,目前有客户使用Flink进行数据同步,效率较差。
之前虽然使用过Spark Streaming,但是Flink和Spark Streaming在使用上,还是有一点差异。如Word Count计算,Spark中好像是一个reduceByKey,Flink中需要先进行GroupBy,然后再做一次sum。文章来源:https://www.toymoban.com/news/detail-515929.html
程序代码
package cn.jihui.flink
import org.apache.flink.api.scala.ExecutionEnvironment
import org.apache.flink.api.scala._
object wc2 {
def main(args: Array[String]): Unit = {
val env = ExecutionEnvironment.getExecutionEnvironment
env.setParallelism(1)
val file_name = "C:\\Users\\32985\\IdeaProjects\\flink_demo1\\resources\\wc.txt"
val data = env.readTextFile(file_name)
.flatMap(p => p.split(" "))
.map(p => (p, 1))
.groupBy(0)
.sum(1)
data.print
}
}
测试文件
文件路径为:C:\\Users\\32985\\IdeaProjects\\flink_demo1\\resources\\wc.txt
,文件内容如下:文章来源地址https://www.toymoban.com/news/detail-515929.html
hello world
how are you
I am fine
how old are you
程序输出:
"C:\Program Files\Java\jdk-11\bin\java.exe" "-javaagent:D:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\lib\idea_rt.jar=60578:D:\Program Files\JetBrains\IntelliJ IDEA Community Edition 2019.3.3\bin" -Dfile.encoding=UTF-8 -classpath C:\Users\32985\IdeaProjects\flink_demo1\out\production\flink_demo1;D:\Bigdata\scala-2.12.18\lib\scala-library.jar;D:\Bigdata\scala-2.12.18\lib\scala-parser-combinators_2.12-1.0.7.jar;D:\Bigdata\scala-2.12.18\lib\scala-reflect.jar;D:\Bigdata\scala-2.12.18\lib\scala-swing_2.12-2.0.3.jar;D:\Bigdata\scala-2.12.18\lib\scala-xml_2.12-2.1.0.jar;D:\Bigdata\flink-1.10.1\lib\log4j-1.2.17.jar;D:\Bigdata\flink-1.10.1\lib\slf4j-log4j12-1.7.15.jar;D:\Bigdata\flink-1.10.1\lib\flink-dist_2.12-1.10.1.jar;D:\Bigdata\flink-1.10.1\lib\flink-table_2.12-1.10.1.jar;D:\Bigdata\flink-1.10.1\lib\flink-table-blink_2.12-1.10.1.jar cn.jihui.flink.wc2
log4j:WARN No appenders could be found for logger (org.apache.flink.api.scala.ClosureCleaner$).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
(I,1)
(am,1)
(are,2)
(fine,1)
(hello,1)
(how,2)
(old,1)
(world,1)
(you,2)
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.apache.flink.shaded.akka.org.jboss.netty.util.internal.ByteBufferUtil (file:/D:/Bigdata/flink-1.10.1/lib/flink-dist_2.12-1.10.1.jar) to method java.nio.DirectByteBuffer.cleaner()
WARNING: Please consider reporting this to the maintainers of org.apache.flink.shaded.akka.org.jboss.netty.util.internal.ByteBufferUtil
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access operations
WARNING: All illegal access operations will be denied in a future release
Process finished with exit code 0
到了这里,关于使用Flink进行WordCount计算的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!