kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sam William <sa...@stumbleupon.com>
Subject Re: Kafka Sample Hadoop consumer.
Date Mon, 15 Aug 2011 21:56:32 GMT
I was using the kafka-v0.6 branch.. Seems like this Null pointer issue has been fixed on the
master.  

However,  I see that , _response.iterator() method is being invoked multiple times  on the
same response object.  This was what was causing the  loops.   Id have the iterator object
as the class attribute   instead of the   _response object.

Sam


On Aug 11, 2011, at 2:07 PM, Sam William wrote:

> Neha,
>    I followed the procedure described in 
> 
> https://github.com/kafka-dev/kafka/tree/kafka-v0.6/contrib/hadoop-consumer . I could
nt get the distributed cache working . So, for the time being I went ahead with copying the
jars to  the tasknodes to get it to run.
> 
> 
> Im not sure , if this particular portion of the examples is completely tested or if 
I have a wrong version of the file ..
> 
> For example .
> 
> The file KafkaETLContext.java 
> 
> 
> public boolean getNext(KafkaETLKey key, BytesWritable value) throws IOException {
>        if ( !hasMore() ) return false;
> 
>        boolean gotNext = get(key, value);
> 
>        Iterator<ByteBufferMessageSet> iter = _response.iterator();
> 
>        while ( !gotNext && _response != null && iter.hasNext()) {
>            ByteBufferMessageSet msgSet = iter.next();
>            if ( hasError(msgSet)) return false;
>            _messageIt =  (Iterator<Message>) msgSet.iterator();
>            gotNext = get(key, value);
>        }
> 
>        return gotNext;
>    }
> 
> 
> 
> 
> 
> The null check for _response is  comes after  Iterator<ByteBufferMessageSet> iter
= _response.iterator() and the first time this function is called , the _response object is
null. 
> 
> Another question ,
>  If i just generate say 2000 events .  and set kafka.fetch.limit = 3000 or (-1) , Id
expect the  the job to stop after reading the 2000 events, but it seems to keep looping from
the beginning. Am I missing something here ?
> 
> Sam
> 
> 
> 
> 
> 
> 
> 
> 
> 
> 
> On Aug 9, 2011, at 4:29 PM, Neha Narkhede wrote:
> 
>> Sam,
>> 
>> I tried this on a downloaded copy of kafka
>> v0.6<http://sna-projects.com/kafka/downloads/kafka-0.6.zip>-
>> 
>> nnarkhed-md:kafka nnarkhed$ jar tvf kafka-0.6.jar | grep
>> "kafka.javaapi.consumer.SimpleConsumer"
>> 3501 Tue May 24 10:23:24 PDT 2011
>> kafka/javaapi/consumer/SimpleConsumer.class
>> 
>> I suspect that the kafka-0.6.jar is not registered correctly with the
>> DistributedCache.
>> 
>> Thanks,
>> Neha
>> 
>> On Tue, Aug 9, 2011 at 3:32 PM, Sam William <sampd@stumbleupon.com> wrote:
>> 
>>> Im trying to run this  sample hadoop consumer in the 0.6 version .   I see
>>> that the jar files (including kafka-0.6.jar) are proper when being copied to
>>> the DistributedCache , but I get the exception
>>> 
>>> Error: java.lang.ClassNotFoundException:
>>> kafka.javaapi.consumer.SimpleConsumer
>>>      at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
>>>      at java.security.AccessController.doPrivileged(Native Method)
>>>      at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
>>>      at java.lang.ClassLoader.loadClass(ClassLoader.java:307)
>>>      at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
>>>      at java.lang.ClassLoader.loadClass(ClassLoader.java:248)
>>>      at kafka.etl.KafkaETLContext.(KafkaETLContext.java:93)
>>>      at
>>> kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:115)
>>>      at kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:14)
>>>      at
>>> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:192)
>>>      at
>>> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:176)
>>>      at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48)
>>>      at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358)
>>>      at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307)
>>>      at org.apache.hadoop.mapred.Child.main(Child.java:170)
>>> 
>>> 
>>> 
>>> Is there something Im missing here ?
>>> 
>>> 
>>> Sam William
>>> sampd@stumbleupon.com
>>> 
>>> 
>>> 
>>> 
> 
> Sam William
> sampd@stumbleupon.com
> 
> 
> 

Sam William
sampd@stumbleupon.com




Mime
View raw message