flink 整合rocketmq

这篇具有很好参考价值的文章主要介绍了flink 整合rocketmq。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

flink 整合rocketmq
下面代码路径 :source->rocketmq->common->selector
DefaultTopicSelector.java 类

public class DefaultTopicSelector<T> implements TopicSelector<T> {

    private final String topicName;
    private final String tagName;

    public DefaultTopicSelector(final String topicName) {
        this(topicName, "");
    }

    public DefaultTopicSelector(String topicName, String tagName) {
        this.topicName = topicName;
        this.tagName = tagName;
    }

    @Override
    public String getTopic(T tuple) {
        return topicName;
    }

    @Override
    public String getTag(T tuple) {
        return tagName;
    }
}

TopicSelector.java

import java.io.Serializable;


public interface TopicSelector<T> extends Serializable {

    String getTopic(T tuple);

    String getTag(T tuple);

}

下面代码路径:source->rocketmq->common->serialization
AlarmEventSerializationSchema.java

import com.alibaba.fastjson.JSON;
import com.bsj.flinkRisk.model.AlarmEvent;
import com.bsj.flinkRisk.utils.NumberToByteUtil;

import java.nio.charset.StandardCharsets;


public class AlarmEventSerializationSchema implements KeyValueSerializationSchema<AlarmEvent>  {
	public static final String DEFAULT_KEY_FIELD = "key";
    public static final String DEFAULT_VALUE_FIELD = "value";

    public String keyField;
    public String valueField;

    public AlarmEventSerializationSchema() {
        this(DEFAULT_KEY_FIELD, DEFAULT_VALUE_FIELD);
    }

    public AlarmEventSerializationSchema(String keyField, String valueField) {
        this.keyField = keyField;
        this.valueField = valueField;
    }
    @Override
    public byte[] serializeKey(AlarmEvent event) {
        return NumberToByteUtil.long2Bytes(event.getVehicleId());
    }

    @Override
    public byte[] serializeValue(AlarmEvent event) {

        String data = JSON.toJSONString(event);
        return data.getBytes(StandardCharsets.UTF_8);
    }
}

KeyValueDeserializationSchema.java

import org.apache.flink.api.java.typeutils.ResultTypeQueryable;

import java.io.Serializable;

public interface KeyValueDeserializationSchema<T> extends ResultTypeQueryable<T>, Serializable {
    T deserializeKeyAndValue(byte[] key, byte[] value);
}

KeyValueSerializationSchema.java

import java.io.Serializable;


public interface KeyValueSerializationSchema<T> extends Serializable {
    byte[] serializeKey(T tuple);

    byte[] serializeValue(T tuple);
}

VehiclePosInfoDeserializationSchema.java

import lombok.extern.slf4j.Slf4j;
import org.apache.flink.api.common.typeinfo.TypeInformation;



@Slf4j
public class VehiclePosInfoDeserializationSchema implements KeyValueDeserializationSchema<VehiclePosInfo> {
	public VehiclePosInfoDeserializationSchema() {
    }
    @Override
    public VehiclePosInfo deserializeKeyAndValue(byte[] key, byte[] value) {
        SpeedInfo speedInfo = null;
        YunDataSerialize yunDataSerialize = new YunDataSerialize();
         try {
          
            speedInfo =yunDataSerialize.SpeedinfoFromArray(value);

            // 2表示为1402的位置数据,传过来的不处理。
            if (speedInfo.getSource()==2){
                return null;
            }
        } catch (Exception e) {
            log.error("数据解析异常,数据{}", HexStr.toStr(value),e);
        }
        return convertToVehicleState(speedInfo);
     }
     /**
     * 类型转换
     *
     * @param speedInfo
     * @return
     */
    private VehiclePosInfo convertToVehicleState(SpeedInfo speedInfo) {
        if(speedInfo==null){
            return null;
        }
        VehiclePosInfo vehicleState = new VehiclePosInfo();
        vehicleState.setVehicleId(speedInfo.getVehicleId());
        vehicleState.setGroupId(speedInfo.getGroupId());
        vehicleState.setPlateColor(speedInfo.getPlateColor());
        vehicleState.setPlate(speedInfo.getPlate());
        vehicleState.setVehicleShape(speedInfo.getVehicleShape());
        vehicleState.setVehicleState(speedInfo.getVehicleState());
        vehicleState.setTerminalNo(speedInfo.getTerminalNo());
        vehicleState.setProtocolType(speedInfo.getProtocolType());
        vehicleState.setTerminalType(speedInfo.getTerminalType());
        vehicleState.setDevTime(speedInfo.getPos().getDevTime());
        vehicleState.setRecvTime(speedInfo.getPos().getRecvTime());
        vehicleState.setLon(speedInfo.getPos().getLon());
        ehicleState.setLat(speedInfo.getPos().getLat());
        vehicleState.setSpeed(speedInfo.getPos().getSpeed());
        vehicleState.setDirect(speedInfo.getPos().getDirect());
        vehicleState.setHigh(speedInfo.getPos().getHigh());
        vehicleState.setMileage(speedInfo.getPos().getMileage());
        vehicleState.setIsAcc(speedInfo.getPos().getIsAcc());
        vehicleState.setExtend(speedInfo.getPos().getExtend());
        vehicleState.setIsPos(speedInfo.getPos().getIsPos());
        vehicleState.setPosSource(speedInfo.getSource());
        vehicleState.setCarhrough(speedInfo.getCarhrough());
    }
}

