GridDBでタイムバケットを行う

GridDBに時系列データを取り込み、データセット内の各時間帯(ウィンドウやバケットとも呼ばれる)の集計を確認したい場合、どうしたらよいでしょうか?。GridDBのTIME_SAMPLING関数では、時間加重平均を出すことは可能ですが、各時間帯のカウント値や最小値・最大値を出したい場合はどうでしょうか?例えば、あるカラムの最大値を毎日求めたり、毎時のサンプル数を求めたりしたい場合はどうでしょうか?GridDBのマルチクエリ機能と、プログラミング言語の時刻・日付・カレンダー機能を使えば、以上のことが可能になります。

このブログでは、GridDBのTQL関数TIME_SAMPLEを利用するだけでなく、NYCタクシーデータセットから複数のアグリゲートを取得できる汎用関数を構築する方法について紹介します。

その具体的なファイルは以下の通りです。このファイルを/tmpディレクトリにダウンロードしているのは、インジェストコードがそこから読み込むようにハードコーディングされているためです。他の場所にダウンロードしたい場合は、src/main/java/net/griddb/tstaxiblog/IngestParquet.javaのコード内のファイルパスも変更してください。

$ wget -O /tmp/yellow_tripdata_2021-01.parquet https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2021-01.parquet

前提条件

このプロジェクトを始める前に、GridDBをダウンロード、インストールします。 https://docs.griddb.net/latest/gettingstarted/using-apt/#install-with-deb

また、GridDBをjavaで使用できるようにしておく必要があります。その方法については以下を参照してください。 https://docs.griddb.net/latest/gettingstarted/java/

Parquet形式でのデータ取り込み

以前のNYC Taxi データに関するブログでは、データはCSVで公開されており、解析が容易でした。最近のNY Taxi CommissionはParquet形式でデータを公開しており、利点もある一方で、解析が難しくなっています。

まず、Apache Parquet Library を使用するために必要な多くのライブラリ、parqet-avro, parqet-hadoop, parqet-format, hadoop-common, hadoop-client を含める必要があります。Maven を使用しているので、以下を依存関係に追加します。

<dependency>
    <groupId>org.apache.parquet</groupId>
    <artifactId>parquet-hadoop</artifactId>
    <version>1.9.0</version>
</dependency>
<dependency>
    <groupId>org.apache.parquet</groupId>
    <artifactId>parquet-format</artifactId>
    <version>2.9.0</version>
</dependency>
<dependency>
    <groupId>org.apache.parquet</groupId>
    <artifactId>parquet-avro</artifactId>
        <version>1.9.0</version>
</dependency>
<dependency>
  <groupId>org.apache.hadoop</groupId>
  <artifactId>hadoop-common</artifactId>
  <version>2.10.0</version>
</dependency>
<dependency>
    <groupId>org.apache.hadoop</groupId>
    <artifactId>hadoop-client</artifactId>
    <version>2.10.1</version>
    <scope>provided</scope>
</dependency>

これで、Javaソースでparquetファイルを読み込み、その行のグループを反復処理できるようになりました。

Path path = new Path("yellow_tripdata_2021-01.parquet");
Configuration conf = new Configuration();

try {
    ParquetMetadata readFooter = ParquetFileReader.readFooter(conf, path, ParquetMetadataConverter.NO_FILTER);
    MessageType schema = readFooter.getFileMetaData().getSchema();
    ParquetFileReader r = new ParquetFileReader(conf, path, readFooter);

    PageReadStore pages = null;
    try {
        while (null != (pages = r.readNextRowGroup())) {
            final long rows = pages.getRowCount();
            System.out.println("Number of rows: " + rows);

            final MessageColumnIO columnIO = new ColumnIOFactory().getColumnIO(schema);
            final RecordReader recordReader = columnIO.getRecordReader(pages, new GroupRecordConverter(schema));
            for (int i = 0; i < rows; i++) {
                final Group g = (Group)recordReader.read();
                writeGroup(ts, g);

            }
        }
    } finally {
        r.close();
    }
} catch (IOException e) {
    System.out.println("Error reading parquet file.");
    e.printStackTrace();
}

writeGroup関数は、行のグループごとに呼び出され、実際に書き込みを行い、各行のデータをTaxiTripクラスのインスタンスに取り込み、書き込みを行います。

