kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Sam William <sa...@stumbleupon.com>
Subject Re: Kafka Sample Hadoop consumer.
Date Thu, 11 Aug 2011 21:07:45 GMT
Neha,
    I followed the procedure described in 

https://github.com/kafka-dev/kafka/tree/kafka-v0.6/contrib/hadoop-consumer . I could nt get
the distributed cache working . So, for the time being I went ahead with copying the jars
to  the tasknodes to get it to run.


Im not sure , if this particular portion of the examples is completely tested or if  I have
a wrong version of the file ..

For example .

The file KafkaETLContext.java 


public boolean getNext(KafkaETLKey key, BytesWritable value) throws IOException {
        if ( !hasMore() ) return false;

        boolean gotNext = get(key, value);

        Iterator<ByteBufferMessageSet> iter = _response.iterator();

        while ( !gotNext && _response != null && iter.hasNext()) {
            ByteBufferMessageSet msgSet = iter.next();
            if ( hasError(msgSet)) return false;
            _messageIt =  (Iterator<Message>) msgSet.iterator();
            gotNext = get(key, value);
        }

        return gotNext;
    }





The null check for _response is  comes after  Iterator<ByteBufferMessageSet> iter =
_response.iterator() and the first time this function is called , the _response object is
null. 

Another question ,
  If i just generate say 2000 events .  and set kafka.fetch.limit = 3000 or (-1) , Id expect
the  the job to stop after reading the 2000 events, but it seems to keep looping from the
beginning. Am I missing something here ?

Sam










On Aug 9, 2011, at 4:29 PM, Neha Narkhede wrote:

> Sam,
> 
> I tried this on a downloaded copy of kafka
> v0.6<http://sna-projects.com/kafka/downloads/kafka-0.6.zip>-
> 
> nnarkhed-md:kafka nnarkhed$ jar tvf kafka-0.6.jar | grep
> "kafka.javaapi.consumer.SimpleConsumer"
>  3501 Tue May 24 10:23:24 PDT 2011
> kafka/javaapi/consumer/SimpleConsumer.class
> 
> I suspect that the kafka-0.6.jar is not registered correctly with the
> DistributedCache.
> 
> Thanks,
> Neha
> 
> On Tue, Aug 9, 2011 at 3:32 PM, Sam William <sampd@stumbleupon.com> wrote:
> 
>> Im trying to run this  sample hadoop consumer in the 0.6 version .   I see
>> that the jar files (including kafka-0.6.jar) are proper when being copied to
>> the DistributedCache , but I get the exception
>> 
>> Error: java.lang.ClassNotFoundException:
>> kafka.javaapi.consumer.SimpleConsumer
>>       at java.net.URLClassLoader$1.run(URLClassLoader.java:202)
>>       at java.security.AccessController.doPrivileged(Native Method)
>>       at java.net.URLClassLoader.findClass(URLClassLoader.java:190)
>>       at java.lang.ClassLoader.loadClass(ClassLoader.java:307)
>>       at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:301)
>>       at java.lang.ClassLoader.loadClass(ClassLoader.java:248)
>>       at kafka.etl.KafkaETLContext.(KafkaETLContext.java:93)
>>       at
>> kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:115)
>>       at kafka.etl.KafkaETLRecordReader.next(KafkaETLRecordReader.java:14)
>>       at
>> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.moveToNext(MapTask.java:192)
>>       at
>> org.apache.hadoop.mapred.MapTask$TrackedRecordReader.next(MapTask.java:176)
>>       at org.apache.hadoop.mapred.MapRunner.run(MapRunner.java:48)
>>       at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:358)
>>       at org.apache.hadoop.mapred.MapTask.run(MapTask.java:307)
>>       at org.apache.hadoop.mapred.Child.main(Child.java:170)
>> 
>> 
>> 
>> Is there something Im missing here ?
>> 
>> 
>> Sam William
>> sampd@stumbleupon.com
>> 
>> 
>> 
>> 

Sam William
sampd@stumbleupon.com




Mime
View raw message