PostgreSQL
 sql >> Datenbank >  >> RDS >> PostgreSQL

Wie richte ich einen SSH-Tunnel in Google Cloud Dataflow zu einem externen Datenbankserver ein?

Problem gelöst ! Ich kann nicht glauben, dass ich zwei volle Tage damit verbracht habe ... Ich habe komplett in die falsche Richtung geschaut.

Das Problem lag nicht an einer Dataflow- oder GCP-Netzwerkkonfiguration, und soweit ich das beurteilen kann...

ist wahr.

Das Problem lag natürlich in meinem Code:nur das Problem wurde nur in einer verteilten Umgebung aufgedeckt. Ich hatte den Fehler gemacht, den Tunnel vom Hauptprozessor der Pipeline aus zu öffnen, anstatt von den Arbeitern. Der SSH-Tunnel war also aktiv, aber nicht zwischen den Arbeitern und dem Zielserver, sondern nur zwischen der Hauptpipeline und dem Ziel!

Um dies zu beheben, musste ich mein anforderndes DoFn ändern, um die Abfrageausführung mit dem Tunnel :

zu umschließen
class TunnelledSQLSourceDoFn(sql.SQLSourceDoFn):
"""Wraps SQLSourceDoFn in a ssh tunnel"""

def __init__(self, *args, **kwargs):
    self.dbport = kwargs["port"]
    self.dbhost = kwargs["host"]
    self.args = args
    self.kwargs = kwargs
    super().__init__(*args, **kwargs)

def process(self, query, *args, **kwargs):
    # Remote side of the SSH Tunnel
    remote_address = (self.dbhost, self.dbport)
    ssh_tunnel = (self.kwargs['ssh_host'], self.kwargs['ssh_port'])
    with open_tunnel(
        ssh_tunnel,
        ssh_username=self.kwargs["ssh_user"],
        ssh_password=self.kwargs["ssh_password"],
        remote_bind_address=remote_address,
        set_keepalive=10.0
    ) as tunnel:
        forwarded_port = tunnel.local_bind_port
        self.kwargs["port"] = forwarded_port
        source = sql.SQLSource(*self.args, **self.kwargs)
        sql.SQLSouceInput._build_value(source, source.runtime_params)
        logging.info("Processing - {}".format(query))
        for records, schema in source.client.read(query):
            for row in records:
                yield source.client.row_as_dict(row, schema)

Wie Sie sehen, musste ich einige Teile der pysql_beam-Bibliothek überschreiben.

Schließlich öffnet jeder Worker für jede Anfrage seinen eigenen Tunnel. Es ist wahrscheinlich möglich, dieses Verhalten zu optimieren, aber für meine Bedürfnisse reicht es aus.