[时序数据库]:InfluxDB进阶

这篇具有很好参考价值的文章主要介绍了[时序数据库]:InfluxDB进阶。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

1 摘要

摘要:InfluxQL;InfluxQL工具类;influxdb.java客户端

2 背景

2.1 问题一:针对Influx V2.0工具

针对新版Influx V2.0 版本数据库:

  • 其一,influx支持两种查询语言,flux和InfluxQL,然后InfluxQL在高版本中已没有得到较好维护,因而,在后续开发中,笔者采用Influx V1.x 版本来进行开发。

2.2 问题二:针对Influx查询语言

针对使用新版Influx数据库,目前在其上做操做有两种方法,

  • 其一,使用官方的UI工具,缺点:由于是可视化拼购操作,对操作有所限制。
  • 其二,使用flux语言,flux是一种查询语言,其语法格式类似于R语言,具有管道符这些的形式,其也是官方所推荐的,然而由于没太使用过,此处不做展开,如有兴趣自己查询。
  • 最后,也就是笔者所推荐的,InfluxQL其语法格式,高度切合于SQL语言,因而作为influx快速使用所推荐。

3 需求分析

正如,上述所描述问题,此处选择环境:

  • 版本:influx v1.x
  • 查询实现语言:InfluxQL

4 快速入门

4.1 客户端驱动版本选择

        <!--influxDB-->
        <dependency>
            <groupId>org.influxdb</groupId>
            <artifactId>influxdb-java</artifactId>
            <version>2.19</version>
        </dependency>

4.2 连接influx

4.2.1 influx配置信息

示例如下,可以优化:

spring:
  influxdb:
    url: yourURL
    databaseApply: databaseName1
    databaseTemp: databaseName2

4.2.2 influx连接配置

/**
 * influx配置读取
 */
@Data
@Component
@ConfigurationProperties(prefix = "spring.influxdb")
public class InfluxProperties {

    private String url; //influx访问URL
//    private String user; //用户名
//    private String password; //密码
    private String databaseApply; //应用数据库
    private String databaseTemp; //备份用数据库

    public InfluxDB getConnectionDatabaseApply() {
        return InfluxDBFactory.connect(url).setDatabase(databaseApply);
    }

    public InfluxDB getConnectionDatabaseTemp() {
        return InfluxDBFactory.connect(url).setDatabase(databaseTemp);
    }
}

4.2.3 测试连通情况

    @Test
    public void testInfluxQLWithMoreTerm() {
        InfluxDB influxDB = influxProperties.getConnectionDatabaseTemp();

        System.out.println("influxDB.ping() = " + influxDB.ping());
    }
}

5 Influx工具类

5.1 InfluxQL工具类

5.1.1 出现背景

目前针对InfluxQL暂无诸如mybatis样的,持久层框架,因而此处提供一种工具类的解决方案。
核心思想:InfluxQL高度类似于SQL,因而我们把诸如select、group等着关键字,封装成工具类方法,以实现链式InfluxQL,便于后续开发于维护。

5.1.2 InfluxQL工具类

/**
 * InfluxQL构造器
 *
 * @author jx
 * @date 2023/7/23 21:24
 */

public class InfluxQLQuery {

    private final StringBuilder queryBuilder;
    private String database;


    private InfluxQLQuery(String query) {
        this.queryBuilder = new StringBuilder(query);
    }

    public static InfluxQLQuery select(String... fields) {
        StringJoiner joiner = new StringJoiner(", ");
        for (String field : fields) {
            if (!field.isEmpty()) {
                joiner.add(field);
            }
        }
        return new InfluxQLQuery("SELECT " + joiner);
    }

    //    SELECT max("A_Cond_Temp"), min("A_Feed_Pump_Power_Cons_Rate") FROM "JJjTEStljx" GROUP BY "A_Cond_Temp", "A_Feed_Pump_Power_Cons_Rate"
    public InfluxQLQuery select(List<AggregationType> aggregationTypes, String... fields) {

        //得到类型
        List<String> list = new ArrayList<>();
        for (AggregationType aggregationType : aggregationTypes) {
            list.add(aggregationType +"(\""+ fields +"\"");
        }
        return new InfluxQLQuery("SELECT " + list);
    }

    public InfluxQLQuery from(String measurement) {
        queryBuilder.append(" FROM ").append(measurement);
        return this;
    }

    public InfluxQLQuery where(String condition) {
        queryBuilder.append(" WHERE ").append(condition);
        return this;
    }

    public InfluxQLQuery groupBy(String grouping) {
        queryBuilder.append(" GROUP BY ").append(grouping);
        return this;
    }

    /**
     * 设置查询时,时间戳为上海时间
     *
     * @return 上海时区
     */
    public InfluxQLQuery setShanghaiTimeZone() {
        queryBuilder.append(" tz('Asia/Shanghai') ");
        return this;
    }

