kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Felix GV <fe...@mate1inc.com>
Subject Re: How to use the hadoop consumer in distributed mode?
Date Wed, 26 Oct 2011 20:10:01 GMT
Hi,

I wanted to give a little update on this topic.

I was able to make hadoop-consumer work with a kafka cluster.

What I did is:

   1. I generated a .properties file for one of the kafka brokers I wanted
   to connect to.
   2. I ran the DataGenerator program by passing the .properties file as a
   parameter.
   3. I moved the 1.dat offset file generated in HDFS so that it has another
   name (so that it's not overwritten the next time I run the DataGenerator).
   4. I changed the the broker's address in the .properties file to the next
   server I wanted to connect to.
   5. I repeated step 2 to 4 for every kafka server in the cluster.
   6. I then ran SimpleKafkaETLJob and it was able to spawn one map task per
   broker and pull all the data from each.

This is almost exactly what I was trying before, except that before, I had
manually modified the .dat offset files instead of generating each one with
the DataGenerator, and I think vim didn't play nice with the SEQ files or
something like that... I don't know.

Anyhow, what I'm doing now is a little convoluted but at least it works... I
will create a script that does all this repetitive stuff for me. Ideally, I
would also like to pull the brokers list from ZK, like you guys do.

The Kafka/Hadoop ETL tools you mentioned are no doubt more mature and
complete than the stuff I will create, so it would be really nice if you
could release it.

I think releasing those tools would help drive the adoption of Kafka,
because in the state it's in now, Kafka is not really plug and play. That
is, it works (which is already better than a lot of open source projects out
there ;) !) but it seems a rather important part is missing.

--
Felix



On Tue, Oct 18, 2011 at 7:31 PM, Hisham Mardam-Bey <hisham@mate1inc.com>wrote:

