关于Flink,TaskManager日志问题的一个记录

这篇具有很好参考价值的文章主要介绍了关于Flink,TaskManager日志问题的一个记录。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

关于Flink,TaskManager日志问题的一个记录

疑问:不知道大家有和我一样,开发完的flink代码推送到flink集群上执行的时候log.info(xxxx)打印的日志不会打印到task-manager节点上去(在IDEA上执行可以打印日志到控制台上),为此一直在困扰了,经过一系列的尝试,终于在java和scala中实现log.info(xxx)打印到task-manager节点上。

  • java代码demo:
@Slf4j
public class Test {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironmentUtil.getStreamExecutionEnvironment();

        DataStreamSource<String> source = env.socketTextStream("prod", 9999);
        SingleOutputStreamOperator<Tuple2<String, Integer>> result = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                String[] split = value.split(",");
                for (String s : split) {
                    out.collect(Tuple2.of(s, 1));
                }
                log.info("打印的日志信息");
            }
        }).keyBy(0)
                .sum(1);
        result.print(">>>>");

        env.execute("Test");
    }
}

这里主要是利用到了lombok的@Slf4j注解的方式来实现,这里观察到编译后的class文件如下

public class Test {
  //自动增加了一个static final类型的Logger对象,这样在下面使用过程中才会
  private static final Logger log = LoggerFactory.getLogger(Test.class);
  
  public static void main(String[] args) throws Exception {
    StreamExecutionEnvironment env = StreamExecutionEnvironmentUtil.getStreamExecutionEnvironment();
    DataStreamSource<String> source = env.socketTextStream("prod", 9999);
    SingleOutputStreamOperator<Tuple2<String, Integer>> result = source.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
          public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
            String[] split = value.split(",");
            for (String s : split)
              out.collect(Tuple2.of(s, Integer.valueOf(1))); 
              //通过类名.方法名进行调用
              Test.log.info("打印的日志信息");
          }
        }).keyBy(new int[] { 0 }).sum(1);
    result.print(">>>>");
    env.execute("Test");
  }
}

这样将代码提交到flink的集群环境,在task-manager日志中也会打印出来

  • sacal代码demo:

scala中不支持使用lombok的方式,所以这里通过伴生对象的方式实现编译后的文件创建static final类型的Logger对象

StreamWordCount.scala

object StreamWordCount {
  val log = LoggerFactory.getLogger(this.getClass.getName)

  def main(args: Array[String]): Unit = {
    val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.streaming.api.scala._
    val stream: DataStream[String] = streamEnv.socketTextStream("prod", 9999)
    val result: DataStream[(String, Int)] = stream.flatMap(_.split(","))
      .map(new MapFunction[String, Tuple2[String, Int]] {
        override def map(value: String): Tuple2[String, Int] = {
          log.info("打印日志")
          Tuple2.apply(value, 1)
        }
      })
      .keyBy(0)
      .sum(1)
    result.print()

    streamEnv.execute("wordcount")
  }
}

编译后的class文件:

public final class StreamWordCount$ {
  public static StreamWordCount$ MODULE$;
  //  log创建
  private final Logger log;
  
  public Logger log() {
    return this.log;
  }
  
  public void main(String[] args) {
    StreamExecutionEnvironment streamEnv = org.apache.flink.streaming.api.scala.StreamExecutionEnvironment$.MODULE$.getExecutionEnvironment();
    DataStream stream = streamEnv.socketTextStream("prod", 9999, streamEnv.socketTextStream$default$3(), streamEnv.socketTextStream$default$4());
    DataStream result = stream.flatMap(StreamWordCount$::$anonfun$main$1$adapted, (TypeInformation)BasicTypeInfo.getInfoFor(String.class)).map(new StreamWordCount$$anon$3(), (TypeInformation)new StreamWordCount$$anon$2()).keyBy((Seq)scala.Predef$.MODULE$.wrapIntArray(new int[] { 0 })).sum(1);
    result.print();
    streamEnv.execute("wordcount");
  }
  
