kafka-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Guozhang Wang <wangg...@gmail.com>
Subject Re: Can I config the consumer auto-offset-reset to latest of kafka stream
Date Fri, 08 Dec 2017 02:44:01 GMT
Hello,

I think you are hitting https://issues.apache.org/jira/browse/KAFKA-4361,
which is fixed in 0.10.1.1 and beyond.


Guozhang




On Thu, Dec 7, 2017 at 12:49 AM, zhaoiwei@163.com <zhaoiwei@163.com> wrote:

> Hello,
>     Working with kafka 0.10.1.0, I used these config
>
> val props = new Properties
>
> props.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId)
>
> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, broker)
>
> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String.getClass)
>
> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.Integer.getClass)
>
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
> "latest")
>
>
>
>
> but these code
> props.put(StreamsConfig.consumerPrefix(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG),
> "latest")
> does not work, what is the reason?
>
>
> I read the code of org.apache.kafka.streams.StreamsConfig, there has some
> code:
>
>
>
> private static final Map<String, Object> CONSUMER_DEFAULT_OVERRIDES;
>
> static
>
> {
>
>     Map<String, Object> tempConsumerDefaultOverrides = new HashMap<>();
>
>     tempConsumerDefaultOverrides.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG,
> "1000");
>
>     tempConsumerDefaultOverrides.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,
> "earliest");
>
>     tempConsumerDefaultOverrides.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,
> "false");
>
>
>
>
>     CONSUMER_DEFAULT_OVERRIDES = Collections.unmodifiableMap(
> tempConsumerDefaultOverrides);
>
> }
>
>
>
> public Map<String, Object> getConsumerConfigs(StreamThread streamThread,
> String groupId, String clientId) throws ConfigException {
>
>     final Map<String, Object> consumerProps = getClientPropsWithPrefix(CONSUMER_PREFIX,
> ConsumerConfig.configNames());
>
>
>
>
>     // disable auto commit and throw exception if there is user overridden
> values,
>
>     // this is necessary for streams commit semantics
>
>     if (consumerProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))
> {
>
>         throw new ConfigException("Unexpected user-specified consumer
> config " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
>
>                 + ", as the streams client will always turn off auto
> committing.");
>
>     }
>
>
>
>
>     consumerProps.putAll(CONSUMER_DEFAULT_OVERRIDES);
>
>
>
>
>     // bootstrap.servers should be from StreamsConfig
>
>     consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
> this.originals().get(BOOTSTRAP_SERVERS_CONFIG));
>
>     // add client id with stream client id prefix, and group id
>
>     consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
>
>     consumerProps.put(CommonClientConfigs.CLIENT_ID_CONFIG, clientId +
> "-consumer");
>
>
>
>
>     // add configs required for stream partition assignor
>
>     consumerProps.put(StreamsConfig.InternalConfig.STREAM_THREAD_INSTANCE,
> streamThread);
>
>     consumerProps.put(StreamsConfig.REPLICATION_FACTOR_CONFIG,
> getInt(REPLICATION_FACTOR_CONFIG));
>
>     consumerProps.put(StreamsConfig.NUM_STANDBY_REPLICAS_CONFIG,
> getInt(NUM_STANDBY_REPLICAS_CONFIG));
>
>     consumerProps.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
> StreamPartitionAssignor.class.getName());
>
>     consumerProps.put(StreamsConfig.WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG,
> getLong(WINDOW_STORE_CHANGE_LOG_ADDITIONAL_RETENTION_MS_CONFIG));
>
>     if (!getString(ZOOKEEPER_CONNECT_CONFIG).equals("")) {
>
>         consumerProps.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG,
> getString(ZOOKEEPER_CONNECT_CONFIG));
>
>     }
>
>
>
>
>     consumerProps.put(APPLICATION_SERVER_CONFIG,
> getString(APPLICATION_SERVER_CONFIG));
>
>     return consumerProps;
>
> }
>
>
>
>
> It will be use the CONSUMER_DEFAULT_OVERRIDES override the config of I
> set?




-- 
-- Guozhang

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message