kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Joe Stein <crypt...@gmail.com>
Subject Re: Unable to send and consume compressed events.
Date Fri, 30 Aug 2013 16:57:22 GMT
I think it is still good to have this one in the FAQ even issue sometimes
folks need to know where to work around and how things until there fixed

here is the JIRA for the defect

https://issues.apache.org/jira/browse/KAFKA-1037 <== great place I think
for someone looking to jump in and start contributing to the code base =8^)


On Fri, Aug 30, 2013 at 12:52 PM, Joe Stein <cryptcom@gmail.com> wrote:

> yeah the error should have showed up , will create JIRA
>
> On Fri, Aug 30, 2013 at 12:32 PM, Jay Kreps <jay.kreps@gmail.com> wrote:
>
>> This seems like more of a bug then a FAQ, no? We are swallowing the
>> exception...
>>
>> -Jay
>>
>>
>> On Thu, Aug 29, 2013 at 11:30 PM, Lu Xuechao <luxuec@gmail.com> wrote:
>>
>> > Hi Jun,
>> >
>> > Thanks for you help. Finally, I found the reason by enabling producer
>> side
>> > DEBUG info output. The snappy jar is not included in the classpath.
>> Added
>> > it and it worked.
>> >
>> > Thanks again.
>> >
>> >
>> >
>> >
>> > On Fri, Aug 30, 2013 at 12:53 PM, Lu Xuechao <luxuec@gmail.com> wrote:
>> >
>> > > No.
>> > >
>> > >
>> > > On Fri, Aug 30, 2013 at 11:57 AM, Jun Rao <junrao@gmail.com> wrote:
>> > >
>> > >> These are the metadata requests. Do you see Producer requests from
>> your
>> > >> client?
>> > >>
>> > >> Thanks,
>> > >>
>> > >> Jun
>> > >>
>> > >>
>> > >> On Thu, Aug 29, 2013 at 5:40 PM, Lu Xuechao <luxuec@gmail.com>
>> wrote:
>> > >>
>> > >> > After I sent 1,000 compressed events, I saw these messages in
>> broker's
>> > >> log
>> > >> > files:
>> > >> >
>> > >> > in kafka-request.log
>> > >> >
>> > >> > [2013-08-30 08:38:18,713] TRACE Processor 6 received request :
>> Name:
>> > >> > TopicMetadataRequest; Version: 0; CorrelationId: 0; ClientId:
;
>> > Topics:
>> > >> > topic1 (kafka.network.RequestChannel$)
>> > >> > [2013-08-30 08:38:18,718] TRACE Completed request:Name:
>> > >> > TopicMetadataRequest; Version: 0; CorrelationId: 0; ClientId:
;
>> > Topics:
>> > >> > topic1 from client
>> > >> > /127.0.0.1:64238
>> > >> > ;totalTime:5,queueTime:3,localTime:1,remoteTime:0,sendTime:1
>> > >> > (kafka.request.logger)
>> > >> >
>> > >> >
>> > >> > in server.log
>> > >> >
>> > >> > [2013-08-30 08:38:18,759] INFO Closing socket connection to /
>> > 127.0.0.1.
>> > >> > (kafka.network.Processor)
>> > >> >
>> > >> >
>> > >> > any ideas?  Thanks.
>> > >> >
>> > >> >
>> > >> > On Thu, Aug 29, 2013 at 10:28 PM, Jun Rao <junrao@gmail.com>
>> wrote:
>> > >> >
>> > >> > > Did you see any error in the producer log? Did the broker
receive
>> > the
>> > >> > > produce request (you can look at the request log in the broker)?
>> > >> > >
>> > >> > > Thanks,
>> > >> > >
>> > >> > > Jun
>> > >> > >
>> > >> > >
>> > >> > > On Thu, Aug 29, 2013 at 6:29 AM, Lu Xuechao <luxuec@gmail.com>
>> > wrote:
>> > >> > >
>> > >> > > > Let me post my test code here. I could see producer.send(data);
>> > >> > returned
>> > >> > > > with no error.
>> > >> > > >
>> > >> > > > public class TestProducer extends Thread {
>> > >> > > >     private final Producer<String, String> producer;
>> > >> > > >
>> > >> > > >     private final int m_events;
>> > >> > > >     private final int m_threadNumber;
>> > >> > > >
>> > >> > > >     private static String msg = StringUtils.rightPad("",
1000,
>> > '*');
>> > >> > > >
>> > >> > > >     public TestProducer(int threadNumber, int events)
{
>> > >> > > >         m_threadNumber = threadNumber;
>> > >> > > >         m_events = events;
>> > >> > > >
>> > >> > > >         Properties props = new Properties();
>> > >> > > >         props.put("serializer.class",
>> > >> > > KafkaProperties.p_serializer_class);
>> > >> > > >         props.put("metadata.broker.list",
>> > >> > > > KafkaProperties.p_metadata_broker_list);
>> > >> > > >         props.put("partitioner.class",
>> > >> > > > KafkaProperties.p_partitioner_class);
>> > >> > > >         props.put("queue.enqueue.timeout.ms",
>> > >> > > > KafkaProperties.p_queue_enqueue_timeout);
>> > >> > > >         props.put("request.required.acks",
>> > >> > > > KafkaProperties.p_request_required_acks);
>> > >> > > >         props.put("producer.type",
>> > KafkaProperties.p_producer_type);
>> > >> > > >
>> > >> > > >         props.put("batch.num.messages",
>> > >> KafkaProperties.p_batch_num);
>> > >> > > >
>> > >> > > >         props.put("compression.codec",
>> > >> > > > KafkaProperties.p_compression_codec);
>> > >> > > >
>> > >> > > >         ProducerConfig config = new ProducerConfig(props);
>> > >> > > >         producer = new Producer<String, String>(config);
>> > >> > > >     }
>> > >> > > >
>> > >> > > >     @Override
>> > >> > > >     public void run() {
>> > >> > > >         long start;
>> > >> > > >         long num = 0;
>> > >> > > >         System.out.println(new Date() + " - Message
sent
>> thread "
>> > +
>> > >> > > > m_threadNumber + " started.");
>> > >> > > >         while (true) {
>> > >> > > >             start = System.currentTimeMillis();
>> > >> > > >             String messageStr = new String(num + "_"
+ start);
>> > >> > > >             KeyedMessage<String, String> data
= new
>> > >> > KeyedMessage<String,
>> > >> > > > String>(KafkaProperties.topic, messageStr,
>> > >> > > >                     start + "_" + msg);
>> > >> > > >             producer.send(data);
>> > >> > > >             num++;
>> > >> > > >             if (num == m_events) {
>> > >> > > >                 break;
>> > >> > > >             }
>> > >> > > >         }
>> > >> > > >         producer.close();
>> > >> > > >         System.out.println(new Date() + " - Message
sent
>> thread "
>> > +
>> > >> > > > m_threadNumber + " end.   " + num
>> > >> > > >                 + " messages sent.");
>> > >> > > >     }
>> > >> > > > }
>> > >> > > >
>> > >> > > >
>> > >> > > > public interface KafkaProperties {
>> > >> > > >     final static String zookeeper_connect = "127.0.0.1:2181";
>> > >> > > >     final static String group_id = "group1";
>> > >> > > >     final static String topic = "topic1";
>> > >> > > >
>> > >> > > >     final static String p_serializer_class =
>> > >> > > > "kafka.serializer.StringEncoder";
>> > >> > > >     final static String p_metadata_broker_list = "
>> 127.0.0.1:9092
>> > ";
>> > >> > > >     final static String p_partitioner_class =
>> > >> > > > "kafka.producer.DefaultPartitioner";
>> > >> > > >
>> > >> > > >     final static String p_queue_enqueue_timeout = "-1";
>> > >> > > >     final static String p_request_required_acks = "1";
>> > >> > > >     final static String p_producer_type = "async";
>> > >> > > >     final static String p_batch_num = "100";
>> > >> > > >     final static String p_compression_codec = "1";
>> > >> > > >     final static String p_message_send_retries = "3";
>> > >> > > >     final static String p_retry_backoff_ms = "200";
>> > >> > > >     final static String p_topic_metadata_refresh = "600000";
>> > >> > > > }
>> > >> > > >
>> > >> > > >
>> > >> > > > On Thu, Aug 29, 2013 at 9:24 PM, Lu Xuechao <luxuec@gmail.com>
>> > >> wrote:
>> > >> > > >
>> > >> > > > > Thanks Paul. Yes, I am using 0.8 beta1.  I followed
your
>> > >> suggestion
>> > >> > to
>> > >> > > > set
>> > >> > > > > request.required.acks=1 and got the same result.
No error
>> > message
>> > >> > seen
>> > >> > > in
>> > >> > > > > broker logs, the size of the partition files were
after
>> sending
>> > >> > > 1,000,000
>> > >> > > > > events, the size of each event was 1KB :
>> > >> > > > >
>> > >> > > > > 00000000000000000000.index  10240 KB
>> > >> > > > > 00000000000000000000.log  0KB
>> > >> > > > >
>> > >> > > > > The broker configurations:
>> > >> > > > >
>> > >> > > > > num.partitions=5
>> > >> > > > > log.flush.interval.messages=20000
>> > >> > > > > log.flush.interval.ms=5000
>> > >> > > > >
>> > >> > > > > log.flush.scheduler.interval.ms=1000
>> > >> > > > > log.retention.hours=1
>> > >> > > > > log.segment.bytes=1073741824
>> > >> > > > > log.cleanup.interval.mins=30
>> > >> > > > >
>> > >> > > > > queued.max.requests=16
>> > >> > > > > fetch.purgatory.purge.interval.requests=100
>> > >> > > > > producer.purgatory.purge.interval.requests=100
>> > >> > > > >
>> > >> > > > > It works if I change the code to
>> props.put("compression.codec",
>> > >> "0");
>> > >> > > > >
>> > >> > > > > thanks,
>> > >> > > > > xlu
>> > >> > > > >
>> > >> > > > > On Thu, Aug 29, 2013 at 6:48 PM, Paul Mackles <
>> > pmackles@adobe.com
>> > >> >
>> > >> > > > wrote:
>> > >> > > > >
>> > >> > > > >> I assume this is kafka 0.8, right? Are there
any
>> corresponding
>> > >> > errors
>> > >> > > in
>> > >> > > > >> the broker logs? With the configuration below,
I don't think
>> > any
>> > >> > > errors
>> > >> > > > >> will be reported back to the producer.
>> > >> > > > >>
>> > >> > > > >> You could also try setting erquest.required.acks=1
to see if
>> > >> errors
>> > >> > > are
>> > >> > > > >> reported back to the client.
>> > >> > > > >>
>> > >> > > > >> On 8/29/13 4:40 AM, "Lu Xuechao" <luxuec@gmail.com>
wrote:
>> > >> > > > >>
>> > >> > > > >> >Hi ,
>> > >> > > > >> >
>> > >> > > > >> >I am trying to enable gzip compression
for my events. But
>> > after
>> > >> I
>> > >> > > > >> switched
>> > >> > > > >> >compression.codec to "1" I found the produced
events were
>> even
>> > >> not
>> > >> > be
>> > >> > > > >> >persisted to disk log file. Of course,
the consumer could
>> not
>> > >> > receive
>> > >> > > > any
>> > >> > > > >> >compressed events. I sent 10,000 or more
events but the
>> > broker's
>> > >> > log
>> > >> > > > file
>> > >> > > > >> >not changed. Seems no events were actually
send to broker?
>> > >> Below is
>> > >> > > my
>> > >> > > > >> >producer's code:
>> > >> > > > >> >
>> > >> > > > >> >        Properties props = new Properties();
>> > >> > > > >> >        props.put("serializer.class",
>> > >> > > > "kafka.serializer.StringEncoder");
>> > >> > > > >> >        props.put("metadata.broker.list",
"127.0.0.1:9092
>> ");
>> > >> > > > >> >        props.put("partitioner.class",
>> > >> > > > >> >"kafka.producer.DefaultPartitioner");
>> > >> > > > >> >        props.put("queue.enqueue.timeout.ms",
"-1");
>> > >> > > > >> >        props.put("request.required.acks",
"0");
>> > >> > > > >> >        props.put("producer.type", "async");
>> > >> > > > >> >
>> > >> > > > >> >        props.put("batch.num.messages",
"100");
>> > >> > > > >> >
>> > >> > > > >> >        props.put("compression.codec",
"1");
>> > >> > > > >> >
>> > >> > > > >> >        ProducerConfig config = new ProducerConfig(props);
>> > >> > > > >> >        producer = new Producer<String,
String>(config);
>> > >> > > > >> >
>> > >> > > > >> >        KeyedMessage<String, String>
data = new
>> > >> > KeyedMessage<String,
>> > >> > > > >> >String>("topic1", messageStr, msg);
>> > >> > > > >> >        producer.send(data);
>> > >> > > > >> >
>> > >> > > > >> >
>> > >> > > > >> >If I comment out this line of code :
>> > >> props.put("compression.codec",
>> > >> > > > "1");
>> > >> > > > >> >then everything works fine. Did I miss
something?
>> > >> > > > >> >
>> > >> > > > >> >thanks,
>> > >> > > > >> >xlu
>> > >> > > > >>
>> > >> > > > >>
>> > >> > > > >
>> > >> > > >
>> > >> > >
>> > >> >
>> > >>
>> > >
>> > >
>> >
>>
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message