Logstash Output Pluginを使ってGridDBデータベースにSyslogメッセージを送信する

このブログでは、GridDB Cloudコンテナにsyslog(やその他)のログレコードを書き込めるLogstash用GridDB Outputプラグインの使い方について紹介します。Logstashは、サーバーサイドのログファイルパーサで、任意のデータベースにメトリクスを格納することができます。ELK-stackの一部として、Elasticsearch(データをためる)、Kibana(データを可視化する)と一緒に使われることが多いようです。ElasticsearchをGridDBに置き換えることで、GridDBの高いパフォーマンスを活用し、ログ監視インフラの運用コストを下げることができます。

GridDB Logstash プラグインの機能を紹介するために、このブログではローカルサーバの Syslog デーモンによって生成された失敗した SSH ログインメッセージを Logstash が GridDB Cloud に格納するためのフィルターをインストールし設定します。その後に、再びGridDB Cloudを使用して、シンプルなSSH侵入検知システムのクエリを構築します。

インストール

GridDB Logstash プラグインを使用するには、CentOS 7.9 (x64)、Ruby v2.4+、Logstash 7.15+が必要となります。

まず、RVMとRubyをインストールします。

$ curl -sSL https://rvm.io/mpapis.asc | gpg --import -
$ curl -sSL https://rvm.io/pkuczynski.asc | gpg --import -
$ curl -L get.rvm.io | bash -s stable
$ source ~/.rvm/scripts/rvm
$ rvm reload
$ rvm install 2.5.3

次に、LogstashのYUM Repoをセットアップして、Logstashをインストールします。

$ cat << EOF | tee /etc/yum.repos.d/logstash.repo
[logstash-7.x] 
name=Elastic repository for 7.x packages 
baseurl=https://artifacts.elastic.co/packages/7.x/yum 
gpgcheck=1 
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch 
enabled=1 
autorefresh=1 
type=rpm-md
EOF

$ yum install logstash

次に、GridDB Logstashプラグインのビルドとインストールを行います。

$ cd /path/to/logstash-output-griddb
$ gem build logstash-output-griddb.gemspec
$ /usr/share/logstash/bin/logstash-plugin install logstash-output-griddb-1.0.0.gem\

これで、GridDB outputプラグインでLogstashを使う準備が整いました。

基本的な使い方

まず、inputとoutputの両方のセクションを持つ設定ファイル /etc/logstash/mylogstash.conf を作成します。Inputセクションでは、Logstashが単に/var/log/secureを読み、それをsyslog出力としてパースするように設定します。これにより、すべてのログインを追跡することができるようになります。

input { file { path => “/var/log/secure” type => “syslog” } }

output { griddb { host => “https://cloud1.griddb.com/trial1234” cluster => “gs_clustertrial1234” database => “public” container => “secure_syslog”
username => “logstash” password => “” insert_mode => “append” } }. }  Outputセクションでは、GridDB出力プラグインを設定し、GridDBクラウドアカウントに書き込むようにします。既存のコンテナがある場合は、それに追加する append モードを使用します。また、Logstashの再起動毎にコンテナを削除・作成するReplaceモードもあります。

GridDB CloudポータルでLogstashユーザを作成します。

ここで、ConfigファイルからLogstashを起動します。 $ sudo /usr/share/logstash/bin/logstash -f /etc/logstash/mylogstash.conf –path.settings /etc/logstash

これで、ユーザがログインしたり、sudoを使用した時に、セキュアログメッセージがGridDBに書き込まれるようになりました。

フィルタリング、Grok、 Mutateについて

セキュアログにはSSHDのメッセージ以外にも、以下のようなフィルタを追加することで、SSH以外のメッセージをフィルタリングすることができます。

filter {
    if ([message] !~ "sshd") {
        drop { }
    }
} 

ログインに失敗したときだけ追跡したい場合は次のように行います。ログインに失敗すると、”Connection closed by $remotehost port $port [preauth]” というエントリがセキュアログに追加されます。そのため、まずは “Connection closed by” と “[preauth]” を含んでいないメッセージをすべてフィルタリングします。

    if ([message] !~ "[preauth]") {
        drop { }
    }
    if ([message] !~ "Connection closed by") {
        drop { }
    }
 

それから、remhost フィールドに追加されたリモートホストを取得するために、ログエントリをGrokします。このブロックもフィルターブロックに到達します。Grokは非常に強力なので、その機能をこちらで知っておくと良いでしょう。

    grok { 
        match => { "message" => "closed by %{IPORHOST:remhost} port "  }
    }

ここで、不要なフィールドを削除するためにmutateを使用します。Mutateはこのように様々な方法でフィールドを変更することができます。

    mutate {
        remove_field => ["path", "type", "version"]
    }

ここで、クエリに集約関数を使用して、異なるリモートホストからのログイン失敗回数を数えることができるようになります。

このクエリのoutputを取ると、ログインに失敗した回数が最も多いリモートホストがわかります。ファイアウォールにルールを追加して、このアタッカーがSSHサーバーに接続できないように設定することができます。ぜひこの優れた機能を使ってみてください。

ブログの内容について疑問や質問がある場合は Q&A サイトである Stack Overflow に質問を投稿しましょう。 GridDB 開発者やエンジニアから速やかな回答が得られるようにするためにも "griddb" タグをつけることをお忘れなく。 https://stackoverflow.com/questions/ask?tags=griddb

Leave a Reply

Your email address will not be published. Required fields are marked *