Storm 集群的搭建及其Java编程进行简单统计计算

这篇具有很好参考价值的文章主要介绍了Storm 集群的搭建及其Java编程进行简单统计计算。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

一、Storm集群构建

编写storm 与 zookeeper的yml文件

 Storm 集群的搭建及其Java编程进行简单统计计算

storm yml文件的编写

具体如下:

version: '2'

services:

  zookeeper1:

    image: registry.aliyuncs.com/denverdino/zookeeper:3.4.8

    container_name: zk1.cloud

    environment:

      - SERVER_ID=1

      - ADDITIONAL_ZOOKEEPER_1=server.1=0.0.0.0:2888:3888

      - ADDITIONAL_ZOOKEEPER_2=server.2=zk2.cloud:2888:3888

      - ADDITIONAL_ZOOKEEPER_3=server.3=zk3.cloud:2888:3888

  zookeeper2:

    image: registry.aliyuncs.com/denverdino/zookeeper:3.4.8

    container_name: zk2.cloud

    environment:

      - SERVER_ID=2

      - ADDITIONAL_ZOOKEEPER_1=server.1=zk1.cloud:2888:3888

      - ADDITIONAL_ZOOKEEPER_2=server.2=0.0.0.0:2888:3888

      - ADDITIONAL_ZOOKEEPER_3=server.3=zk3.cloud:2888:3888

  zookeeper3:

    image: registry.aliyuncs.com/denverdino/zookeeper:3.4.8

    container_name: zk3.cloud

    environment:

      - SERVER_ID=3

      - ADDITIONAL_ZOOKEEPER_1=server.1=zk1.cloud:2888:3888

      - ADDITIONAL_ZOOKEEPER_2=server.2=zk2.cloud:2888:3888

      - ADDITIONAL_ZOOKEEPER_3=server.3=0.0.0.0:2888:3888

  ui:

    image: registry.aliyuncs.com/denverdino/baqend-storm:1.0.0

    command: ui -c nimbus.host=nimbus

    environment:

      - STORM_ZOOKEEPER_SERVERS=zk1.cloud,zk2.cloud,zk3.cloud

    restart: always

    container_name: ui

    ports:

      - 8080:8080

    depends_on:

      - nimbus

  nimbus:

    image: registry.aliyuncs.com/denverdino/baqend-storm:1.0.0

    command: nimbus -c nimbus.host=nimbus

    restart: always

    environment:

      - STORM_ZOOKEEPER_SERVERS=zk1.cloud,zk2.cloud,zk3.cloud

    container_name: nimbus

    ports:

      - 6627:6627

  supervisor:

    image: registry.aliyuncs.com/denverdino/baqend-storm:1.0.0

    command: supervisor -c nimbus.host=nimbus -c supervisor.slots.ports=[6700,6701,6702,6703]

    restart: always

    environment:

      - affinity:role!=supervisor

      - STORM_ZOOKEEPER_SERVERS=zk1.cloud,zk2.cloud,zk3.cloud

    depends_on:

      - nimbus

networks:

  default:

    external:

      name: zk-net

 

 

拉取Storm搭建需要的镜像,这里我选择镜像版本为 zookeeper:3.4.8  storm:1.0.0

键入命令:

docker pull zookeeper:3.4.8  docker pull storm:1.0.0

Storm 集群的搭建及其Java编程进行简单统计计算 

storm镜像 获取

使用docker-compose 构建集群

在power shell中执行以下命令:

 

docker-compose -f storm.yml up -d

 Storm 集群的搭建及其Java编程进行简单统计计算

                                                                              docker-compose 构建集群

在浏览器中打开localhost:8080 可以看到storm集群的详细情况

 Storm 集群的搭建及其Java编程进行简单统计计算

storm UI 展示

二、Storm统计任务

统计股票交易情况交易量和交易总金额   (数据文件存储在csv文件中)

编写DataSourceSpout类

Storm 集群的搭建及其Java编程进行简单统计计算 

DataSourceSpout类

编写bolt类

 

 

 Storm 集群的搭建及其Java编程进行简单统计计算

编写topology类

 

 Storm 集群的搭建及其Java编程进行简单统计计算

需要注意的是 Storm Java API 下有本地模型和远端模式

在本地模式下的调试不依赖于集群环境,可以进行简单的调试

如果需要使用生产模式,则需要将

1、 编写和自身业务相关的spout和bolt类,并将其打包成一个jar包

 

2、将上述的jar包放到客户端代码能读到的任何位置,

 

3、使用如下方式定义一个拓扑(Topology)

 Storm 集群的搭建及其Java编程进行简单统计计算

 

演示结果:

本地模式下的调试:

 