    public QueryResult execute(InfluxDB influxDB, String database) {
        Query queryObject = new Query(queryBuilder.toString(), database);
        return influxDB.query(queryObject);
    }

    public InfluxQLQuery selectMean(String field) {
        queryBuilder.append("MEAN(").append(field).append(")");
        return this;
    }

    public InfluxQLQuery selectMin(String field) {
        queryBuilder.append("min(").append(field).append(")");
        return this;
    }

    public InfluxQLQuery selectMax(String field) {
        queryBuilder.append("max(").append(field).append(")");
        return this;
    }

    public InfluxQLQuery selectSum(String field) {
        queryBuilder.append(" sum(").append(field).append(")");
        return this;
    }

    public InfluxQLQuery selectCount(String field) {
        queryBuilder.append(" count(").append(field).append(")");
        return this;
    }

    /**
     * 查询结果的最大数量
     *
     * @param limit
     * @return
     */
    public InfluxQLQuery limit(int limit) {
        queryBuilder.append(" LIMIT ").append(limit);
        return this;
    }

    /**
     * 设置当前操作的值。
     *
     * @param interval
     * @return
     */
    public InfluxQLQuery interval(String interval) {
        queryBuilder.append(" INTERVAL ").append(interval);
        return this;
    }

    /**
     * 设置查询条件中的标签。
     *
     * @param tags
     * @return
     */
    public InfluxQLQuery whereTags(Map<String, String> tags) {
        StringBuilder tagsBuilder = new StringBuilder();
        for (Map.Entry<String, String> entry : tags.entrySet()) {
            String tagKey = entry.getKey();
            String tagValue = entry.getValue();
            tagsBuilder.append(" \"").append(tagKey).append("\"='").append(tagValue).append("' AND");
        }
        // 删除最后的 AND
        if (tagsBuilder.length() > 0) {
            tagsBuilder.setLength(tagsBuilder.length() - 4);
        }
        queryBuilder.append(" WHERE").append(tagsBuilder);
        return this;
    }

    /**
     * 设置填充策略。
     *
     * @param value
     * @return
     */
    public InfluxQLQuery fill(String value) {
        queryBuilder.append(" fill(").append(value).append(")");
        return this;
    }

    /**
     * 查询结果的排序方式。
     *
     * @param field
     * @param direction
     * @return
     */
    public InfluxQLQuery orderBy(String field, String direction) {
        queryBuilder.append(" ORDER BY ").append(field).append(" ").append(direction);
        return this;
    }

    /**
     * 设置结果集的返回数量上限。
     *
     * @param limit
     * @return
     */
    public InfluxQLQuery sLimit(int limit) {
        queryBuilder.append(" SLIMIT ").append(limit);
        return this;
    }

    /**
     * 设置结果集的偏移量。
     *
     * @param offset
     * @return
     */
    public InfluxQLQuery sOffset(int offset) {
        queryBuilder.append(" SOFFSET ").append(offset);
        return this;
    }

    /**
     * 指定写入的目标measurement
     *
     * @param targetMeasurement
     * @return
     */
    public InfluxQLQuery into(String targetMeasurement) {
        queryBuilder.append(" INTO ").append(targetMeasurement);
        return this;
    }

    /**
     * 指定写入数据时的标签
     *
     * @param tagKey
     * @param tagValue
     * @return
     */
    public InfluxQLQuery withTag(String tagKey, String tagValue) {
        queryBuilder.append(" WITH ").append(tagKey).append("=").append(tagValue);
        return this;
    }

    /**
     * 创建数据保留策略
     *
     * @param policyName
     * @param duration
     * @param replication
     * @param isDefault
     * @return
     */
    public InfluxQLQuery createRetentionPolicy(String policyName, String duration, String replication,
                                               boolean isDefault) {
        queryBuilder.append(" CREATE RETENTION POLICY ").append("\"").append(policyName).append("\"")
                .append(" ON ").append(database)
                .append(" DURATION ").append(duration)
                .append(" REPLICATION ").append(replication);
        if (isDefault) {
            queryBuilder.append(" DEFAULT");
        }
        return this;
    }

    /**
     * 展示当前数据库的所有保留策略。
     *
     * @return
     */
    public InfluxQLQuery showRetentionPolicies() {
        queryBuilder.append(" SHOW RETENTION POLICIES").append(" ON ").append(database);
        return this;
    }

    /**
     * 删除指定的数据保留策略。
     *
     * @param policyName
     * @return
     */
    public InfluxQLQuery dropRetentionPolicy(String policyName) {
        queryBuilder.append(" DROP RETENTION POLICY ").append("\"").append(policyName).append("\"")
                .append(" ON ").append(database);
        return this;
    }