下面代码路径:source->rocketmq->example 这个只是例子可以不用

RocketMQFlinkExample.java

public class RocketMQFlinkExample {
	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        env.enableCheckpointing(3000);

        Properties consumerProps = new Properties();
        consumerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, "localhost:9876");
        consumerProps.setProperty(RocketMQConfig.CONSUMER_GROUP, "c002");
        consumerProps.setProperty(RocketMQConfig.CONSUMER_TOPIC, "gg");
        Properties producerProps = new Properties();
        producerProps.setProperty(RocketMQConfig.NAME_SERVER_ADDR, "localhost:9876");
        int msgDelayLevel = RocketMQConfig.MSG_DELAY_LEVEL00;
        producerProps.setProperty(RocketMQConfig.MSG_DELAY_LEVEL, String.valueOf(msgDelayLevel));
        // TimeDelayLevel is not supported for batching
        boolean batchFlag = msgDelayLevel <= 0;
         DataStream<SpeedInfo> stream = env
                .addSource(new RocketMQSource(new VehiclePosInfoDeserializationSchema(), consumerProps))
                .name("transactions");

                stream.keyBy(SpeedInfo::getVehicleId)
                .process(new KeyedProcessFunction<Long, SpeedInfo, AlarmEvent>() {
                    @Override
                    public void processElement(SpeedInfo in, Context ctx, Collector<AlarmEvent> out) throws Exception {

                        AlarmEvent event=new AlarmEvent();
                        out.collect(event);

                    }
                })
                .name("upper-processor")
                .setParallelism(2)
                .addSink(new RocketMQSink(new AlarmEventSerializationSchema("id", "province"),
                        new DefaultTopicSelector("zhisheng"), producerProps).withBatchFlushOnCheckpoint(batchFlag))
                .name("rocketmq-sink")
                .setParallelism(2);

        env.execute("rocketmq-flink-example");
	}
}

SimpleConsumer.java

