Problem gelöst ! Ich kann nicht glauben, dass ich zwei volle Tage damit verbracht habe ... Ich habe komplett in die falsche Richtung geschaut.
Das Problem lag nicht an einer Dataflow- oder GCP-Netzwerkkonfiguration, und soweit ich das beurteilen kann...
ist wahr.
Das Problem lag natürlich in meinem Code:nur das Problem wurde nur in einer verteilten Umgebung aufgedeckt. Ich hatte den Fehler gemacht, den Tunnel vom Hauptprozessor der Pipeline aus zu öffnen, anstatt von den Arbeitern. Der SSH-Tunnel war also aktiv, aber nicht zwischen den Arbeitern und dem Zielserver, sondern nur zwischen der Hauptpipeline und dem Ziel!
Um dies zu beheben, musste ich mein anforderndes DoFn ändern, um die Abfrageausführung mit dem Tunnel :
zu umschließenclass TunnelledSQLSourceDoFn(sql.SQLSourceDoFn):
"""Wraps SQLSourceDoFn in a ssh tunnel"""
def __init__(self, *args, **kwargs):
self.dbport = kwargs["port"]
self.dbhost = kwargs["host"]
self.args = args
self.kwargs = kwargs
super().__init__(*args, **kwargs)
def process(self, query, *args, **kwargs):
# Remote side of the SSH Tunnel
remote_address = (self.dbhost, self.dbport)
ssh_tunnel = (self.kwargs['ssh_host'], self.kwargs['ssh_port'])
with open_tunnel(
ssh_tunnel,
ssh_username=self.kwargs["ssh_user"],
ssh_password=self.kwargs["ssh_password"],
remote_bind_address=remote_address,
set_keepalive=10.0
) as tunnel:
forwarded_port = tunnel.local_bind_port
self.kwargs["port"] = forwarded_port
source = sql.SQLSource(*self.args, **self.kwargs)
sql.SQLSouceInput._build_value(source, source.runtime_params)
logging.info("Processing - {}".format(query))
for records, schema in source.client.read(query):
for row in records:
yield source.client.row_as_dict(row, schema)
Wie Sie sehen, musste ich einige Teile der pysql_beam-Bibliothek überschreiben.
Schließlich öffnet jeder Worker für jede Anfrage seinen eigenen Tunnel. Es ist wahrscheinlich möglich, dieses Verhalten zu optimieren, aber für meine Bedürfnisse reicht es aus.