    /**
     * 创建用户
     *
     * @param username
     * @param password
     * @return
     */
    public InfluxQLQuery createUser(String username, String password) {
        queryBuilder.append(" CREATE USER ").append("\"").append(username).append("\"")
                .append(" WITH PASSWORD ").append("'").append(password).append("'");
        return this;
    }

    /**
     * 设置用户密码
     *
     * @param username
     * @param password
     * @return
     */
    public InfluxQLQuery setUserPassword(String username, String password) {
        queryBuilder.append(" SET PASSWORD FOR ").append("\"").append(username).append("\"")
                .append(" = ").append("'").append(password).append("'");
        return this;
    }

    /**
     * 设置当前操作的数据库。
     *
     * @param database
     * @return
     */
    public InfluxQLQuery setDatabase(String database) {
        this.database = database;
        return this;
    }

    /**
     * 设置当前操作的measurement。
     *
     * @param measurement
     * @return
     */
    public InfluxQLQuery setMeasurement(String measurement) {
        queryBuilder.append(" ").append(measurement);
        return this;
    }

    /**
     * 设置当前操作的字段。
     *
     * @param field
     * @return
     */
    public InfluxQLQuery setField(String field) {
        queryBuilder.append(" ").append(field);
        return this;
    }

    /**
     * 设置当前操作的值。
     *
     * @param value
     * @return
     */
    public InfluxQLQuery setValue(String value) {
        queryBuilder.append(" = ").append(value);
        return this;
    }

    public InfluxQLQuery aggFunction(InfluxQLQuery influxQLQuery, TimeType timeType, String field) {

        if (influxQLQuery == null || timeType == null || field == null) {
            return null;
        }

        switch (timeType) {
            case MAX:
                influxQLQuery.selectMax(field);
                break;
            case AVG:
                influxQLQuery.selectMean(field);
                break;
            case MIN:
                influxQLQuery.selectMin(field);
                break;
            case SUM:
                influxQLQuery.selectSum(field);
                break;
            default:
                return null;
        }

        return influxQLQuery;
    }

    public static InfluxQLQuery groupBy(String... fields) {
        StringJoiner joiner = new StringJoiner(", ");
        for (String field : fields) {
            if (!field.isEmpty()) {
                joiner.add(field);
            }
        }
        return new InfluxQLQuery("SELECT " + joiner);
    }
}

5.2 插入数据工具类

注意:此处相关逻辑,需要依据入库实际需求而适应!文章来源地址https://www.toymoban.com/news/detail-654451.html

 @Resource
    private InfluxProperties influxProperties;

    /**
     * 插入influx应用数据库
     *
     * @param assetId    表名
     * @param pointId    测点名
     * @param pointValue 测点值
     * @param time       消息时间戳
     */
    public void intoInfluxApply(String assetId, String pointId, Double pointValue, Long time) {

        InfluxDB influxDBApply = influxProperties.getConnectionDatabaseApply();

        //创建要写入的数据点
        Map<String, Object> fields = new HashMap<>();
        fields.put(pointId, pointValue); //测单标识符,值
        // 写入数据
        Point point = Point.measurement(assetId)//表名
                .time(time, TimeUnit.MILLISECONDS)//时间戳
                .fields(fields)//添加一个字段的多个属性值
                .build();

        influxDBApply.write(influxProperties.getDatabaseApply(), "autogen", point);
    }

    /**
     * 插入influx备份数据库
     *
     * @param assetId    表名
     * @param pointId    测点名
     * @param pointValue 测点值
     * @param time       消息时间戳
     */
    public void intoInfluxTemp(String assetId, String pointId, Double pointValue, Long time) {

        InfluxDB influxDBApply = influxProperties.getConnectionDatabaseTemp();

        //创建要写入的数据点
        Map<String, Object> fields = new HashMap<>();
        fields.put(pointId, pointValue); //测单标识符,值
        // 写入数据
        Point point = Point.measurement(assetId)//表名
                .time(time, TimeUnit.MILLISECONDS)//时间戳
                .fields(fields)//添加一个字段的多个属性值
                .build();

        try {
            influxDBApply.write(influxProperties.getDatabaseTemp(), "autogen", point);
        } catch (Exception e) {
            log.info("{}", "写入influx临时数据库出现错误" + e.getMessage());
        }
    }

