flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Patrick Lucas (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-6369) Better support for overlay networks
Date Wed, 06 Sep 2017 14:46:01 GMT

    [ https://issues.apache.org/jira/browse/FLINK-6369?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16155461#comment-16155461

Patrick Lucas commented on FLINK-6369:

[~till.rohrmann]: yeah that's what I was getting at. It doesn't really solve our problem for
that reason---though if you just Googled around a bit you might come across that documentation
and think it does. I definitely agree that an HTTP control mechanism is the way to go.

> Better support for overlay networks
> -----------------------------------
>                 Key: FLINK-6369
>                 URL: https://issues.apache.org/jira/browse/FLINK-6369
>             Project: Flink
>          Issue Type: Improvement
>          Components: Docker, Network
>    Affects Versions: 1.2.0
>            Reporter: Patrick Lucas
>             Fix For: 1.4.0
> Running Flink in an environment that utilizes an overlay network (containerized environments
like Kubernetes or Docker Compose, or cloud platforms like AWS VPC) poses various challenges
related to networking.
> The core problem is that in these environments, applications are frequently addressed
by a name different from that with which the application sees itself.
> For instance, it is plausible that the Flink UI (served by the Jobmanager) is accessed
via an ELB, which poses a problem in HA mode when the non-leader UI returns an HTTP redirect
to the leader—but the user may not be able to connect directly to the leader.
> Or, if a user is using [Docker Compose|https://github.com/apache/flink/blob/aa21f853ab0380ec1f68ae1d0b7c8d9268da4533/flink-contrib/docker-flink/docker-compose.yml],
they cannot submit a job via the CLI since there is a mismatch between the name used to address
the Jobmanager and what the Jobmanager perceives as its hostname. (see \[1] below for more
> ----
> h3. Problems and proposed solutions
> There are four instances of this issue that I've run into so far:
> h4. Jobmanagers must be addressed by the same name they are configured with due to limitations
of Akka
> Akka enforces that messages it receives are addressed with the hostname it is configured
with. Newer versions of Akka (>= 2.4) than what Flink uses (2.3-custom) have support for
accepting messages with the "wrong" hostname, but it limited to a single "external" hostname.
> In many environments, it is likely that not all parties that want to connect to the Jobmanager
have the same way of addressing it (e.g. the ELB example above). Other similarly-used protocols
like HTTP generally don't have this restriction: if you connect on a socket and send a well-formed
message, the system assumes that it is the desired recipient.
> One solution is to not use Akka at all when communicating with the cluster from the outside,
perhaps using an HTTP API instead. This would be somewhat involved, and probabyl best left
as a longer-term goal.
> A more immediate solution would be to override this behavior within Flakka, the custom
fork of Akka currently in use by Flink. I'm not sure how much effort this would take.
> h4. The Jobmanager needs to be able to address the Taskmanagers for e.g. metrics collection
> Having the Taskmanagers register themselves by IP is probably the best solution here.
It's a reasonable assumption that IPs can always be used for communication between the nodes
of a single cluster. Asking that each Taskmanager container have a resolvable hostname is
> h4. Jobmanagers in HA mode send HTTP redirects to URLs that aren't externally resolvable/routable
> If multiple Jobmanagers are used in HA mode, HTTP requests to non-leaders (such as if
you put a Kubernetes Service in front of all Jobmanagers in a cluster) get redirected to the
(supposed) hostname of the leader, but this is potentially unresolvable/unroutable externally.
> Enabling non-leader Jobmanagers to proxy API calls to the leader would solve this. The
non-leaders could even serve static asset requests (e.g. for css or js files) directly.
> h4. Queryable state requests involve direct communication with Taskmanagers
> Currently, queryable state requests involve communication between the client and the
Jobmanager (for key partitioning lookups) and between the client and all Taskmanagers.
> If the client is inside the network (as would be common in production use-cases where
high-volume lookups are required) this is a non-issue, but problems crop up if the client
is outside the network.
> For the communication with the Jobmanager, a similar solution as above can be used: if
all Jobmanagers can service all key partitioning lookup requests (e.g. by proxying) then a
simple Service can be used.
> The story is a bit different for the Taskmanagers. The partitioning lookup to the Jobmanager
would return the name of the particular Taskmanager that owned the desired data, but that
name (likely an IP, as proposed in the second section above) is not necessarily resolvable/routable
from the client.
> In the context of Kubernetes, where individual containers are generally not addressible,
a very ugly solution would involve creating a Service for each Taskmanager, then cleverly
configuring things such that the same name could be used to address a specific Taskmanager
both inside and outside the network. \[2]
> A much nicer solution would be, like in the previous section, to enable Taskmanagers
to proxy any queryable state lookup to the appropriate member of the cluster. Once again,
the principle is for every node to be able to fulfill every request.
> This is of course less efficient than addressing the "correct" Taskmanager directly,
but it greatly simplifies the situation for users that want to make queryable state requests
from outside the network.
> ----
> h3. Subtasks
> Once there has been some discussion about the proposed solutions above, this issue can
be used as umbrella ticket for any relevant subtasks.
> ----
> h3. Footnotes
> \[1] In this example, the Jobmanager may be configured with {{jobmanager.rpc.address:
jobmanager}} and indeed, within the Docker network containing the nodes of the cluster, the
name {{jobmanager}} is resolveable. But outside the Docker network, the port is mapped to
{{localhost}}. When the user runs {{$ flink run -m localhost:6123 ...}}, the CLI connects
to the Jobmanager using Akka, but Akka enforces that received messages are addressed with
the same name it is configured with. The result is that the CLI hangs until a timeout is reached,
and warning messages appear in the Jobmanager's log like:
> {noformat}dropping message [class akka.actor.ActorSelectionMessage] for non-local recipient
[Actor[akka.tcp://flink@localhost:6123/]] arriving at [akka.tcp://flink@localhost:6123] inbound
addresses are [akka.tcp://flink@jobmanager:6123]
> 2017-04-24 09:47:52,560 WARN  akka.remote.ReliableDeliverySupervisor{noformat}
> \[2] Another option is to use a Kubernetes StatefulSet, which gives you per-pod addressability.
The downside is that currently all scaling operations on a StatefulSet (including initial
creation) always create or delete pods in sequence instead of concurrently, making cluster
launch time linear with the number of nodes in the cluster.

This message was sent by Atlassian JIRA

View raw message