kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Hisham Mardam-Bey <his...@mate1inc.com>
Subject Re: How to use the hadoop consumer in distributed mode?
Date Tue, 18 Oct 2011 23:31:39 GMT
Hi folks, been following this thread, Felix and I are working together
on this project, we really like Kafka and are moving it into
production very soon.

Jay, question, would you guys consider releasing the code in a "not so
clean state" and have the community (we would like to help) shore it
up so it becomes usable by the masses or are there other issues
(legal?) you have to sort out first?

Thanks!

hisham.

On Tue, Oct 18, 2011 at 6:28 PM, Jay Kreps <jay.kreps@gmail.com> wrote:
> I would actually love for us to release the full ETL system we have for
> Kafka/Hadoop, it is just a matter of finding the time to get this code into
> that shape.
>
> The hadoop team that maintains that code is pretty busy right now, but i am
> hoping we can find a way.
>
> -Jay
>
> On Tue, Oct 18, 2011 at 3:18 PM, Felix Giguere Villegas <
> felix.giguere@mate1inc.com> wrote:
>
>> Thanks for your replies guys :)
>>
>> @Jay: I thought about the Hadoop version mismatch too, because I've had the
>> same problem before. I'll double check again to make sure I have the same
>> versions of hadoop everywhere, as the Kafka distributed cluster I was
>> testing on is a new setup and I might have forgotten to put the hadoop jars
>> we use in it... I'm working part-time for now so I probably won't touch
>> this
>> again until next week but I'll keep you guys posted ASAP :)
>>
>> @Richard: Thanks a lot for your description. That clears out the
>> inaccuracies in my understanding. Is there any chance you guys might
>> release
>> the code you use to query ZK and create appropriate offset files for each
>> broker/partition pair? The hadoop consumer provided in the source works
>> with
>> the setup we get from the quickstart guide, but the process you describe
>> seems more appropriate for production use.
>>
>> Thanks again :)
>>
>> --
>> Felix
>>
>>
>>
>> On Tue, Oct 18, 2011 at 5:52 PM, Richard Park <richard.b.park@gmail.com
>> >wrote:
>>
>> > Does the version in contrib contain the fixes for Kafka-131? The offsets
>> > were incorrectly computed prior to this patch.
>> >
>> > At LinkedIn, this is what we do in a nutshell.
>> > 1. We connect to the zookeeper instance. With this we are able to
>> discover
>> > the topics, the brokers and the partitions of a broker.
>> >
>> > 2. For a topic we want to pull, we create files that contains the offset
>> > for
>> > each broker and partition.  Each individual file contains a unique
>> > broker/partition pair. This is essentially what data generator does,
>> except
>> > we use values from zookeeper. We take the output of the previous run of
>> > kafka (the new offsets) and use them as the new offset files. If the old
>> > offset doesn't exist, we set a default starting offset.
>> >
>> > 3. We run the pull hadoop job. One mapper per broker/partition pulls
>> using
>> > the simple consumer into hdfs (the KafkaETLRecordReader handles most of
>> > this). We query kafka for the latest offset. The mapper fetches from the
>> > kafka broker until the latest offset is reached.
>> >
>> > 4. We group the data by hourly partition with a reduce step.
>> >
>> > 5. The kafka hadoop job's mapper spits out new offsets for the next time
>> we
>> > decide to pull the data. The pull occurs at regular scheduled intervals
>> > quite frequently.
>> >
>> > That's the gist of it. There are a few additional modification we made to
>> > the kafka job including the ability to handle unavailable nodes, avro
>> > schema
>> > resolution and auditing.
>> >
>> > Thanks,
>> > -Richard
>> >
>> >
>> >
>> > On Tue, Oct 18, 2011 at 2:03 PM, Jay Kreps <jay.kreps@gmail.com> wrote:
>> >
>> > > Is it possible that this is due to a hadoop version mismatch? Typically
>> > if
>> > > the client jar you pick up does not match the hadoop version of your
>> > hadoop
>> > > cluster you get EOFException.
>> > >
>> > > -Jay
>> > >
>> > > On Tue, Oct 18, 2011 at 9:01 AM, Felix Giguere Villegas <
>> > > felix.giguere@mate1inc.com> wrote:
>> > >
>> > > > Hello everyone :) !
>> > > >
>> > > > I have trouble using the Kafka hadoop consumer included in
>> > > > contrib/hadoop-consumer and I'd like to know if/how it is used at
>> > > LinkedIn
>> > > > or elsewhere? I would also like if someone could confirm or correct
>> the
>> > > > assumptions I make below.
>> > > >
>> > > > Here's what I have so far:
>> > > >
>> > > > It works when pulling from one Kafka broker, but not when pulling
>> from
>> > > > many. There are two problems:
>> > > >
>> > > > The first problem concerns the offset files that the Map/Reduce job
>> > takes
>> > > > as
>> > > > its input. From what I understand, these offset files represent the
>> > > offset
>> > > > to start reading from on each of the Kafka brokers.
>> > > >
>> > > > To generate those files the first time (and thus start from offset
>> -1),
>> > > we
>> > > > can go in contrib/hadoop-consumer/ and run:
>> > > >
>> > > > ./run-class.sh kafka.etl.impl.DataGenerator
>> > my-properties-file.properties
>> > > >
>> > > > The problem is that this DataGenerator class can take only one Kafka
>> > > broker
>> > > > in its parameters (the properties file) and thus generates only one
>> > > offset
>> > > > file.
>> > > >
>> > > > The Map/Reduce job will then spawn one map task for each offset file
>> it
>> > > > finds in its input directory, and each of these map tasks will
>> connect
>> > to
>> > > a
>> > > > different Kafka broker. Since the DataGenerator can only generate
one
>> > > > offset
>> > > > file, the Map/Reduce job only spawns one map task which queries only
>> > one
>> > > > Kafka broker.
>> > > >
>> > > > Unless my assumptions are wrong or someone else provides a nice
>> > > alternative
>> > > > solution, I was planning to modify the DataGenerator class so that
it
>> > can
>> > > > generate multiple offset files, but for now, as a manual work-around,
>> I
>> > > > just
>> > > > duplicated the offset files and specified a different Kafka broker
in
>> > > each.
>> > > >
>> > > > Other than that, I am thinking perhaps a more robust solution would
>> be
>> > to
>> > > > have ZK-based discovery of the available brokers. Again, I'm curious
>> to
>> > > > find
>> > > > out how this is done at LinkedIn or elsewhere?
>> > > >
>> > > > The second problem is when I run the M/R job. If I run it with the
>> > > multiple
>> > > > offset files I manually generated as its input, it does spawn three
>> map
>> > > > tasks, as expected, but it then fails with the following error:
>> > > >
>> > > > java.io.IOException: java.io.EOFException
>> > > >        at
>> > > > kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:166)
>> > > >        at
>> > > kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:30)
>> > > >        at
>> > > >
>> > >
>> >
>> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:208)
>> > > >        at
>> > > >
>> > >
>> >
>> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:193)
>> > > >        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48)
>> > > >        at
>> > org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391)
>> > > >        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
>> > > >        at org.apache.hadoop.mapred.Child$4.run(Child.java:270)
>> > > >        at java.security.AccessController.doPrivileged(Native Method)
>> > > >        at javax.security.auth.Subject.doAs(Subject.java:396)
>> > > >        at
>> > > >
>> > >
>> >
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127)
>> > > >        at org.apache.hadoop.mapred.Child.main(Child.java:264)Caused
>> by:
>> > > > java.io.EOFException
>> > > >        at java.io.DataInputStream.readFully(DataInputStream.java:180)
>> > > >        at
>> > > >
>> > >
>> >
>> org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63)
>> > > >        at
>> > > >
>> org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101)
>> > > >        at
>> > > > org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:1945)
>> > > >        at
>> > > > org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2077)
>> > > >        at
>> > > >
>> > >
>> >
>> org.apache.hadoop.mapred.SequenceFileRecordReader.next(SequenceFileRecordReader.java:76)
>> > > >        at
>> > > > kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:128)
>> > > >        ... 11 more
>> > > >
>> > > >
>> > > > It fails before writing anything whatsoever, and it fails repeatedly
>> > for
>> > > > each Map task until the JobTracker reaches the maximum amount of
>> > failures
>> > > > per task and marks the job as failed.
>> > > >
>> > > > I haven't figured this one out yet...
>> > > >
>> > > > Any help would be greatly appreciated :) !
>> > > >
>> > > > Thanks :) !!!!
>> > > >
>> > > > --
>> > > > Felix
>> > > >
>> > >
>> >
>>
>

-- 
Hisham Mardam Bey

A: Because it messes up the order in which people normally read text.
Q: Why is top-posting such a bad thing?
A: Top-posting.
Q: What is the most annoying thing in e-mail?

-=[ Codito Ergo Sum ]=-

Mime
View raw message