  public final class StreamWordCount$$anon$2 extends CaseClassTypeInfo<Tuple2<String, Object>> {
    public TypeSerializer<Tuple2<String, Object>> createSerializer(ExecutionConfig executionConfig) {
    }
    
    public StreamWordCount$$anon$2() {
      super(Tuple2.class, (TypeInformation[])(new scala.collection.immutable..colon.colon(BasicTypeInfo.getInfoFor(String.class), (List)new scala.collection.immutable..colon.colon(BasicTypeInfo.getInfoFor(int.class), (List)scala.collection.immutable.Nil$.MODULE$))).toArray((ClassTag)scala.Predef$.MODULE$.implicitly(scala.reflect.ClassTag$.MODULE$.apply(TypeInformation.class))), (Seq)new scala.collection.immutable..colon.colon(BasicTypeInfo.getInfoFor(String.class), (List)new scala.collection.immutable..colon.colon(BasicTypeInfo.getInfoFor(int.class), (List)scala.collection.immutable.Nil$.MODULE$)), (Seq)scala.collection.Seq$.MODULE$.apply((Seq)scala.Predef$.MODULE$.wrapRefArray((Object[])new String[2])));
    }
    
    public final class StreamWordCount$$anon$2$$anon$1 extends ScalaCaseClassSerializer<Tuple2<String, Object>> {
      public Tuple2<String, Object> createInstance(Object[] fields) {
        return new Tuple2(fields[0], BoxesRunTime.boxToInteger(BoxesRunTime.unboxToInt(fields[1])));
      }
      
      public StreamWordCount$$anon$2$$anon$1(StreamWordCount$$anon$2 $outer, TypeSerializer[] fieldSerializers$1) {
        super($outer.getTypeClass(), fieldSerializers$1);
      }
    }
  }
  
  public final class StreamWordCount$$anon$3 implements MapFunction<String, Tuple2<String, Object>> {
    public Tuple2<String, Object> map(String value) {
    	//通过静态方法调用
      StreamWordCount$.MODULE$.log().info("打印日志");
      return new Tuple2(value, BoxesRunTime.boxToInteger(1));
    }
  }
  
  private StreamWordCount$() {
    MODULE$ = this;
    this.log = LoggerFactory.getLogger(getClass().getName());
  }
}

结论:

log对象的是 private static final 修饰的文章来源地址https://www.toymoban.com/news/detail-578329.html