到了这里,关于[时序数据库]:InfluxDB进阶的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • Windows下 influxdb 数据库安装和简单使用

    你可以从 InfluxDB 的 InfluxDB官网winndows二进制安装包下载适用于不同操作系统的 InfluxDB 安装包。在本教程中,我们将介绍在 Windows上安装 InfluxDB 的步骤。 如果所示,可以点击下载windows版本的安卓版,右上角还可以切换其他版本的安装包。 下载后解压,里面有个influxd.e

    2024年02月08日
    浏览(27)
  • 数据库信息速递: Apache Arrow 如何加速 InfluxDB (翻译)

    开头还是介绍一下群,如果感兴趣PolarDB ,MongoDB ,MySQL ,PostgreSQL ,Redis, Oceanbase, Sql Server等有问题,有需求都可以加群群内,可以解决你的问题。加群请联系 liuaustin3 ,(共1760人左右 1 + 2 + 3 + 4 +5) 4群(260+),另欢迎 OpenGauss 的技术人员加入。 最近是百业萧条,本地前十的新能源

    2024年02月03日
    浏览(41)
  • 数据库安全-H2 database&Elasticsearch&CouchDB&Influxdb漏洞复现

    参考:influxdb CVE-2019-20933 靶场环境:vulhub 打开靶场进入环境: 访问: 端口扫描: 默认端口: 8086:用于客户端和服务端交互的HTTP API 8088 :用于提供备份和恢复的RPC服务 influxdb 是一款著名的时序数据库,其使用 jwt 作为鉴权方式。在用户开启了认证, 但未设置参数 shared-s

    2024年02月06日
    浏览(29)
  • 服务攻防-数据库安全-Influxdb&H2database&CouchDB&ElasticSearch数据库漏洞复现

    目录 一、Influxdb-未授权访问-Jwt 验证不当 1、Infuxdb简介 2、安全问题 3、漏洞复现  二、H2database-未授权访问-配置不当 1、H2database简介 2、安全问题 3、漏洞复现  三、CouchDB-权限绕过配合RCE-漏洞 1、CouchDB简介 2、安全问题 3、漏洞复现  四 、ElasticSearch-文件写入RCE-漏洞 1、Ela

    2024年02月16日
    浏览(33)
  • 小迪安全 第56天 服务攻防-数据库安全&H2&Elasticsearch&CouchDB&Influxdb 复现

    1.端口扫描 2.报错回显 时序数据库是近几年一个新的概念,与传统的Mysql关系型数据库相比,它的最大的特点是:数据按照时间顺序存储。 举例来说,日志数据,是以时间顺序存储的,所以用时序数据库存储是一种很好的选择。使用Mysql在存储的过程中,不是对这种基于时间

    2024年02月03日
    浏览(35)
  • 网络安全全栈培训笔记(56-服务攻防-数据库安全&H2&Elasticsearch&CouchDB&Influxdb复现)

    知识点: 1、服务攻防数据库类型安全 2、influxdb,.未授权访问wt验证 3、H2 database-未授权访问-配置不当 4、CouchDB-权限绕过配合RCE-漏洞 5、ElasticSearch-文件写入RCE-漏洞 #章节内容: 常见服务应用的安全测试: 1、配置不当-未授权访问 2、安全机制特定安全漏洞 3、安全机制弱口令

    2024年01月23日
    浏览(39)
  • Docker进阶:Docker轻量级可视化工具Portainer与容器监控3剑客CAdvisor+InfluxDB+Granfana

    💖The Begin💖点点关注,收藏不迷路💖 在开始之前,确保已经安装了Docker。 Portainer是一个开源的Docker轻量级可视化工具,它提供了一个直观的Web界面,让你轻松管理和监控Docker容器、镜像和网络等。本文将为你介绍如何安装和使用Portainer,并提供详细的步骤指导,帮助你快

    2024年02月08日
    浏览(28)
  • 【Influxdb数据迁移,从windos移到linux】

    前提——保证两边的版本不要相差太多 1、windows的导出G:influxdb2为暂存的目录 导出之后会有一堆文件 全部上传到/var/lib/influxdb这个目录下。这个应该是默认的linux的存储地址 然后就可以导出 注意 1、influxdb需要是启动状态: 会在屏幕上打印日志,方便看输出内容 后台启动

    2024年02月12日
    浏览(35)
  • InfluxDB 2 介绍与使用 flux查询 数据可视化

    相比V1 移除了database 和 RP,增加了bucket。 V2具有以下几个概念: timestamp、field key、field value、field set、tag key、tag value、tag set、measurement、series、point、bucket、bucket schema、organization 新增的概念: bucket:所有 InfluxDB 数据都存储在一个存储桶中。一个桶结合了数据库的概念和存储

    2024年02月02日
    浏览(38)
  • 解决JMeter+Grafana+influxdb 配置出现transaction无数据情形

            JMeter+Grafana+influxdb 配置时,Darren洋发现jmeter中明明已经配置好了事务条件以及接口实例信息,但就是在grafana的头部导航栏中的transaction按钮下来没有相应事务数据信息,经过相关资料查询,Darren洋发现执行以下两个步骤即可解决该问题。         第一步我们在仪

    2024年02月16日
    浏览(28)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包