kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Felix Giguere Villegas <felix.gigu...@mate1inc.com>
Subject Re: How to use the hadoop consumer in distributed mode?
Date Tue, 18 Oct 2011 22:18:47 GMT
Thanks for your replies guys :)

@Jay: I thought about the Hadoop version mismatch too, because I've had the
same problem before. I'll double check again to make sure I have the same
versions of hadoop everywhere, as the Kafka distributed cluster I was
testing on is a new setup and I might have forgotten to put the hadoop jars
we use in it... I'm working part-time for now so I probably won't touch this
again until next week but I'll keep you guys posted ASAP :)

@Richard: Thanks a lot for your description. That clears out the
inaccuracies in my understanding. Is there any chance you guys might release
the code you use to query ZK and create appropriate offset files for each
broker/partition pair? The hadoop consumer provided in the source works with
the setup we get from the quickstart guide, but the process you describe
seems more appropriate for production use.

Thanks again :)

--
Felix



On Tue, Oct 18, 2011 at 5:52 PM, Richard Park <richard.b.park@gmail.com>wrote:

> Does the version in contrib contain the fixes for Kafka-131? The offsets
> were incorrectly computed prior to this patch.
>
> At LinkedIn, this is what we do in a nutshell.
> 1. We connect to the zookeeper instance. With this we are able to discover
> the topics, the brokers and the partitions of a broker.
>
> 2. For a topic we want to pull, we create files that contains the offset
> for
> each broker and partition.  Each individual file contains a unique
> broker/partition pair. This is essentially what data generator does, except
> we use values from zookeeper. We take the output of the previous run of
> kafka (the new offsets) and use them as the new offset files. If the old
> offset doesn't exist, we set a default starting offset.
>
> 3. We run the pull hadoop job. One mapper per broker/partition pulls using
> the simple consumer into hdfs (the KafkaETLRecordReader handles most of
> this). We query kafka for the latest offset. The mapper fetches from the
> kafka broker until the latest offset is reached.
>
> 4. We group the data by hourly partition with a reduce step.
>
> 5. The kafka hadoop job's mapper spits out new offsets for the next time we
> decide to pull the data. The pull occurs at regular scheduled intervals
> quite frequently.
>
> That's the gist of it. There are a few additional modification we made to
> the kafka job including the ability to handle unavailable nodes, avro
> schema
> resolution and auditing.
>
> Thanks,
> -Richard
>
>
>
> On Tue, Oct 18, 2011 at 2:03 PM, Jay Kreps <jay.kreps@gmail.com> wrote:
>
> > Is it possible that this is due to a hadoop version mismatch? Typically
> if
> > the client jar you pick up does not match the hadoop version of your
> hadoop
> > cluster you get EOFException.
> >
> > -Jay
> >
> > On Tue, Oct 18, 2011 at 9:01 AM, Felix Giguere Villegas <
> > felix.giguere@mate1inc.com> wrote:
> >
> > > Hello everyone :) !
> > >
> > > I have trouble using the Kafka hadoop consumer included in
> > > contrib/hadoop-consumer and I'd like to know if/how it is used at
> > LinkedIn
> > > or elsewhere? I would also like if someone could confirm or correct the
> > > assumptions I make below.
> > >
> > > Here's what I have so far:
> > >
> > > It works when pulling from one Kafka broker, but not when pulling from
> > > many. There are two problems:
> > >
> > > The first problem concerns the offset files that the Map/Reduce job
> takes
> > > as
> > > its input. From what I understand, these offset files represent the
> > offset
> > > to start reading from on each of the Kafka brokers.
> > >
> > > To generate those files the first time (and thus start from offset -1),
> > we
> > > can go in contrib/hadoop-consumer/ and run:
> > >
> > > ./run-class.sh kafka.etl.impl.DataGenerator
> my-properties-file.properties
> > >
> > > The problem is that this DataGenerator class can take only one Kafka
> > broker
> > > in its parameters (the properties file) and thus generates only one
> > offset
> > > file.
> > >
> > > The Map/Reduce job will then spawn one map task for each offset file it
> > > finds in its input directory, and each of these map tasks will connect
> to
> > a
> > > different Kafka broker. Since the DataGenerator can only generate one
> > > offset
> > > file, the Map/Reduce job only spawns one map task which queries only
> one
> > > Kafka broker.
> > >
> > > Unless my assumptions are wrong or someone else provides a nice
> > alternative
> > > solution, I was planning to modify the DataGenerator class so that it
> can
> > > generate multiple offset files, but for now, as a manual work-around, I
> > > just
> > > duplicated the offset files and specified a different Kafka broker in
> > each.
> > >
> > > Other than that, I am thinking perhaps a more robust solution would be
> to
> > > have ZK-based discovery of the available brokers. Again, I'm curious to
> > > find
> > > out how this is done at LinkedIn or elsewhere?
> > >
> > > The second problem is when I run the M/R job. If I run it with the
> > multiple
> > > offset files I manually generated as its input, it does spawn three map
> > > tasks, as expected, but it then fails with the following error:
> > >
> > > java.io.IOException: java.io.EOFException
> > >        at
> > > kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:166)
> > >        at
> > kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:30)
> > >        at
> > >
> >
> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:208)
> > >        at
> > >
> >
> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:193)
> > >        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48)
> > >        at
> org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391)
> > >        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
> > >        at org.apache.hadoop.mapred.Child$4.run(Child.java:270)
> > >        at java.security.AccessController.doPrivileged(Native Method)
> > >        at javax.security.auth.Subject.doAs(Subject.java:396)
> > >        at
> > >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127)
> > >        at org.apache.hadoop.mapred.Child.main(Child.java:264)Caused by:
> > > java.io.EOFException
> > >        at java.io.DataInputStream.readFully(DataInputStream.java:180)
> > >        at
> > >
> >
> org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63)
> > >        at
> > > org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101)
> > >        at
> > > org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:1945)
> > >        at
> > > org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2077)
> > >        at
> > >
> >
> org.apache.hadoop.mapred.SequenceFileRecordReader.next(SequenceFileRecordReader.java:76)
> > >        at
> > > kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:128)
> > >        ... 11 more
> > >
> > >
> > > It fails before writing anything whatsoever, and it fails repeatedly
> for
> > > each Map task until the JobTracker reaches the maximum amount of
> failures
> > > per task and marks the job as failed.
> > >
> > > I haven't figured this one out yet...
> > >
> > > Any help would be greatly appreciated :) !
> > >
> > > Thanks :) !!!!
> > >
> > > --
> > > Felix
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message