flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Maximilian Michels (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-3964) Job submission times out with recursive.file.enumeration
Date Mon, 27 Jun 2016 10:23:52 GMT

    [ https://issues.apache.org/jira/browse/FLINK-3964?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15350759#comment-15350759
] 

Maximilian Michels commented on FLINK-3964:
-------------------------------------------

I see where you're going. That's currently not possible and wouldn't help in your case. First,
the client would still have to wait until all vertices have been configured. Second, it is
actually a single vertex (we're at a {{JobGraph}} level at submission time). Your Hadoop input
format corresponds to one JobGraph vertex. Only later, the {{ExecutionGraph}} is created where
the single vertex will be split up in the number of parallelism vertices.

+1 to add a notice in the timeout exception for increasing {{akka.client.timeout}}.

> Job submission times out with recursive.file.enumeration
> --------------------------------------------------------
>
>                 Key: FLINK-3964
>                 URL: https://issues.apache.org/jira/browse/FLINK-3964
>             Project: Flink
>          Issue Type: Bug
>          Components: Batch Connectors and Input/Output Formats, DataSet API
>    Affects Versions: 1.0.0
>            Reporter: Juho Autio
>
> When using "recursive.file.enumeration" with a big enough folder structure to list, flink
batch job fails right at the beginning because of a timeout.
> h2. Problem details
> We get this error: {{Communication with JobManager failed: Job submission to the JobManager
timed out}}.
> The code we have is basically this:
> {code}
> val env = ExecutionEnvironment.getExecutionEnvironment
> val parameters = new Configuration
> // set the recursive enumeration parameter
> parameters.setBoolean("recursive.file.enumeration", true)
> val parameter = ParameterTool.fromArgs(args)
> val input_data_path : String = parameter.get("input_data_path", null )
> val data : DataSet[(Text,Text)] = env.readSequenceFile(classOf[Text], classOf[Text],
input_data_path)
> .withParameters(parameters)
> data.first(10).print
> {code}
> If we set {{input_data_path}} parameter to {{s3n://bucket/path/date=*/}} it times out.
If we use a more restrictive pattern like {{s3n://bucket/path/date=20160523/}}, it doesn't
time out.
> To me it seems that time taken to list files shouldn't cause any timeouts on job submission
level.
> For us this was "fixed" by adding {{akka.client.timeout: 600 s}} in {{flink-conf.yaml}},
but I wonder if the timeout would still occur if we have even more files to list?
> ----
> P.S. Is there any way to set {{akka.client.timeout}} when calling {{bin/flink run}} instead
of editing {{flink-conf.yaml}}. I tried to add it as a {{-yD}} flag but couldn't get it working.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Mime
View raw message