この記事はZOZO Advent Calendar 21日目の記事です。
DataflowでCloud Pub/Sub からKinesisへ書き込む処理とKinesisからBigQueryへ書き込む処理を作りました。本記事ではDataflowでKinesisを扱う際の注意点をご紹介できたらと思います。
プロデューサーとコンシューマーの言語がKPL対応しているか
Kinesisへのデータ送信側をプロデューサー、データ処理側をコンシューマーと呼びます。KPLはAmazon Kinesis Streamsにデータ送信するためのライブラリで、スループットの向上や複数レコードの書き込みが可能となります。
docs.aws.amazon.com
プロデューサーをJavaで作っている場合KPLを利用できますが、Java以外ではKPLを利用できません。
プロデューサーもコンシューマーもJavaの場合、KCLのコードでKPLでaggregationされたレコードをdeaggregationする処理が含まれており、KPLでaggregationされたレコードの参照が可能です。一方で、プロデューサーがJavaでコンシューマーがJava以外の言語だと集約を無効にする必要があります。Javaで実装したDataflowのプロデューサーはKPL対応しているため、複数レコードをaggregationしてKinesisに書き込みます。
Dataflow(java)で作ったプロデューサー(Cloud Pub/Sub to Kinesis)とコンシューマー(Kinesis To BigQuery)の動作確認はしていました。しかし、Kinesis利用者がJava以外のコンシューマーを利用していたため、次のようにKPLでaggregationされたデータを参照できませんでした。
[{'SequenceNumber': '49622473968815949086389767786279735597341342139024408578', 'ApproximateArrivalTimestamp': datetime.datetime(2021, 9, 29, 1, 6, 54, 337000, tzinfo=tzlocal()), 'Data': b'\xf3\x89\x9a\xc2\n\x011\x1a\xbf\x04\x08\x00\x1a\xba\x04{"massage_unique_id":"xxxxx"}\n\x1a\xc0\x04\x08\x00\x1a\xbb\x04{"massage_unique_id":"xxxxx"}\n\xa2)W\x93f\xd5\x1d`\xa9\x84d\xae\x19\x08q\xdc', 'PartitionKey': 'a'}]
>The Amazon Kinesis Producer Library (KPL) aggregates multiple logical user records into a >single Amazon Kinesis record for efficient puts.
github.com
プロデューサーとコンシューマーの言語がKPL対応しているか注意する必要があります。
検証用にPythonでコンシューマを作ったところdeaggregationできないことも再現できました。対応としては利用者のコンシューマーをJavaで実装してもらいました。
検証用のpythonコンシューマー
import boto3 import time client = boto3.client('kinesis') stream_name = '' stream = client.describe_stream(StreamName=stream_name) print(f'stream: {stream}') shards = stream['StreamDescription']['Shards'][0]['ShardId'] print(f'shards: {shards}') kinesis_iterator = client.get_shard_iterator(StreamName=stream_name,ShardId=shards,ShardIteratorType='LATEST') print(f'kinesis_iterator: {kinesis_iterator}') next_iterator = None while True: if next_iterator is None: next_iterator = kinesis_iterator['ShardIterator'] else: next_iterator = responce['NextShardIterator'] responce = None responce = client.get_records(ShardIterator=next_iterator,Limit=1) print(responce['Records']) time.sleep(1)
DataflowプロデューサーのデフォルトRecordTtlは30秒
Kinesisで「UserRecordExpired」が発生して書き込みに失敗してしまうことがありました。書き込みに失敗した場合Dataflowはリトライしますが、Cloud Pub/Subのメッセージ保持期間を過ぎてしまいデータが欠損していました。
if (ur->attempts().back().error_code() == "Expired") { put(Names::UserRecordExpired, 1); }
原因としてはKPL の RecordTtl の設定値に達した User Record が expire していたためでした。DataflowプロデューサーのデフォルトRecordTtlは30秒に設定されています。
# Default: 30000 # Minimum: 100 # Maximum (inclusive): 9223372036854775807 RecordTtl = 30000
対応としてDataflowで書き込む際、RecordTtlを設定するようにしました。
Properties properties = new Properties(); properties.setProperty("RecordTtl", "3600000"); Pipeline pipeline = Pipeline.create(options); pipeline .apply( "Read PubSub Events", PubsubIO.readMessagesWithAttributes().fromSubscription(options.getInputSubscription())) .apply( "Prepare Kinesis input records", ParDo.of(new ConvertToBytes())) .apply( "Write Kinesis Events", KinesisIO.write() .withStreamName(options.getAwsKinesisStream().get()) .withPartitionKey(options.getAwsKinesisPartitionKey().get()) .withAWSClientsProvider( options.getAwsAccessKey().get(), options.getAwsSecretKey().get(), Regions.fromName(options.getAwsKinesisRegion().get() ) ) .withProducerProperties(properties));
以上DataflowでKinesisを扱う際の注意点となります。同じようなことを検討している方の参考になれば幸いです。