Problem løst ! Jeg kan ikke fatte, at jeg har brugt to hele dage på det her... Jeg kiggede fuldstændig i den forkerte retning.
Problemet var ikke med en eller anden Dataflow- eller GCP-netværkskonfiguration, og så vidt jeg kan se...
er sandt.
Problemet lå selvfølgelig i min kode:kun problemet blev kun afsløret i et distribueret miljø. Jeg havde begået den fejl at åbne tunnelen fra hovedrørledningsprocessoren i stedet for arbejderne. Så SSH-tunnelen var oppe, men ikke mellem arbejderne og målserveren, kun mellem hovedrørledningen og målet!
For at rette op på dette var jeg nødt til at ændre min anmodende DoFn for at omslutte forespørgselsudførelsen med tunnelen:
class TunnelledSQLSourceDoFn(sql.SQLSourceDoFn):
"""Wraps SQLSourceDoFn in a ssh tunnel"""
def __init__(self, *args, **kwargs):
self.dbport = kwargs["port"]
self.dbhost = kwargs["host"]
self.args = args
self.kwargs = kwargs
super().__init__(*args, **kwargs)
def process(self, query, *args, **kwargs):
# Remote side of the SSH Tunnel
remote_address = (self.dbhost, self.dbport)
ssh_tunnel = (self.kwargs['ssh_host'], self.kwargs['ssh_port'])
with open_tunnel(
ssh_tunnel,
ssh_username=self.kwargs["ssh_user"],
ssh_password=self.kwargs["ssh_password"],
remote_bind_address=remote_address,
set_keepalive=10.0
) as tunnel:
forwarded_port = tunnel.local_bind_port
self.kwargs["port"] = forwarded_port
source = sql.SQLSource(*self.args, **self.kwargs)
sql.SQLSouceInput._build_value(source, source.runtime_params)
logging.info("Processing - {}".format(query))
for records, schema in source.client.read(query):
for row in records:
yield source.client.row_as_dict(row, schema)
som du kan se, var jeg nødt til at tilsidesætte nogle stykker af pysql_beam-biblioteket.
Endelig åbner hver arbejder sin egen tunnel for hver anmodning. Det er nok muligt at optimere denne adfærd, men det er nok til mine behov.