case-kの備忘録

日々の備忘録です。データ分析とか基盤系に興味あります。

DataflowでKinesisを扱う際の注意点

この記事はZOZO Advent Calendar 21日目の記事です。

qiita.com


DataflowでCloud Pub/Sub からKinesisへ書き込む処理とKinesisからBigQueryへ書き込む処理を作りました。本記事ではDataflowでKinesisを扱う際の注意点をご紹介できたらと思います。

github.com

プロデューサーとコンシューマーの言語がKPL対応しているか

Kinesisへのデータ送信側をプロデューサー、データ処理側をコンシューマーと呼びます。KPLはAmazon Kinesis Streamsにデータ送信するためのライブラリで、スループットの向上や複数レコードの書き込みが可能となります。
docs.aws.amazon.com
プロデューサーをJavaで作っている場合KPLを利用できますが、Java以外ではKPLを利用できません。
プロデューサーもコンシューマーもJavaの場合、KCLのコードでKPLでaggregationされたレコードをdeaggregationする処理が含まれており、KPLでaggregationされたレコードの参照が可能です。一方で、プロデューサーがJavaでコンシューマーがJava以外の言語だと集約を無効にする必要があります。Javaで実装したDataflowのプロデューサーはKPL対応しているため、複数レコードをaggregationしてKinesisに書き込みます。

docs.aws.amazon.com

Dataflow(java)で作ったプロデューサー(Cloud Pub/Sub to Kinesis)とコンシューマー(Kinesis To BigQuery)の動作確認はしていました。しかし、Kinesis利用者がJava以外のコンシューマーを利用していたため、次のようにKPLでaggregationされたデータを参照できませんでした。

[{'SequenceNumber': '49622473968815949086389767786279735597341342139024408578', 'ApproximateArrivalTimestamp': datetime.datetime(2021, 9, 29, 1, 6, 54, 337000, tzinfo=tzlocal()), 'Data': b'\xf3\x89\x9a\xc2\n\x011\x1a\xbf\x04\x08\x00\x1a\xba\x04{"massage_unique_id":"xxxxx"}\n\x1a\xc0\x04\x08\x00\x1a\xbb\x04{"massage_unique_id":"xxxxx"}\n\xa2)W\x93f\xd5\x1d`\xa9\x84d\xae\x19\x08q\xdc', 'PartitionKey': 'a'}]

>The Amazon Kinesis Producer Library (KPL) aggregates multiple logical user records into a >single Amazon Kinesis record for efficient puts.
github.com

プロデューサーとコンシューマーの言語がKPL対応しているか注意する必要があります。

検証用にPythonでコンシューマを作ったところdeaggregationできないことも再現できました。対応としては利用者のコンシューマーをJavaで実装してもらいました。

検証用のpythonコンシューマー

import boto3
import time

client = boto3.client('kinesis')

stream_name = ''
stream = client.describe_stream(StreamName=stream_name)
print(f'stream: {stream}')
shards = stream['StreamDescription']['Shards'][0]['ShardId']
print(f'shards: {shards}')

kinesis_iterator = client.get_shard_iterator(StreamName=stream_name,ShardId=shards,ShardIteratorType='LATEST')
print(f'kinesis_iterator: {kinesis_iterator}')

next_iterator = None
while True:
    if next_iterator is None:
        next_iterator = kinesis_iterator['ShardIterator']
    else:
        next_iterator = responce['NextShardIterator']
    responce = None
    responce = client.get_records(ShardIterator=next_iterator,Limit=1)
    print(responce['Records'])
    time.sleep(1)

DataflowプロデューサーのデフォルトRecordTtlは30秒

Kinesisで「UserRecordExpired」が発生して書き込みに失敗してしまうことがありました。書き込みに失敗した場合Dataflowはリトライしますが、Cloud Pub/Subのメッセージ保持期間を過ぎてしまいデータが欠損していました。

if (ur->attempts().back().error_code() == "Expired") {
  put(Names::UserRecordExpired, 1);
}

github.com

原因としてはKPL の RecordTtl の設定値に達した User Record が expire していたためでした。DataflowプロデューサーのデフォルトRecordTtlは30秒に設定されています。

# Default: 30000
# Minimum: 100
# Maximum (inclusive): 9223372036854775807
RecordTtl = 30000

github.com

対応としてDataflowで書き込む際、RecordTtlを設定するようにしました。

Properties properties = new Properties();
properties.setProperty("RecordTtl", "3600000");
Pipeline pipeline = Pipeline.create(options);
pipeline
    .apply(
        "Read PubSub Events",
        PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription()))
    .apply(
        "Prepare Kinesis input records",
        ParDo.of(new ConvertToBytes()))
    .apply(
        "Write Kinesis Events",
        KinesisIO.write()
        .withStreamName(options.getAwsKinesisStream().get())
        .withPartitionKey(options.getAwsKinesisPartitionKey().get())
        .withAWSClientsProvider(
          options.getAwsAccessKey().get(),
          options.getAwsSecretKey().get(),
          Regions.fromName(options.getAwsKinesisRegion().get()
          )
        )
        .withProducerProperties(properties));

以上DataflowでKinesisを扱う際の注意点となります。同じようなことを検討している方の参考になれば幸いです。