public class SimpleConsumer {
	 public static void main(String[] args) {
		 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("g00003");
        consumer.setNamesrvAddr("localhost:9876");
        try {
            consumer.subscribe("zhisheng", "*");
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    System.out.println(msg.getKeys() + ":" + new String(msg.getBody()));
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
         try {
            consumer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
	}
}

SimpleProducer.java

public class SimpleProducer {
    public static void main(String[] args) {
    	DefaultMQProducer producer = new DefaultMQProducer("p001");
        producer.setNamesrvAddr("localhost:9876");
        try {
            producer.start();
        } catch (MQClientException e) {
            e.printStackTrace();
        }
        for (int i = 0; i < 10000; i++) {
            Message msg = new Message("zhisheng", "", "id_" + i, ("country_X province_" + i).getBytes());
            try {
                producer.send(msg);
            } catch (Exception e) {
                e.printStackTrace();
            }
            System.out.println("send " + i);
            try {
                Thread.sleep(10);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

下面代码路径:source->rocketmq

RocketMQConfig.java

public class RocketMQConfig {
	// Server Config
    public static final String NAME_SERVER_ADDR = "nameserver.address"; // 必须

    public static final String NAME_SERVER_POLL_INTERVAL = "nameserver.poll.interval";
    public static final int DEFAULT_NAME_SERVER_POLL_INTERVAL = 30000; // 30 seconds

    public static final String BROKER_HEART_BEAT_INTERVAL = "brokerserver.heartbeat.interval";
    public static final int DEFAULT_BROKER_HEART_BEAT_INTERVAL = 30000; // 30 seconds


    // Producer related config
    public static final String PRODUCER_GROUP = "producer.group";

    public static final String PRODUCER_RETRY_TIMES = "producer.retry.times";
    public static final int DEFAULT_PRODUCER_RETRY_TIMES = 3;

    public static final String PRODUCER_TIMEOUT = "producer.timeout";
    public static final int DEFAULT_PRODUCER_TIMEOUT = 3000; // 3 seconds
    // Consumer related config
    public static final String CONSUMER_GROUP = "consumer.group"; // 必须

    public static final String CONSUMER_TOPIC = "consumer.topic"; // 必须

    public static final String CONSUMER_TAG = "consumer.tag";
    public static final String DEFAULT_CONSUMER_TAG = "*";

    public static final String CONSUMER_OFFSET_RESET_TO = "consumer.offset.reset.to"; // offset 重制到
    public static final String CONSUMER_OFFSET_LATEST = "latest";
    public static final String CONSUMER_OFFSET_EARLIEST = "earliest";
    public static final String CONSUMER_OFFSET_TIMESTAMP = "timestamp";
    public static final String CONSUMER_OFFSET_FROM_TIMESTAMP = "consumer.offset.from.timestamp"; // offset 重制到某个时间点
    public static final String CONSUMER_OFFSET_PERSIST_INTERVAL = "consumer.offset.persist.interval";
    public static final int DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL = 5000; // 5 seconds

    public static final String CONSUMER_PULL_POOL_SIZE = "consumer.pull.thread.pool.size";
    public static final int DEFAULT_CONSUMER_PULL_POOL_SIZE = 20;

    public static final String CONSUMER_BATCH_SIZE = "consumer.batch.size";
    public static final int DEFAULT_CONSUMER_BATCH_SIZE = 500;

    public static final String CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND = "consumer.delay.when.message.not.found";
    public static final int DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND = 10;

    public static final String MSG_DELAY_LEVEL = "msg.delay.level";
    public static final int MSG_DELAY_LEVEL00 = 0; // no delay
    public static final int MSG_DELAY_LEVEL01 = 1; // 1s
    public static final int MSG_DELAY_LEVEL02 = 2; // 5s
    public static final int MSG_DELAY_LEVEL03 = 3; // 10s
    public static final int MSG_DELAY_LEVEL04 = 4; // 30s
    public static final int MSG_DELAY_LEVEL05 = 5; // 1min
    public static final int MSG_DELAY_LEVEL06 = 6; // 2min
    public static final int MSG_DELAY_LEVEL07 = 7; // 3min
    public static final int MSG_DELAY_LEVEL08 = 8; // 4min
    public static final int MSG_DELAY_LEVEL09 = 9; // 5min
    public static final int MSG_DELAY_LEVEL10 = 10; // 6min
    public static final int MSG_DELAY_LEVEL11 = 11; // 7min
    public static final int MSG_DELAY_LEVEL12 = 12; // 8min
    public static final int MSG_DELAY_LEVEL13 = 13; // 9min
    public static final int MSG_DELAY_LEVEL14 = 14; // 10min
    public static final int MSG_DELAY_LEVEL15 = 15; // 20min
    public static final int MSG_DELAY_LEVEL16 = 16; // 30min
    public static final int MSG_DELAY_LEVEL17 = 17; // 1h
    public static final int MSG_DELAY_LEVEL18 = 18; // 2h
/**
     * 构建 producer 配置
     *
     * @param props    Properties
     * @param producer DefaultMQProducer
     */
    public static void buildProducerConfigs(Properties props, DefaultMQProducer producer) {
		buildCommonConfigs(props, producer);

        String group = props.getProperty(PRODUCER_GROUP);
        if (StringUtils.isEmpty(group)) {
            group = UUID.randomUUID().toString();
        }
         producer.setProducerGroup(props.getProperty(PRODUCER_GROUP, group));

        producer.setRetryTimesWhenSendFailed(getInteger(props,
                PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES));
        producer.setRetryTimesWhenSendAsyncFailed(getInteger(props,
                PRODUCER_RETRY_TIMES, DEFAULT_PRODUCER_RETRY_TIMES));
        producer.setSendMsgTimeout(getInteger(props,
                PRODUCER_TIMEOUT, DEFAULT_PRODUCER_TIMEOUT));
	}
 /**
     * 构建 Consumer 配置
     *
     * @param props    Properties
     * @param consumer DefaultMQPushConsumer
     */
    public static void buildConsumerConfigs(Properties props, DefaultMQPullConsumer consumer) {
        buildCommonConfigs(props, consumer);
		//消费模式
        consumer.setMessageModel(MessageModel.CLUSTERING);
//        consumer.setMessageModel(MessageModel.BROADCASTING);
        consumer.setPersistConsumerOffsetInterval(getInteger(props,
                CONSUMER_OFFSET_PERSIST_INTERVAL, DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL));
	}
      /**
     * 构建通用的配置
     *
     * @param props  Properties
     * @param client ClientConfig
     */
    private static void buildCommonConfigs(Properties props, ClientConfig client) {
        String nameServers = props.getProperty(NAME_SERVER_ADDR);
        Validate.notEmpty(nameServers);
        client.setNamesrvAddr(nameServers);

        client.setPollNameServerInterval(getInteger(props,
                NAME_SERVER_POLL_INTERVAL, DEFAULT_NAME_SERVER_POLL_INTERVAL));
        client.setHeartbeatBrokerInterval(getInteger(props,
                BROKER_HEART_BEAT_INTERVAL, DEFAULT_BROKER_HEART_BEAT_INTERVAL));
    }
}

RocketMQSink.java

public class RocketMQSink<IN> extends RichSinkFunction<IN> implements CheckpointedFunction {
	private static final long serialVersionUID = 1L;

    private static final Logger LOG = LoggerFactory.getLogger(RocketMQSink.class);

    private transient DefaultMQProducer producer;
    private boolean async; // false by default

    private Properties props;
    private TopicSelector<IN> topicSelector;
    private KeyValueSerializationSchema<IN> serializationSchema;

    private boolean batchFlushOnCheckpoint; // false by default
    private int batchSize = 1000;
    private List<Message> batchList;

    private int messageDeliveryDelayLevel = RocketMQConfig.MSG_DELAY_LEVEL00;
    
    public RocketMQSink(KeyValueSerializationSchema<IN> schema, TopicSelector<IN> topicSelector, Properties props) {
        this.serializationSchema = schema;
        this.topicSelector = topicSelector;
        this.props = props;

        if (this.props != null) {
            this.messageDeliveryDelayLevel = RocketMQUtils.getInteger(this.props, RocketMQConfig.MSG_DELAY_LEVEL,
                    RocketMQConfig.MSG_DELAY_LEVEL00);
            if (this.messageDeliveryDelayLevel < RocketMQConfig.MSG_DELAY_LEVEL00) {
                this.messageDeliveryDelayLevel = RocketMQConfig.MSG_DELAY_LEVEL00;
            } else if (this.messageDeliveryDelayLevel > RocketMQConfig.MSG_DELAY_LEVEL18) {
                this.messageDeliveryDelayLevel = RocketMQConfig.MSG_DELAY_LEVEL18;
            }
        }
    }
    @Override
    public void open(Configuration parameters) throws Exception {
		Validate.notEmpty(props, "Producer properties can not be empty");
        Validate.notNull(topicSelector, "TopicSelector can not be null");
        Validate.notNull(serializationSchema, "KeyValueSerializationSchema can not be null");

        producer = new DefaultMQProducer();
        producer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()));
        RocketMQConfig.buildProducerConfigs(props, producer);
        batchList = new LinkedList<>();

        if (batchFlushOnCheckpoint && !((StreamingRuntimeContext) getRuntimeContext()).isCheckpointingEnabled()) {
            LOG.warn("Flushing on checkpoint is enabled, but checkpointing is not enabled. Disabling flushing.");
            batchFlushOnCheckpoint = false;
        }

        try {
            producer.start();
        } catch (MQClientException e) {
            throw new RuntimeException(e);
        }
	}
	Override
    public void invoke(IN input, Context context) throws Exception {
        Message msg = prepareMessage(input);

        if (batchFlushOnCheckpoint) {
            batchList.add(msg);
            if (batchList.size() >= batchSize) {
                flushSync();
            }
            return;
        }
        if (async) {
			try {
				producer.send(msg,new MessageQueueSelector() {
                    // 每个节点轮着来
                    @Override
                    public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
                        // arg就是send的第3个参数
                        long vehicleId = (Long) arg;
                        return mqs.get((int) (vehicleId % mqs.size()));
                    }
                },msg.getKeys(), new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        LOG.debug("Async send message success! result: {}", sendResult);
                    }
                       @Override
                    public void onException(Throwable throwable) {
                        if (throwable != null) {
                            LOG.error("Async send message failure!", throwable);
                        }
                    }
                });
			}catch (Exception e) {
                LOG.error("Async send message failure!", e);
            }
		}else {
            try {
                SendResult result = producer.send(msg);
                LOG.debug("Sync send message result: {}", result);
            } catch (Exception e) {
                LOG.error("Sync send message failure!", e);
            }
        }
     }
     /**
     * 解析消息
     *
     * @param input
     * @return
     */
     private Message prepareMessage(IN input) {
        String topic = topicSelector.getTopic(input);
        String tag = topicSelector.getTag(input) != null ? topicSelector.getTag(input) : "";

        byte[] k = serializationSchema.serializeKey(input);
        String key = k != null ? String.valueOf(NumberToByteUtil.bytes2Long(k)) : "";
        byte[] value = serializationSchema.serializeValue(input);

        Validate.notNull(topic, "the message topic is null");
        Validate.notNull(value, "the message body is null");

        Message msg = new Message(topic, tag, key, value);
        if (this.messageDeliveryDelayLevel > RocketMQConfig.MSG_DELAY_LEVEL00) {
            msg.setDelayTimeLevel(this.messageDeliveryDelayLevel);
        }
        return msg;
    }
	public RocketMQSink<IN> withAsync(boolean async) {
        this.async = async;
        return this;
    }

    public RocketMQSink<IN> withBatchFlushOnCheckpoint(boolean batchFlushOnCheckpoint) {
        this.batchFlushOnCheckpoint = batchFlushOnCheckpoint;
        return this;
    }

    public RocketMQSink<IN> withBatchSize(int batchSize) {
        this.batchSize = batchSize;
        return this;
    }
    @Override
    public void close() throws Exception {
        if (producer != null) {
            flushSync();
            producer.shutdown();
        }
    }
    private void flushSync() throws Exception {
        if (batchFlushOnCheckpoint) {
            synchronized (batchList) {
                if (batchList.size() > 0) {
                    producer.send(batchList);
                    batchList.clear();
                }
            }
        }
    }
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        flushSync();
    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        // Nothing to do
    }

}

