Apache Beam Python SQLServer To BigQueryを検証。クエリの上書きができないのは検証済みだがそれ以外で不足機能がないかみてみた。
www.case-k.jp
SQL Serverの場合jarファイルの追加が必要。PostgresSQLは新たにjarを追加しなくてもドライバが含まれているようだが、SQLServerは入っていない。
Caused by: java.sql.SQLException: Cannot load JDBC driver class 'com.microsoft.sqlserver.jdbc.SQLServerDriver' at org.apache.commons.dbcp2.DriverFactory.createDriver(DriverFactory.java:54) at org.apache.commons.dbcp2.BasicDataSource.createConnectionFactory(BasicDataSource.java:462) at org.apache.commons.dbcp2.BasicDataSource.createDataSource(BasicDataSource.java:528) at org.apache.commons.dbcp2.BasicDataSource.getConnection(BasicDataSource.java:734) at org.apache.beam.sdk.io.jdbc.JdbcIO$ReadRows.inferBeamSchema(JdbcIO.java:651) ... 21 more Caused by: java.lang.ClassNotFoundException: com.microsoft.sqlserver.jdbc.SQLServerDriver at java.net.URLClassLoader.findClass(URLClassLoader.java:382) at java.lang.ClassLoader.loadClass(ClassLoader.java:418) at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352) at java.lang.ClassLoader.loadClass(ClassLoader.java:351) at org.apache.commons.dbcp2.DriverFactory.createDriver(DriverFactory.java:49) ... 25 more
SQLServerDriverクラスが含まれるjarファイルの追加が必要。
jar -tf mssql-jdbc-8.4.1.jre8.jar | grep SQLServerDriver com/microsoft/sqlserver/jdbc/SQLServerDriver.class com/microsoft/sqlserver/jdbc/SQLServerDriverPropertyInfo.class com/microsoft/sqlserver/jdbc/SQLServerDriverIntProperty.class com/microsoft/sqlserver/jdbc/SQLServerDriverBooleanProperty.class com/microsoft/sqlserver/jdbc/SQLServerDriverObjectProperty.class com/microsoft/sqlserver/jdbc/SQLServerDriverStringProperty.class
pipeline_options.pyをみるとDataflowのワーカににjarを渡せるようになっていた。
github.com
独自にPythonでJDBCでSQL Server に接続しDataflowで実装している記事も見つけた。この記事でもpipeline_options.pyをつかいDataflowのワーカにjarファイルを渡している。
yomon.hatenablog.com
pipeline_options.pyを見た限りだと、Diirect Runnerではjarを渡せなそうなので都度都度Dataflowを走らせないと検証は厳しそう。
from apache_beam.io.jdbc import ReadFromJdbc import apache_beam as beam # from apache_beam.options.pipeline_options import PipelineOptions from apache_beam.options.pipeline_options import PipelineOptions, SetupOptions, GoogleCloudOptions, StandardOptions, WorkerOptions, DebugOptions from apache_beam.io.gcp.bigquery import WriteToBigQuery from apache_beam.io.gcp.bigquery import BigQuerySource from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json import json class ToBigQueryJsonRows(beam.DoFn): def __init__(self, schema): self._schema = schema # x : BeamSchema_232f6906_dfc6_48a8_873d_f7b5bbc2b88f(id=3) https://github.com/apache/beam/blob/master/sdks/python/apache_beam/typehints/schemas.py#L246 def process(self, x): from apache_beam.io.gcp.bigquery import TableRowJsonCoder from apache_beam.io.gcp.bigquery_tools import parse_table_schema_from_json import json from apache_beam.io.gcp.internal.clients import bigquery from apache_beam.internal.gcp.json_value import to_json_value schema = parse_table_schema_from_json(json.dumps({"fields": self._schema})) coder = TableRowJsonCoder(schema) x = bigquery.TableRow(f=[bigquery.TableCell(v=to_json_value(e)) for e in x]) row = json.loads(coder.encode(x)) # {'id': 3} print(row) yield row class JdbcToBigQuery(beam.DoFn): def __init__(self, username, password, driver_class_name, query, jdbc_url, project, dataset, table, options): self._options = options self._username = username self._password = password self._driver_class_name = driver_class_name self._query = query self._jdbc_url = jdbc_url self._project = project self._dataset = dataset self._table = table self._json_schema = [ {"name": "id", "type": "integer", "mode": "NULLABLE"}, {"name": "id2", "type": "integer", "mode": "NULLABLE"}, ] def run(self): with beam.Pipeline(options=self._options) as p: # TODO: filter columns used by self._json_schema until finish to fiw overwrite query # https://issues.apache.org/jira/browse/BEAM-10750 jdbc_results = ( p | 'Read from jdbc' >> ReadFromJdbc( table_name=self._table, driver_class_name=self._driver_class_name, jdbc_url=self._jdbc_url, username=self._username, password=self._password , query=self._query, )) bq_rows = (jdbc_results | 'To Json With Schema' >> beam.ParDo(ToBigQueryJsonRows(schema=self._json_schema))) bq_rows | 'Load To BigQuery' >> beam.io.WriteToBigQuery(f'{self._project}:{self._dataset}.{self._table}',schema=parse_table_schema_from_json(json.dumps({"fields": self._json_schema})),write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE,create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED) class SQLServerToBigQuery(): def __init__(self): self._username = '' self._password = '' self._driver_class_name = 'com.microsoft.sqlserver.jdbc.SQLServerDriver' self._query = "" self._jdbc_url = 'jdbc:sqlserver://localhost:1433;database=beam;encrypt=true;' self._project = '' self._dataset = '' self._table = '' self._options = PipelineOptions([ "--runner=DirectRunner", "--project=<project_id", "--temp_location=<gcs path>", ]) def test(self): JdbcToBigQuery(self._username, self._password, self._driver_class_name, self._query, self._jdbc_url, self._project, self._dataset,self._table, self._options).run() if __name__ == '__main__': SQLServerToBigQuery().test()