private static void writeGroup(TimeSeries ts, Group g) throws GSException {

    int fieldCount = g.getType().getFieldCount();
    int valueCount = g.getFieldRepetitionCount(0);
    for (int index = 0; index < valueCount; index++) {
        TaxiTrip r = new TaxiTrip();
        for (int field = 0; field < fieldCount; field++) {
        
            try {
            Type fieldType = g.getType().getType(field);
            String fieldName = fieldType.getName();

            if (fieldType.isPrimitive()) {
                switch(fieldName) {
                    case "tpep_pickup_datetime":
                        r.tpep_pickup_datetime = new Date(g.getLong(field, index)/1000);
                        break;
                    /* .... snip ... */
                    case "fare_amount":
                        r.fare_amount = (float)g.getDouble(field, index);
                        break;
                    /* .... snip .... */
                    default:
                        System.out.println("Unknown field: "+fieldName+" value="+g.getValueToString(field, index));
                }
            } 
            } catch (Exception e) {
            }
        }
        ts.put(r);
    }
}

GridDB’s TIME_SAMPLING 機能

GridDB の TIME_SAMPLING 関数は、各ウィンドウ内のカラムの値を線形補間して返します。この関数のシグネチャは以下の通りです。

TIME_SAMPLING(column, start, end, window_count, window_size)

この関数は、各リクエストのタイムスタンプに対して、その前後の行を使用して補間値を求めることで動作します。次の図は、その概念を視覚化したものです。

詳細は、GridDB プログラミングガイドに記載されています。

このコードでは、クエリを実行し、返された各ウィンドウを繰り返し処理します。

public static void timeSampling(GridStore store, String column, Date start, Date end, String windowstr) throws GSException {

    TimeZone tz = TimeZone.getTimeZone("UTC");
    DateFormat df = new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:00'Z'"); 
    df.setTimeZone(tz);

    Container ts = store.getContainer("NYC_TaxiTrips");
    String querystr = "select time_sampling("+column+", TIMESTAMP('"+df.format(start)+"'), TIMESTAMP('"+df.format(end) +"') , 1, "+windowstr+") "; 

    Query<row> q = ts.query(querystr);
    RowSet</row><row> rs = q.fetch();
    while (rs.hasNext()) {
        Row result = rs.next();
        System.out.println(result.getTimestamp(0)+"="+result.getDouble(10));
    }

}</row>

timeSampling(store, "fare_amount", start, end, "DAY");を呼び出すと、次のような値が得られます。

Window Sample
Fri Jan 01 00:00:00 GMT 2021 14.833333015441895
Sat Jan 02 00:00:00 GMT 2021 8.55555534362793
Sun Jan 03 00:00:00 GMT 2021 15.710000038146973
Mon Jan 04 00:00:00 GMT 2021 25.31818199157715
Tue Jan 05 00:00:00 GMT 2021 22.024999618530273
Wed Jan 06 00:00:00 GMT 2021 9.0
Thu Jan 07 00:00:00 GMT 2021 43.5
Fri Jan 08 00:00:00 GMT 2021 6.0
Sat Jan 09 00:00:00 GMT 2021 28.950000762939453
Sun Jan 10 00:00:00 GMT 2021 18.450000762939453
Mon Jan 11 00:00:00 GMT 2021 41.75
Tue Jan 12 00:00:00 GMT 2021 6.5
Wed Jan 13 00:00:00 GMT 2021 17.710525512695312
Thu Jan 14 00:00:00 GMT 2021 10.8125
Fri Jan 15 00:00:00 GMT 2021 52.0
Sat Jan 16 00:00:00 GMT 2021 28.200000762939453
Sun Jan 17 00:00:00 GMT 2021 9.977272987365723
Mon Jan 18 00:00:00 GMT 2021 9.100000381469727
Tue Jan 19 00:00:00 GMT 2021 25.66666603088379
Wed Jan 20 00:00:00 GMT 2021 34.349998474121094
Thu Jan 21 00:00:00 GMT 2021 5.5
Fri Jan 22 00:00:00 GMT 2021 7.472727298736572
Sat Jan 23 00:00:00 GMT 2021 10.5
Sun Jan 24 00:00:00 GMT 2021 17.0
Mon Jan 25 00:00:00 GMT 2021 21.0
Tue Jan 26 00:00:00 GMT 2021 15.852941513061523
Wed Jan 27 00:00:00 GMT 2021 7.984375
Thu Jan 28 00:00:00 GMT 2021 42.83333206176758
Fri Jan 29 00:00:00 GMT 2021 9.083333015441895
Sat Jan 30 00:00:00 GMT 2021 11.892857551574707
Sun Jan 31 00:00:00 GMT 2021 14.371428489685059

TIME_SAMPLINGは便利ですが、各ウィンドウの線形補間を返すだけで、それぞれの平均値は一般的に(中央値が似ているように)似ているものの真の平均ではないので、使用にはいくつかの制限があります。同様に、時間ウィンドウの行数、最小値、最大値を求めることはできません。

