case-kの備忘録

日々の備忘録です。データ分析とか基盤系に興味あります。

Apache Beam Python PostgreSQL To BigQuery検証

Apache BeamのPythonPostgreSQLからBigQueryに書き込めるか検証。検証したところJDBCをつかいPostgreSQLからBigQueryに書きこむことはできた。ただし、クエリの上書きはできなかった。
すべて「SELECT * FROM TABLE」で実行されてします。以下の条件に該当せずデフォルト値が渡せれているもよう
github.com


BigQueryへの書き込みはうまくいったがまだ実運用は厳しそうだった。PostgreSQLからBigQueryへはPythonのラッパークラスをつかい内部的にはJavaで実装している。
beam.apache.org
ぱっと見は動きそうなので、デバッグできる環境をつくる必要がありそう。PythonからJavaのコードを実行しなくてはいけないので検証がやや大変そう。


カラムの絞りこみだけであればBQに書き込む際スキーマをしぼることは可能だが、クエリでSETオプション等使いたい場合は厳しそうだ。

beam.io.WriteToBigQuery(...,schema=
from apache_beam.io.jdbc import ReadFromJdbc
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.io.gcp.bigquery import BigQuerySource
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
import json

class ToBigQueryJsonRows(beam.DoFn):
    def __init__(self, schema):
        self._schema = schema

    # x : BeamSchema_232f6906_dfc6_48a8_873d_f7b5bbc2b88f(id=3) https://github.com/apache/beam/blob/master/sdks/python/apache_beam/typehints/schemas.py#L246
    def process(self, x):
        from apache_beam.io.gcp.bigquery import TableRowJsonCoder
        from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
        import json
        from apache_beam.io.gcp.internal.clients import bigquery
        from apache_beam.internal.gcp.json_value import to_json_value
        schema = parse_table_schema_from_json(json.dumps({"fields": self._schema}))
        coder = TableRowJsonCoder(schema)
        x = bigquery.TableRow(f=[bigquery.TableCell(v=to_json_value(e)) for e in x])
        row = json.loads(coder.encode(x)) # {'id': 3}
        print(row)
        yield row

class JdbcToBigQuery(beam.DoFn):

    def __init__(self, username, password, driver_class_name, query, jdbc_url, project, dataset,table):
        self._options = PipelineOptions([
            "--runner=DirectRunner",
            "--project=<project_id>",
            "--temp_location=<gcs_path>",
        ])
        self._username = username
        self._password = password
        self._driver_class_name = driver_class_name
        self._query = query
        self._jdbc_url = jdbc_url

        self._project = project
        self._dataset = dataset
        self._table = table
        self._json_schema = [
            {"name": "id", "type": "integer", "mode": "NULLABLE"},
            {"name": "id2", "type": "integer", "mode": "NULLABLE"},
        ]

    def run(self):
        with beam.Pipeline(options=self._options) as p:
            # TODO: filter columns used by self._json_schema until finish to fiw overwrite query
            # https://issues.apache.org/jira/browse/BEAM-10750
            jdbc_results = ( p | 'Read from jdbc' >> ReadFromJdbc(
                table_name=self._table,
                driver_class_name=self._driver_class_name,
                jdbc_url=self._jdbc_url,
                username=self._username,
                password=self._password ,
                query=self._query,
            ))
            bq_rows = (jdbc_results | 'To Json With Schema' >> beam.ParDo(ToBigQueryJsonRows(schema=self._json_schema)))
            bq_rows | 'Load To BigQuery' >> beam.io.WriteToBigQuery(f'{self._project}:{self._dataset}.{self._table}',schema=parse_table_schema_from_json(json.dumps({"fields": self._json_schema})),write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)

class PostgresToBigQuery():

    def __init__(self):
        self._username = 'admin'
        self._password = 'admin'
        self._driver_class_name = 'org.postgresql.Driver'
        self._query = "select id from table_id;"
        self._jdbc_url = 'jdbc:postgresql://localhost:5432/beam'
        self._project = 'project_id'
        self._dataset = 'datasest_id'
        self._table = 'table_id'

    def test(self):
        JdbcToBigQuery(self._username, self._password, self._driver_class_name, self._query, self._jdbc_url, self._project, self._dataset,self._table).run()

if __name__ == '__main__':
  PostgresToBigQuery().test()