case-kの備忘録

日々の備忘録です。データ分析とか基盤系に興味あります。

FluentdのInputプラグインを作って、定期的にSQLServerからPubSubにデータを転送してみる

備忘録としてFluentdのInputプラグインの作り方を残しておきます。

作るプラグインについて

SQL Serverからデータを取り出し、取得結果をGCPのCloud PubSubに送ります。今回はデータを取り出す際必要となるSQL ServerのInputプラグインを作ってみました。
Cloud PubSubのOutputプラグインは次のプラグインを使いました。
GitHub - mia-0032/fluent-plugin-gcloud-pubsub-custom: Google Cloud Pub/Sub input/output plugin for Fluentd event collector

Dockerfile

Dockerfileは次のようにしています。

# Dockerfile
FROM ruby:2.7
USER root
RUN apt-get update && apt-get install -y \
  freetds-dev 
RUN tiny_tds -v 2.1.2
RUN gem install  \
  freetds-dev  \
  fluentd \
  luent-plugin-gcloud-pubsub-custom

自作プラグインをつくる

SQLServerからデータを取得するインプットプラグインをつくります。fluent-plugin-generateコマンドを使いプラグインのテンプレートを自動生成します。

fluent-plugin-generate  input sql-server

実行すると次のディレクトリ構成のファイルが生成されます。

├── fluent-plugin-sql-server
│   ├── Gemfile
│   ├── LICENSE
│   ├── README.md
│   ├── Rakefile
│   ├── fluent-plugin-sql-server.gemspec
│   ├── lib
│   │   └── fluent
│   │       └── plugin
│   │           └── in_sql_server.rb
│   └── test
│       ├── helper.rb
│       └── plugin
│           └── test_in_sql_server.rb

プラグインの実装サンプル

Inputプラグインの実装サンプルは次のようにしてます。生成された「 in_sql_server.rb」に処理を書きます。60秒ごとに定期的に実行させます。

#  in_sql_server.rb
require 'fluent/plugin/input'
require 'tiny_tds'

module Fluent
  module Plugin
    class SqlServerInput < Input
      Fluent::Plugin.register_input('sql_server', self)

      def configure(conf)
        super
        @username = conf['username']
        @password = conf['password']
        @host = conf['host']
        @port = conf['port']
        @databasename = conf['databasename']
        @tablename = conf['tablename']
        @time_interval = conf['time_interval'].to_i
      end

       def start
        super
        client = TinyTds::Client.new username: @username, password: @password, host: @host, port: @port, database: @databasename, azure: false
        tag = "example.publish"
        time = Fluent::Engine.now
        es = MultiEventStream.new
        loop do
          results = client.execute("SELECT TOP 10 * FROM #{@tablename}")
          results.each do |row|
            es.add(time, row)
            puts row
          end
          router.emit_stream(tag, es)
          sleep @time_interval
        end
      end
    end
  end
end

参考
RubyでSQL Serverに接続する(TinyTDS) - Qiita
https://docs.fluentd.org/plugin-development/api-plugin-input
fluentdのpluginを作成する (fluent-plugin-generateコマンド利用) - Qiita

設定ファイル

設定ファイルをつくります。今回はSQLServerにクエリを投げて取得結果をPubSubに投げたいので次のようにします。
Inputプラグインはsourceで囲まれた箇所となります。

# fluent/fluent.conf 
<source>
  @type sql_server
  username ''
  password ''
  host 'sqlserver'
  port 1433
  databasename 'TestDB'
  tablename 'TestTb'
  time_interval 60
</source>
<match example.publish>
  @type gcloud_pubsub
  project <project id>
  key /usr/src/app/fluent/<service account \>.json
  topic projects/<project id >/topics/fluentd-topic
  autocreate_topic false
  max_messages 1000
  max_total_size 9800000
  max_message_size 4000000
  <buffer>
    @type memory
    flush_interval 1s
  </buffer>
  <format>
    @type json
  </format>
</match>

起動

fluentdを起動させます。

fluentd -c ./fluent/fluent.conf -p /usr/src/app/fluent-plugin-sql-server/lib/fluent/plugin

動作確認

PubSubにメッセージが届いていることを確認します。
f:id:casekblog:20200528185546p:plain

所感

プラグイン結構簡単に作れてしまうもんなんですね。既存のもの使う想定でしたが、自作の方がメンテもできて良い気がしました。
Cloud Pub/SubのOutputプラグインもつくりたいと思います。