GridDBに時系列データを取り込み、データセット内の各時間帯(ウィンドウやバケットとも呼ばれる)の集計を確認したい場合、どうしたらよいでしょうか?。GridDBのTIME_SAMPLING関数では、時間加重平均を出すことは可能ですが、各時間帯のカウント値や最小値・最大値を出したい場合はどうでしょうか?例えば、あるカラムの最大値を毎日求めたり、毎時のサンプル数を求めたりしたい場合はどうでしょうか?GridDBのマルチクエリ機能と、プログラミング言語の時刻・日付・カレンダー機能を使えば、以上のことが可能になります。
このブログでは、GridDBのTQL関数TIME_SAMPLEを利用するだけでなく、NYCタクシーデータセットから複数のアグリゲートを取得できる汎用関数を構築する方法について紹介します。
その具体的なファイルは以下の通りです。このファイルを/tmpディレクトリにダウンロードしているのは、インジェストコードがそこから読み込むようにハードコーディングされているためです。他の場所にダウンロードしたい場合は、src/main/java/net/griddb/tstaxiblog/IngestParquet.java
のコード内のファイルパスも変更してください。
$ wget -O /tmp/yellow_tripdata_2021-01.parquet https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-01.parquet
前提条件
このプロジェクトを始める前に、GridDBをダウンロード、インストールします。 https://docs.griddb.net/latest/gettingstarted/using-apt/#install-with-deb
また、GridDBをjavaで使用できるようにしておく必要があります。その方法については以下を参照してください。 https://docs.griddb.net/latest/gettingstarted/java/
Parquet形式でのデータ取り込み
以前のNYC Taxi データに関するブログでは、データはCSVで公開されており、解析が容易でした。最近のNY Taxi CommissionはParquet形式でデータを公開しており、利点もある一方で、解析が難しくなっています。
まず、Apache Parquet Library を使用するために必要な多くのライブラリ、parqet-avro, parqet-hadoop, parqet-format, hadoop-common, hadoop-client を含める必要があります。Maven を使用しているので、以下を依存関係に追加します。
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-format</artifactId>
<version>2.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.10.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.10.1</version>
<scope>provided</scope>
</dependency>
これで、Javaソースでparquetファイルを読み込み、その行のグループを反復処理できるようになりました。
Path path = new Path("yellow_tripdata_2021-01.parquet");
Configuration conf = new Configuration();
try {
ParquetMetadata readFooter = ParquetFileReader.readFooter(conf, path, ParquetMetadataConverter.NO_FILTER);
MessageType schema = readFooter.getFileMetaData().getSchema();
ParquetFileReader r = new ParquetFileReader(conf, path, readFooter);
PageReadStore pages = null;
try {
while (null != (pages = r.readNextRowGroup())) {
final long rows = pages.getRowCount();
System.out.println("Number of rows: " + rows);
final MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
final RecordReader recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema));
for (int i = 0; i < rows; i++) {
final Group g = (Group)recordReader.read();
writeGroup(ts, g);
}
}
} finally {
r.close();
}
} catch (IOException e) {
System.out.println("Error reading parquet file.");
e.printStackTrace();
}
writeGroup関数は、行のグループごとに呼び出され、実際に書き込みを行い、各行のデータをTaxiTripクラスのインスタンスに取り込み、書き込みを行います。
private static void writeGroup(TimeSeries ts, Group g) throws GSException {
int fieldCount = g.getType().getFieldCount();
int valueCount = g.getFieldRepetitionCount(0);
for (int index = 0; index < valueCount; index++) {
TaxiTrip r = new TaxiTrip();
for (int field = 0; field < fieldCount; field++) {
try {
Type fieldType = g.getType().getType(field);
String fieldName = fieldType.getName();
if (fieldType.isPrimitive()) {
switch(fieldName) {
case "tpep_pickup_datetime":
r.tpep_pickup_datetime = new Date(g.getLong(field, index)/1000);
break;
/* .... snip ... */
case "fare_amount":
r.fare_amount = (float)g.getDouble(field, index);
break;
/* .... snip .... */
default:
System.out.println("Unknown field: "+fieldName+" value="+g.getValueToString(field, index));
}
}
} catch (Exception e) {
}
}
ts.put(r);
}
}
GridDB’s TIME_SAMPLING 機能
GridDB の TIME_SAMPLING 関数は、各ウィンドウ内のカラムの値を線形補間して返します。この関数のシグネチャは以下の通りです。
TIME_SAMPLING(column, start, end, window_count, window_size)
この関数は、各リクエストのタイムスタンプに対して、その前後の行を使用して補間値を求めることで動作します。次の図は、その概念を視覚化したものです。
詳細は、GridDB プログラミングガイドに記載されています。
このコードでは、クエリを実行し、返された各ウィンドウを繰り返し処理します。
public static void timeSampling(GridStore store, String column, Date start, Date end, String windowstr) throws GSException {
TimeZone tz = TimeZone.getTimeZone("UTC");
DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:00'Z'");
df.setTimeZone(tz);
Container, Row> ts = store.getContainer("NYC_TaxiTrips");
String querystr = "select time_sampling("+column+", TIMESTAMP('"+df.format(start)+"'), TIMESTAMP('"+df.format(end) +"') , 1, "+windowstr+") ";
Query<row> q = ts.query(querystr);
RowSet</row><row> rs = q.fetch();
while (rs.hasNext()) {
Row result = rs.next();
System.out.println(result.getTimestamp(0)+"="+result.getDouble(10));
}
}</row>
timeSampling(store, "fare_amount", start, end, "DAY");
を呼び出すと、次のような値が得られます。
Window | Sample |
---|---|
Fri Jan 01 00:00:00 GMT 2021 | 14.833333015441895 |
Sat Jan 02 00:00:00 GMT 2021 | 8.55555534362793 |
Sun Jan 03 00:00:00 GMT 2021 | 15.710000038146973 |
Mon Jan 04 00:00:00 GMT 2021 | 25.31818199157715 |
Tue Jan 05 00:00:00 GMT 2021 | 22.024999618530273 |
Wed Jan 06 00:00:00 GMT 2021 | 9.0 |
Thu Jan 07 00:00:00 GMT 2021 | 43.5 |
Fri Jan 08 00:00:00 GMT 2021 | 6.0 |
Sat Jan 09 00:00:00 GMT 2021 | 28.950000762939453 |
Sun Jan 10 00:00:00 GMT 2021 | 18.450000762939453 |
Mon Jan 11 00:00:00 GMT 2021 | 41.75 |
Tue Jan 12 00:00:00 GMT 2021 | 6.5 |
Wed Jan 13 00:00:00 GMT 2021 | 17.710525512695312 |
Thu Jan 14 00:00:00 GMT 2021 | 10.8125 |
Fri Jan 15 00:00:00 GMT 2021 | 52.0 |
Sat Jan 16 00:00:00 GMT 2021 | 28.200000762939453 |
Sun Jan 17 00:00:00 GMT 2021 | 9.977272987365723 |
Mon Jan 18 00:00:00 GMT 2021 | 9.100000381469727 |
Tue Jan 19 00:00:00 GMT 2021 | 25.66666603088379 |
Wed Jan 20 00:00:00 GMT 2021 | 34.349998474121094 |
Thu Jan 21 00:00:00 GMT 2021 | 5.5 |
Fri Jan 22 00:00:00 GMT 2021 | 7.472727298736572 |
Sat Jan 23 00:00:00 GMT 2021 | 10.5 |
Sun Jan 24 00:00:00 GMT 2021 | 17.0 |
Mon Jan 25 00:00:00 GMT 2021 | 21.0 |
Tue Jan 26 00:00:00 GMT 2021 | 15.852941513061523 |
Wed Jan 27 00:00:00 GMT 2021 | 7.984375 |
Thu Jan 28 00:00:00 GMT 2021 | 42.83333206176758 |
Fri Jan 29 00:00:00 GMT 2021 | 9.083333015441895 |
Sat Jan 30 00:00:00 GMT 2021 | 11.892857551574707 |
Sun Jan 31 00:00:00 GMT 2021 | 14.371428489685059 |
TIME_SAMPLINGは便利ですが、各ウィンドウの線形補間を返すだけで、それぞれの平均値は一般的に(中央値が似ているように)似ているものの真の平均ではないので、使用にはいくつかの制限があります。同様に、時間ウィンドウの行数、最小値、最大値を求めることはできません。
マルチクエリによるウィンドウズ集計
時系列ウィンドウのセットに対して任意の集計を見つけるには、GridDB のマルチクエリ関数を使用することができます。これは、各タイムウィンドウに対して指定された集計を個別に実行するものです。次の図は、各ウィンドウに対するクエリを示しています。
このような処理を行うと遅くなるように思えるかもしれませんが、マルチクエリを使用することで、GridDBへのクエリの送信とデータ取得を最適化することができます。データベースは、この関数がビルトインされている場合と同じ処理を実行します。
汎用ウィンドウ集約機能の最初のステップとして、コンテナを開いて各ウィンドウのクエリを作成し、GridDBで実行されるクエリリストに追加します。各クエリは、Java の Calendar クラスを使用し、Calendar.HOUR や Calendar.DATE などの指定された Calendar ENUM 値で間隔をインクリメントすることで構築されています。
public static void windowAggregation(GridStore store, String aggregation, Date start, Date end, int windowsize) throws GSException {
Calendar c = Calendar.getInstance();
ArrayList<Query<aggregationresult>> queryList = new ArrayList<Query</aggregationresult><aggregationresult>>();
ArrayList<date> dates = new ArrayList</date><date>();
Container, Row> ts = store.getContainer("NYC_TaxiTrips");
c.setTime(start);
Date interval = c.getTime();
do {
c.add(windowsize, 1);
System.out.println("interval="+interval);
String windowquery = "select "+aggregation+" where tpep_pickup_datetime > TO_TIMESTAMP_MS("+interval.getTime()+") and tpep_pickup_datetime < TO_TIMESTAMP_MS("+c.getTime().getTime()+")";
Query<aggregationresult> q = ts.query(windowquery, AggregationResult.class);
dates.add(interval);
queryList.add(q);
interval = c.getTime();
} while (interval.getTime() <= end.getTime());
</aggregationresult></date></aggregationresult>
クエリのリストができたら、実行し、結果を繰り返し表示することができます。
store.fetchAll(queryList);
for (int i = 0; i < queryList.size(); i++) {
Query<aggregationresult> query = queryList.get(i);
RowSet</aggregationresult><aggregationresult> rs = query.getRowSet();
while (rs.hasNext()) {
AggregationResult result = rs.next();
double value = result.getDouble();
if (value != 0)
System.out.println(dates.get(i)+"= "+ value);
}
}
}
</aggregationresult>
windowAggregate(store, "avg(fare_amount)", start, end, Calendar.DATE);
を呼び出すと、以下のような結果が得られます。
Window | avg(fare_amount) |
---|---|
Fri Jan 01 00:00:00 GMT 2021 | 13.851391098873778 |
Sat Jan 02 00:00:00 GMT 2021 | 13.770499047323876 |
Sun Jan 03 00:00:00 GMT 2021 | 15.249551398785178 |
Mon Jan 04 00:00:00 GMT 2021 | 13.473628681590766 |
Tue Jan 05 00:00:00 GMT 2021 | 12.676972567160476 |
Wed Jan 06 00:00:00 GMT 2021 | 12.243332028212064 |
Thu Jan 07 00:00:00 GMT 2021 | 12.464050631310585 |
Fri Jan 08 00:00:00 GMT 2021 | 12.091422930601727 |
Sat Jan 09 00:00:00 GMT 2021 | 12.537748556275371 |
Sun Jan 10 00:00:00 GMT 2021 | 13.496787364093032 |
Mon Jan 11 00:00:00 GMT 2021 | 12.350597800281259 |
Tue Jan 12 00:00:00 GMT 2021 | 12.189052448708432 |
Wed Jan 13 00:00:00 GMT 2021 | 11.970959482137403 |
Thu Jan 14 00:00:00 GMT 2021 | 12.338237139653735 |
Fri Jan 15 00:00:00 GMT 2021 | 12.24978189382601 |
Sat Jan 16 00:00:00 GMT 2021 | 12.348960280360648 |
Sun Jan 17 00:00:00 GMT 2021 | 12.987822185285506 |
Mon Jan 18 00:00:00 GMT 2021 | 12.66304436182826 |
Tue Jan 19 00:00:00 GMT 2021 | 12.19362357336725 |
Wed Jan 20 00:00:00 GMT 2021 | 11.760170207452589 |
Thu Jan 21 00:00:00 GMT 2021 | 12.153647415269047 |
Fri Jan 22 00:00:00 GMT 2021 | 12.026541242751565 |
Sat Jan 23 00:00:00 GMT 2021 | 11.73361113486781 |
Sun Jan 24 00:00:00 GMT 2021 | 12.876742517934895 |
Mon Jan 25 00:00:00 GMT 2021 | 12.229045667216711 |
Tue Jan 26 00:00:00 GMT 2021 | 11.716670845330835 |
Wed Jan 27 00:00:00 GMT 2021 | 11.823579132291002 |
Thu Jan 28 00:00:00 GMT 2021 | 11.842199614451049 |
Fri Jan 29 00:00:00 GMT 2021 | 11.784085302551507 |
Sat Jan 30 00:00:00 GMT 2021 | 11.843539460445053 |
Sun Jan 31 00:00:00 GMT 2021 | 12.406643581985854 |
また、windowAggregate(store, "count(*)", start, end, Calendar.DATE)
を呼ぶことで、行数を確認することができます。
Window | COUNT(*) |
---|---|
Fri Jan 01 00:00:00 GMT 2021 | 20430.0 |
Sat Jan 02 00:00:00 GMT 2021 | 25429.0 |
Sun Jan 03 00:00:00 GMT 2021 | 20909.0 |
Mon Jan 04 00:00:00 GMT 2021 | 30733.0 |
Tue Jan 05 00:00:00 GMT 2021 | 31667.0 |
Wed Jan 06 00:00:00 GMT 2021 | 32965.0 |
Thu Jan 07 00:00:00 GMT 2021 | 33306.0 |
Fri Jan 08 00:00:00 GMT 2021 | 33628.0 |
Sat Jan 09 00:00:00 GMT 2021 | 28626.0 |
Sun Jan 10 00:00:00 GMT 2021 | 23034.0 |
Mon Jan 11 00:00:00 GMT 2021 | 31583.0 |
Tue Jan 12 00:00:00 GMT 2021 | 33201.0 |
Wed Jan 13 00:00:00 GMT 2021 | 33633.0 |
Thu Jan 14 00:00:00 GMT 2021 | 34580.0 |
Fri Jan 15 00:00:00 GMT 2021 | 34706.0 |
Sat Jan 16 00:00:00 GMT 2021 | 28094.0 |
Sun Jan 17 00:00:00 GMT 2021 | 24373.0 |
Mon Jan 18 00:00:00 GMT 2021 | 26830.0 |
Tue Jan 19 00:00:00 GMT 2021 | 33525.0 |
Wed Jan 20 00:00:00 GMT 2021 | 32902.0 |
Thu Jan 21 00:00:00 GMT 2021 | 34652.0 |
Fri Jan 22 00:00:00 GMT 2021 | 35478.0 |
Sat Jan 23 00:00:00 GMT 2021 | 29830.0 |
Sun Jan 24 00:00:00 GMT 2021 | 23776.0 |
Mon Jan 25 00:00:00 GMT 2021 | 32064.0 |
Tue Jan 26 00:00:00 GMT 2021 | 31969.0 |
Wed Jan 27 00:00:00 GMT 2021 | 34148.0 |
Thu Jan 28 00:00:00 GMT 2021 | 35670.0 |
Fri Jan 29 00:00:00 GMT 2021 | 35219.0 |
Sat Jan 30 00:00:00 GMT 2021 | 28349.0 |
Sun Jan 31 00:00:00 GMT 2021 | 23674.0 |
もし、毎日ウィンドウズクエリを実行して時間ごとの集計をしたい場合は、cronjobをセットアップするか、sleep()でプログラムをループさせることができます。
Calendar c = Calendar.getInstance();
Date now = new Date();
Date start = c.add(Calendar.DATE, -1).getTIme();
c.add(windowsize, 1);
windowAggregate(store, "avg(*)", start, now, Calendar.HOUR);
まとめ
ここまでで紹介したように、GridDBを使えば簡単に時間ウィンドウの集計を行うことができます。このブログの全ソースコードは、GridDB.netのGitHubページに掲載されています。
git clone --branch time-bucketing https://github.com/griddbnet/Blogs.git
このプロジェクトを実行するには、mavenを使用し、まずはingestを行います。
mvn package exec:java -Dexec.mainClass="net.griddb.tstaxiblog.IngestParquet"
インジェストが完了すると、インジェストされたタクシー利用の数か、パーケットファイルが見つからなかったというエラーが表示されます。また、Parquet Librariesが原因でこのエラーが発生することもあります。
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:144)
at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:165)
at org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner.run(FileSystem.java:3693)
at java.lang.Thread.run(Thread.java:748)
gs_sh と TQL の “select count(*) NYC_TaxiTrips” で、データが正しく取り込まれたことを確認することができます。
そして、分析を実行する方法は以下の通りです。
mvn package exec:java -Dexec.mainClass="net.griddb.tstaxiblog.TaxiQuery"
ブログの内容について疑問や質問がある場合は Q&A サイトである Stack Overflow に質問を投稿しましょう。 GridDB 開発者やエンジニアから速やかな回答が得られるようにするためにも "griddb" タグをつけることをお忘れなく。 https://stackoverflow.com/questions/ask?tags=griddb