flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Aljoscha Krettek <aljos...@apache.org>
Subject Re: Queries regarding Historical Reprocessing
Date Thu, 04 May 2017 09:12:35 GMT
Sorry for the longer wait, it’s a longer answer and I had to sort my thoughts. I’ll try
and answer each question separately, though the solution for some of the issues are the same.

1. I think the problem here is that Flink will not perform any checkpoints if some operators
have finished. The Streaming File Sources are implemented as a combination of two operators:
file monitor/split generator and file reader operator. The first one is responsible for enumerating
available files and generating input splits. The second one is responsible for reading the
actual contents. In a Flink Job it will thus look like this: File Monitor -> File Reader,
you should see this in the JobManager dashboard.

Now, by default env.createInput(InputFormat) creates a file monitor that only scans the directory
once and then finishes, this is why we don’t see any checkpoints being performed. You can
work around this by using
env.readFile(FileInputFormat<T> format, String filePath, FileProcessingMode watchType,
long interval)
With a watch type of PROCESS_CONTINUOUSLY. With this, the file monitor will stay active and
continuously send input splits downstream for newly created files.

2. The File Monitor should always have parallelism=1 while the read operator will have the
parallelism configured by the user.

3. With separate source there would be a separate file monitor/file reader combination for
each of them. How they are spread across the TaskManagers depends on the parallelism and how
the Cluster, especially the TaskManager slots are configured.

4. See 1. and 2. If you set PROCESS_CONTINUOUSLY it will pick up new files that are added
to the folder.


> On 3. May 2017, at 16:27, Vinay Patil <vinay18.patil@gmail.com> wrote:
> Hi Guys,
> Can someone please help me in understanding this ?
> Regards,
> Vinay Patil
> On Thu, Apr 27, 2017 at 12:36 PM, Vinay Patil <vinay18.patil@gmail.com> wrote:
> Hi Guys, 
> For historical reprocessing , I am reading the avro data from S3 and passing these records
to the same pipeline for processing. 
> I have the following queries: 
> 1. I am running this pipeline as a stream application with checkpointing enabled, the
records are successfully written to S3, however they remain in the pending state as checkpointing
is not triggered when I doing re-processing. Why does this happen ? (kept the checkpointing
interval to 1 minute, pipeline ran for 10 minutes) 
> this is the code I am using for reading avro data from S3 
> AvroInputFormat<SomeAvroClass> avroInputFormat = new AvroInputFormat<>( 
>                     new org.apache.flink.core.fs.Path(s3Path), SomeAvroClass.class);

> sourceStream = env.createInput(avroInputFormat).map(...); 
> 2. For the source stream Flink sets the parallelism as 1 , and for the rest of the operators
the user specified parallelism is set. How does Flink reads the data ? does it bring the entire
file from S3 one at a time  and then Split it according to parallelism ? 
> 3. I am reading from two different S3 folders and treating them as separate sourceStreams,
how does Flink reads data in this case ? does it pick one file from each S3 folder , split
the data and pass it downstream ? Does Flink reads the data sequentially ? I am confused here
as only one Task Manager is reading the data from S3 and then all TM's are getting the data.

> 4. Although I am running this as as stream application, the operators goes into FINISHED
state after processing , is this because Flink treats the S3 source as finite data ? What
will happen if the data is continuously written to S3 from one pipeline and from the second
pipeline I am doing historical re-processing ? 
> Regards, 
> Vinay Patil

View raw message