add support for influxdb 2

This commit is contained in:
Joachim Lusiardi 2024-05-01 10:31:36 +02:00
parent 5cac182f5d
commit a754263d68
3 changed files with 337 additions and 40 deletions

295
Cargo.lock generated
View File

@ -110,6 +110,12 @@ dependencies = [
"rustc-demangle",
]
[[package]]
name = "base64"
version = "0.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8"
[[package]]
name = "base64"
version = "0.21.7"
@ -216,7 +222,7 @@ dependencies = [
"heck",
"proc-macro2",
"quote",
"syn",
"syn 2.0.48",
]
[[package]]
@ -266,6 +272,27 @@ dependencies = [
"typenum",
]
[[package]]
name = "csv"
version = "1.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac574ff4d437a7b5ad237ef331c17ccca63c46479e5b5453eb8e10bb99a759fe"
dependencies = [
"csv-core",
"itoa",
"ryu",
"serde",
]
[[package]]
name = "csv-core"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5efa2b3d7902f4b634a20cae3c9c4e6209dc4779feb6863329607560143efa70"
dependencies = [
"memchr",
]
[[package]]
name = "digest"
version = "0.10.7"
@ -276,6 +303,18 @@ dependencies = [
"crypto-common",
]
[[package]]
name = "doc-comment"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10"
[[package]]
name = "either"
version = "1.11.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a47c1c47d2f5964e29c61246e81db715514cd532db6b5116a25ea3c03d6780a2"
[[package]]
name = "encoding_rs"
version = "0.8.33"
@ -301,6 +340,12 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "fallible-iterator"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7"
[[package]]
name = "fastrand"
version = "2.0.1"
@ -393,7 +438,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.48",
]
[[package]]
@ -453,6 +498,12 @@ version = "0.28.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253"
[[package]]
name = "go-parse-duration"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "558b88954871f5e5b2af0e62e2e176c8bde7a6c2c4ed41b13d138d96da2e2cbd"
[[package]]
name = "h2"
version = "0.3.24"
@ -649,6 +700,19 @@ dependencies = [
"tokio-rustls",
]
[[package]]
name = "hyper-tls"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905"
dependencies = [
"bytes",
"hyper 0.14.28",
"native-tls",
"tokio",
"tokio-native-tls",
]
[[package]]
name = "hyper-tls"
version = "0.6.0"
@ -746,6 +810,55 @@ dependencies = [
"thiserror",
]
[[package]]
name = "influxdb2"
version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "823356ce3ed697fd640a7db4fbaaa6c2a00bf1eceec66ae0c57c01ef98f06270"
dependencies = [
"base64 0.13.1",
"bytes",
"chrono",
"csv",
"fallible-iterator",
"futures",
"go-parse-duration",
"influxdb2-derive",
"influxdb2-structmap",
"ordered-float",
"parking_lot 0.11.2",
"reqwest 0.11.24",
"secrecy",
"serde",
"serde_json",
"snafu",
"url",
]
[[package]]
name = "influxdb2-derive"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "990f899841aa30130fc06f7938e3cc2cbc3d5b92c03fd4b5d79a965045abcf16"
dependencies = [
"itertools",
"proc-macro2",
"quote",
"regex",
"syn 1.0.109",
]
[[package]]
name = "influxdb2-structmap"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1408e712051787357e99ff732e44e8833e79cea0fabc9361018abfbff72b6265"
dependencies = [
"chrono",
"num-traits",
"ordered-float",
]
[[package]]
name = "influxdb_derive"
version = "0.5.1"
@ -754,7 +867,16 @@ checksum = "6ac96b3660efd0cde32b0b20bc86cc93f33269cd9f6c97e759e0b0259b2133fb"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.48",
]
[[package]]
name = "instant"
version = "0.1.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c"
dependencies = [
"cfg-if",
]
[[package]]
@ -763,6 +885,15 @@ version = "2.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3"
[[package]]
name = "itertools"
version = "0.10.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473"
dependencies = [
"either",
]
[[package]]
name = "itoa"
version = "1.0.10"
@ -919,7 +1050,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.48",
]
[[package]]
@ -950,6 +1081,26 @@ dependencies = [
"vcpkg",
]
[[package]]
name = "ordered-float"
version = "3.9.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f1e1c390732d15f1d48471625cd92d154e66db2c56645e29a9cd26f4699f72dc"
dependencies = [
"num-traits",
]
[[package]]
name = "parking_lot"
version = "0.11.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99"
dependencies = [
"instant",
"lock_api",
"parking_lot_core 0.8.6",
]
[[package]]
name = "parking_lot"
version = "0.12.1"
@ -957,7 +1108,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f"
dependencies = [
"lock_api",
"parking_lot_core",
"parking_lot_core 0.9.9",
]
[[package]]
name = "parking_lot_core"
version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc"
dependencies = [
"cfg-if",
"instant",
"libc",
"redox_syscall 0.2.16",
"smallvec",
"winapi",
]
[[package]]
@ -968,7 +1133,7 @@ checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e"
dependencies = [
"cfg-if",
"libc",
"redox_syscall",
"redox_syscall 0.4.1",
"smallvec",
"windows-targets 0.48.5",
]
@ -1010,7 +1175,7 @@ dependencies = [
"pest_meta",
"proc-macro2",
"quote",
"syn",
"syn 2.0.48",
]
[[package]]
@ -1041,7 +1206,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.48",
]
[[package]]
@ -1080,6 +1245,15 @@ dependencies = [
"proc-macro2",
]
[[package]]
name = "redox_syscall"
version = "0.2.16"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a"
dependencies = [
"bitflags 1.3.2",
]
[[package]]
name = "redox_syscall"
version = "0.4.1"
@ -1134,10 +1308,12 @@ dependencies = [
"http-body 0.4.6",
"hyper 0.14.28",
"hyper-rustls",
"hyper-tls 0.5.0",
"ipnet",
"js-sys",
"log",
"mime",
"native-tls",
"once_cell",
"percent-encoding",
"pin-project-lite",
@ -1149,11 +1325,14 @@ dependencies = [
"sync_wrapper",
"system-configuration",
"tokio",
"tokio-native-tls",
"tokio-rustls",
"tokio-util",
"tower-service",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"wasm-streams",
"web-sys",
"webpki-roots",
"winreg 0.50.0",
@ -1175,7 +1354,7 @@ dependencies = [
"http-body 1.0.0",
"http-body-util",
"hyper 1.3.1",
"hyper-tls",
"hyper-tls 0.6.0",
"hyper-util",
"ipnet",
"js-sys",
@ -1290,10 +1469,12 @@ dependencies = [
"futures",
"handlebars",
"influxdb",
"influxdb2",
"reqwest 0.12.4",
"serde",
"serde_json",
"tokio",
"url",
]
[[package]]
@ -1327,6 +1508,15 @@ dependencies = [
"untrusted",
]
[[package]]
name = "secrecy"
version = "0.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9bd1c54ea06cfd2f6b63219704de0b9b4f72dcc2b8fdef820be6cd799780e91e"
dependencies = [
"zeroize",
]
[[package]]
name = "security-framework"
version = "2.9.2"
@ -1367,7 +1557,7 @@ checksum = "33c85360c95e7d137454dc81d9a4ed2b8efd8fbe19cee57357b32b9771fccb67"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.48",
]
[[package]]
@ -1428,6 +1618,27 @@ version = "1.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7"
[[package]]
name = "snafu"
version = "0.6.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eab12d3c261b2308b0d80c26fffb58d17eba81a4be97890101f416b478c79ca7"
dependencies = [
"doc-comment",
"snafu-derive",
]
[[package]]
name = "snafu-derive"
version = "0.6.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1508efa03c362e23817f96cde18abed596a25219a8b2c66e8db33c03543d315b"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "socket2"
version = "0.5.5"
@ -1450,6 +1661,17 @@ version = "0.11.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f"
[[package]]
name = "syn"
version = "1.0.109"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237"
dependencies = [
"proc-macro2",
"quote",
"unicode-ident",
]
[[package]]
name = "syn"
version = "2.0.48"
@ -1496,7 +1718,7 @@ checksum = "01ce4141aa927a6d1bd34a041795abd0db1cccba5d5f24b009f694bdf3a1f3fa"
dependencies = [
"cfg-if",
"fastrand",
"redox_syscall",
"redox_syscall 0.4.1",
"rustix",
"windows-sys 0.52.0",
]
@ -1518,7 +1740,7 @@ checksum = "fa0faa943b50f3db30a20aa7e265dbc66076993efed8463e8de414e5d06d3471"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.48",
]
[[package]]
@ -1547,7 +1769,7 @@ dependencies = [
"libc",
"mio",
"num_cpus",
"parking_lot",
"parking_lot 0.12.1",
"pin-project-lite",
"signal-hook-registry",
"socket2",
@ -1563,7 +1785,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.48",
]
[[package]]
@ -1758,7 +1980,7 @@ dependencies = [
"once_cell",
"proc-macro2",
"quote",
"syn",
"syn 2.0.48",
"wasm-bindgen-shared",
]
@ -1792,7 +2014,7 @@ checksum = "bae1abb6806dc1ad9e560ed242107c0f6c84335f1749dd4e8ddb012ebd5e25a7"
dependencies = [
"proc-macro2",
"quote",
"syn",
"syn 2.0.48",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
@ -1803,6 +2025,19 @@ version = "0.2.90"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d91413b1c31d7539ba5ef2451af3f0b833a005eb27a631cec32bc0635a8602b"
[[package]]
name = "wasm-streams"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b65dc4c90b63b118468cf747d8bf3566c1913ef60be765b5730ead9e0a3ba129"
dependencies = [
"futures-util",
"js-sys",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
]
[[package]]
name = "web-sys"
version = "0.3.67"
@ -1819,6 +2054,28 @@ version = "0.25.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1"
[[package]]
name = "winapi"
version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419"
dependencies = [
"winapi-i686-pc-windows-gnu",
"winapi-x86_64-pc-windows-gnu",
]
[[package]]
name = "winapi-i686-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-core"
version = "0.52.0"
@ -1979,3 +2236,9 @@ dependencies = [
"cfg-if",
"windows-sys 0.48.0",
]
[[package]]
name = "zeroize"
version = "1.7.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "525b4ec142c6b68a2d10f01f7bbf6755599ca3f81ea53b8431b7dd348f5fdb2d"

View File

@ -10,8 +10,10 @@ serde = { version = "1.0", features = ["derive"] }
serde_json = { version = "1.0" }
reqwest = { version = "0.12", features = ["json", "native-tls-vendored"] }
influxdb = { version = "0.7", features = ["derive"] }
influxdb2 = { version = "0.5.0" }
futures = { version = "0.3" }
tokio = { version = "1.37", features = ["full"] }
chrono = { version = "0.4" }
clap = { version = "4.5", features = ["derive"] }
handlebars = { version = "5.1" }
handlebars = { version = "5.1" }
url = "2.5.0"

View File

@ -1,11 +1,17 @@
use chrono::{DateTime, Utc, Timelike, Days};
use influxdb::{Client, Timestamp, WriteQuery};
use influxdb::InfluxDbWriteable;
use clap::Parser;
use handlebars::Handlebars;
use std::collections::HashMap;
use std::fs::File;
use chrono::{DateTime, Days, Timelike, Utc};
use clap::Parser;
use futures::stream;
use handlebars::Handlebars;
use influxdb::{Timestamp, WriteQuery};
use influxdb2::Client as i2Client;
use influxdb2::models::DataPoint;
use influxdb::Client as i1Client;
use influxdb::InfluxDbWriteable;
use url::Url;
#[derive(InfluxDbWriteable)]
struct InfluxData {
time: DateTime<Utc>,
@ -29,6 +35,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config_data: serde_json::Value = serde_json::from_reader(File::open(cli_options.file).unwrap())
.expect("file should be proper JSON");
let influx_conf = &config_data["influx"];
let influx_version = influx_conf["version"].as_i64().unwrap();
if influx_version != 1 && influx_version != 2 {
panic!("unsupported influx version: {}", influx_version)
}
println!("Handling influx version: {}", influx_version);
let mut data = HashMap::new();
for (key, value) in config_data["vars"].as_object().unwrap() {
data.insert(key, value.as_str().unwrap().to_string());
@ -44,6 +57,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let dt = Utc::now();
let mut write_queries: Vec<WriteQuery> = Vec::new();
let mut data_points: Vec<DataPoint> = Vec::new();
let http_client = reqwest::Client::builder()
.timeout(core::time::Duration::from_secs(5))
@ -100,32 +114,50 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
let unit = value["tags"]["unit"].as_str().unwrap().to_string();
let measurement = value["measurement"].as_str().unwrap().to_string();
write_queries.push(
InfluxData {
time: Timestamp::from(dt).into(),
value: res_value,
system,
metric,
unit,
}.into_query(measurement),
);
if influx_version == 1 {
write_queries.push(
InfluxData {
time: Timestamp::from(dt).into(),
value: res_value,
system,
metric,
unit,
}.into_query(measurement),
);
} else if influx_version == 2 {
data_points.push(
DataPoint::builder(measurement)
.tag("system", system)
.tag("metric", metric)
.tag("unit", unit)
.field("value", res_value)
.build()?
);
}
}
}
}
}
}
let influx_conf = &config_data["influx"];
let url = influx_conf["url"].as_str().unwrap().to_string();
let database = influx_conf["database"].as_str().unwrap().to_string();
let username = influx_conf["username"].as_str().unwrap().to_string();
let password = influx_conf["password"].as_str().unwrap().to_string();
let client = Client::new(url, database)
.with_auth(username, password);
let write_result = client
.query(write_queries)
.await;
assert!(write_result.is_ok(), "{}", write_result.err().unwrap().to_string());
if influx_version == 1 {
let username = influx_conf["i1_username"].as_str().unwrap().to_string();
let password = influx_conf["i1_password"].as_str().unwrap().to_string();
let client = i1Client::new(url, database)
.with_auth(username, password);
let write_result = client
.query(write_queries)
.await;
assert!(write_result.is_ok(), "{}", write_result.err().unwrap().to_string());
} else if influx_version == 2 {
let host = Url::parse(&*url)?;
let org = influx_conf["i2_org"].as_str().unwrap().to_string();
let token = influx_conf["i2_token"].as_str().unwrap().to_string();
let client = i2Client::new(host, org, token);
client.write(&*database, stream::iter(data_points)).await?;
}
Ok(())
}