flink学习之state

这篇具有很好参考价值的文章主要介绍了flink学习之state。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

state作用

保留当前key的历史状态。

state用法

ListState<Integer> vipList = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("vipList", TypeInformation.of(Integer.class)));

有valueState listState mapstate 。冒失没有setstate

state案例

比如起点的小说不能被下载。别人只能通过截屏,提取文字的方式盗版小说。

起点做了防爬措施。防爬措施如下。

1.如果一个用户1s翻1页,或者速度更快,连续10次,那么就认为用户是机器人。

上述两种情况,用户 不断的发起点击事件 

userId=1 click_time=2023-09-07 00:00:00

userId=1 click_time=2023-09-07 00:00:01

我们如何判断1呢?

lastClickState 保留用户上一次的点击时间。

clickcntState  保留用户1s1页连续点击次数。

来一条数据就与上一次的lastClickState去对比,

如果间隔<1s clickcntState就+1

如果>1s  clickcntState就置于0

同时判断clickcntState次数是否>=10如果大于就将该userid 输出到sink

来个demo直接说话。

package com.chenchi.pojo;

public  class User {
    public Integer userId;
    public Integer vip;

    public long clickTime=System.currentTimeMillis();
    public User() {
    }

    public User(Integer userId, Integer vip) {
        this.userId = userId;
        this.vip = vip;
    }

    public Integer getUserId() {
        return userId;
    }

    public void setUserId(Integer userId) {
        this.userId = userId;
    }

    public Integer getVip() {
        return vip;
    }

    public void setVip(Integer vip) {
        this.vip = vip;
    }

    @Override
    public String toString() {
        return "User{" +
                "userId=" + userId +
                ", vip=" + vip +
                ", clickTime=" + clickTime +
                '}';
    }

    public long getClickTime() {
        return clickTime;
    }

    public void setClickTime(long clickTime) {
        this.clickTime = clickTime;
    }
}
package com.chenchi.pojo;

import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.util.Random;

public class UserSource implements SourceFunction<User> {
    boolean run = true;
    public UserSource(){}
    int randomBound=1000;
    int interval=1000;
    public UserSource(Integer randomBound){
        this.randomBound=randomBound;
    }
    public UserSource(Integer randomBound,int interval){
        this.randomBound=randomBound;
        this.interval=interval;
    }
    @Override
    public void run(SourceContext<User> sourceContext) throws Exception {
        while (true) {
            Integer userId = new Random().nextInt(randomBound);
            Integer vip = new Random().nextInt(10);
            sourceContext.collect(new User(userId, vip));
            Thread.sleep(interval);
        }
    }

    @Override
    public void cancel() {
        run = false;
    }
}
package com.chenchi.state;

import com.chenchi.pojo.User;
import com.chenchi.pojo.UserSource;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

public class ValueStateDemo {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        CustomProcess customProcess = new CustomProcess();
        DataStreamSource<User> userDataStreamSource = env.addSource(new UserSource(4,100));
        userDataStreamSource.print();
        userDataStreamSource
                .keyBy(user->user.userId)
                .process(customProcess)
                        .print();
        env.execute();
    }

     static class CustomProcess extends KeyedProcessFunction<Integer,User,String> {
        ValueState<Long> lastClickTime;
        ValueState<Integer> cnt;
        @Override
        public void open(Configuration parameters) throws Exception {
            lastClickTime= getRuntimeContext().getState(new ValueStateDescriptor<Long>("click", TypeInformation.of(Long.class)));
            cnt= getRuntimeContext().getState(new ValueStateDescriptor<Integer>("cnt",Integer.class));
            super.open(parameters);
        }

        @Override
        public void processElement(User user, KeyedProcessFunction<Integer, User, String>.Context context, Collector<String> collector) throws Exception {
            Integer userId = user.getUserId();
            long clickTime = user.getClickTime();
            Long last = lastClickTime.value();
            Integer value = cnt.value();
            if (value==null){
                value=0;
            }
            if (last!=null&&clickTime-last<1000){
                //点击太快
                cnt.update(value+1);
            }else {
                //之前可能是不喜欢的页数突然点快了 点击慢就置0
                cnt.update(0);
            }
            //保存当前的listState
            lastClickTime.update(clickTime);
            if (cnt.value()>10){
                collector.collect("userId="+userId+",clickCnt="+cnt.value()+",click太快 注意注意");
            }
        }
    }
}

打印日志

10> User{userId=0, vip=0, clickTime=1694083167883}
11> User{userId=0, vip=4, clickTime=1694083167994}
12> User{userId=2, vip=4, clickTime=1694083168102}
1> User{userId=2, vip=7, clickTime=1694083168212}
2> User{userId=2, vip=3, clickTime=1694083168320}
3> User{userId=1, vip=0, clickTime=1694083168428}
4> User{userId=0, vip=7, clickTime=1694083168537}
5> User{userId=2, vip=3, clickTime=1694083168646}
6> User{userId=2, vip=0, clickTime=1694083168757}
7> User{userId=2, vip=3, clickTime=1694083168866}
8> User{userId=0, vip=9, clickTime=1694083168975}
9> User{userId=0, vip=3, clickTime=1694083169084}
10> User{userId=2, vip=7, clickTime=1694083169191}
11> User{userId=0, vip=7, clickTime=1694083169298}
12> User{userId=0, vip=6, clickTime=1694083169406}
1> User{userId=3, vip=9, clickTime=1694083169513}
2> User{userId=0, vip=4, clickTime=1694083169618}
3> User{userId=3, vip=3, clickTime=1694083169726}
4> User{userId=1, vip=8, clickTime=1694083169833}
5> User{userId=2, vip=1, clickTime=1694083169942}
6> User{userId=3, vip=2, clickTime=1694083170050}
7> User{userId=2, vip=8, clickTime=1694083170158}
8> User{userId=1, vip=4, clickTime=1694083170267}
9> User{userId=1, vip=2, clickTime=1694083170374}
10> User{userId=0, vip=1, clickTime=1694083170481}
11> User{userId=3, vip=6, clickTime=1694083170589}
12> User{userId=0, vip=9, clickTime=1694083170696}
1> User{userId=3, vip=1, clickTime=1694083170804}
2> User{userId=1, vip=8, clickTime=1694083170911}
3> User{userId=1, vip=3, clickTime=1694083171018}
4> User{userId=0, vip=7, clickTime=1694083171126}
5> User{userId=1, vip=8, clickTime=1694083171233}
6> User{userId=3, vip=5, clickTime=1694083171341}
7> User{userId=0, vip=8, clickTime=1694083171448}
9> userId=0,clickCnt=11,click太快 注意注意

