spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Cody Koeninger <c...@koeninger.org>
Subject Re: Providing Kafka configuration as Map of Strings
Date Wed, 24 Jan 2018 22:31:12 GMT
Have you tried passing in a Map<String,Object> that happens to have
string for all the values?  I haven't tested this, but the underlying
kafka consumer constructor is documented to take either strings or
objects as values, despite the static type.

On Wed, Jan 24, 2018 at 2:48 PM, Tecno Brain
<cerebrotecnologico@gmail.com> wrote:
> Basically, I am trying to avoid writing code like:
>
>       switch( key ) {
>                 case "key.deserializer" :  result.put(key ,
> Class.forName(value)); break;
>                 case "key.serializer"   :  result.put(key ,
> Class.forName(value)); break;
>                 case "value.deserializer" :  result.put(key ,
> Class.forName(value)); break;
>                 case "value.serializer"   :  result.put(key ,
> Class.forName(value)); break;
>                 case "max.partition.fetch.bytes" : result.put(key,
> Long.valueOf(value)); break;
>                 case "max.poll.interval.ms" : result.put(key,
> Long.valueOf(value)); break;
>                 case "enable.auto.commit" : result.put(key,
> Boolean.valueOf(value)); break;
>                 default:
>                     result.put(key, value);
>                     break;
>             }
>
> since I would need to go over all possible Kafka properties that are not
> expected as a String.
>
> On Wed, Jan 24, 2018 at 12:32 PM, Tecno Brain <cerebrotecnologico@gmail.com>
> wrote:
>>
>> On page
>> https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html
>> there is this Java example:
>>
>> Map<String, Object> kafkaParams = new HashMap<>();
>> kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
>> kafkaParams.put("key.deserializer", StringDeserializer.class);
>> kafkaParams.put("value.deserializer", StringDeserializer.class);
>> kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
>> kafkaParams.put("auto.offset.reset", "latest");
>> kafkaParams.put("enable.auto.commit", false);
>>
>> Collection<String> topics = Arrays.asList("topicA", "topicB");
>>
>> JavaInputDStream<ConsumerRecord<String, String>> stream =
>>   KafkaUtils.createDirectStream(
>>     streamingContext,
>>     LocationStrategies.PreferConsistent(),
>>     ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
>>   );
>>
>> I would like to configure Kafka from properties loaded from a Properties
>> file or a Map<String, String>.
>>
>> Is there any API to take a Map<String, String> and produce the required
>> Map<String, Object> required to set the Kafka parameters ? Such code would
>> convert "true" to a boolean, or a class name to the Class depending on the
>> key.
>>
>> Seems to me that I would need to know ALL possible Kafka parameters and
>> what data type they should be converted to in order to produce the
>> Map<String, Object> kafkaParams.
>>
>> The older API used a Map<String, String> passed to the
>> KafkaUtils.createDirectStream
>>
>> Thanks
>>
>>
>>
>>
>>
>>
>

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Mime
View raw message