kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Richard Park <richard.b.p...@gmail.com>
Subject Re: jobtracker / hadoop comsumer
Date Wed, 31 Aug 2011 19:48:47 GMT
It really looks like your mapper tasks may be failing to connect to your
kafka server.

Here's a brief overview of what that demo job is doing so you can understand
where the example may have gone wrong.

   1. When DataGenerator is run, it needs the property 'kafka.etl.topic',
   and 'kafka.server.uri' set in the properties file. When you run
   kafka.etl.impl.DataGenerator test/test.properties, you can tell that
   they're properly set by the output 'topic=<blah>' and 'server uri=<kafka
   server url>.
   2. The DataGenerator will create a bunch of dummy messages and pump it to
   that kafka server. Afterwards, it will write a file to HDFS at path 'input'
   which you also set in the properties file. The file that is created will be
   named something like 1.dat.
   3. 1.dat is a sequence file, so if it isn't compressed, you should be
   able to see its contents in plain text. The contents will essentially list
   the kafka server url, the partition number and the topic as well as the
   4. In a real scenario, you'll probably create several of these files for
   each broker and possibly partition, but for this example, you only need one
   file. Each file will spawn a mapper during the mapred step.


   1. This should copy the necessary jars for kafka hadoop, and push them
   into HDFS for the distributed cache. If the jars are copied locally instead
   of to a remote cluster, most likely HADOOP_CONF_DIR hasn't been set up
   correctly. The environment should probably be set by the script, so someone
   can change that.


   1. This job will then setup the distributed classpath, and the input path
   should be the directory that 1.dat was written to.
   2. Internally, the mappers will then load 1.dat and use the connection
   data contained in it to connect to kafka. If it's trying to connect to
   anything but your kafka server, than this file was incorrectly written.
   3. The RecordReader wraps all of this and hides all the connection stuff
   so that your Mapper should see a stream of Kafka messages rather than the
   contents of 1.dat.

So please see if you can figure out what is wrong with your example and feel
free beef up the README instructions to take in account your pitfalls.


On Wed, Aug 31, 2011 at 12:02 PM, Ben Ciceron <ben@triggit.com> wrote:

> ok i could live with setting mapred.job.tracker manually for the code for
> now.
> This way it can connect now to the proper jobtracker.
> > The hadoop map tasks will need to connect to the kafka server port (the
> > broker uri/port).
> i run the hadoop soncumer on the same hostA where the kafka-server is
> running.
> each of the host in the hadoop cluster can telnet/nmap to port 9092 on
> hostA where the kafka-server is running.
> also HostA can connect to port 5181 on any host in the cluster.
> but each task fails with a similar connection issue :
> java.io.IOException: java.net.ConnectException: Connection refused
>        at
> kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:155)
>        at kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:14)
>        at
> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:210)
>        at
> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:195)
>        at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48)
>        at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:393)
>        at org.apache.hadoop.mapred.MapTask.run(MapTask.java:326)
>        at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
>        at java.security.AccessController.doPrivileged(Native Method)
>        at javax.security.auth.Subject.doAs(Subject.java:396)
>        at
> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1074)
>        at org.apache.hadoop.mapred.Child.main(Child.java:262)
> Caused by: java.net.ConnectException: Connection refused
>        at sun.nio.ch.Net.connect(Native Method)
>        at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:500)
>        at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:54)
>        at
> kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:193)
>        at
> kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:156)
>        at
> kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:65)
>        at
> kafka.etl.KafkaETLContext.getOffsetRange(KafkaETLContext.java:209)
>        at kafka.etl.KafkaETLContext.<init>(KafkaETLContext.java:97)
>        at
> kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:115)
>        ... 11 more

  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message