到了这里,关于关于Flink,TaskManager日志问题的一个记录的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • Flink|《Flink 官方文档 - 部署 - 内存配置 - 配置 TaskManager 内存》学习笔记

    Flink|《Flink 官方文档 - 部署 - 内存配置 - 配置 TaskManager 内存》学习笔记

    学习文档:Flink|《Flink 官方文档 - 部署 - 内存配置 - 配置 TaskManager 内存》学习笔记 学习笔记如下: Flink JVM 进程的进程总内存(Total Process Memory)包含了由 Flink 应用使用的内存(Flink 总内存)以及由运行 Flink 的 JVM 使用的内存。其中,Flink 总内存(Total Flink Memory)包括 JV

    2024年03月15日
    浏览(6)
  • Flink源码之TaskManager启动流程

    Flink源码之TaskManager启动流程

    从启动命令flink-daemon.sh可以看出TaskManger入口类为org.apache.flink.runtime.taskexecutor.TaskManagerRunner 在TaskManagerRunner构造函数中,可以看出与JobManger类似,也是先构造出一些公共服务: 这些服务在构造TaskExecutor时作为构造函数参数传入 构造TaskExecutor前会先构造TaskManagerServices辅助Task

    2024年02月13日
    浏览(7)
  • flink任务的taskmanager失败Heartbeat of TaskManager with id taskmanager-1-1 timed out.

    flink任务的taskmanager失败Heartbeat of TaskManager with id taskmanager-1-1 timed out.

    flink任务上线运行 问题java.util.concurrent.TimeoutException: Heartbeat of TaskManager with id job-af2f94d0-59d7-4e51-aa55-dc91d1a264a8-taskmanager-1-1 timed out. 报错原因 分布式物理机网络失联。 Failover的节点对应Taskmanager的内存设置太小,垃圾回收机制会导致心跳超时。 解决方案 如果此问题出现的

    2024年02月14日
    浏览(8)
  • flink任务内存调优,TaskManager、JobManager内存配置

    flink任务内存调优,TaskManager、JobManager内存配置

            Flink是基于java的JVM运行,拥有高效的数据处理能力,但是考虑到用户在 Flink 上运行的应用的多样性,尽管flink框架已经为所有配置项提供合理的默认值,仍无法满足所有情况下的需求。 为了给用户生产提供最大化的价值, Flink 允许用户在整体上以及细粒度上对集

    2024年02月06日
    浏览(11)
  • 【大数据】Flink 内存管理(四):TaskManager 内存分配(实战篇)

    【大数据】Flink 内存管理(四):TaskManager 内存分配(实战篇)

    《 Flink 内存管理 》系列(已完结),共包含以下 4 篇文章: Flink 内存管理(一):设置 Flink 进程内存 Flink 内存管理(二):JobManager 内存分配(含实际计算案例) Flink 内存管理(三):TaskManager 内存分配(理论篇) Flink 内存管理(四):TaskManager 内存分配(实战篇) 😊

    2024年03月13日
    浏览(13)
  • 【Flink】Flink任务缺失Jobmanager日志的问题排查

    【Flink】Flink任务缺失Jobmanager日志的问题排查

    问题不是大问题,不是什么代码级别的高深问题,也没有影响任务运行,纯粹因为人员粗心导致,记录一下排查的过程。 一个生产环境的奇怪问题,环境是flink1.15.0 on yarn3.2.2的,研发人员反馈业务正常运行,但是最近变更算法替换新包的时候有业务异常,然后需要排查日志的

    2024年01月19日
    浏览(8)
  • Flink本地运行WebUI日志问题

    Flink本地运行WebUI日志问题

    前几天在本地开发调试Flink程序时,在WebUI页面无法查看jobManager日志或者taskManager日志,点击会在控制台报如下错误: 解决办法如下: 1、引入日志配置,包括pom文件中的依赖和 src/main/resources 目录下的日志文职文件。下面以log4j2.xml为例展示日志配置: 2、在flink程序开始初始

    2024年02月12日
    浏览(8)
  • flink的异常concurrent.TimeoutException: Heartbeat of TaskManager with id的解决

    在使用flink进行集成测试时,我们会使用MiniClusterWithClientResource类,但是当我们断点导致在某个方法执行的时间比较长时,会有错误发生,那么该如何解决这个错误呢? 其实关键的配置是heartbeat.timeout,这个错误是JobManager抛出的,意思是和某个TaskManager的心跳中断超过了指定的

    2024年02月03日
    浏览(7)
  • 关于flink滚动窗口下数据乱序+倾斜,allowedLateness的一个坑

    关于flink滚动窗口下数据乱序+倾斜,allowedLateness的一个坑

    目录 前言         滚动窗口(Tumbling Windows)         allowedLateness 场景描述 数据倾斜问题解决 输出结果偏差问题         思考 输出结果偏差解决 扩展         滚动窗口的 assigner 分发元素到指定大小的窗口。滚动窗口的大小是固定的,且各自范围之间不重叠。

    2024年02月21日
    浏览(12)
  • flink mysql cdc调试问题记录

    最近需要用到flink cdc作为数据流处理框架,在demo运行中发现一些问题,特此记录问题和解决过程。 Caused by: java.lang.IllegalArgumentException: Can\\\'t find any matched tables, please check your configured database-name: [localdb] and table-name: [flink_cdc_message] at com.ververica.cdc.connectors.mysql.debezium.DebeziumUtils.

    2023年04月17日
    浏览(10)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包