kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jay Kreps <jay.kr...@gmail.com>
Subject Re: How to use the hadoop consumer in distributed mode?
Date Tue, 18 Oct 2011 21:03:05 GMT
Is it possible that this is due to a hadoop version mismatch? Typically if
the client jar you pick up does not match the hadoop version of your hadoop
cluster you get EOFException.

-Jay

On Tue, Oct 18, 2011 at 9:01 AM, Felix Giguere Villegas <
felix.giguere@mate1inc.com> wrote:

> Hello everyone :) !
>
> I have trouble using the Kafka hadoop consumer included in
> contrib/hadoop-consumer and I'd like to know if/how it is used at LinkedIn
> or elsewhere? I would also like if someone could confirm or correct the
> assumptions I make below.
>
> Here's what I have so far:
>
> It works when pulling from one Kafka broker, but not when pulling from
> many. There are two problems:
>
> The first problem concerns the offset files that the Map/Reduce job takes
> as
> its input. From what I understand, these offset files represent the offset
> to start reading from on each of the Kafka brokers.
>
> To generate those files the first time (and thus start from offset -1), we
> can go in contrib/hadoop-consumer/ and run:
>
> ./run-class.sh kafka.etl.impl.DataGenerator my-properties-file.properties
>
> The problem is that this DataGenerator class can take only one Kafka broker
> in its parameters (the properties file) and thus generates only one offset
> file.
>
> The Map/Reduce job will then spawn one map task for each offset file it
> finds in its input directory, and each of these map tasks will connect to a
> different Kafka broker. Since the DataGenerator can only generate one
> offset
> file, the Map/Reduce job only spawns one map task which queries only one
> Kafka broker.
>
> Unless my assumptions are wrong or someone else provides a nice alternative
> solution, I was planning to modify the DataGenerator class so that it can
> generate multiple offset files, but for now, as a manual work-around, I
> just
> duplicated the offset files and specified a different Kafka broker in each.
>
> Other than that, I am thinking perhaps a more robust solution would be to
> have ZK-based discovery of the available brokers. Again, I'm curious to
> find
> out how this is done at LinkedIn or elsewhere?
>
> The second problem is when I run the M/R job. If I run it with the multiple
> offset files I manually generated as its input, it does spawn three map
> tasks, as expected, but it then fails with the following error:
>
> java.io.IOException: java.io.EOFException
>        at
> kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:166)
>        at kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:30)
>        at
> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:208)
>        at
> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:193)
>        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48)
>        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391)
>        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
>        at org.apache.hadoop.mapred.Child$4.run(Child.java:270)
>        at java.security.AccessController.doPrivileged(Native Method)
>        at javax.security.auth.Subject.doAs(Subject.java:396)
>        at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127)
>        at org.apache.hadoop.mapred.Child.main(Child.java:264)Caused by:
> java.io.EOFException
>        at java.io.DataInputStream.readFully(DataInputStream.java:180)
>        at
> org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63)
>        at
> org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101)
>        at
> org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:1945)
>        at
> org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2077)
>        at
> org.apache.hadoop.mapred.SequenceFileRecordReader.next(SequenceFileRecordReader.java:76)
>        at
> kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:128)
>        ... 11 more
>
>
> It fails before writing anything whatsoever, and it fails repeatedly for
> each Map task until the JobTracker reaches the maximum amount of failures
> per task and marks the job as failed.
>
> I haven't figured this one out yet...
>
> Any help would be greatly appreciated :) !
>
> Thanks :) !!!!
>
> --
> Felix
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message