正在执行:

 Storm 集群的搭建及其Java编程进行简单统计计算

根据24小时

 

 Storm 集群的搭建及其Java编程进行简单统计计算

根据股票种类

 

 

生产模式:

 Storm 集群的搭建及其Java编程进行简单统计计算

向集群提交topology

                                                       

 

 

 

三、核心计算bolt的代码

1.统计不同类型的股票交易量和交易总金额:

package bolt;

 

import java.io.IOException;

import java.util.HashMap;

import java.util.Map;

import java.util.Set;

 

import org.apache.storm.task.OutputCollector;

import org.apache.storm.task.TopologyContext;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.topology.base.BaseRichBolt;

import org.apache.storm.tuple.Tuple;

import org.apache.storm.tuple.Values;

 

@SuppressWarnings("serial")

public class TypeCountBolt extends BaseRichBolt {

 

    OutputCollector collector;

 

    Map<String,Integer> map = new HashMap<String, Integer>();

 

    Map<String,Float> map2 = new HashMap<String, Float>();

 

 

    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

        this.collector = collector;

 

    }

 

    public void execute(Tuple input) {

        String line = input.getStringByField("line");

        String[] data = line.split(",");

        Integer count = map.get(data[2]);

        Float total_amount = map2.get(data[2]);

        if(count==null){

            count = 0;

        }

        if(total_amount==null){

            total_amount = 0.0f;

        }

        count++;

        total_amount+=Float.parseFloat(data[3]) * Integer.parseInt(data[4]);

        map.put(data[2],count);

        map2.put(data[2],total_amount);

 

        System.out.println("~~~~~~~~~~~~~~~~~~~~~~~");

        Set<Map.Entry<String,Integer>> entrySet = map.entrySet();

        for(Map.Entry<String,Integer> entry :entrySet){

            System.out.println("交易量:");

            System.out.println(entry);

        }

        System.out.println();

        Set<Map.Entry<String,Float>> entrySet2 = map2.entrySet();

        for(Map.Entry<String,Float> entry :entrySet2){

            System.out.println("交易总金额:");

            System.out.println(entry);

        }

    }

 

 

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }

 

}

 

 

2. 统计不同每个小时的交易量和交易总金额

package bolt;

 

import org.apache.storm.task.OutputCollector;

import org.apache.storm.task.TopologyContext;

import org.apache.storm.topology.OutputFieldsDeclarer;

import org.apache.storm.topology.base.BaseRichBolt;

import org.apache.storm.tuple.Tuple;

 

import java.text.ParseException;

import java.text.SimpleDateFormat;

import java.util.Date;

import java.util.HashMap;

import java.util.Map;

import java.util.Set;

 

public  class TimeCountBolt extends BaseRichBolt {

    OutputCollector collector;

 

    Map<Integer,Integer> map = new HashMap<Integer, Integer>();

 

    Map<Integer,Float> map2 = new HashMap<Integer, Float>();

 

 

    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {

        this.collector = collector;

 

    }

 

    public void execute(Tuple input) {

        String line = input.getStringByField("line");

        String[] data = line.split(",");

 

        Date date = new Date();

        SimpleDateFormat dateFormat= new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");

        try {

            date = dateFormat.parse(data[0]);

        } catch (ParseException e) {

            e.printStackTrace();

        }

 

        Integer count = map.get(date.getHours());

        Float total_amount = map2.get(date.getHours());

        if(count==null){

            count = 0;

        }

        if(total_amount==null){

            total_amount = 0.0f;

        }

        count++;

        total_amount+=Float.parseFloat(data[3]) * Integer.parseInt(data[4]);

        map.put(date.getHours(),count);

        map2.put(date.getHours(),total_amount);

 

        System.out.println("~~~~~~~~~~~~~~~~~~~~~~~");

        Set<Map.Entry<Integer,Integer>> entrySet = map.entrySet();

        for(Map.Entry<Integer,Integer> entry :entrySet){

            System.out.println("交易量:");

            System.out.println(entry);

        }

        System.out.println();

        Set<Map.Entry<Integer,Float>> entrySet2 = map2.entrySet();

        for(Map.Entry<Integer,Float> entry :entrySet2){

            System.out.println("交易总金额:");

            System.out.println(entry);

        }

    }

 

 

    public void declareOutputFields(OutputFieldsDeclarer declarer) {

    }

}

 文章来源地址https://www.toymoban.com/news/detail-760110.html