RocketMQSource.java

public class RocketMQSource <OUT> extends RichParallelSourceFunction<OUT>
        implements CheckpointedFunction, ResultTypeQueryable<OUT> {
       		private static final long serialVersionUID = 1L;

    private static final Logger LOG = LoggerFactory.getLogger(RocketMQSource.class);


    private transient MQPullConsumerScheduleService pullConsumerScheduleService;
    private DefaultMQPullConsumer consumer;

    private KeyValueDeserializationSchema<OUT> schema;

    private RunningChecker runningChecker;

    private transient ListState<Tuple2<MessageQueue, Long>> unionOffsetStates;
    private Map<MessageQueue, Long> offsetTable;
    private Map<MessageQueue, Long> restoredOffsets;

    private Properties props;
    private String topic;
    private String group;

    private static final String OFFSETS_STATE_NAME = "topic-partition-offset-states";

    private transient volatile boolean restored;
    public RocketMQSource(KeyValueDeserializationSchema<OUT> schema, Properties props) {
        this.schema = schema;
        this.props = props;
    }
    @Override
    public void open(Configuration parameters) throws Exception {
        LOG.info("启动定位数据消费Mq");
        Validate.notEmpty(props, "Consumer properties can not be empty");
        Validate.notNull(schema, "KeyValueDeserializationSchema can not be null");

        this.topic = props.getProperty(RocketMQConfig.CONSUMER_TOPIC);
        this.group = props.getProperty(RocketMQConfig.CONSUMER_GROUP);

        Validate.notEmpty(topic, "Consumer topic can not be empty");
        Validate.notEmpty(group, "Consumer group can not be empty");
        if (offsetTable == null) {
            offsetTable = new ConcurrentHashMap<>();
        }
        if (restoredOffsets == null) {
            restoredOffsets = new ConcurrentHashMap<>();
        }

        runningChecker = new RunningChecker();

        pullConsumerScheduleService = new MQPullConsumerScheduleService(group);
        consumer = pullConsumerScheduleService.getDefaultMQPullConsumer();

        consumer.setInstanceName(String.valueOf(getRuntimeContext().getIndexOfThisSubtask()));
        RocketMQConfig.buildConsumerConfigs(props, consumer);
     }
     @Override
    public void run(SourceContext<OUT> context) throws Exception {
        LOG.info("开始发送数据");
    	// The lock that guarantees that record emission and state updates are atomic,
        // from the view of taking a checkpoint.
        final Object lock = context.getCheckpointLock();

        int delayWhenMessageNotFound = getInteger(props, RocketMQConfig.CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND,
                RocketMQConfig.DEFAULT_CONSUMER_DELAY_WHEN_MESSAGE_NOT_FOUND);

        String tag = props.getProperty(RocketMQConfig.CONSUMER_TAG, RocketMQConfig.DEFAULT_CONSUMER_TAG);

        int pullPoolSize = getInteger(props, RocketMQConfig.CONSUMER_PULL_POOL_SIZE,
                RocketMQConfig.DEFAULT_CONSUMER_PULL_POOL_SIZE);

        int pullBatchSize = getInteger(props, RocketMQConfig.CONSUMER_BATCH_SIZE,
                RocketMQConfig.DEFAULT_CONSUMER_BATCH_SIZE);
        pullConsumerScheduleService.setPullThreadNums(pullPoolSize);
        pullConsumerScheduleService.registerPullTaskCallback(topic, new PullTaskCallback() {
			 @Override
            public void doPullTask(MessageQueue mq, PullTaskContext pullTaskContext) {
				try {
					long offset = getMessageQueueOffset(mq);
                    if (offset < 0) {
                        return;
                    }
                    PullResult pullResult = consumer.pull(mq, tag, offset, pullBatchSize);
                    boolean found = false;
                    switch (pullResult.getPullStatus()) {
						case FOUND:
                            List<MessageExt> messages = pullResult.getMsgFoundList();
                             for (MessageExt msg : messages) {
                                byte[] key = msg.getKeys() != null ? msg.getKeys().getBytes(StandardCharsets.UTF_8) : null;
                                byte[] value = msg.getBody();

                                OUT data = schema.deserializeKeyAndValue(key, value);

                                // output and state update are atomic
                                synchronized (lock) {
                                    if(data!=null){
                                        context.collectWithTimestamp(data, msg.getBornTimestamp());
                                    }
                                }
                            }
                            found = true;
                            break;
                        case NO_MATCHED_MSG:
                            LOG.info("No matched message after offset {} for queue {}", offset, mq);
                            break;
                        case NO_NEW_MSG:
                            break;
                        case OFFSET_ILLEGAL:
                            LOG.info("Offset {} is illegal for queue {}", offset, mq);
                            break;
                        default:
                            break;
					}
					synchronized (lock) {
                        putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    }

                    if (found) {
                        pullTaskContext.setPullNextDelayTimeMillis(0); // no delay when messages were found
                    } else {
                        pullTaskContext.setPullNextDelayTimeMillis(delayWhenMessageNotFound);
                    }
				}catch (Exception e) {
                    throw new RuntimeException(e);
                }
			}
		});
		try {
            pullConsumerScheduleService.start();
        } catch (MQClientException e) {
            throw new RuntimeException(e);
        }

        runningChecker.setRunning(true);

        awaitTermination();
    }
    private void awaitTermination() throws InterruptedException {
        while (runningChecker.isRunning()) {
            Thread.sleep(50);
        }
    }
    private long getMessageQueueOffset(MessageQueue mq) throws MQClientException {
    	Long offset = offsetTable.get(mq);
        // restoredOffsets(unionOffsetStates) is the restored global union state;
        // should only snapshot mqs that actually belong to us
        if (restored && offset == null) {
            offset = restoredOffsets.get(mq);
        }
        if (offset == null) {
        	offset = consumer.fetchConsumeOffset(mq, false);
            if (offset < 0) {
            String initialOffset = props.getProperty(RocketMQConfig.CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST);
            switch (initialOffset) {
                    case CONSUMER_OFFSET_EARLIEST:
                        offset = consumer.minOffset(mq);
                        break;
                    case CONSUMER_OFFSET_LATEST:
                        offset = consumer.maxOffset(mq);
                        break;
                    case CONSUMER_OFFSET_TIMESTAMP:
                        offset = consumer.searchOffset(mq, getLong(props,
                                RocketMQConfig.CONSUMER_OFFSET_FROM_TIMESTAMP, System.currentTimeMillis()));
                        break;
                    default:
                        throw new IllegalArgumentException("Unknown value for CONSUMER_OFFSET_RESET_TO.");
                }
            }
        }
        offsetTable.put(mq, offset);
        return offsetTable.get(mq);
    }
    private void putMessageQueueOffset(MessageQueue mq, long offset) throws MQClientException {
        offsetTable.put(mq, offset);
        consumer.updateConsumeOffset(mq, offset);
    }
    @Override
    public void cancel() {
        LOG.info("cancel ...");
        runningChecker.setRunning(false);

        if (pullConsumerScheduleService != null) {
            pullConsumerScheduleService.shutdown();
        }

        offsetTable.clear();
        restoredOffsets.clear();
    }
    @Override
    public void close() throws Exception {
        LOG.info("close ...");
        try {
            cancel();
        } finally {
            super.close();
        }
    }
    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
    	if (!runningChecker.isRunning()) {
            LOG.info("snapshotState() called on closed source; returning null.");
            return;
        }

        if (LOG.isDebugEnabled()) {
            LOG.info("Snapshotting state {} ...", context.getCheckpointId());
        }

        unionOffsetStates.clear();

        if (LOG.isDebugEnabled()) {
            LOG.info("Snapshotted state, last processed offsets: {}, checkpoint id: {}, timestamp: {}",
                    offsetTable, context.getCheckpointId(), context.getCheckpointTimestamp());
        }
    }
     // remove the unassigned queues in order to avoid read the wrong offset when the source restart
        Set<MessageQueue> assignedQueues = consumer.fetchMessageQueuesInBalance(topic);
        offsetTable.entrySet().removeIf(item -> !assignedQueues.contains(item.getKey()));

        for (Map.Entry<MessageQueue, Long> entry : offsetTable.entrySet()) {
            unionOffsetStates.add(Tuple2.of(entry.getKey(), entry.getValue()));
        }
        @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
    	// called every time the user-defined function is initialized,
        // be that when the function is first initialized or be that
        // when the function is actually recovering from an earlier checkpoint.
        // Given this, initializeState() is not only the place where different types of state are initialized,
        // but also where state recovery logic is included.
        LOG.info("initialize State ...");
        this.unionOffsetStates = context.getOperatorStateStore().getUnionListState(new ListStateDescriptor<>(
                OFFSETS_STATE_NAME, TypeInformation.of(new TypeHint<Tuple2<MessageQueue, Long>>() {
        })));

        this.restored = context.isRestored();
        if (restored) {
            if (restoredOffsets == null) {
                restoredOffsets = new ConcurrentHashMap<>();
            }
            for (Tuple2<MessageQueue, Long> mqOffsets : unionOffsetStates.get()) {
                if (!restoredOffsets.containsKey(mqOffsets.f0) || restoredOffsets.get(mqOffsets.f0) < mqOffsets.f1) {
                    restoredOffsets.put(mqOffsets.f0, mqOffsets.f1);
                }
            }
            LOG.info("Setting restore state in the consumer. Using the following offsets: {}", restoredOffsets);
        } else {
            LOG.info("No restore state for the consumer.");
        }
    }
	@Override
    public TypeInformation<OUT> getProducedType() {
        return schema.getProducedType();
    }
}

