基础架构部文档
基础架构部文件格式标准参考
技术文档
mr_doc 接入ucenter 认证登录
loki日志收集
https证书与ssl/tls 加密
FTP 主动模式和被动模式的区别
Hadoop-windows10安装部署Hadoop2.7.3
JKS和PFX证书文件格式相互转换方法
KVM 基础操作
k8s nginx ingress日志收集到ELK并分析
Django基础
clash http代理 socks代理服务器搭建 配置
Ubuntu 22.04 安装 FFmpeg v7.0
Office正版化项目的个人体验和心得
重置jenkins构建历史
K8S实施方案
k8s的yaml文件语法
Docker的优势与虚拟机的区别
问题处理文档
HR推送数据问题处理报
Nginx从入门到放弃01-nginx基础安装
Nginx从入门到放弃02-Nginx基本命令和新建WEB站点
Nginx从入门到放弃03-Nginx调优
Nginx从入门到放弃04-Nginx的N种特别实用示例
JMeter教程
01-mariadb编译安装
02-mariadb二进制安装
Docker修改默认的存储路径
01-influxdb2时序数据库简介及安装
02-influxdb2时序数据库核心概念
03-influxdb2时序数据库flux查询语言
04-influxdb2--Python客户端influxdb-client
05-Spring boot 集成influxdb2
06-influxdb2其他知识
OA添加waf后相关问题的解决过程
排除java应用cpu使用率过高
exsi迁移文档
视频测试
阿里云产品试题
超融合服务器和传统服务器的区别
Serv-U问题集锦
文件夹共享操作手册
磁盘脱机处理方案
Office内存或磁盘空间不足处理方法
Cmd中ping不是内部或外部命令的解决方法
ELK 搭建文档
限制用户的远程桌面会话数量
Docker快速安装rocketmq、redis、zookeeper
超融合建设方案
git 入门
HR系统写入ES数据报错403
ELK搭建文档
KVM 安装和基础使用文档
helm 安装 rancher
访问共享提示禁用当前用户解决方法
K8S StorageClass搭建
KVM 扩展磁盘
借助sasl构建基于AD用户验证的SVN服务器
fastdfs编译安装并迁移数据
关闭系统保护的必要性
SCF 前置机部署
阿里云OSS学习文档
阿里云学习文档-VPC
(k8s踩坑)namespace无法删除
rancher-helm安装
zookeeper集群安装
批量替换K8s secrets 中某个特定域名的tls证书
kibana 批量创建索引模式
centos7 恢复Yum使用
ACP云计算部分知识点总结
Loki 日志系统搭建文档
自动更新k8s集群中所有名称空间中特定证书
AI分享
(AI)函数调用与MCP调用的区别
安装戴尔DELL Optilex 7040 USB驱动时提示无法定位程序输入点 kernel32\.dll
新华三服务器EXSI 显卡直通
本文档使用「觅思文档专业版」发布
-
+
首页
05-Spring boot 集成influxdb2
官方Client仓库地址](https://links.jianshu.com/go?to=https%3A%2F%2Fgithub.com%2Finfluxdata%2Finfluxdb-client-java) ### Spring boot 集成 引入influxdb-client-java依赖 **maven** ```xml <dependency> <groupId>com.influxdb</groupId> <artifactId>influxdb-client-java</artifactId> <version>4.0.0</version> </dependency> ``` **gradle** ```groovy dependencies { compile "com.influxdb:influxdb-client-java:4.0.0" } ``` #### 集成方式一 在配置文件中指定*influxdb* *url*,*token*,*org*,*bucket*信息 ```yaml influx: url: http://172.21.220.141:8086 token: 58s6gl9hD8lk-AS_i6mUaYMMCGe6N1vIfVpJUo2xJ2HkWMlWx2yp7r7IKZsyF6h8vQdTPfIpGyHtbALayLgUQw== bucket: wxm-influx org: wxm ``` 添加influx配置类 ```java //InfluxConfig.java @Configuration @ConfigurationProperties(prefix = "influx") public class InfluxConfig { private String url; private String token; private String org; private String bucket; @Bean public InfluxDBClient influxDBClient(){ return InfluxDBClientFactory.create(url, token.toCharArray(),org,bucket); } public void setUrl(String url) { this.url = url; } public void setToken(String token) { this.token = token; } public void setOrg(String org) { this.org = org; } public void setBucket(String bucket) { this.bucket = bucket; } } ``` 随后便可以在其他位置注入并使用`InfluxDBClient`了 #### 集成方式二 再resource目录下创建名为`influx2.properties`的配置文件并添加如下配置 ```properties influx2.url=http://172.23.64.25:8086 influx2.org=wxm influx2.token=58s6gl9hD8lk-AS_i6mUaYMMCGe6N1vIfVpJUo2xJ2HkWMlWx2yp7r7IKZsyF6h8vQdTPfIpGyHtbALayLgUQw== influx2.bucket=wxm-influx ``` 添加influx配置类 ```java @Configuration public class InfluxConfigProperties { @Bean public InfluxDBClient influxDBClient() { return InfluxDBClientFactory.create(); } } ``` 个人推荐使用第二种方式,influx2.properties还可用于配置默认`tag`等 ### Write Data - Write by Line Protocl ```java try(WriteApi writeApi = influxDBClient.makeWriteApi()) { String val = "temperature,location=north wxm=69.0"; writeApi.writeRecord(WritePrecision.NS,val); } ``` - Write by POJO ```java //Temperature.java @Measurement(name = "temperature") public class Temperature { @Column(tag = true) private String location; @Column private Double value; @Column(timestamp = true) Instant time; getterAndSeter..... } ``` ```java try (WriteApi writeApi = influxDBClient.makeWriteApi()) { Temperature temperature = new Temperature(); temperature.setLocation("east"); temperature.setValue(106.2D); temperature.setTime(Instant.now()); writeApi.writeMeasurement(WritePrecision.NS,temperature); } ``` - Write Data by Data Point ```java try (WriteApi writeApi = influxDBClient.makeWriteApi()) { Point point = Point.measurement("temperature") .addTag("location","south") .addTag("owner","wxm") .addField("wxm",230.8); writeApi.writePoint(point); } ``` **写数据时为所有数据指定默认标签** 指定默认标签有两种方式,通过配置文件和通过编码配置 1. 通过配置文件 在influx2.properties中指定默认标签及标签值 ```properties # 自定义默认标签 influx2.tags.customer=ALMING ``` 1. 通过编码配置 首先编写配置 ```java private InfluxDBClientOptions influxDBClientOptions() { return InfluxDBClientOptions.builder() .url("http://172.23.64.25:8086") .org("wxm") .bucket("wxm-influx") .authenticateToken("58s6gl9hD8lk-AS_i6mUaYMMCGe6N1vIfVpJUo2xJ2HkWMlWx2yp7r7IKZsyF6h8vQdTPfIpGyHtbALayLgUQw==".toCharArray()) .addDefaultTag("id", "wxm-id") .build(); } ``` 随后在创建客户端时指定配置 ```java @Bean public InfluxDBClient influxDBClient() { return InfluxDBClientFactory.create(influxDBClientOptions()); } ``` 需要注意的是使用编码配置会覆盖默认从配置文件读取的`InfluxDBClientOptions`,所以使用编码方式指定配置文件会失效 - 同步阻塞写入 以上写入都是异步写入,influxdb同时支持同步阻塞写入。使用如下方式获取阻塞写入API ```java WriteApiBlocking writeApiBlocking = influxDBClient.getWriteApiBlocking(); ``` ### Query Data - 查询返回`FluxTable` ```java String flux = "from(bucket: \"wxm-influx\")\n" + " |> range(start: 2022-03-12T08:18:04Z,stop:2022-03-13T08:19:05Z)\n" + " |> filter(fn:(r)=>(r.owner==\"wxm\"))"; List<FluxTable> tables = influxDBClient.getQueryApi().query(flux); for (FluxTable fluxTable : tables) { List<FluxRecord> records = fluxTable.getRecords(); for (FluxRecord fluxRecord : records) { System.out.println(fluxRecord.getTime() + ": " + fluxRecord.getValueByKey("_value")); } } ``` - 查询结果影射成为POJO ```java String flux = "from(bucket: \"wxm-influx\")\n" + " |> range(start: 2022-03-12T08:18:04Z,stop:2022-03-13T08:19:05Z)\n" + " |> filter(fn:(r)=>(r.owner==\"wxm\"))"; List<Temperature> query = influxDBClient.getQueryApi().query(flux, Temperature.class); query.forEach(System.out::println); ``` - 异步查询 ```java String flux = "from(bucket: \"wxm-influx\")\n" + " |> range(start: 2022-03-12T08:18:04Z,stop:2022-03-13T08:19:05Z)\n" + " |> filter(fn:(r)=>(r.owner==\"wxm\"))"; influxDBClient.getQueryApi().query(flux, (cancellable, fluxRecord) -> { System.out.println(fluxRecord.getTime() + ": " + fluxRecord.getValueByKey("_value")); }, throwable -> { //invoke when exception System.out.println("Error"); }, () -> { //complete System.out.println("complete"); }); System.out.println("Query invoke done"); TimeUnit.SECONDS.sleep(3); /** * 输出结果: * Query invoke done * 2022-03-12T08:18:16.543834Z: 62.0 * 2022-03-12T08:19:44.833253500Z: 65.0 * 2022-03-12T09:25:43.388488100Z: 81.5 * complete */ ``` - 异步查询并映射成为POJO ```java String flux = "from(bucket: \"wxm-influx\")\n" + " |> range(start: 2022-03-12T08:18:04Z,stop:2022-03-13T08:19:05Z)\n" + " |> filter(fn:(r)=>(r.owner==\"wxm\"))"; influxDBClient.getQueryApi().query(flux,Temperature.class, (cancellable, fluxRecord) -> { System.out.println(fluxRecord); }, throwable -> { //invoke when exception System.out.println("Error"); }, () -> { //complete System.out.println("complete"); }); System.out.println("Query invoke done"); TimeUnit.SECONDS.sleep(3); /** * 输出结果: * Query invoke done * Temperature(location=north, value=62.0, time=2022-03-12T08:18:16.543834Z) * Temperature(location=north, value=65.0, time=2022-03-12T08:19:44.833253500Z) * Temperature(location=north, value=81.5, time=2022-03-12T09:25:43.388488100Z) * complete */ ``` - 查询并返回源CSV格式数据 返回一个字符串,其内容是以逗号为分隔的CSV格式。使用Postman进行测试时返回就是该格式。 ```java String flux = "from(bucket: \"wxm-influx\")\n" + " |> range(start: 2022-03-12T08:18:04Z,stop:2022-03-13T08:19:05Z)\n" + " |> filter(fn:(r)=>(r.owner==\"wxm\"))"; String raw = influxDBClient.getQueryApi().queryRaw(flux); System.out.println(raw); ``` - 源数据异步查询 ```java String flux = "from(bucket: \"wxm-influx\")\n" + " |> range(start: 2022-03-12T08:18:04Z,stop:2022-03-13T08:19:05Z)\n" + " |> filter(fn:(r)=>(r.owner==\"wxm\"))"; influxDBClient.getQueryApi().queryRaw(flux, (cancellable, fluxRecord) -> { System.out.println(fluxRecord); }, throwable -> { //invoke when exception System.out.println("Error"); }, () -> { //complete System.out.println("complete"); }); System.out.println("Query invoke done"); TimeUnit.SECONDS.sleep(3); /** * 输出结果: * Query invoke done * ,result,table,_start,_stop,_time,_value,_field,_measurement,location,owner * ,_result,0,2022-03-12T08:18:04Z,2022-03-13T08:19:05Z,2022-03-12T08:18:16.543834Z,62,wxm,temperature,north,wxm * ,_result,0,2022-03-12T08:18:04Z,2022-03-13T08:19:05Z,2022-03-12T08:19:44.8332535Z,65,wxm,temperature,north,wxm * ,_result,0,2022-03-12T08:18:04Z,2022-03-13T08:19:05Z,2022-03-12T09:25:43.3884881Z,81.5,wxm,temperature,north,wxm * * complete */ ``` ### Delete Data ```java OffsetDateTime start = OffsetDateTime.now().minus(24, ChronoUnit.HOURS); OffsetDateTime stop = OffsetDateTime.now(); influxDBClient.getDeleteApi().delete(start,stop,"","wxm-influx","wxm"); ``` ### flux-dsl 官方文档中有一章节是描写如何使用flux-dsl来构建flux语句,但在使用过程中发现官方使用的Flux类在当前情况下并不能被引入,实际`influxdb-client-java`这个依赖也确实不包含Flux。所以需要手动构建 flux-dsl module 并在项目中引用。 #### flux-dsl 模块构建 1. clone 官方仓库地址,并在idea中引入。 2. 进入命令行在项目根目录执行`mvn install` 如果时windows系统该指令会失败但不用担心,flux-dsl模块需要的module已经构建成功。 然后`cd`到flux-dsl目录下执行`mvn clean install` 3. 成功执行后在本台电脑上其他项目中以如下GAV引入maven依赖。  图片截取自`flux-dsl`模块pom.xml,版本号(V)根据下载的项目版本进行填写 在其他项目中引用如下 ```xml <dependency> <groupId>com.influxdb</groupId> <artifactId>flux-dsl</artifactId> <version>3.5.0-SNAPSHOT</version> </dependency> ``` gradle用户需先在`repositories`下添加 `mavenLocal()`然后在依赖中引入如下 ```groovy repositories { ... mavenLocal() } dependencies { ... implementation 'com.influxdb:flux-dsl:4.3.0-SNAPSHOT' } ``` #### flux-dsl 使用 - 使用内建函数 ```java Flux flux = Flux .from("wxm-influx") .window(15L, ChronoUnit.MINUTES, 20L, ChronoUnit.SECONDS) .sum(); /** * from(bucket:"wxm-influx") * |> window(every:15m, period:20s) * |> sum() */ ``` - 使用内建函数属性 ```java SumFlux flux = Flux.from("wxm-influx") .window() .withEvery(15L, ChronoUnit.MINUTES) .withPeriod(20L, ChronoUnit.SECONDS) .sum(); /** * from(bucket:"wxm-influx") * |> window(every:15m, period:20s) * |> sum() */ ``` - 手动声明属性名称 ```java SumFlux flux = Flux.from("wxm-influx") .window() .withPropertyValue("every", 15L, ChronoUnit.MINUTES) .withPropertyValue("period", 20L, ChronoUnit.SECONDS) .sum(); /** * from(bucket:"wxm-influx") * |> window(every:15m, period:20s) * |> sum() */ ``` - 声明一个可重用的属性集合 ```java Map<String, Object> properties = new HashMap<>(); properties.put("every", new TimeInterval(15L, ChronoUnit.MINUTES)); properties.put("period", new TimeInterval(20L, ChronoUnit.SECONDS)); Flux flux = Flux .from("telegraf") .window() .withPropertyNamed("every") .withPropertyNamed("period") .sum(); //使用时指定使用的属性集 System.out.println(flux.toString(properties)); /** * from(bucket:"telegraf") * |> window(every:15m, period:20s) * |> sum() */ ``` - 自定义表达式 ```java Flux flux = Flux.from("wxm-influx") .expression("filter(fn:(r)=>(r.owner==\"wxm\"))") .expression("sum()"); /** * from(bucket:"wxm-influx") * |> filter(fn:(r)=>(r.owner=="wxm")) * |> sum() */ ``` 其余函数都比较类似,详见[flux-dsl README
张文
2022年8月25日 10:54
转发文档
收藏文档
上一篇
下一篇
手机扫码
复制链接
手机扫一扫转发分享
复制链接
Markdown文件
Word文件
PDF文档
PDF文档(打印)
分享
链接
类型
密码
更新密码
有效期