kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sam William <sa...@stumbleupon.com>
Subject Re: jobtracker / hadoop comsumer
Date Wed, 31 Aug 2011 23:40:13 GMT
Richard,
     I read somewhere that the  mappers write out the offset to the output dir , so that 
further attempts (after a task failure) can start from the right offset.   
I see that the  offset is generated .   But where is the logic to read this and adjust the
offset for the next read ? . I wasnt able to find it.

Sam

On Aug 31, 2011, at 12:48 PM, Richard Park wrote:

> It really looks like your mapper tasks may be failing to connect to your
> kafka server.
> 
> Here's a brief overview of what that demo job is doing so you can understand
> where the example may have gone wrong.
> DataGenerator:
> 
>   1. When DataGenerator is run, it needs the property 'kafka.etl.topic',
>   and 'kafka.server.uri' set in the properties file. When you run
> ./run-class.sh
>   kafka.etl.impl.DataGenerator test/test.properties, you can tell that
>   they're properly set by the output 'topic=<blah>' and 'server uri=<kafka
>   server url>.
>   2. The DataGenerator will create a bunch of dummy messages and pump it to
>   that kafka server. Afterwards, it will write a file to HDFS at path 'input'
>   which you also set in the properties file. The file that is created will be
>   named something like 1.dat.
>   3. 1.dat is a sequence file, so if it isn't compressed, you should be
>   able to see its contents in plain text. The contents will essentially list
>   the kafka server url, the partition number and the topic as well as the
>   offset.
>   4. In a real scenario, you'll probably create several of these files for
>   each broker and possibly partition, but for this example, you only need one
>   file. Each file will spawn a mapper during the mapred step.
> 
> CopyJars:
> 
>   1. This should copy the necessary jars for kafka hadoop, and push them
>   into HDFS for the distributed cache. If the jars are copied locally instead
>   of to a remote cluster, most likely HADOOP_CONF_DIR hasn't been set up
>   correctly. The environment should probably be set by the script, so someone
>   can change that.
> 
> SimpleKafkaETLJob
> 
>   1. This job will then setup the distributed classpath, and the input path
>   should be the directory that 1.dat was written to.
>   2. Internally, the mappers will then load 1.dat and use the connection
>   data contained in it to connect to kafka. If it's trying to connect to
>   anything but your kafka server, than this file was incorrectly written.
>   3. The RecordReader wraps all of this and hides all the connection stuff
>   so that your Mapper should see a stream of Kafka messages rather than the
>   contents of 1.dat.
> 
> So please see if you can figure out what is wrong with your example and feel
> free beef up the README instructions to take in account your pitfalls.
> 
> Thanks,
> -Richard
> 
> 
> 
> On Wed, Aug 31, 2011 at 12:02 PM, Ben Ciceron <ben@triggit.com> wrote:
> 
>> ok i could live with setting mapred.job.tracker manually for the code for
>> now.
>> This way it can connect now to the proper jobtracker.
>> 
>>> The hadoop map tasks will need to connect to the kafka server port (the
>>> broker uri/port).
>> 
>> i run the hadoop soncumer on the same hostA where the kafka-server is
>> running.
>> 
>> each of the host in the hadoop cluster can telnet/nmap to port 9092 on
>> hostA where the kafka-server is running.
>> also HostA can connect to port 5181 on any host in the cluster.
>> 
>> but each task fails with a similar connection issue :
>> java.io.IOException: java.net.ConnectException: Connection refused
>>       at
>> kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:155)
>>       at kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:14)
>>       at
>> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:210)
>>       at
>> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:195)
>>       at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48)
>>       at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:393)
>>       at org.apache.hadoop.mapred.MapTask.run(MapTask.java:326)
>>       at org.apache.hadoop.mapred.Child$4.run(Child.java:268)
>>       at java.security.AccessController.doPrivileged(Native Method)
>>       at javax.security.auth.Subject.doAs(Subject.java:396)
>>       at
>> org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1074)
>>       at org.apache.hadoop.mapred.Child.main(Child.java:262)
>> Caused by: java.net.ConnectException: Connection refused
>>       at sun.nio.ch.Net.connect(Native Method)
>>       at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:500)
>>       at kafka.consumer.SimpleConsumer.connect(SimpleConsumer.scala:54)
>>       at
>> kafka.consumer.SimpleConsumer.getOrMakeConnection(SimpleConsumer.scala:193)
>>       at
>> kafka.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:156)
>>       at
>> kafka.javaapi.consumer.SimpleConsumer.getOffsetsBefore(SimpleConsumer.scala:65)
>>       at
>> kafka.etl.KafkaETLContext.getOffsetRange(KafkaETLContext.java:209)
>>       at kafka.etl.KafkaETLContext.<init>(KafkaETLContext.java:97)
>>       at
>> kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:115)
>>       ... 11 more
>> 

Sam William
sampd@stumbleupon.com




Mime
View raw message