RocketMQUtils.java

public final class RocketMQUtils {

    public static int getInteger(Properties props, String key, int defaultValue) {
        return Integer.parseInt(props.getProperty(key, String.valueOf(defaultValue)));
    }

    public static long getLong(Properties props, String key, long defaultValue) {
        return Long.parseLong(props.getProperty(key, String.valueOf(defaultValue)));
    }

    public static boolean getBoolean(Properties props, String key, boolean defaultValue) {
        return Boolean.parseBoolean(props.getProperty(key, String.valueOf(defaultValue)));
    }

}

RunningChecker.java

public class RunningChecker implements Serializable {
    private volatile boolean isRunning = false;

    public boolean isRunning() {
        return isRunning;
    }

    public void setRunning(boolean running) {
        isRunning = running;
    }
}

以下代码路径:source
RocketMQPosInfoSource.java文章来源地址https://www.toymoban.com/news/detail-405298.html


/**
 * rocketmq 定位数据流源
 */
public class RocketMQPosInfoSource extends RocketMQSource<VehiclePosInfo> implements ResultTypeQueryable<VehiclePosInfo> {
    public RocketMQPosInfoSource(KeyValueDeserializationSchema<VehiclePosInfo> schema, Properties props) {
        super(schema, props);
    }
}

到了这里,关于flink 整合rocketmq的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 【Flink-Kafka-To-RocketMQ】使用 Flink 自定义 Sink 消费 Kafka 数据写入 RocketMQ

    这里的 maven 依赖比较冗余,推荐大家都加上,后面陆续优化。 注意: 1、此程序中所有的相关配置都是通过 Mysql 读取的(生产环境中没有直接写死的,都是通过配置文件动态配置),大家实际测试过程中可以将相关配置信息写死。 2、此程序中 Kafka 涉及到了 Kerberos 认证操作

    2024年02月03日
    浏览(50)
  • Flink RocketMQ Connector实现

    Flink内置了很多Connector,可以满足大部分场景。但是还是有一些场景无法满足,比如RocketMQ。需要消费RocketMQ的消息,需要自定时Source。 参考FlinkKafkaConsumer: 可以看到,自定义的Source,只需要实现SourceFunction。 创建FlinkRocketMQConsumer,实现SourceFunction,重写run()和cancel()方法 需要

    2024年02月11日
    浏览(38)
  • Springbootg整合RocketMQ ——使用 rocketmq-spring-boot-starter 来配置发送和消费 RocketMQ 消息

           本文解析将 RocketMQ Client 端集成为 spring-boot-starter 框架的开发细节,然后通过一个简单的示例来一步一步的讲解如何使用这个 spring-boot-starter 工具包来配置,发送和消费 RocketMQ 消息。 添加maven依赖: 修改application.properties 注意: 请将上述示例配置中的 127.0.0.1:9876 替换

    2024年03月22日
    浏览(42)
  • rocketMq消息队列详细使用与实践整合spring

    使用RocketMQ的原生API开发是最简单也是目前看来最牢靠的方式。这里用SpringBoot来搭建一系列消息生产者和消息消费者,来访问之前搭建的RocketMQ集群。 首先创建一个基于Maven的SpringBoot工程,引入如下依赖: RocketMQ的官网上有很多经典的测试代码,这些代码虽然依赖的版本比较

    2024年02月13日
    浏览(40)
  • Spring Cloud Alibaba整合RocketMQ架构原理分析

    关于RocketMQ的原理,本文就不做详细分析了,这里就重点关注Spring Cloud Alibaba是如何整合RocketrMQ的。 RocketMQ提供了RocketMQ Client SDK,开发者可以直接依赖这个SDK,就可以完成消息的生产和消费。 1.生产消息 RocketMQ Client SDK提供了生产消息的API接口DefaultMQProducer,开发者可以直接使

    2024年01月22日
    浏览(50)
  • SpringBoot整合RocketMQ,老鸟们都是这么玩的!

    今天我们来讨论如何在项目开发中优雅地使用RocketMQ。本文分为三部分,第一部分实现SpringBoot与RocketMQ的整合,第二部分解决在使用RocketMQ过程中可能遇到的一些问题并解决他们,第三部分介绍如何封装RocketMQ以便更好地使用。 在SpringBoot中集成RocketMQ,只需要简单四步: 引入

    2023年04月10日
    浏览(33)
  • SpringBoot3.0整合RocketMQ时出现未能加载bean文件

    问题 APPLICATION FAILED TO START Description: Field rocketMQTemplate in com.spt.message.service.MqProducerService required a bean of type ‘org.apache.rocketmq.spring.core.RocketMQTemplate’ that could not be found. The injection point has the following annotations: - @org.springframework.beans.factory.annotation.Autowired(required=true) Action: Consider

    2024年02月12日
    浏览(36)
  • SpringBoot整合消息中间件(ActiveMQ,RabbitMQ,RocketMQ,Kafka)

    消息的发送方:生产者 消息的接收方:消费者 同步消息:发送方发送消息到接收方,接收方有所回应后才能够进行下一次的消息发送 异步消息:不需要接收方回应就可以进行下一步的发送 什么是消息队列? 当此时有很多个用户同时访问服务器,需要服务器进行操作,但此

    2024年04月27日
    浏览(48)
  • springboot整合rocketmq:一个消费者组怎么订阅多个topic

            一个消费者组中的所有消费者订阅关系,可以多个topic,多个tag,但是必须一致,否则就倒沫子了,如下图:  下面贴了如下结构的代码  一个消费组(消费者)订阅多个topic的代码(只写了一个消费组的,其他类似): 结果:

    2024年02月15日
    浏览(50)
  • Sprint Cloud Stream整合RocketMq和websocket实现消息发布订阅

    1. 引入RocketMQ依赖 :首先,在 pom.xml 文件中添加RocketMQ的依赖: 2. 配置RocketMQ连接信息 :在 application.properties 或 application.yml 中配置RocketMQ的连接信息,包括Name Server地址等: 3.消息发布组件 4.消息发布控制器 项目结构: 接下来是websocket模块的搭建 1. 依赖添加 2.application.yml配

    2024年02月08日
    浏览(36)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包