case-kの備忘録

日々の備忘録です。データ分析とか基盤系に興味あります。

Apache Beam Python SQLServer To BigQuery検証

Apache Beam Python SQLServer To BigQueryを検証。クエリの上書きができないのは検証済みだがそれ以外で不足機能がないかみてみた。
www.case-k.jp

SQL Serverの場合jarファイルの追加が必要。PostgresSQLは新たにjarを追加しなくてもドライバが含まれているようだが、SQLServerは入っていない。

Caused by: java.sql.SQLException: Cannot load JDBC driver class 'com.microsoft.sqlserver.jdbc.SQLServerDriver'
	at org.apache.commons.dbcp2.DriverFactory.createDriver(DriverFactory.java:54)
	at org.apache.commons.dbcp2.BasicDataSource.createConnectionFactory(BasicDataSource.java:462)
	at org.apache.commons.dbcp2.BasicDataSource.createDataSource(BasicDataSource.java:528)
	at org.apache.commons.dbcp2.BasicDataSource.getConnection(BasicDataSource.java:734)
	at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadRows.inferBeamSchema(JdbcIO.java:651)
	... 21 more
Caused by: java.lang.ClassNotFoundException: com.microsoft.sqlserver.jdbc.SQLServerDriver
	at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	at org.apache.commons.dbcp2.DriverFactory.createDriver(DriverFactory.java:49)
	... 25 more

SQLServerDriverクラスが含まれるjarファイルの追加が必要。

jar -tf mssql-jdbc-8.4.1.jre8.jar | grep SQLServerDriver
com/microsoft/sqlserver/jdbc/SQLServerDriver.class
com/microsoft/sqlserver/jdbc/SQLServerDriverPropertyInfo.class
com/microsoft/sqlserver/jdbc/SQLServerDriverIntProperty.class
com/microsoft/sqlserver/jdbc/SQLServerDriverBooleanProperty.class
com/microsoft/sqlserver/jdbc/SQLServerDriverObjectProperty.class
com/microsoft/sqlserver/jdbc/SQLServerDriverStringProperty.class

pipeline_options.pyをみるとDataflowのワーカににjarを渡せるようになっていた。
github.com

独自にPythonJDBCSQL Server に接続しDataflowで実装している記事も見つけた。この記事でもpipeline_options.pyをつかいDataflowのワーカにjarファイルを渡している。
yomon.hatenablog.com
pipeline_options.pyを見た限りだと、Diirect Runnerではjarを渡せなそうなので都度都度Dataflowを走らせないと検証は厳しそう。

from apache_beam.io.jdbc import ReadFromJdbc
import apache_beam as beam
# from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions, GoogleCloudOptions, StandardOptions, WorkerOptions, DebugOptions
from apache_beam.io.gcp.bigquery import WriteToBigQuery
from apache_beam.io.gcp.bigquery import BigQuerySource
from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
import json

class ToBigQueryJsonRows(beam.DoFn):
    def __init__(self, schema):
        self._schema = schema

    # x : BeamSchema_232f6906_dfc6_48a8_873d_f7b5bbc2b88f(id=3) https://github.com/apache/beam/blob/master/sdks/python/apache_beam/typehints/schemas.py#L246
    def process(self, x):
        from apache_beam.io.gcp.bigquery import TableRowJsonCoder
        from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json
        import json
        from apache_beam.io.gcp.internal.clients import bigquery
        from apache_beam.internal.gcp.json_value import to_json_value
        schema = parse_table_schema_from_json(json.dumps({"fields": self._schema}))
        coder = TableRowJsonCoder(schema)
        x = bigquery.TableRow(f=[bigquery.TableCell(v=to_json_value(e)) for e in x])
        row = json.loads(coder.encode(x)) # {'id': 3}
        print(row)
        yield row

class JdbcToBigQuery(beam.DoFn):

    def __init__(self, username, password, driver_class_name, query, jdbc_url, project, dataset, table, options):
        self._options = options
        self._username = username
        self._password = password
        self._driver_class_name = driver_class_name
        self._query = query
        self._jdbc_url = jdbc_url

        self._project = project
        self._dataset = dataset
        self._table = table
        self._json_schema = [
            {"name": "id", "type": "integer", "mode": "NULLABLE"},
            {"name": "id2", "type": "integer", "mode": "NULLABLE"},
        ]

    def run(self):
        with beam.Pipeline(options=self._options) as p:
            # TODO: filter columns used by self._json_schema until finish to fiw overwrite query
            # https://issues.apache.org/jira/browse/BEAM-10750
            jdbc_results = ( p | 'Read from jdbc' >> ReadFromJdbc(
                table_name=self._table,
                driver_class_name=self._driver_class_name,
                jdbc_url=self._jdbc_url,
                username=self._username,
                password=self._password ,
                query=self._query,
            ))
            bq_rows = (jdbc_results | 'To Json With Schema' >> beam.ParDo(ToBigQueryJsonRows(schema=self._json_schema)))
            bq_rows | 'Load To BigQuery' >> beam.io.WriteToBigQuery(f'{self._project}:{self._dataset}.{self._table}',schema=parse_table_schema_from_json(json.dumps({"fields": self._json_schema})),write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED)


class SQLServerToBigQuery():

    def __init__(self):
        self._username = ''
        self._password = ''
        self._driver_class_name = 'com.microsoft.sqlserver.jdbc.SQLServerDriver'
        self._query = ""
        self._jdbc_url = 'jdbc:sqlserver://localhost:1433;database=beam;encrypt=true;'
        self._project = ''
        self._dataset = ''
        self._table = ''
        self._options = PipelineOptions([
            "--runner=DirectRunner",
            "--project=<project_id",
            "--temp_location=<gcs path>",
        ])


    def test(self):
        JdbcToBigQuery(self._username, self._password, self._driver_class_name, self._query, self._jdbc_url, self._project, self._dataset,self._table, self._options).run()


if __name__ == '__main__':
  SQLServerToBigQuery().test()