マルチクエリによるウィンドウズ集計

時系列ウィンドウのセットに対して任意の集計を見つけるには、GridDB のマルチクエリ関数を使用することができます。これは、各タイムウィンドウに対して指定された集計を個別に実行するものです。次の図は、各ウィンドウに対するクエリを示しています。

このような処理を行うと遅くなるように思えるかもしれませんが、マルチクエリを使用することで、GridDBへのクエリの送信とデータ取得を最適化することができます。データベースは、この関数がビルトインされている場合と同じ処理を実行します。

汎用ウィンドウ集約機能の最初のステップとして、コンテナを開いて各ウィンドウのクエリを作成し、GridDBで実行されるクエリリストに追加します。各クエリは、Java の Calendar クラスを使用し、Calendar.HOUR や Calendar.DATE などの指定された Calendar ENUM 値で間隔をインクリメントすることで構築されています。

public static void windowAggregation(GridStore store, String aggregation, Date start, Date end, int windowsize) throws GSException {
    Calendar c = Calendar.getInstance();
    ArrayList<Query<aggregationresult>> queryList = new ArrayList<Query</aggregationresult><aggregationresult>>();
    ArrayList<date> dates = new ArrayList</date><date>();
    Container ts = store.getContainer("NYC_TaxiTrips");

    c.setTime(start);
    Date interval = c.getTime();

    do {
        c.add(windowsize, 1);
        System.out.println("interval="+interval);
        String windowquery = "select "+aggregation+" where tpep_pickup_datetime > TO_TIMESTAMP_MS("+interval.getTime()+") and tpep_pickup_datetime < TO_TIMESTAMP_MS("+c.getTime().getTime()+")"; 
        Query<aggregationresult> q = ts.query(windowquery, AggregationResult.class);
        dates.add(interval);
        queryList.add(q);
        interval = c.getTime();
    } while (interval.getTime() <= end.getTime());
</aggregationresult></date></aggregationresult>

クエリのリストができたら、実行し、結果を繰り返し表示することができます。

    store.fetchAll(queryList);

    for (int i = 0; i < queryList.size(); i++) {
        Query<aggregationresult> query = queryList.get(i);
        RowSet</aggregationresult><aggregationresult> rs = query.getRowSet();
        while (rs.hasNext()) {
            AggregationResult result = rs.next();
            double value = result.getDouble();
            if (value != 0)
                System.out.println(dates.get(i)+"= "+ value);
        }
    }


}
</aggregationresult>

windowAggregate(store, "avg(fare_amount)", start, end, Calendar.DATE); を呼び出すと、以下のような結果が得られます。

Window avg(fare_amount)
Fri Jan 01 00:00:00 GMT 2021 13.851391098873778
Sat Jan 02 00:00:00 GMT 2021 13.770499047323876
Sun Jan 03 00:00:00 GMT 2021 15.249551398785178
Mon Jan 04 00:00:00 GMT 2021 13.473628681590766
Tue Jan 05 00:00:00 GMT 2021 12.676972567160476
Wed Jan 06 00:00:00 GMT 2021 12.243332028212064
Thu Jan 07 00:00:00 GMT 2021 12.464050631310585
Fri Jan 08 00:00:00 GMT 2021 12.091422930601727
Sat Jan 09 00:00:00 GMT 2021 12.537748556275371
Sun Jan 10 00:00:00 GMT 2021 13.496787364093032
Mon Jan 11 00:00:00 GMT 2021 12.350597800281259
Tue Jan 12 00:00:00 GMT 2021 12.189052448708432
Wed Jan 13 00:00:00 GMT 2021 11.970959482137403
Thu Jan 14 00:00:00 GMT 2021 12.338237139653735
Fri Jan 15 00:00:00 GMT 2021 12.24978189382601
Sat Jan 16 00:00:00 GMT 2021 12.348960280360648
Sun Jan 17 00:00:00 GMT 2021 12.987822185285506
Mon Jan 18 00:00:00 GMT 2021 12.66304436182826
Tue Jan 19 00:00:00 GMT 2021 12.19362357336725
Wed Jan 20 00:00:00 GMT 2021 11.760170207452589
Thu Jan 21 00:00:00 GMT 2021 12.153647415269047
Fri Jan 22 00:00:00 GMT 2021 12.026541242751565
Sat Jan 23 00:00:00 GMT 2021 11.73361113486781
Sun Jan 24 00:00:00 GMT 2021 12.876742517934895
Mon Jan 25 00:00:00 GMT 2021 12.229045667216711
Tue Jan 26 00:00:00 GMT 2021 11.716670845330835
Wed Jan 27 00:00:00 GMT 2021 11.823579132291002
Thu Jan 28 00:00:00 GMT 2021 11.842199614451049
Fri Jan 29 00:00:00 GMT 2021 11.784085302551507
Sat Jan 30 00:00:00 GMT 2021 11.843539460445053
Sun Jan 31 00:00:00 GMT 2021 12.406643581985854

