case-kの備忘録

日々の備忘録です。データ分析とか基盤系に興味あります。

Apache Beam Python JDBCを使いDataflowを動かすには、ジョブの実行環境からもコネクションを張れる必要があった

Apache BeamのPython jdbcコネクタを使いDataflowでジョブを実行してみました。Cloud SQLとDataflowを同一サブネット内に作りプライベートIPで接続を試みました。検証したところジョブ実行時に実行環境からPostgresにコネクションを張ろうとしていることがわかりました。

class PostgresToBigQueryDataflow():

    def __init__(self):
        self._username = '<username>'
        self._password = '<password>'
        self._driver_class_name = 'org.postgresql.Driver'
        self._query = "select id from beam_table;"
        self._jdbc_url = 'jdbc:postgresql://<private_IP>:5432/beam'
        self._project = '<project id>'
        self._dataset = '<dataset>'
        self._table = '<table>'
        self._options = DebugOptions([
            "--runner=DataflowRunner",
            "--project=<project id>",
            "--job_name=<job name>",
            "--temp_location=gs://<project id>/tmp/",
            "--region=us-central1",
            "--experiments=use_runner_v2",
            "--subnetwork=regions/us-central1/subnetworks/<subnet>",
        ])
    def test(self):
        JdbcToBigQuery(self._username, self._password, self._driver_class_name, self._query, self._jdbc_url, self._project, self._dataset,self._table, self._options).run()

ローカル環境からDataflowジョブを実行した際、不可解なことにコネクションが張れずJOBの実行に失敗しました。WiresharkでパケットをみたところローカルPCからコネクションを試みていました。

f:id:casekblog:20220214124507p:plain

ジョブを実行する場合、実行環境からもDBに接続できる必要がありそうです。

GCPの同一サブネット内からは問題なくJOBを実行できました。テンプレート化することで回避できるかもしれませんが未検証。
クエリの上書きできない問題もあるので、現時点でPython版を使うのはやめた方がよさそう。