Apache BeamのPython jdbcコネクタを使いDataflowでジョブを実行してみました。Cloud SQLとDataflowを同一サブネット内に作りプライベートIPで接続を試みました。検証したところジョブ実行時に実行環境からPostgresにコネクションを張ろうとしていることがわかりました。
class PostgresToBigQueryDataflow(): def __init__(self): self._username = '<username>' self._password = '<password>' self._driver_class_name = 'org.postgresql.Driver' self._query = "select id from beam_table;" self._jdbc_url = 'jdbc:postgresql://<private_IP>:5432/beam' self._project = '<project id>' self._dataset = '<dataset>' self._table = '<table>' self._options = DebugOptions([ "--runner=DataflowRunner", "--project=<project id>", "--job_name=<job name>", "--temp_location=gs://<project id>/tmp/", "--region=us-central1", "--experiments=use_runner_v2", "--subnetwork=regions/us-central1/subnetworks/<subnet>", ]) def test(self): JdbcToBigQuery(self._username, self._password, self._driver_class_name, self._query, self._jdbc_url, self._project, self._dataset,self._table, self._options).run()
ローカル環境からDataflowジョブを実行した際、不可解なことにコネクションが張れずJOBの実行に失敗しました。WiresharkでパケットをみたところローカルPCからコネクションを試みていました。
ジョブを実行する場合、実行環境からもDBに接続できる必要がありそうです。
GCPの同一サブネット内からは問題なくJOBを実行できました。テンプレート化することで回避できるかもしれませんが未検証。
クエリの上書きできない問題もあるので、現時点でPython版を使うのはやめた方がよさそう。