到了这里,关于Storm 集群的搭建及其Java编程进行简单统计计算的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Storm学习之使用官方Docker镜像快速搭建Storm运行环境

    Apache Storm 官方也出了Docker 镜像 https://hub.docker.com/_/storm/ 本文我们就基于官方镜像搭建一个 Apache Storm 2.4 版本的运行环境,供大家后续学习。 有问题可以参考issue 解决, 我的安装过程一路都很顺畅。所以基本上没有看下面是我的详细操作和截图 。 可以说网上的乱七八糟的教

    2024年02月14日
    浏览(59)
  • Redis持久化说明及其单台Linux服务器搭建Redis集群架构

    说明:RDB快照主要以二进制文件的形式进行存储数据,主要以文件名dump.rdb进行存储,主要设置redis.conf里面设置’save 60 1000’命令可以开启, 表示在60秒内操作1000次进行一次备份数据。在客户端执行save(同步)和bgsave(异步操作)。 redis.conf 启动redis相关命令 说明:主要把文件生

    2024年02月10日
    浏览(59)
  • es 集群简单介绍及搭建

    Cluster :代表一个集群,集群中有多个节点,其中有一个为主节点,这个主节点是可以通过选举产生的,主从节点是对于集群内部来说的。es 的一个概念就是去中心化,字面上理解就是无中心节点,这是对于集群外部来说的,因为从外部来看 es 集群,在逻辑上是个整体,你与

    2024年04月15日
    浏览(30)
  • Windows kafka 简单集群搭建

    项目 版本 操作系统环境 Windows 11 / 64 位操作系统 Zookeeper环境 zookeeper-3.4.8 kafka环境 kafka_2.12-3.7.0 要搭建kafka集群首先要搭建Zookeeper集群 2.1 ZooKeeper下载 ZooKeeper下载地址 本文选用的是3.4.8版本 2.2 ZooKeeper安装 拷贝多份zookeeper程序,此处设置三个server,分别创建目录Server-A、Serve

    2024年03月14日
    浏览(49)
  • 快速搭建k8s集群,使用kubekey搭建简单搭建

    1. 安装环境依赖 2. 下载KubeKey工具,下载好后就会出现 kk 这个可执行文件 3. 使用KubeKey生成一个k8s集群启动模板k8s.yaml,编辑好账号密码后保存 模板如下:  需要调整地方: 4.  执行刚才的文件 此时界面会安装下载各种组件并安装,大概等待5-10分钟即可完成安装 6. 安装完

    2024年02月09日
    浏览(63)
  • pytorch搭建squeezenet网络的整套工程,及其转tensorrt进行cuda加速

    本来,前辈们用caffe搭建了一个squeezenet的工程,用起来也还行,但考虑到caffe的停更后续转trt应用在工程上时可能会有版本的问题所以搭建了一个pytorch版本的。 以下的环境搭建不再细说,主要就是pyorch,其余的需要什么pip install什么。 squeezenet的网络结构及其具体的参数如下

    2024年02月09日
    浏览(41)
  • 【免费题库】华为OD题库C卷 - API集群负载统计(Java 代码+解析)

    哈喽,本题库完全免费,收费是为了防止被爬,大家订阅专栏后可以私信联系退款。感谢支持 某个产品的RESTful API集合部署在服务器集群的多个节点上,近期对客户端访问日志进行了采集,需要统计各个API的访问频次,根据热点信息在服务器节点之间做负载均衡,现在需要实

    2024年04月13日
    浏览(47)
  • 【免费题库】华为OD机试 - API集群负载统计(Java & JS & Python & C & C++)

    哈喽,本题库完全免费,收费是为了防止被爬,大家订阅专栏后可以私信联系退款。感谢支持 某个产品的RESTful API集合部署在服务器集群的多个节点上,近期对客户端访问日志进行了采集,需要统计各个API的访问频次,根据热点信息在服务器节点之间做负载均衡,现在需要实

    2024年04月10日
    浏览(107)
  • 无需编程,简单易上手的家具小程序搭建方法分享

    想要开设一家家具店的小程序吗?现在,我将为大家介绍如何使用乔拓云平台搭建一个家具小程序,帮助您方便快捷地开展线上家具销售业务。 第一步,登录乔拓云平台进入商城后台管理页面。 第二步,在乔拓云平台的后台管理页面中,找到并点击【小程序商城】模块,进

    2024年01月16日
    浏览(47)
  • 【华为机试真题详解JAVA实现】—识别有效的IP地址和掩码并进行分类统计

        目录 一、题目描述 二、解题代码 请解析IP地址和对应的掩码,进行分类识别。要求按照A/B/C/D/E类地址归类,不合法的地址和掩码单独归类。 所有的IP地址划分为 A,B,C,D,E五类 A类地址从1.0.0.0到126.255.255.255; B类地址从128.0.0.0到191.255.255.255; C类地址从192.0.0.0到223.255.255.255;

    2023年04月09日
    浏览(42)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包