また、windowAggregate(store, "count(*)", start, end, Calendar.DATE) を呼ぶことで、行数を確認することができます。

Window COUNT(*)
Fri Jan 01 00:00:00 GMT 2021 20430.0
Sat Jan 02 00:00:00 GMT 2021 25429.0
Sun Jan 03 00:00:00 GMT 2021 20909.0
Mon Jan 04 00:00:00 GMT 2021 30733.0
Tue Jan 05 00:00:00 GMT 2021 31667.0
Wed Jan 06 00:00:00 GMT 2021 32965.0
Thu Jan 07 00:00:00 GMT 2021 33306.0
Fri Jan 08 00:00:00 GMT 2021 33628.0
Sat Jan 09 00:00:00 GMT 2021 28626.0
Sun Jan 10 00:00:00 GMT 2021 23034.0
Mon Jan 11 00:00:00 GMT 2021 31583.0
Tue Jan 12 00:00:00 GMT 2021 33201.0
Wed Jan 13 00:00:00 GMT 2021 33633.0
Thu Jan 14 00:00:00 GMT 2021 34580.0
Fri Jan 15 00:00:00 GMT 2021 34706.0
Sat Jan 16 00:00:00 GMT 2021 28094.0
Sun Jan 17 00:00:00 GMT 2021 24373.0
Mon Jan 18 00:00:00 GMT 2021 26830.0
Tue Jan 19 00:00:00 GMT 2021 33525.0
Wed Jan 20 00:00:00 GMT 2021 32902.0
Thu Jan 21 00:00:00 GMT 2021 34652.0
Fri Jan 22 00:00:00 GMT 2021 35478.0
Sat Jan 23 00:00:00 GMT 2021 29830.0
Sun Jan 24 00:00:00 GMT 2021 23776.0
Mon Jan 25 00:00:00 GMT 2021 32064.0
Tue Jan 26 00:00:00 GMT 2021 31969.0
Wed Jan 27 00:00:00 GMT 2021 34148.0
Thu Jan 28 00:00:00 GMT 2021 35670.0
Fri Jan 29 00:00:00 GMT 2021 35219.0
Sat Jan 30 00:00:00 GMT 2021 28349.0
Sun Jan 31 00:00:00 GMT 2021 23674.0

もし、毎日ウィンドウズクエリを実行して時間ごとの集計をしたい場合は、cronjobをセットアップするか、sleep()でプログラムをループさせることができます。

Calendar c = Calendar.getInstance();
Date now = new Date();
Date start = c.add(Calendar.DATE, -1).getTIme(); 
c.add(windowsize, 1);
windowAggregate(store, "avg(*)", start, now, Calendar.HOUR);

まとめ

ここまでで紹介したように、GridDBを使えば簡単に時間ウィンドウの集計を行うことができます。このブログの全ソースコードは、GridDB.netのGitHubページに掲載されています。

git clone --branch time-bucketing https://github.com/griddbnet/Blogs.git

このプロジェクトを実行するには、mavenを使用し、まずはingestを行います。

mvn package exec:java -Dexec.mainClass="net.griddb.tstaxiblog.IngestParquet"

インジェストが完了すると、インジェストされたタクシー利用の数か、パーケットファイルが見つからなかったというエラーが表示されます。また、Parquet Librariesが原因でこのエラーが発生することもあります。

java.lang.InterruptedException
    at java.lang.Object.wait(Native Method)
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:144)
    at java.lang.ref.ReferenceQueue.remove(ReferenceQueue.java:165)
    at org.apache.hadoop.fs.FileSystem$Statistics$StatisticsDataReferenceCleaner.run(FileSystem.java:3693)
    at java.lang.Thread.run(Thread.java:748)

gs_sh と TQL の “select count(*) NYC_TaxiTrips” で、データが正しく取り込まれたことを確認することができます。

そして、分析を実行する方法は以下の通りです。

mvn package exec:java -Dexec.mainClass="net.griddb.tstaxiblog.TaxiQuery"

ブログの内容について疑問や質問がある場合は Q&A サイトである Stack Overflow に質問を投稿しましょう。 GridDB 開発者やエンジニアから速やかな回答が得られるようにするためにも "griddb" タグをつけることをお忘れなく。 https://stackoverflow.com/questions/ask?tags=griddb

Leave a Reply

Your email address will not be published. Required fields are marked *