備忘録としてFluentdのInputプラグインの作り方を残しておきます。
作るプラグインについて
SQL Serverからデータを取り出し、取得結果をGCPのCloud PubSubに送ります。今回はデータを取り出す際必要となるSQL ServerのInputプラグインを作ってみました。
Cloud PubSubのOutputプラグインは次のプラグインを使いました。
GitHub - mia-0032/fluent-plugin-gcloud-pubsub-custom: Google Cloud Pub/Sub input/output plugin for Fluentd event collector
Dockerfile
Dockerfileは次のようにしています。
# Dockerfile FROM ruby:2.7 USER root RUN apt-get update && apt-get install -y \ freetds-dev RUN tiny_tds -v 2.1.2 RUN gem install \ freetds-dev \ fluentd \ luent-plugin-gcloud-pubsub-custom
自作プラグインをつくる
SQLServerからデータを取得するインプットプラグインをつくります。fluent-plugin-generateコマンドを使いプラグインのテンプレートを自動生成します。
fluent-plugin-generate input sql-server
実行すると次のディレクトリ構成のファイルが生成されます。
├── fluent-plugin-sql-server
│ ├── Gemfile
│ ├── LICENSE
│ ├── README.md
│ ├── Rakefile
│ ├── fluent-plugin-sql-server.gemspec
│ ├── lib
│ │ └── fluent
│ │ └── plugin
│ │ └── in_sql_server.rb
│ └── test
│ ├── helper.rb
│ └── plugin
│ └── test_in_sql_server.rb
プラグインの実装サンプル
Inputプラグインの実装サンプルは次のようにしてます。生成された「 in_sql_server.rb」に処理を書きます。60秒ごとに定期的に実行させます。
# in_sql_server.rb require 'fluent/plugin/input' require 'tiny_tds' module Fluent module Plugin class SqlServerInput < Input Fluent::Plugin.register_input('sql_server', self) def configure(conf) super @username = conf['username'] @password = conf['password'] @host = conf['host'] @port = conf['port'] @databasename = conf['databasename'] @tablename = conf['tablename'] @time_interval = conf['time_interval'].to_i end def start super client = TinyTds::Client.new username: @username, password: @password, host: @host, port: @port, database: @databasename, azure: false tag = "example.publish" time = Fluent::Engine.now es = MultiEventStream.new loop do results = client.execute("SELECT TOP 10 * FROM #{@tablename}") results.each do |row| es.add(time, row) puts row end router.emit_stream(tag, es) sleep @time_interval end end end end end
参考
RubyでSQL Serverに接続する(TinyTDS) - Qiita
https://docs.fluentd.org/plugin-development/api-plugin-input
fluentdのpluginを作成する (fluent-plugin-generateコマンド利用) - Qiita
設定ファイル
設定ファイルをつくります。今回はSQLServerにクエリを投げて取得結果をPubSubに投げたいので次のようにします。
Inputプラグインはsourceで囲まれた箇所となります。
# fluent/fluent.conf <source> @type sql_server username '' password '' host 'sqlserver' port 1433 databasename 'TestDB' tablename 'TestTb' time_interval 60 </source> <match example.publish> @type gcloud_pubsub project <project id> key /usr/src/app/fluent/<service account \>.json topic projects/<project id >/topics/fluentd-topic autocreate_topic false max_messages 1000 max_total_size 9800000 max_message_size 4000000 <buffer> @type memory flush_interval 1s </buffer> <format> @type json </format> </match>
起動
fluentdを起動させます。
fluentd -c ./fluent/fluent.conf -p /usr/src/app/fluent-plugin-sql-server/lib/fluent/plugin
動作確認
PubSubにメッセージが届いていることを確認します。