> Hi folks, been following this thread, Felix and I are working together
> on this project, we really like Kafka and are moving it into
> production very soon.
>
> Jay, question, would you guys consider releasing the code in a "not so
> clean state" and have the community (we would like to help) shore it
> up so it becomes usable by the masses or are there other issues
> (legal?) you have to sort out first?
>
> Thanks!
>
> hisham.
>
> On Tue, Oct 18, 2011 at 6:28 PM, Jay Kreps <jay.kreps@gmail.com> wrote:
> > I would actually love for us to release the full ETL system we have for
> > Kafka/Hadoop, it is just a matter of finding the time to get this code
> into
> > that shape.
> >
> > The hadoop team that maintains that code is pretty busy right now, but i
> am
> > hoping we can find a way.
> >
> > -Jay
> >
> > On Tue, Oct 18, 2011 at 3:18 PM, Felix Giguere Villegas <
> > felix.giguere@mate1inc.com> wrote:
> >
> >> Thanks for your replies guys :)
> >>
> >> @Jay: I thought about the Hadoop version mismatch too, because I've had
> the
> >> same problem before. I'll double check again to make sure I have the
> same
> >> versions of hadoop everywhere, as the Kafka distributed cluster I was
> >> testing on is a new setup and I might have forgotten to put the hadoop
> jars
> >> we use in it... I'm working part-time for now so I probably won't touch
> >> this
> >> again until next week but I'll keep you guys posted ASAP :)
> >>
> >> @Richard: Thanks a lot for your description. That clears out the
> >> inaccuracies in my understanding. Is there any chance you guys might
> >> release
> >> the code you use to query ZK and create appropriate offset files for
> each
> >> broker/partition pair? The hadoop consumer provided in the source works
> >> with
> >> the setup we get from the quickstart guide, but the process you describe
> >> seems more appropriate for production use.
> >>
> >> Thanks again :)
> >>
> >> --
> >> Felix
> >>
> >>
> >>
> >> On Tue, Oct 18, 2011 at 5:52 PM, Richard Park <richard.b.park@gmail.com
> >> >wrote:
> >>
> >> > Does the version in contrib contain the fixes for Kafka-131? The
> offsets
> >> > were incorrectly computed prior to this patch.
> >> >
> >> > At LinkedIn, this is what we do in a nutshell.
> >> > 1. We connect to the zookeeper instance. With this we are able to
> >> discover
> >> > the topics, the brokers and the partitions of a broker.
> >> >
> >> > 2. For a topic we want to pull, we create files that contains the
> offset
> >> > for
> >> > each broker and partition.  Each individual file contains a unique
> >> > broker/partition pair. This is essentially what data generator does,
> >> except
> >> > we use values from zookeeper. We take the output of the previous run
> of
> >> > kafka (the new offsets) and use them as the new offset files. If the
> old
> >> > offset doesn't exist, we set a default starting offset.
> >> >
> >> > 3. We run the pull hadoop job. One mapper per broker/partition pulls
> >> using
> >> > the simple consumer into hdfs (the KafkaETLRecordReader handles most
> of
> >> > this). We query kafka for the latest offset. The mapper fetches from
> the
> >> > kafka broker until the latest offset is reached.
> >> >
> >> > 4. We group the data by hourly partition with a reduce step.
> >> >
> >> > 5. The kafka hadoop job's mapper spits out new offsets for the next
> time
> >> we
> >> > decide to pull the data. The pull occurs at regular scheduled
> intervals
> >> > quite frequently.
> >> >
> >> > That's the gist of it. There are a few additional modification we made
> to
> >> > the kafka job including the ability to handle unavailable nodes, avro
> >> > schema
> >> > resolution and auditing.
> >> >
> >> > Thanks,
> >> > -Richard
> >> >
> >> >
> >> >
> >> > On Tue, Oct 18, 2011 at 2:03 PM, Jay Kreps <jay.kreps@gmail.com>
> wrote:
> >> >
> >> > > Is it possible that this is due to a hadoop version mismatch?
> Typically
> >> > if
> >> > > the client jar you pick up does not match the hadoop version of your
> >> > hadoop
> >> > > cluster you get EOFException.
> >> > >
> >> > > -Jay
> >> > >
> >> > > On Tue, Oct 18, 2011 at 9:01 AM, Felix Giguere Villegas <
> >> > > felix.giguere@mate1inc.com> wrote:
> >> > >
> >> > > > Hello everyone :) !
> >> > > >
> >> > > > I have trouble using the Kafka hadoop consumer included in
> >> > > > contrib/hadoop-consumer and I'd like to know if/how it is used
at
> >> > > LinkedIn
> >> > > > or elsewhere? I would also like if someone could confirm or
> correct
> >> the
> >> > > > assumptions I make below.
> >> > > >
> >> > > > Here's what I have so far:
> >> > > >
> >> > > > It works when pulling from one Kafka broker, but not when pulling
> >> from
> >> > > > many. There are two problems:
> >> > > >
> >> > > > The first problem concerns the offset files that the Map/Reduce
> job
> >> > takes
> >> > > > as
> >> > > > its input. From what I understand, these offset files represent
> the
> >> > > offset
> >> > > > to start reading from on each of the Kafka brokers.
> >> > > >
> >> > > > To generate those files the first time (and thus start from offset
> >> -1),
> >> > > we
> >> > > > can go in contrib/hadoop-consumer/ and run:
> >> > > >
> >> > > > ./run-class.sh kafka.etl.impl.DataGenerator
> >> > my-properties-file.properties
> >> > > >
> >> > > > The problem is that this DataGenerator class can take only one
> Kafka
> >> > > broker
> >> > > > in its parameters (the properties file) and thus generates only
> one
> >> > > offset
> >> > > > file.
> >> > > >
> >> > > > The Map/Reduce job will then spawn one map task for each offset
> file
> >> it
> >> > > > finds in its input directory, and each of these map tasks will
> >> connect
> >> > to
> >> > > a
> >> > > > different Kafka broker. Since the DataGenerator can only generate
> one
> >> > > > offset
> >> > > > file, the Map/Reduce job only spawns one map task which queries
> only
> >> > one
> >> > > > Kafka broker.
> >> > > >
> >> > > > Unless my assumptions are wrong or someone else provides a nice
> >> > > alternative
> >> > > > solution, I was planning to modify the DataGenerator class so
that
> it
> >> > can
> >> > > > generate multiple offset files, but for now, as a manual
> work-around,
> >> I
> >> > > > just
> >> > > > duplicated the offset files and specified a different Kafka broker
> in
> >> > > each.
> >> > > >
> >> > > > Other than that, I am thinking perhaps a more robust solution
> would
> >> be
> >> > to
> >> > > > have ZK-based discovery of the available brokers. Again, I'm
> curious
> >> to
> >> > > > find
> >> > > > out how this is done at LinkedIn or elsewhere?
> >> > > >
> >> > > > The second problem is when I run the M/R job. If I run it with
the
> >> > > multiple
> >> > > > offset files I manually generated as its input, it does spawn
> three
> >> map
> >> > > > tasks, as expected, but it then fails with the following error:
> >> > > >
> >> > > > java.io.IOException: java.io.EOFException
> >> > > >        at
> >> > > > kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:166)
> >> > > >        at
> >> > > kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:30)
> >> > > >        at
> >> > > >
> >> > >
> >> >
> >>
> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:208)
> >> > > >        at
> >> > > >
> >> > >
> >> >
> >>
> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:193)
> >> > > >        at
> org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48)
> >> > > >        at
> >> > org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:391)
> >> > > >        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:325)
> >> > > >        at org.apache.hadoop.mapred.Child$4.run(Child.java:270)
> >> > > >        at java.security.AccessController.doPrivileged(Native
> Method)
> >> > > >        at javax.security.auth.Subject.doAs(Subject.java:396)
> >> > > >        at
> >> > > >
> >> > >
> >> >
> >>
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1127)
> >> > > >        at
> org.apache.hadoop.mapred.Child.main(Child.java:264)Caused
> >> by:
> >> > > > java.io.EOFException
> >> > > >        at
> java.io.DataInputStream.readFully(DataInputStream.java:180)
> >> > > >        at
> >> > > >
> >> > >
> >> >
> >>
> org.apache.hadoop.io.DataOutputBuffer$Buffer.write(DataOutputBuffer.java:63)
> >> > > >        at
> >> > > >
> >> org.apache.hadoop.io.DataOutputBuffer.write(DataOutputBuffer.java:101)
> >> > > >        at
> >> > > >
> org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:1945)
> >> > > >        at
> >> > > >
> org.apache.hadoop.io.SequenceFile$Reader.next(SequenceFile.java:2077)
> >> > > >        at
> >> > > >
> >> > >
> >> >
> >>
> org.apache.hadoop.mapred.SequenceFileRecordReader.next(SequenceFileRecordReader.java:76)
> >> > > >        at
> >> > > > kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:128)
> >> > > >        ... 11 more
> >> > > >
> >> > > >
> >> > > > It fails before writing anything whatsoever, and it fails
> repeatedly
> >> > for
> >> > > > each Map task until the JobTracker reaches the maximum amount
of
> >> > failures
> >> > > > per task and marks the job as failed.
> >> > > >
> >> > > > I haven't figured this one out yet...
> >> > > >
> >> > > > Any help would be greatly appreciated :) !
> >> > > >
> >> > > > Thanks :) !!!!
> >> > > >
> >> > > > --
> >> > > > Felix
> >> > > >
> >> > >
> >> >
> >>
> >
>
> --
> Hisham Mardam Bey
>
> A: Because it messes up the order in which people normally read text.
> Q: Why is top-posting such a bad thing?
> A: Top-posting.
> Q: What is the most annoying thing in e-mail?
>
> -=[ Codito Ergo Sum ]=-
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message