DataflowでBigQueryのインサート時刻を書き込みたい場合は以下のようにして付与します。
.withFormatFunction((TableRow elem) -> elem.set("bigquery_insert_time", Instant.now().toString()))
Dataflowのdynamic destinationを使う場合
WriteResult writeResult = convertedTableRows.get(TRANSFORM_OUT) .apply( // Out put each table depends on the table name BigQueryIO.<TableRow>write() .to( new DynamicDestinations<TableRow, String>() { @Override public String getDestination(ValueInSingleWindow<TableRow> elem) { return elem.getValue().get("table_name").toString(); } @Override public TableDestination getTable(String destination) { return new TableDestination( new TableReference() .setProjectId(projectID) .setDatasetId("<dataset name>") .setTableId("<table name>" + "_" + destination), "destination table" + destination); } @Override public TableSchema getSchema(String destination) { TableSchema schema = schemaCreater(new TableSchema(), destination); return schema; } }) .withFormatFunction((TableRow elem) -> elem.set("bigquery_insert_time", Instant.now().toString())) .withoutValidation() .withCreateDisposition(CreateDisposition.CREATE_NEVER) .withWriteDisposition(WriteDisposition.WRITE_APPEND) .withExtendedErrorInfo() .withMethod(BigQueryIO.Write.Method.STREAMING_INSERTS) .withFailedInsertRetryPolicy(InsertRetryPolicy.retryTransientErrors()));