From a754263d68dffeafcdc7d946a8329f4581c12710 Mon Sep 17 00:00:00 2001 From: Joachim Lusiardi Date: Wed, 1 May 2024 10:31:36 +0200 Subject: [PATCH] add support for influxdb 2 --- Cargo.lock | 295 +++++++++++++++++++++++++++++++++++++++++++++++++--- Cargo.toml | 4 +- src/main.rs | 78 ++++++++++---- 3 files changed, 337 insertions(+), 40 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e7cf74e..62e5a08 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -110,6 +110,12 @@ dependencies = [ "rustc-demangle", ] +[[package]] +name = "base64" +version = "0.13.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e1b586273c5702936fe7b7d6896644d8be71e6314cfe09d3167c95f712589e8" + [[package]] name = "base64" version = "0.21.7" @@ -216,7 +222,7 @@ dependencies = [ "heck", "proc-macro2", "quote", - "syn", + "syn 2.0.48", ] [[package]] @@ -266,6 +272,27 @@ dependencies = [ "typenum", ] +[[package]] +name = "csv" +version = "1.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac574ff4d437a7b5ad237ef331c17ccca63c46479e5b5453eb8e10bb99a759fe" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde", +] + +[[package]] +name = "csv-core" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5efa2b3d7902f4b634a20cae3c9c4e6209dc4779feb6863329607560143efa70" +dependencies = [ + "memchr", +] + [[package]] name = "digest" version = "0.10.7" @@ -276,6 +303,18 @@ dependencies = [ "crypto-common", ] +[[package]] +name = "doc-comment" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fea41bba32d969b513997752735605054bc0dfa92b4c56bf1189f2e174be7a10" + +[[package]] +name = "either" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a47c1c47d2f5964e29c61246e81db715514cd532db6b5116a25ea3c03d6780a2" + [[package]] name = "encoding_rs" version = "0.8.33" @@ -301,6 +340,12 @@ dependencies = [ "windows-sys 0.52.0", ] +[[package]] +name = "fallible-iterator" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4443176a9f2c162692bd3d352d745ef9413eec5782a80d8fd6f8a1ac692a07f7" + [[package]] name = "fastrand" version = "2.0.1" @@ -393,7 +438,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.48", ] [[package]] @@ -453,6 +498,12 @@ version = "0.28.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4271d37baee1b8c7e4b708028c57d816cf9d2434acb33a549475f78c181f6253" +[[package]] +name = "go-parse-duration" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "558b88954871f5e5b2af0e62e2e176c8bde7a6c2c4ed41b13d138d96da2e2cbd" + [[package]] name = "h2" version = "0.3.24" @@ -649,6 +700,19 @@ dependencies = [ "tokio-rustls", ] +[[package]] +name = "hyper-tls" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d6183ddfa99b85da61a140bea0efc93fdf56ceaa041b37d553518030827f9905" +dependencies = [ + "bytes", + "hyper 0.14.28", + "native-tls", + "tokio", + "tokio-native-tls", +] + [[package]] name = "hyper-tls" version = "0.6.0" @@ -746,6 +810,55 @@ dependencies = [ "thiserror", ] +[[package]] +name = "influxdb2" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "823356ce3ed697fd640a7db4fbaaa6c2a00bf1eceec66ae0c57c01ef98f06270" +dependencies = [ + "base64 0.13.1", + "bytes", + "chrono", + "csv", + "fallible-iterator", + "futures", + "go-parse-duration", + "influxdb2-derive", + "influxdb2-structmap", + "ordered-float", + "parking_lot 0.11.2", + "reqwest 0.11.24", + "secrecy", + "serde", + "serde_json", + "snafu", + "url", +] + +[[package]] +name = "influxdb2-derive" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "990f899841aa30130fc06f7938e3cc2cbc3d5b92c03fd4b5d79a965045abcf16" +dependencies = [ + "itertools", + "proc-macro2", + "quote", + "regex", + "syn 1.0.109", +] + +[[package]] +name = "influxdb2-structmap" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1408e712051787357e99ff732e44e8833e79cea0fabc9361018abfbff72b6265" +dependencies = [ + "chrono", + "num-traits", + "ordered-float", +] + [[package]] name = "influxdb_derive" version = "0.5.1" @@ -754,7 +867,16 @@ checksum = "6ac96b3660efd0cde32b0b20bc86cc93f33269cd9f6c97e759e0b0259b2133fb" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.48", +] + +[[package]] +name = "instant" +version = "0.1.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a5bbe824c507c5da5956355e86a746d82e0e1464f65d862cc5e71da70e94b2c" +dependencies = [ + "cfg-if", ] [[package]] @@ -763,6 +885,15 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8f518f335dce6725a761382244631d86cf0ccb2863413590b31338feb467f9c3" +[[package]] +name = "itertools" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +dependencies = [ + "either", +] + [[package]] name = "itoa" version = "1.0.10" @@ -919,7 +1050,7 @@ checksum = "a948666b637a0f465e8564c73e89d4dde00d72d4d473cc972f390fc3dcee7d9c" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.48", ] [[package]] @@ -950,6 +1081,26 @@ dependencies = [ "vcpkg", ] +[[package]] +name = "ordered-float" +version = "3.9.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1e1c390732d15f1d48471625cd92d154e66db2c56645e29a9cd26f4699f72dc" +dependencies = [ + "num-traits", +] + +[[package]] +name = "parking_lot" +version = "0.11.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d17b78036a60663b797adeaee46f5c9dfebb86948d1255007a1d6be0271ff99" +dependencies = [ + "instant", + "lock_api", + "parking_lot_core 0.8.6", +] + [[package]] name = "parking_lot" version = "0.12.1" @@ -957,7 +1108,21 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3742b2c103b9f06bc9fff0a37ff4912935851bee6d36f3c02bcc755bcfec228f" dependencies = [ "lock_api", - "parking_lot_core", + "parking_lot_core 0.9.9", +] + +[[package]] +name = "parking_lot_core" +version = "0.8.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60a2cfe6f0ad2bfc16aefa463b497d5c7a5ecd44a23efa72aa342d90177356dc" +dependencies = [ + "cfg-if", + "instant", + "libc", + "redox_syscall 0.2.16", + "smallvec", + "winapi", ] [[package]] @@ -968,7 +1133,7 @@ checksum = "4c42a9226546d68acdd9c0a280d17ce19bfe27a46bf68784e4066115788d008e" dependencies = [ "cfg-if", "libc", - "redox_syscall", + "redox_syscall 0.4.1", "smallvec", "windows-targets 0.48.5", ] @@ -1010,7 +1175,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn", + "syn 2.0.48", ] [[package]] @@ -1041,7 +1206,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.48", ] [[package]] @@ -1080,6 +1245,15 @@ dependencies = [ "proc-macro2", ] +[[package]] +name = "redox_syscall" +version = "0.2.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fb5a58c1855b4b6819d59012155603f0b22ad30cad752600aadfcb695265519a" +dependencies = [ + "bitflags 1.3.2", +] + [[package]] name = "redox_syscall" version = "0.4.1" @@ -1134,10 +1308,12 @@ dependencies = [ "http-body 0.4.6", "hyper 0.14.28", "hyper-rustls", + "hyper-tls 0.5.0", "ipnet", "js-sys", "log", "mime", + "native-tls", "once_cell", "percent-encoding", "pin-project-lite", @@ -1149,11 +1325,14 @@ dependencies = [ "sync_wrapper", "system-configuration", "tokio", + "tokio-native-tls", "tokio-rustls", + "tokio-util", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", + "wasm-streams", "web-sys", "webpki-roots", "winreg 0.50.0", @@ -1175,7 +1354,7 @@ dependencies = [ "http-body 1.0.0", "http-body-util", "hyper 1.3.1", - "hyper-tls", + "hyper-tls 0.6.0", "hyper-util", "ipnet", "js-sys", @@ -1290,10 +1469,12 @@ dependencies = [ "futures", "handlebars", "influxdb", + "influxdb2", "reqwest 0.12.4", "serde", "serde_json", "tokio", + "url", ] [[package]] @@ -1327,6 +1508,15 @@ dependencies = [ "untrusted", ] +[[package]] +name = "secrecy" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9bd1c54ea06cfd2f6b63219704de0b9b4f72dcc2b8fdef820be6cd799780e91e" +dependencies = [ + "zeroize", +] + [[package]] name = "security-framework" version = "2.9.2" @@ -1367,7 +1557,7 @@ checksum = "33c85360c95e7d137454dc81d9a4ed2b8efd8fbe19cee57357b32b9771fccb67" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.48", ] [[package]] @@ -1428,6 +1618,27 @@ version = "1.13.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e6ecd384b10a64542d77071bd64bd7b231f4ed5940fba55e98c3de13824cf3d7" +[[package]] +name = "snafu" +version = "0.6.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eab12d3c261b2308b0d80c26fffb58d17eba81a4be97890101f416b478c79ca7" +dependencies = [ + "doc-comment", + "snafu-derive", +] + +[[package]] +name = "snafu-derive" +version = "0.6.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1508efa03c362e23817f96cde18abed596a25219a8b2c66e8db33c03543d315b" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "socket2" version = "0.5.5" @@ -1450,6 +1661,17 @@ version = "0.11.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7da8b5736845d9f2fcb837ea5d9e2628564b3b043a70948a3f0b778838c5fb4f" +[[package]] +name = "syn" +version = "1.0.109" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "72b64191b275b66ffe2469e8af2c1cfe3bafa67b529ead792a6d0160888b4237" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + [[package]] name = "syn" version = "2.0.48" @@ -1496,7 +1718,7 @@ checksum = "01ce4141aa927a6d1bd34a041795abd0db1cccba5d5f24b009f694bdf3a1f3fa" dependencies = [ "cfg-if", "fastrand", - "redox_syscall", + "redox_syscall 0.4.1", "rustix", "windows-sys 0.52.0", ] @@ -1518,7 +1740,7 @@ checksum = "fa0faa943b50f3db30a20aa7e265dbc66076993efed8463e8de414e5d06d3471" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.48", ] [[package]] @@ -1547,7 +1769,7 @@ dependencies = [ "libc", "mio", "num_cpus", - "parking_lot", + "parking_lot 0.12.1", "pin-project-lite", "signal-hook-registry", "socket2", @@ -1563,7 +1785,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.48", ] [[package]] @@ -1758,7 +1980,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn", + "syn 2.0.48", "wasm-bindgen-shared", ] @@ -1792,7 +2014,7 @@ checksum = "bae1abb6806dc1ad9e560ed242107c0f6c84335f1749dd4e8ddb012ebd5e25a7" dependencies = [ "proc-macro2", "quote", - "syn", + "syn 2.0.48", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -1803,6 +2025,19 @@ version = "0.2.90" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4d91413b1c31d7539ba5ef2451af3f0b833a005eb27a631cec32bc0635a8602b" +[[package]] +name = "wasm-streams" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b65dc4c90b63b118468cf747d8bf3566c1913ef60be765b5730ead9e0a3ba129" +dependencies = [ + "futures-util", + "js-sys", + "wasm-bindgen", + "wasm-bindgen-futures", + "web-sys", +] + [[package]] name = "web-sys" version = "0.3.67" @@ -1819,6 +2054,28 @@ version = "0.25.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5f20c57d8d7db6d3b86154206ae5d8fba62dd39573114de97c2cb0578251f8e1" +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + [[package]] name = "windows-core" version = "0.52.0" @@ -1979,3 +2236,9 @@ dependencies = [ "cfg-if", "windows-sys 0.48.0", ] + +[[package]] +name = "zeroize" +version = "1.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "525b4ec142c6b68a2d10f01f7bbf6755599ca3f81ea53b8431b7dd348f5fdb2d" diff --git a/Cargo.toml b/Cargo.toml index c7e9761..c67fe3e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -10,8 +10,10 @@ serde = { version = "1.0", features = ["derive"] } serde_json = { version = "1.0" } reqwest = { version = "0.12", features = ["json", "native-tls-vendored"] } influxdb = { version = "0.7", features = ["derive"] } +influxdb2 = { version = "0.5.0" } futures = { version = "0.3" } tokio = { version = "1.37", features = ["full"] } chrono = { version = "0.4" } clap = { version = "4.5", features = ["derive"] } -handlebars = { version = "5.1" } \ No newline at end of file +handlebars = { version = "5.1" } +url = "2.5.0" \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 7554700..751d778 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,11 +1,17 @@ -use chrono::{DateTime, Utc, Timelike, Days}; -use influxdb::{Client, Timestamp, WriteQuery}; -use influxdb::InfluxDbWriteable; -use clap::Parser; -use handlebars::Handlebars; use std::collections::HashMap; use std::fs::File; +use chrono::{DateTime, Days, Timelike, Utc}; +use clap::Parser; +use futures::stream; +use handlebars::Handlebars; +use influxdb::{Timestamp, WriteQuery}; +use influxdb2::Client as i2Client; +use influxdb2::models::DataPoint; +use influxdb::Client as i1Client; +use influxdb::InfluxDbWriteable; +use url::Url; + #[derive(InfluxDbWriteable)] struct InfluxData { time: DateTime, @@ -29,6 +35,13 @@ async fn main() -> Result<(), Box> { let config_data: serde_json::Value = serde_json::from_reader(File::open(cli_options.file).unwrap()) .expect("file should be proper JSON"); + let influx_conf = &config_data["influx"]; + let influx_version = influx_conf["version"].as_i64().unwrap(); + if influx_version != 1 && influx_version != 2 { + panic!("unsupported influx version: {}", influx_version) + } + println!("Handling influx version: {}", influx_version); + let mut data = HashMap::new(); for (key, value) in config_data["vars"].as_object().unwrap() { data.insert(key, value.as_str().unwrap().to_string()); @@ -44,6 +57,7 @@ async fn main() -> Result<(), Box> { let dt = Utc::now(); let mut write_queries: Vec = Vec::new(); + let mut data_points: Vec = Vec::new(); let http_client = reqwest::Client::builder() .timeout(core::time::Duration::from_secs(5)) @@ -100,32 +114,50 @@ async fn main() -> Result<(), Box> { let unit = value["tags"]["unit"].as_str().unwrap().to_string(); let measurement = value["measurement"].as_str().unwrap().to_string(); - write_queries.push( - InfluxData { - time: Timestamp::from(dt).into(), - value: res_value, - system, - metric, - unit, - }.into_query(measurement), - ); + if influx_version == 1 { + write_queries.push( + InfluxData { + time: Timestamp::from(dt).into(), + value: res_value, + system, + metric, + unit, + }.into_query(measurement), + ); + } else if influx_version == 2 { + data_points.push( + DataPoint::builder(measurement) + .tag("system", system) + .tag("metric", metric) + .tag("unit", unit) + .field("value", res_value) + .build()? + ); + } } } } } } - let influx_conf = &config_data["influx"]; let url = influx_conf["url"].as_str().unwrap().to_string(); let database = influx_conf["database"].as_str().unwrap().to_string(); - let username = influx_conf["username"].as_str().unwrap().to_string(); - let password = influx_conf["password"].as_str().unwrap().to_string(); - let client = Client::new(url, database) - .with_auth(username, password); - let write_result = client - .query(write_queries) - .await; - assert!(write_result.is_ok(), "{}", write_result.err().unwrap().to_string()); + if influx_version == 1 { + let username = influx_conf["i1_username"].as_str().unwrap().to_string(); + let password = influx_conf["i1_password"].as_str().unwrap().to_string(); + let client = i1Client::new(url, database) + .with_auth(username, password); + let write_result = client + .query(write_queries) + .await; + assert!(write_result.is_ok(), "{}", write_result.err().unwrap().to_string()); + } else if influx_version == 2 { + let host = Url::parse(&*url)?; + let org = influx_conf["i2_org"].as_str().unwrap().to_string(); + let token = influx_conf["i2_token"].as_str().unwrap().to_string(); + let client = i2Client::new(host, org, token); + client.write(&*database, stream::iter(data_points)).await?; + } Ok(()) }