case-kの備忘録

日々の備忘録です。データ分析とか基盤系に興味あります。

DataflowでBigQueryのインサート時刻を付与する

DataflowでBigQueryのインサート時刻を書き込みたい場合は以下のようにして付与します。

 .withFormatFunction((TableRow elem) -> elem.set("bigquery_insert_time", Instant.now().toString()))

Dataflowのdynamic destinationを使う場合

    WriteResult writeResult = convertedTableRows.get(TRANSFORM_OUT)
    .apply(
        // Out put each table depends on the table name
        BigQueryIO.<TableRow>write()
            .to(
                new DynamicDestinations<TableRow, String>() {
                @Override
                public String getDestination(ValueInSingleWindow<TableRow> elem) {
                    return elem.getValue().get("table_name").toString();
                }

                @Override
                public TableDestination getTable(String destination) {
                    return new TableDestination(
                        new TableReference()
                            .setProjectId(projectID)
                            .setDatasetId("<dataset name>")
                            .setTableId("<table name>" + "_" + destination),
                        "destination table" + destination);
                }
                @Override
                public TableSchema getSchema(String destination) {
                    TableSchema schema = schemaCreater(new TableSchema(), destination);
                    return schema;
                }
            })
            .withFormatFunction((TableRow elem) -> elem.set("bigquery_insert_time", Instant.now().toString()))
            .withoutValidation()
            .withCreateDisposition(CreateDisposition.CREATE_NEVER)
            .withWriteDisposition(WriteDisposition.WRITE_APPEND)
            .withExtendedErrorInfo()
            .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS)
            .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));