效果符合预期。 文章来源地址https://www.toymoban.com/news/detail-705849.html

到了这里,关于flink学习之state的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • flink 的 State

    目录 一、前言 二、什么是State 2.1:什么时候需要历史数据 2.2:为什么要容错,以及checkpoint如何进行容错 2.3:state basckend 又是什么 三、有哪些常见的是 State 四、 State的使用 五、State backend 5.1  MemoryStateBackend: 5.2  FsStatebackend: 5.3  RocksDBStateBackend: 六、Checkpoint 七、 Deep

    2023年04月18日
    浏览(23)
  • Flink State 状态管理

    状态在Flink中叫做State,用来保存中间计算结果或者缓存数据。要做到比较好的状态管理,需要考虑以下几点内容: 状态数据的存储和访问 在Task内部,如何高效地保存状态数据和使用状态数据。 状态数据的备份和恢复 作业失败是无法避免的,那么就要考虑如何高效地将状态

    2024年01月17日
    浏览(35)
  • Flink State 状态原理解析

    State 用于记录 Flink 应用在运行过程中,算子的中间计算结果或者元数据信息。运行中的 Flink 应用如果需要上次计算结果进行处理的,则需要使用状态存储中间计算结果。如 Join、窗口聚合场景。 Flink 应用运行中会保存状态信息到 State 对象实例中,State 对象实例通过 StateBac

    2024年02月05日
    浏览(31)
  • Flink_state 的优化与 remote_state 的探索

    摘要:本文整理自 bilibili 资深开发工程师张杨,在 Flink Forward Asia 2022 核心技术专场的分享。本篇内容主要分为四个部分: 相关背景 state 压缩优化 Remote state 探索 未来规划 点击查看原文视频 演讲PPT 1.1 业务概况 从业务规模来讲,B 站目前大约是 4000+的 Flink 任务,其中 95%是

    2024年02月11日
    浏览(35)
  • Flink源码之State创建流程

    StreamOperatorStateHandler 在StreamTask启动初始化时通过StreamTaskStateInitializerImpl::streamOperatorStateContext会为每个StreamOperator 创建keyedStatedBackend和operatorStateBackend,在AbstractStreamOperator中有个StreamOperatorStateHandler成员变量,调用AbstractStreamOperator::initializeState方法中会初始化StreamOperatorStateH

    2024年02月12日
    浏览(80)
  • Flink State backend状态后端

    Flink在v1.12到v1.14的改进当中,其状态后端也发生了变化。老版本的状态后端有三个,分别是MemoryStateBackend、FsStateBackend、RocksDBStateBackend,在flink1.14中,这些状态已经被废弃了,新版本的状态后端是 HashMapStateBackend、EmbeddedRocksDBStateBackend。 有状态流应用中的检查点(checkpoint),

    2024年01月25日
    浏览(32)
  • 【状态管理|概述】Flink的状态管理:为什么需要state、怎么保存state、对于state过大怎么处理

    按照数据的划分和扩张方式,Flink中大致分为2类: Keyed States:记录每个Key对应的状态值 因为一个任务的并行度有多少,就会有多少个子任务,当key的范围大于并行度时,就会出现一个subTask上可能包含多个Key(),但不同Task上不会出现相同的Key(解决了shuffle的问题?)   常

    2024年02月01日
    浏览(42)
  • Flink State 和 Fault Tolerance详解

    有状态操作或者操作算子在处理DataStream的元素或者事件的时候需要存储计算的中间状态,这就使得状态在整个Flink的精细化计算中有着非常重要的地位: 记录数据从某一个过去时间点到当前时间的状态信息。 以每分钟/小时/天汇总事件时,状态将保留待处理的汇总记录。 在

    2024年02月14日
    浏览(28)
  • Flink理论—容错之状态后端(State Backends)

    Flink 使用流重放 和 检查点 的组合来实现容错。检查点标记每个输入流中的特定点以及每个运算符的相应状态。通过恢复运算符的状态并从检查点点重放记录,可以从检查点恢复流数据流,同时保持一致性 容错机制不断地绘制分布式流数据流的快照。对于小状态的流式应用程

    2024年02月20日
    浏览(28)
  • 【flink番外篇】13、Broadcast State 模式示例(完整版)

    一、Flink 专栏 Flink 专栏系统介绍某一知识点,并辅以具体的示例进行说明。 1、Flink 部署系列 本部分介绍Flink的部署、配置相关基础内容。 2、Flink基础系列 本部分介绍Flink 的基础部分,比如术语、架构、编程模型、编程指南、基本的datastream api用法、四大基石等内容。 3、

    2024年01月17日
    浏览(41)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包