kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jay Kreps <jay.kr...@gmail.com>
Subject Re: How to use the hadoop consumer in distributed mode?
Date Tue, 18 Oct 2011 22:28:13 GMT
I would actually love for us to release the full ETL system we have for
Kafka/Hadoop, it is just a matter of finding the time to get this code into
that shape.

The hadoop team that maintains that code is pretty busy right now, but i am
hoping we can find a way.

-Jay

On Tue, Oct 18, 2011 at 3:18 PM, Felix Giguere Villegas <
felix.giguere@mate1inc.com> wrote:

> Thanks for your replies guys :)
>
> @Jay: I thought about the Hadoop version mismatch too, because I've had the
> same problem before. I'll double check again to make sure I have the same
> versions of hadoop everywhere, as the Kafka distributed cluster I was
> testing on is a new setup and I might have forgotten to put the hadoop jars
> we use in it... I'm working part-time for now so I probably won't touch
> this
> again until next week but I'll keep you guys posted ASAP :)
>
> @Richard: Thanks a lot for your description. That clears out the
> inaccuracies in my understanding. Is there any chance you guys might
> release
> the code you use to query ZK and create appropriate offset files for each
> broker/partition pair? The hadoop consumer provided in the source works
> with
> the setup we get from the quickstart guide, but the process you describe
> seems more appropriate for production use.
>
> Thanks again :)
>
> --
> Felix
>
>
>
> On Tue, Oct 18, 2011 at 5:52 PM, Richard Park <richard.b.park@gmail.com
> >wrote:
>
> > Does the version in contrib contain the fixes for Kafka-131? The offsets
> > were incorrectly computed prior to this patch.
> >
> > At LinkedIn, this is what we do in a nutshell.
> > 1. We connect to the zookeeper instance. With this we are able to
> discover
> > the topics, the brokers and the partitions of a broker.
> >
> > 2. For a topic we want to pull, we create files that contains the offset
> > for
> > each broker and partition.  Each individual file contains a unique
> > broker/partition pair. This is essentially what data generator does,
> except
> > we use values from zookeeper. We take the output of the previous run of
> > kafka (the new offsets) and use them as the new offset files. If the old
> > offset doesn't exist, we set a default starting offset.
> >
> > 3. We run the pull hadoop job. One mapper per broker/partition pulls
> using
> > the simple consumer into hdfs (the KafkaETLRecordReader handles most of
> > this). We query kafka for the latest offset. The mapper fetches from the
> > kafka broker until the latest offset is reached.
> >
> > 4. We group the data by hourly partition with a reduce step.
> >
> > 5. The kafka hadoop job's mapper spits out new offsets for the next time
> we
> > decide to pull the data. The pull occurs at regular scheduled intervals
> > quite frequently.
> >
> > That's the gist of it. There are a few additional modification we made to
> > the kafka job including the ability to handle unavailable nodes, avro
> > schema
> > resolution and auditing.
> >
> > Thanks,
> > -Richard
> >
> >
> >
> > On Tue, Oct 18, 2011 at 2:03 PM, Jay Kreps <jay.kreps@gmail.com> wrote:
> >
> > > Is it possible that this is due to a hadoop version mismatch? Typically
> > if
> > > the client jar you pick up does not match the hadoop version of your
> > hadoop
> > > cluster you get EOFException.
> > >
> > > -Jay
> > >
> > > On Tue, Oct 18, 2011 at 9:01 AM, Felix Giguere Villegas <
> > > felix.giguere@mate1inc.com> wrote:
> > >
> > > > Hello everyone :) !
> > > >
> > > > I have trouble using the Kafka hadoop consumer included in
> > > > contrib/hadoop-consumer and I'd like to know if/how it is used at
> > > LinkedIn
> > > > or elsewhere? I would also like if someone could confirm or correct
> the
> > > > assumptions I make below.
> > > >
> > > > Here's what I have so far:
> > > >
> > > > It works when pulling from one Kafka broker, but not when pulling
> from
> > > > many. There are two problems:
> > > >
> > > > The first problem concerns the offset files that the Map/Reduce job
> > takes
> > > > as
> > > > its input. From what I understand, these offset files represent the
> > > offset
> > > > to start reading from on each of the Kafka brokers.
> > > >
> > > > To generate those files the first time (and thus start from offset
> -1),
> > > we
> > > > can go in contrib/hadoop-consumer/ and run:
> > > >
> > > > ./run-class.sh kafka.etl.impl.DataGenerator
> > my-properties-file.properties
> > > >
> > > > The problem is that this DataGenerator class can take only one Kafka
> > > broker
> > > > in its parameters (the properties file) and thus generates only one
> > > offset
> > > > file.
> > > >
> > > > The Map/Reduce job will then spawn one map task for each offset file
> it
> > > > finds in its input directory, and each of these map tasks will
> connect
> > to
> > > a
> > > > different Kafka broker. Since the DataGenerator can only generate one
> > > > offset
> > > > file, the Map/Reduce job only spawns one map task which queries only
> > one
> > > > Kafka broker.
> > > >
> > > > Unless my assumptions are wrong or someone else provides a nice
> > > alternative
> > > > solution, I was planning to modify the DataGenerator class so that it
> > can
> > > > generate multiple offset files, but for now, as a manual work-around,
> I
> > > > just
> > > > duplicated the offset files and specified a different Kafka broker in
> > > each.
> > > >
> > > > Other than that, I am thinking perhaps a more robust solution would
> be
> > to
> > > > have ZK-based discovery of the available brokers. Again, I'm curious
> to
> > > > find
> > > > out how this is done at LinkedIn or elsewhere?
> > > >
> > > > The second problem is when I run the M/R job. If I run it with the
> > > multiple
> > > > offset files I manually generated as its input, it does spawn three
> map
> > > > tasks, as expected, but it then fails with the following error:
> > > >
> > > > java.io.IOException: java.io.EOFException
> > > >        at
> > > > kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:166)
> > > >        at
> > > kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:30)
> > > >        at
> > > >
> > >
> >
> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:208)
> > > >        at
> > > >
> > >
> >
> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:193)
> > > >        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48)
> > > >        at
> > org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391)
> > > >        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
> > > >        at org.apache.hadoop.mapred.Child$4.run(Child.java:270)
> > > >        at java.security.AccessController.doPrivileged(Native Method)
> > > >        at javax.security.auth.Subject.doAs(Subject.java:396)
> > > >        at
> > > >
> > >
> >
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127)
> > > >        at org.apache.hadoop.mapred.Child.main(Child.java:264)Caused
> by:
> > > > java.io.EOFException
> > > >        at java.io.DataInputStream.readFully(DataInputStream.java:180)
> > > >        at
> > > >
> > >
> >
> org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63)
> > > >        at
> > > >
> org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101)
> > > >        at
> > > > org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:1945)
> > > >        at
> > > > org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2077)
> > > >        at
> > > >
> > >
> >
> org.apache.hadoop.mapred.SequenceFileRecordReader.next(SequenceFileRecordReader.java:76)
> > > >        at
> > > > kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:128)
> > > >        ... 11 more
> > > >
> > > >
> > > > It fails before writing anything whatsoever, and it fails repeatedly
> > for
> > > > each Map task until the JobTracker reaches the maximum amount of
> > failures
> > > > per task and marks the job as failed.
> > > >
> > > > I haven't figured this one out yet...
> > > >
> > > > Any help would be greatly appreciated :) !
> > > >
> > > > Thanks :) !!!!
> > > >
> > > > --
> > > > Felix
> > > >
> > >
> >
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message