Apache BeamのPythonでPostgreSQLからBigQueryに書き込めるか検証。検証したところJDBCをつかいPostgreSQLからBigQueryに書きこむことはできた。ただし、クエリの上書きはできなかった。
すべて「SELECT * FROM TABLE」で実行されてします。以下の条件に該当せずデフォルト値が渡せれているもよう
github.com
BigQueryへの書き込みはうまくいったがまだ実運用は厳しそうだった。PostgreSQLからBigQueryへはPythonのラッパークラスをつかい内部的にはJavaで実装している。
beam.apache.org
ぱっと見は動きそうなので、デバッグできる環境をつくる必要がありそう。PythonからJavaのコードを実行しなくてはいけないので検証がやや大変そう。
カラムの絞りこみだけであればBQに書き込む際スキーマをしぼることは可能だが、クエリでSETオプション等使いたい場合は厳しそうだ。
beam.io.WriteToBigQuery(...,schema=
from apache_beam.io.jdbc import ReadFromJdbc import apache_beam as beam from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.io.gcp.bigquery import WriteToBigQuery from apache_beam.io.gcp.bigquery import BigQuerySource from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json import json class ToBigQueryJsonRows(beam.DoFn): def __init__(self, schema): self._schema = schema # x : BeamSchema_232f6906_dfc6_48a8_873d_f7b5bbc2b88f(id=3) https://github.com/apache/beam/blob/master/sdks/python/apache_beam/typehints/schemas.py#L246 def process(self, x): from apache_beam.io.gcp.bigquery import TableRowJsonCoder from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json import json from apache_beam.io.gcp.internal.clients import bigquery from apache_beam.internal.gcp.json_value import to_json_value schema = parse_table_schema_from_json(json.dumps({"fields": self._schema})) coder = TableRowJsonCoder(schema) x = bigquery.TableRow(f=[bigquery.TableCell(v=to_json_value(e)) for e in x]) row = json.loads(coder.encode(x)) # {'id': 3} print(row) yield row class JdbcToBigQuery(beam.DoFn): def __init__(self, username, password, driver_class_name, query, jdbc_url, project, dataset,table): self._options = PipelineOptions([ "--runner=DirectRunner", "--project=<project_id>", "--temp_location=<gcs_path>", ]) self._username = username self._password = password self._driver_class_name = driver_class_name self._query = query self._jdbc_url = jdbc_url self._project = project self._dataset = dataset self._table = table self._json_schema = [ {"name": "id", "type": "integer", "mode": "NULLABLE"}, {"name": "id2", "type": "integer", "mode": "NULLABLE"}, ] def run(self): with beam.Pipeline(options=self._options) as p: # TODO: filter columns used by self._json_schema until finish to fiw overwrite query # https://issues.apache.org/jira/browse/BEAM-10750 jdbc_results = ( p | 'Read from jdbc' >> ReadFromJdbc( table_name=self._table, driver_class_name=self._driver_class_name, jdbc_url=self._jdbc_url, username=self._username, password=self._password , query=self._query, )) bq_rows = (jdbc_results | 'To Json With Schema' >> beam.ParDo(ToBigQueryJsonRows(schema=self._json_schema))) bq_rows | 'Load To BigQuery' >> beam.io.WriteToBigQuery(f'{self._project}:{self._dataset}.{self._table}',schema=parse_table_schema_from_json(json.dumps({"fields": self._json_schema})),write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED) class PostgresToBigQuery(): def __init__(self): self._username = 'admin' self._password = 'admin' self._driver_class_name = 'org.postgresql.Driver' self._query = "select id from table_id;" self._jdbc_url = 'jdbc:postgresql://localhost:5432/beam' self._project = 'project_id' self._dataset = 'datasest_id' self._table = 'table_id' def test(self): JdbcToBigQuery(self._username, self._password, self._driver_class_name, self._query, self._jdbc_url, self._project, self._dataset,self._table).run() if __name__ == '__main__': PostgresToBigQuery().test()