プログラミング言語Rustは、「パフォーマンス、型安全性、並行性を重視した 」静的コンパイル言語です。使われ始めてすぐ急速に成長し、stackoverflowの年間開発者調査では常に最も人気のある言語としてランクインしています。
Rustの人気をうけて、GridDB開発チームはRustをデータベースと連携するための GridDB rust クライアントを作成しました。他のGridDBコネクタと同様に、これを利用することで、実行中のGridDBサーバに対して直接読み書きできるプログラムを書くことができるようになります。
このブログでは、Rustのインストール、Rustクライアント、そしてこのクライアントの基本的な機能を紹介するいくつかの簡単なCRUDコマンドについて説明します。この中には、コンテナへのクエリなどのアクションが含まれます。
こちらからソースコードの全容を確認することができます。
$ git clone --branch blog_1_rust https://github.com/griddbnet/Blogs.git
最近のブログで使用しているデータセットは、Kaggleから取得したものを使用しています。詳しくはPostgreSQLからGridDBへ移行するのブログで紹介しています。
はじめに
まずは、GridDBを直接インストール、またはDockerで稼働させておく必要があります。
また、GridDB c_clientも必要です。(注:aptやyumでGridDBをインストールした場合、c_clientは既にインストールされています。)そして最後に、プログラミング言語Rustもあらかじめインストールしておく必要があります。
$ curl https://sh.rustup.rs -sSf | sh
もう一つ、コンパイラのclangをインストールしておきます。
# Ubuntu
$ sudo apt-get install clang-10 libclang-10-dev
# CentOS
$ sudo yum install llvm-toolset-7.0
$ scl enable llvm-toolset-7.0 bash
マシンにこれらの前提条件が揃ったら、rust_client
ディレクトリに移動して、Rust buildコマンドを実行します。
$ cargo build
このコマンドはRustツールチェーンの一部で、Cargo.toml
ファイルを読み込んでプロジェクトをビルドアウトしてくれるものです。ここから、公式クライアントのレポに含まれるサンプルコードを実行して、すべてが意図したとおりに動作しているかどうかを確認することができます。このプロジェクトのレポをクローンしている場合は、もう少しお待ちください。このレポに含まれるコードを実行する方法について、先に説明します。
Rustソースコードを書く
GridDBサーバと連携するためのRustソースコードを書き始めるために、まず適切なGridDBクレートをインポートし、サーバに接続することから始めます。
そこで、まずはライブラリをインポートし、コードで使用することを目的とした関数をインポートしてみましょう。
現在、このブログの上部と下部に含まれるソースコードで作業しています。
use std::time::Duration;
use griddb::get_value;
use griddb::griddb::ContainerInfo::*;
use griddb::griddb::StoreFactory::*;
use griddb::griddb::Type::*;
use griddb::griddb::Value::*;
use griddb::gsvec;
use chrono:: Utc;
[package]
name = "griddb_rust_client_blog"
version = "0.1.0"
edition = "2021"
各ファイルの上部にあるimportは、他の言語(Pythonなど)と同様に動作します。ここでは、GridDB Rustクライアントが存在するkonector_dbライブラリーを呼び出しています。
パッケージ名は、好きな名前をつけることができます。ここでは、このブログなので一般的な名前をつけています。
GridDB Rustクライアントを使う
GridDB Rustクライアントを自分のリポジトリ・プロジェクトに追加するには、以下の内容を Cargo.toml
ファイルに追加します。
[dependencies]
griddb = "0.5.0"
chrono = "0.4"
convert_case = "^0.3.0"
これは、Rustツールチェーンが、コンパイル時に cargo build
や cargo run
を実行すると、GridDB rust connector のソースコードがビルドされてプロジェクトに含まれることを確認することを意味します。
このプロジェクトを実行するには、リポジトリをクローンして、公式レポのサンプルコードと同様に各例を実行します。
$ cargo run --example connect
Successfully Connected to GridDB
筆者のサーバーの値は、examples
ディレクトリのサンプルファイルにハードコードされているので、コマンドが失敗した場合は、GridDBが起動していることを確認し、必要に応じてDBの接続詳細を変更してください。
GridDBに接続する
このブログのソースコードでは、使いやすいように、すべてのコードを main
関数の中に配置することにします。
他のGridDBコネクタと同様に、ファクトリーストアを使用して、接続の詳細を入力することでデータベースに接続することになります。
// get default factory
let factory = StoreFactory::get_instance();
let properties = vec![
("notification_member", "127.0.0.1:10001"),
("cluster_name", "myCluster"),
("user", "admin"),
("password", "admin"),
];
GridDBのソースコードの例とは少し異なりますが、実行とデバッグをスムーズに行うために、GridDBの接続の詳細をコード内にハードコードしています。
適切な接続の詳細が得られたら、接続を確立して gridstore
関数を取得することができます。
// get gridstore function
let store = match factory.get_store(properties) {
Ok(result) => result,
Err(error) => panic!("Error factory get_store() with error code: {:?}", error),
};
これは C
や JavaScript
にある古典的な switch
ステートメントに少し似た働きをします。うまくいけば Ok
を返し、 store
に result
を返します。失敗すると、プログラムはエラーを返してパニックになり、エラーを出力します。
このストア変数が入力されると、データベースに接続され、コンテナの作成を開始することができます。
GridDBを使ってCRUDを作成、読み込み、更新、削除する
Rustクライアントを紹介するために、典型的なCRUDコマンドを通して、SQLやその他のデータベースと連携する基本的な機能を紹介します。コンテナの作成、コンテナの削除、行の追加、行の削除、TQLとAPIによるクエリ、そして最後にTQLによる単純な集計機能を実行する大きな関数を用意しました。
完全なソースコードに入る前に、もう一度 Cargo.toml
ファイルを見てみましょう。今回は、ファイル全体を表示し、使用している依存関係をすべて紹介します。
[package]
name = "griddb_rust_client_blog"
version = "0.1.0"
edition = "2021"
[dependencies]
griddb = "0.5.0"
chrono = "0.4"
convert_case = "^0.3.0"
作成、削除する
GridDBコンテナスキーマを格納するcolinfo
変数を作成する際には、データ型がRustのものと一致するように留意してください。例えば、GridDBのDOUBLEのデータ型はf64
でなければなりません。
何が何に変換されるかについては、APIドキュメントのData-Type Mappingセクションで確認することができます。
// Creating Time Series Container
let tsinfo = ContainerInfo::ContainerInfo(
"device13",
vec![
("ts", Type::Timestamp),
("co", Type::Double),
("humidity", Type::Double),
("light", Type::Bool),
("lpg", Type::Double),
("motion", Type::Bool),
("smoke", Type::Double),
("temp", Type::Double),
],
ContainerType::TimeSeries,
true,
);
そして、スキーマとすべての情報が設定されたら、put_container
でさらに通常のGridDBを実行します。
しかし、実際にコンテナ(とそのスキーマ)をデータベース内に作成する前に、まずコンテナに対して drop_container
を呼び出すことにします。これにより、この例のソースコードを実行するたびに、新しい状態から開始されるようになります。存在しないコンテナを削除してもエラーは発生しないので、SQL コマンドの DROP TABLE IF EXISTS
に似ていることに気付いたかもしれません。
store.drop_container("device13");
match store.put_container(&tsinfo, false) {
Ok(result) => result,
Err(error) => panic!("Error store put_container() with error code: {:?}", error),
};
また、同様の設定で Collection
コンテナを作成することもできます。このコンテナは、センサーコンテナと記録管理コンテナのような、現実世界のタイプスキーマを模倣しています。
// Creating Collection Container
let colinfo = ContainerInfo::ContainerInfo(
"deviceMaster2",
vec![
("sensorID", Type::String),
("location", Type::String),
],
ContainerType::Collection,
true,
);
store.drop_container("deviceMaster2");
let con = match store.put_container(&colinfo, false) {
Ok(result) => result,
Err(error) => panic!("Error store put_container() with error code: {:?}", error),
};
con.set_auto_commit(false);
con.create_index("sensorID", IndexType::Default);
con.commit();
println!("Successfully created Collection container: deviceMaster2");
時系列コンテナとコレクションの唯一の違いは、ここではロウキーのインデックスを手動で作成しましたが、時系列コンテナでは、コンテナタイプの定義により自動作成されるという点です。
そしてまた、このサンプルコードを実行します。
$ cargo run --example create_containers
これを実行すると、実行中のGridDBサーバにこの2つのコンテナが作成されることになります。GridDB CLI ツールで確認することができます。以下のように使います。
$ sudo su gsadm
$ gs_sh
gs> showcontainer device13
Database : public
Name : device13
Type : TIME_SERIES
Partition ID: 8
DataAffinity: -
Compression Method : NO
Compression Window : -
Row Expiration Time: -
Row Expiration Division Count: -
Columns:
No Name Type CSTR RowKey Compression
------------------------------------------------------------------------------
0 ts TIMESTAMP NN [RowKey]
1 co DOUBLE
2 humidity DOUBLE
3 light BOOL
4 lpg DOUBLE
5 motion BOOL
6 smoke DOUBLE
7 temp DOUBLE
データを挿入、作成する
次に、いくつかのデータをコンテナにプッシュしてみましょう。これは、次のように .put
というシンプルな API 呼び出しで実現できます。
// Grab current time to use as time value for container
let timestamp: Timestamp = Timestamp {
value: Utc::now().timestamp_millis(),
};
// following the schema laid out in the create_container.rs file
ts.put(gsvec![timestamp, 0.004342, 49.0, false, 0.00753242, false, 0.0212323, 23.2]);
let timestamp_second: Timestamp = Timestamp {
value: Utc::now().timestamp_millis() + 1000,
};
ts.put(gsvec![timestamp_second, 0.0065342, 31.0, false, 0.753242, false, 0.02653323, 27.2]);
// rows aren't pushed until the commit is called
ts.commit();
ここでは、デバイスコンテナを表す変数 ts
を使って、そこに直接データを入れています。GridDB クライアントから gsvec
を使って、スキーマが期待する適切なデータ型をすべて含むベクトルを作成します。このように、適切なデータを直接コンテナに入力することができます。ここでは、2つの行をコンテナに配置していますが、それぞれ異なる時刻を行キーとしています。(2番目の行キーについては、新しい行が作られることを保証するために、元のタイムスタンプに1000msを追加します。)
$ cargo run --example insert_data
ここでも、実際にデータが挿入されたことを確認することができます。
$ sudo su gsadm
$ gs_sh
gs[public]> select * from device13;
2 results. (5 ms)
gs[public]> get
ts,co,humidity,light,lpg,motion,smoke,temp
2022-11-28T21:40:12.250Z,0.004342,49.0,false,0.00753242,false,0.0212323,23.2
2022-11-28T21:40:13.251Z,0.0065342,31.0,false,0.753242,false,0.02653323,27.2
The 2 results had been acquired.
TQLで読み込む
次に、より複雑なデータの読み取りを試してみましょう。rowkeyとapiを使って直接行を呼び出す代わりに、.query
apiコールを使って実際のTQLクエリを実行します。
ソースコードは以下です。
let query = match ts.query("select *") {
Ok(result) => result,
Err(error) => panic!("Error container query data with error code: {:?}", error),
};
let row_set = match query.fetch() {
Ok(result) => result,
Err(error) => panic!("Error query fetch() data with error code: {:?}", error),
};
while row_set.has_next() {
let row_data = match row_set.next() {
Ok(result) => result,
Err(error) => panic!("Error row set next() row with error code: {:?}", error),
};
let ts: Timestamp = get_value![row_data[0]];
let timestamp_number: i64 = ts.value;
let co: f64 = get_value![row_data[1]];
let humidity: f64 = get_value![row_data[2]];
let light: bool = get_value![row_data[3]];
let lpg: f64 = get_value![row_data[4]];
let motion: bool = get_value![row_data[5]];
let smoke: f64 = get_value![row_data[6]];
let temp: f64 = get_value![row_data[7]];
let tup_query = (timestamp_number, co, humidity, light, lpg, motion, smoke, temp);
println!(
"Device13:
ts={0} co={1} humidity={2} light={3} lpg={4} motion={5} smoke={6} temp={7}",
tup_query.0,
tup_query.1,
tup_query.2,
tup_query.3,
tup_query.4,
tup_query.5,
tup_query.6,
tup_query.7
);
}
ここでもcon変数を使用してAPIコールを行っています。どのコンテナがターゲットになっているかはすでに分かっているので、クエリでコンテナを指定する必要はなく、単に必要なカラムを修飾語なしで選択しています。そこから、query変数を受け取り、fetchを実行して検索を実行します。結果はrow_setの中に保存されます。
このrow_setが入力されたら、返された各行をループして、単純に行の各カラムの値を取得し、結果をプリントアウトすることができます。この処理は、GridDBで利用できるプログラミングコネクタと同様です。
これを実行すれば、デバイスコンテナに2行が表示されることになります。このプログラムを使ってクエリーを行うこともでき、GridDB CLIを使って結果を表示することもできます。
$ cargo run --example read_data
アップデートする
次に、データベースからデータを読み込んでみましょう。この時点ではコンテナ内に2つの行しかありませんが、特定の行のデータを素早く検索してタイムスタンプ(rowkey)を取得し、その後に行の更新を行います。
let query = match ts.query("select * where temp = 23.2") {
Ok(result) => result,
Err(error) => panic!("Error container query data with error code: {:?}", error),
};
let row_set = match query.fetch() {
Ok(result) => result,
Err(error) => panic!("Error query fetch() data with error code: {:?}", error),
};
// Init timestamp to current time
let mut timestamp: Timestamp = Timestamp {
value: Utc::now().timestamp_millis(),
};
while row_set.has_next() {
let row_data = match row_set.next() {
Ok(result) => result,
Err(error) => panic!("Error row set next() row with error code: {:?}", error),
};
timestamp = get_value![row_data[0]];
}
ts.put(gsvec![timestamp, 0.214342, 43.32, true, 0.00753242, true, 0.0212323, 23.2]);
ts.commit();
ここでは、かなり多くのことが行われているので、順を追って説明します。まず、再び query
を使用して、TQL検索を行い、あるデータポイントを検索しています。これは、更新しようとする行の行キーを取得するためです。
この場合、行のルックアップを行い、クエリが成功したら、ルックアップしたクエリに一致するすべての行を繰り返し処理します。そして、クエリから返されたタイムスタンプを timestamp
変数に保存し、その情報を使用して行を更新します。
一度ロウキーを取得すれば、すでに存在する行に対して .put
を使用しても、エラーを発生させるのではなく、単にその行を新しい値で更新するだけになります。
$ cargo run --example update_data
これが実行されると、温度が23.2ちょうどの行を探すというクエリにマッチした最初の行の湿度が更新されます。つまり、この更新により湿度は49から43.32に変更されます。これは、CLIツールまたは read_data
の例で確認することができます。
$ cargo run --example read_data
Finished dev [unoptimized + debuginfo] target(s) in 0.08s
Running `target/debug/examples/read_data`
Device13:
ts=1669671612250 co=0.214342 *humidity=43.32* light=true lpg=0.00753242 motion=true smoke=0.0212323 temp=23.2
Device13:
ts=1669671613251 co=0.0065342 humidity=31 light=false lpg=0.753242 motion=false smoke=0.02653323 temp=27.2
削除する
行を削除するには、.remove
APIコールを使用すると簡単です。updateと同様に、対象となる行のrowkeyが必要です。今回のケースでは、削除する行の正確なタイムスタンプが必要です。この情報が得られれば(おそらくクエリのルックアップによって)、簡単に行を削除することができます。
while row_set.has_next() {
let row_data = match row_set.next() {
Ok(result) => result,
Err(error) => panic!("Error row set next() row with error code: {:?}", error),
};
timestamp = get_value![row_data[0]];
}
ts.remove(timestamp);
ts.commit();
$ cargo run --example delete_data
確認するには、CLIかread_data
を使用します。今回はCLIを使用してみましょう。
$ sudo su gsadm
$ gs_sh
gs[public]> select * from device13;
1 results. (2 ms)
gs[public]> get
ts,co,humidity,light,lpg,motion,smoke,temp
2022-11-28T21:40:13.251Z,0.0065342,31.0,false,0.753242,false,0.02653323,27.2
The 1 results had been acquired.
これで、device13
コンテナの中に1行のデータだけが挿入されたことになります。
集計クエリ
最後に、TQLを使った簡単な集計クエリについて説明したいと思います。TQLについては、こちらのTQLのドキュメントで紹介しています。
この例では、あるしきい値以上の気温を検索し、その時間帯の平均気温を求めます。この例では2行のデータしかないので、正確には有用なデータとは言えませんが、可能性とこの関数がどのように機能するのかを明らかにすることができます。
この例では、TIME_AVG
とTIME_SAMPLING
の両方について説明します。どちらも上のリンクでより詳しく説明しています。
TIME_AVG
は、重量時間平均を取得したい列を入力すると、コンテナ全体の関連するすべての行について計算が行われ、単一の値が出力されます。
そして、以前の作業例のデータは非常に小さく、2行以下のデータでは面白い分析はできないため、今回はこのKaggleデータセットからコンテナの1つを使用します。このデータセットのインジェストについては、以前のブログを参照してください。
この場合、単純にdevice1
とします。
let ts = match store.get_container("device1") {
Ok(result) => result,
Err(error) => panic!("Error store put_container() with error code: {:?}", error),
};
// Aggregation Time Series Time Avg: https://www.toshiba-sol.co.jp/en/pro/griddb/docs-en/v4_0_3/GridDB_API_Reference.html#sec-3-3-3
let average_query = format!("select TIME_AVG(humidity)");
let agg_query = match ts.query(&average_query) {
Ok(result) => result,
Err(error) => panic!(
"Error container query aggregation data with error code: {}",
error
),
};
let agg_result = match agg_query.fetch() {
Ok(result) => result,
Err(error) => panic!(
"Error query fetch() aggregation data with error code: {}",
error
),
};
let agg_data = match agg_result.next_aggregation() {
Ok(result) => result,
Err(error) => panic!(
"Error row set next() aggregation row with error code: {}",
error
),
};
println!(" humidity time avg = {:}",agg_data.get_as_f64().1);
上記のソースコードは長く見えますが、これまでやってきたことと何ら変わりはありません。単純にTQLクエリを取り込んで、その結果をプリントアウトしています。まず結果を取得し、次に next_aggregation
APIコールを使って値を取得し、コンソールに出力しています。
次に、TIME_SAMPLE
関数を使用してみることにします。
// TIME SAMPLE from Time Series Aggregation functionality using TQL
let time_sample = format!("select TIME_SAMPLING(humidity, TIMESTAMP('2020-07-18T11:22:33.444Z'), TIMESTAMP('2020-07-22T11:22:33.444Z'), 1, HOUR)");
println!("Running Query: {}", time_sample);
let agg_query_two = match ts.query(&time_sample) {
Ok(result) => result,
Err(error) => panic!(
"Error container query aggregation data with error code: {}",
error
),
};
let agg_result_two = match agg_query_two.fetch() {
Ok(result) => result,
Err(error) => panic!(
"Error query fetch() aggregation data with error code: {}",
error
),
};
while agg_result_two.has_next() {
let row_data = match agg_result_two.next() {
Ok(result) => result,
Err(error) => panic!("Error row set next() row with error code: {:?}", error),
};
let ts: Timestamp = get_value![row_data[0]];
let timestamp_number: i64 = ts.value;
let co: f64 = get_value![row_data[1]];
let humidity: f64 = get_value![row_data[2]];
let light: bool = get_value![row_data[3]];
let lpg: f64 = get_value![row_data[4]];
let motion: bool = get_value![row_data[5]];
let smoke: f64 = get_value![row_data[6]];
let temp: f64 = get_value![row_data[7]];
let tup_query = (timestamp_number, co, humidity, light, lpg, motion, smoke, temp);
println!(
"Device13:
ts={0} co={1} humidity={2} light={3} lpg={4} motion={5} smoke={6} temp={7}",
tup_query.0,
tup_query.1,
tup_query.2,
tup_query.3,
tup_query.4,
tup_query.5,
tup_query.6,
tup_query.7
);
}
ここでは、基本的に read_data
関数と同じコードを実行していますが、より興味深く、複雑なクエリを使用しています。ここでは device1
データセットの TIME_SAMPLING
を検索しています。実際のクエリ文字列は次のようなものです。let time_sample = format!("select TIME_SAMPLING(humidity, TIMESTAMP('2020-07-18T11:22:33.444Z'), TIMESTAMP('2020-07-22T11:22:33.444Z'), 1, HOUR)");
これは、クエリで指定した明示的な値(この場合は4日分のデータ)の間のデータセットを、1時間間隔でサンプリングするようプログラムに指示しています。
そして、返ってきた結果を繰り返し、コンソールに出力します。
$ cargo run --example timeseries_aggregation
結果はこのようになります。(ただし、これが列の全てではありません。)
humidity time avg = 50.81422729710963
Running Query: select TIME_SAMPLING(humidity, TIMESTAMP('2020-07-18T11:22:33.444Z'), TIMESTAMP('2020-07-22T11:22:33.444Z'), 1, HOUR)
Device13:
ts=1595071353444 co=0.0057382469167573434 humidity=52.29999923706055 light=false lpg=0.008506583886712459 motion=false smoke=0.022858971137213444 temp=22
Device13:
ts=1595074953444 co=0.0056719601223669276 humidity=52.70000076293945 light=false lpg=0.008435383766205812 motion=false smoke=0.022654654779928257 temp=22
Device13:
ts=1595078553444 co=0.0056825985719608325 humidity=53.5235710144043 light=false lpg=0.008446826202354823 motion=false smoke=0.022687482170072902 temp=22.3
Device13:
ts=1595082153444 co=0.005747325011217979 humidity=51.92147445678711 light=false lpg=0.008516317111357271 motion=false smoke=0.022886910748059562 temp=21.7
Device13:
ts=1595085753444 co=0.0062171975352361755 humidity=54.908409118652344 light=false lpg=0.009014482964305905 motion=false smoke=0.024319772878645333 temp=21.8
Device13:
ts=1595089353444 co=0.006172104950567534 humidity=51.70000076293945 light=false lpg=0.008967138511929578 motion=false smoke=0.02418336009283044 temp=21.3
さらに、GridDB CLIでこれらのクエリを見てみましょう。
$ sudo su gsadm
$ gs_sh
gs[public]> tql device1 select TIME_AVG(humidity);
1 results. (2 ms)
gs[public]> get
Result
50.81422729710963
The 1 results had been acquired.
gs[public]> tql device1 select TIME_SAMPLING(humidity, TIMESTAMP('2020-07-18T11:22:33.444Z'), TIMESTAMP('2020-07-22T11:22:33.444Z'), 1, HOUR);
37 results. (0 ms)
gs[public]> get
ts,co,humidity,light,lpg,motion,smoke,temp
2020-07-18T11:22:33.444Z,0.0030488793379940217,77.0,false,0.005383691572564569,false,0.014022828989540865,19.799999237060547
2020-07-18T12:22:33.444Z,0.0029050147565559603,76.65528106689453,false,0.005198697479294309,false,0.013508733329556249,19.799999237060547
2020-07-18T13:22:33.444Z,0.002872341154862943,76.61731719970703,false,0.005156332935627952,false,0.013391176782176004,19.799999237060547
2020-07-18T14:22:33.444Z,0.002612589347788125,75.37261199951172,false,0.004814621044662395,false,0.012445419108693902,19.5
2020-07-18T15:22:33.444Z,0.003655886924967606,75.85926818847656,false,0.006139350359130843,false,0.016134931599636852,19.299999237060547
2020-07-18T16:22:33.444Z,0.003655886924967606,74.72727966308594,false,0.006139350359130843,false,0.016134931599636852,19.0
まとめ
ここまでで、GridDB rustクライアントをインストールし、使ってみました。もっと詳しく知りたい方は、公式リポジトリにある他のサンプルコードを見て、自分のアプリケーションを作ってみてください。
全ソースはこちらのGitHubでご覧いただけます。
ブログの内容について疑問や質問がある場合は Q&A サイトである Stack Overflow に質問を投稿しましょう。 GridDB 開発者やエンジニアから速やかな回答が得られるようにするためにも "griddb" タグをつけることをお忘れなく。 https://stackoverflow.com/questions/ask?tags=griddb