flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mohit Anchlia <mohitanch...@gmail.com>
Subject Re: Odd flink behaviour
Date Tue, 01 Aug 2017 06:08:41 GMT
I didn't override open. I am using open that got inherited from
FileInputFormat . Am I supposed to specifically override open?

On Mon, Jul 31, 2017 at 9:49 PM, Fabian Hueske <fhueske@gmail.com> wrote:

> Do you set reached to false in open()?
>
>
> Am 01.08.2017 2:44 vorm. schrieb "Mohit Anchlia" <mohitanchlia@gmail.com>:
>
> And here is the inputformat code:
>
> public class PDFFileInputFormat extends FileInputFormat<String> {
>  /**
>   *
>   */
>  private static final long serialVersionUID = -4137283038479003711L;
>  private static final Logger logger = LoggerFactory
>    .getLogger(PDFInputFormat.class.getName());
>  private boolean reached = false;
>  @Override
>  public boolean reachedEnd() throws IOException {
>   logger.info("called reached " + reached);
>   // TODO Auto-generated method stub
>   return reached;
>  }
>  @Override
>  public String nextRecord(String reuse) throws IOException {
>   logger.info("This is where you parse PDF");
>   String content = new String(
>     Files.readAllBytes(Paths.get(this.currentSplit.getPath().getPath())));
>   logger.info("Content " + content);
>   reached = true;
>   return content;
>  }
> }
>
> On Mon, Jul 31, 2017 at 5:09 PM, Mohit Anchlia <mohitanchlia@gmail.com>
> wrote:
>
>> I have a very simple program that just reads all the files in the path.
>> However, flink is not working as expected.
>>
>> Everytime I execute this job I only see flink reading 2 files, even
>> though there are more in that directory. On closer look it appears that it
>> might be related to:
>>
>> [flink-akka.actor.default-dispatcher-3] INFO
>> org.apache.flink.runtime.taskmanager.TaskManager - TaskManager has 2
>> task slot(s).
>>
>> My question is, isn't flink supposed to iterate over the directory after
>> those 2 slots become free again? I am assuming this problem is caused
>> because there are only 2 slots.
>>
>>
>> Code ---
>>
>>   PDFFileInputFormat format = new PDFFileInputFormat();
>>   format.setFilePath(args[0]);
>>   format.setNestedFileEnumeration(true);
>>   logger.info("Number of splits " + format.getNumSplits());
>>
>>   // logger.info(Paths.get(".").toAbsolutePath().normalize().toString());
>>
>>   env.createInput(format, TypeInformation.of(StringValue.class)).print();
>>
>
>
>

Mime
View raw message