基础架构部文档
基础架构部文件格式标准参考
技术文档
mr_doc 接入ucenter 认证登录
loki日志收集
https证书与ssl/tls 加密
FTP 主动模式和被动模式的区别
Hadoop-windows10安装部署Hadoop2.7.3
JKS和PFX证书文件格式相互转换方法
KVM 基础操作
k8s nginx ingress日志收集到ELK并分析
Django基础
clash http代理 socks代理服务器搭建 配置
Ubuntu 22.04 安装 FFmpeg v7.0
Office正版化项目的个人体验和心得
重置jenkins构建历史
K8S实施方案
k8s的yaml文件语法
Docker的优势与虚拟机的区别
问题处理文档
HR推送数据问题处理报
Nginx从入门到放弃01-nginx基础安装
Nginx从入门到放弃02-Nginx基本命令和新建WEB站点
Nginx从入门到放弃03-Nginx调优
Nginx从入门到放弃04-Nginx的N种特别实用示例
JMeter教程
01-mariadb编译安装
02-mariadb二进制安装
Docker修改默认的存储路径
01-influxdb2时序数据库简介及安装
02-influxdb2时序数据库核心概念
03-influxdb2时序数据库flux查询语言
04-influxdb2--Python客户端influxdb-client
05-Spring boot 集成influxdb2
06-influxdb2其他知识
OA添加waf后相关问题的解决过程
排除java应用cpu使用率过高
exsi迁移文档
视频测试
阿里云产品试题
超融合服务器和传统服务器的区别
Serv-U问题集锦
文件夹共享操作手册
磁盘脱机处理方案
Office内存或磁盘空间不足处理方法
Cmd中ping不是内部或外部命令的解决方法
ELK 搭建文档
限制用户的远程桌面会话数量
Docker快速安装rocketmq、redis、zookeeper
超融合建设方案
git 入门
HR系统写入ES数据报错403
ELK搭建文档
KVM 安装和基础使用文档
helm 安装 rancher
访问共享提示禁用当前用户解决方法
K8S StorageClass搭建
KVM 扩展磁盘
借助sasl构建基于AD用户验证的SVN服务器
fastdfs编译安装并迁移数据
关闭系统保护的必要性
SCF 前置机部署
阿里云OSS学习文档
阿里云学习文档-VPC
(k8s踩坑)namespace无法删除
rancher-helm安装
zookeeper集群安装
批量替换K8s secrets 中某个特定域名的tls证书
kibana 批量创建索引模式
centos7 恢复Yum使用
ACP云计算部分知识点总结
Loki 日志系统搭建文档
自动更新k8s集群中所有名称空间中特定证书
AI分享
(AI)函数调用与MCP调用的区别
安装戴尔DELL Optilex 7040 USB驱动时提示无法定位程序输入点 kernel32\.dll
新华三服务器EXSI 显卡直通
本文档使用「觅思文档专业版」发布
-
+
首页
04-influxdb2--Python客户端influxdb-client
# 一、Python客户端influxdb-client 参考文档:https://docs.influxdata.com/influxdb/v2.4/api-guide/client-libraries/python/ 参考文档:https://github.com/influxdata/influxdb-client-python ## 1.1在你开始之前 1. 安装 InfluxDB Python 库: ```sh pip install influxdb-client ``` 2. 确保 InfluxDB 正在运行。如果在本地运行 InfluxDB,请访问 http://localhost:8086。(如果使用 InfluxDB Cloud,请访问您的 InfluxDB Cloud UI 的 URL。例如:https ://us-west-2-1.aws.cloud2.influxdata.com 。) ## 1.2使用 Python 将数据写入 InfluxDB 我们将使用 Python 库以线路协议编写一些数据。 1. 在您的 Python 程序中,导入 InfluxDB 客户端库并使用它将数据写入 InfluxDB。 ```python import influxdb_client from influxdb_client.client.write_api import SYNCHRONOUS ``` 2. 使用您的存储桶、组织和令牌的名称定义一些变量。 ```python bucket = "<my-bucket>" org = "<my-org>" token = "<my-token>" # Store the URL of your InfluxDB instance url="http://localhost:8086" ``` 更改 InfluxDB URL 3. 实例化客户端。该InfluxDBClient对象采用三个命名参数:url、org和token。传入命名参数。 ```python client = influxdb_client.InfluxDBClient( url=url, token=token, org=org ) ``` 该InfluxDBClient对象有一个write_api用于配置的方法。 4. 使用对象和方法实例化写入客户端。使用方法配置 writer 对象。client``write_api``write_api ```python write_api = client.write_api(write_options=SYNCHRONOUS) ``` 5. 创建一个点write对象,并使用API writer 对象的方法将其写入 InfluxDB 。write 方法需要三个参数:bucket、org和record。 ```python p = influxdb_client.Point("my_measurement").tag("location", "Prague").field("temperature", 25.3) write_api.write(bucket=bucket, org=org, record=p) ``` ## 1.3完整的示例编写脚本 官方python示例: ```python import influxdb_client from influxdb_client.client.write_api import SYNCHRONOUS bucket = "<my-bucket>" org = "<my-org>" token = "<my-token>" # Store the URL of your InfluxDB instance url="http://localhost:8086" client = influxdb_client.InfluxDBClient( url=url, token=token, org=org ) write_api = client.write_api(write_options=SYNCHRONOUS) p = influxdb_client.Point("my_measurement").tag("location", "Prague").field("temperature", 25.3) write_api.write(bucket=bucket, org=org, record=p) ``` 自己的脚本示例: ```python from datetime import datetime import influxdb_client from influxdb_client.client.write_api import SYNCHRONOUS token = "s9-YTkaURbSb8xUeJ3QbXtZLzkcTD2LPcA0OYLKBC0dlhM_yMduRHTk_bry6lLoEtcg4Nz88G9LOkBaQ_zZhPA==" org = "study" bucket = "study-bucket" url="http://10.2.8.216:8086" client = influxdb_client.InfluxDBClient( url=url, token=token, org=org, ) write_api = client.write_api(write_options=SYNCHRONOUS) p = influxdb_client.Point("mem") \ .tag("host", "10.0.0.10") \ .field("used_percent", 24.43234543) \ .time(datetime.utcnow()) write_api.write(bucket=bucket, org=org, record=p) ``` ## 1.4使用 Python 从 InfluxDB 查询数据 1. 实例化查询客户端。 ```python query_api = client.query_api() ``` 2. 创建一个 Flux 查询,然后将其格式化为 Python 字符串。 ```python query = ' from(bucket:"my-bucket")\ |> range(start: -10m)\ |> filter(fn:(r) => r._measurement == "my_measurement")\ |> filter(fn: (r) => r.location == "Prague")\ |> filter(fn:(r) => r._field == "temperature" ) ' ``` 查询客户端将 Flux 查询发送到 InfluxDB,并返回一个带有表结构的 Flux 对象。 3. 向方法传递query()两个命名参数:org和query. ```python result = query_api.query(org=org, query=query) ``` 4. 遍历 Flux 对象中的表和记录。 - 使用get_value()方法返回值。 - 使用get_field()方法返回字段。 ```python results = [] for table in result: for record in table.records: results.append((record.get_field(), record.get_value())) print(results) [(temperature, 25.3)] ``` Flux 对象提供以下方法来访问您的数据: - get_measurement():返回记录的测量名称。 - get_field():返回字段名称。 - get_value():返回实际的字段值。 - values:返回列值的映射。 - values.get("<your tag>"):从给定列的记录中返回一个值。 - get_time():返回记录的时间。 - get_start():返回当前表中所有记录的包含下限。 - get_stop():返回当前表中所有记录的独占时间上限。 ## 1.5完整的示例查询脚本 官网示例: ```python query_api = client.query_api() query = ‘ from(bucket:"my-bucket")\ |> range(start: -10m)\ |> filter(fn:(r) => r._measurement == "my_measurement")\ |> filter(fn: (r) => r.location == "Prague")\ |> filter(fn:(r) => r._field == "temperature" )‘ result = query_api.query(org=org, query=query) results = [] for table in result: for record in table.records: results.append((record.get_field(), record.get_value())) print(results) [(temperature, 25.3)] ``` 有关更多信息,请参阅GitHub 上的 Python 客户端自述文件。 # 二、Python操作influxdb 代码一: ```python from datetime import datetime import influxdb_client from influxdb_client.client.write_api import SYNCHRONOUS token = "s9-YTkaURbSb8xUeJ3QbXtZLzkcTD2LPcA0OYLKBC0dlhM_yMduRHTk_bry6lLoEtcg4Nz88G9LOkBaQ_zZhPA==" org = "study" bucket = "study-bucket" url="http://10.2.8.216:8086" client = influxdb_client.InfluxDBClient( url=url, token=token, org=org, ) write_api = client.write_api(write_options=SYNCHRONOUS) p = influxdb_client.Point("mem") \ .tag("host", "10.0.0.10") \ .field("used_percent", 24.43234543) \ .time(datetime.utcnow()) write_api.write(bucket=bucket, org=org, record=p) query_api = client.query_api() ## using Table structure tables = query_api.query('from(bucket:"study-bucket") |> range(start: -1000m)') for table in tables: print(table) for row in table.records: print(row.values) ## using csv library # csv_result = query_api.query_csv('from(bucket:"study-bucket") |> range(start: -10m)') # val_count = 0 # for row in csv_result: # for cell in row: # val_count += 1 ``` 几个重要的名词介绍: buket:相当于数据库 measurement:数据库中的表; point:表里面的一行数据。 每个行记录由time(纳秒时间戳)、字段(fields)和tags组成。 time:每条数据记录的时间,也是数据库自动生成的主索引; fields:记录各个字段的值; tags:各种有索引的属性,一般用于where查询条件。 输出结果: ```shell FluxTable() columns: 9, records: 5 {'result': '_result', 'table': 0, '_start': datetime.datetime(2022, 8, 21, 23, 38, 30, 996789, tzinfo=tzutc()), '_stop': datetime.datetime(2022, 8, 22, 16, 18, 30, 996789, tzinfo=tzutc()), '_time': datetime.datetime(2022, 8, 22, 6, 53, 23, 599244, tzinfo=tzutc()), '_value': 24.43234543, '_field': 'used_percent', '_measurement': 'mem', 'host': '10.0.0.10'} {'result': '_result', 'table': 0, '_start': datetime.datetime(2022, 8, 21, 23, 38, 30, 996789, tzinfo=tzutc()), '_stop': datetime.datetime(2022, 8, 22, 16, 18, 30, 996789, tzinfo=tzutc()), '_time': datetime.datetime(2022, 8, 22, 6, 53, 58, 70643, tzinfo=tzutc()), '_value': 24.43234543, '_field': 'used_percent', '_measurement': 'mem', 'host': '10.0.0.10'} {'result': '_result', 'table': 0, '_start': datetime.datetime(2022, 8, 21, 23, 38, 30, 996789, tzinfo=tzutc()), '_stop': datetime.datetime(2022, 8, 22, 16, 18, 30, 996789, tzinfo=tzutc()), '_time': datetime.datetime(2022, 8, 22, 8, 16, 5, 706262, tzinfo=tzutc()), '_value': 24.43234543, '_field': 'used_percent', '_measurement': 'mem', 'host': '10.0.0.10'} {'result': '_result', 'table': 0, '_start': datetime.datetime(2022, 8, 21, 23, 38, 30, 996789, tzinfo=tzutc()), '_stop': datetime.datetime(2022, 8, 22, 16, 18, 30, 996789, tzinfo=tzutc()), '_time': datetime.datetime(2022, 8, 22, 8, 16, 59, 203703, tzinfo=tzutc()), '_value': 24.43234543, '_field': 'used_percent', '_measurement': 'mem', 'host': '10.0.0.10'} {'result': '_result', 'table': 0, '_start': datetime.datetime(2022, 8, 21, 23, 38, 30, 996789, tzinfo=tzutc()), '_stop': datetime.datetime(2022, 8, 22, 16, 18, 30, 996789, tzinfo=tzutc()), '_time': datetime.datetime(2022, 8, 22, 8, 17, 28, 529356, tzinfo=tzutc()), '_value': 24.43234543, '_field': 'used_percent', '_measurement': 'mem', 'host': '10.0.0.10'} ``` 代码二: ```python from datetime import datetime from influxdb_client import InfluxDBClient, Point, WritePrecision from influxdb_client.client.write_api import SYNCHRONOUS token = "s9-YTkaURbSb8xUeJ3QbXtZLzkcTD2LPcA0OYLKBC0dlhM_yMduRHTk_bry6lLoEtcg4Nz88G9LOkBaQ_zZhPA==" org = "study" bucket = "study-bucket" with InfluxDBClient(url="http://10.2.8.216:8086", token=token, org=org) as client: write_api = client.write_api(write_options=SYNCHRONOUS) # 写入方法1 data = "mem,host=host1,used_percent=23.43234543" write_api.write(bucket, org, data) # 写入方法2 (推荐这种) point = Point("mem") \ .tag("host", "host1") \ .field("used_percent", 24.43234543) \ .time(datetime.utcnow(), WritePrecision.NS) write_api.write(bucket, org, point) # 写入方法3 sequence = ["mem,host=host1 used_percent=25.43234543", "mem,host=host1 available_percent=15.856523"] write_api.write(bucket, org, sequence) # TODO 这里写查询 # 用完关闭 client.close() ```
张文
2022年8月25日 10:51
转发文档
收藏文档
上一篇
下一篇
手机扫码
复制链接
手机扫一扫转发分享
复制链接
Markdown文件
Word文件
PDF文档
PDF文档(打印)
分享
链接
类型
密码
更新密码
有效期