flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From PedroMrChaves <pedro.mr.cha...@gmail.com>
Subject FlinkKafkaConsumer010 - Memory Issue
Date Wed, 19 Jul 2017 18:23:36 GMT
Hello,

Whenever I submit a job to Flink that retrieves data from Kafka the memory
consumption continuously increases. I've changed the max heap memory from
2gb, to 4gb, even to 6gb but the memory consumption keeps reaching the
limit. 

An example of a simple Job that shows this behavior is depicted bellow.  

/          /*
             * Execution Environment Setup
             */
            final StreamExecutionEnvironment environment =
getGlobalJobConfiguration(configDir, configurations);

            /**
             * Collect event data from Kafka
             */
            DataStreamSource<String> s = environment.addSource(new
FlinkKafkaConsumer010<String>(
                    configurations.get(ConfigKeys.KAFKA_INPUT_TOPIC), 
                    new SimpleStringSchema(),
                    getKafkaConfiguration(configurations)));
            
            s.filter(new FilterFunction<String>() {
                @Override
                public boolean filter(String value) throws Exception {
                    return false;
                }
            }).print();

private static Properties getKafkaConfiguration(ParameterTool
configurations) {
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers",
configurations.get(ConfigKeys.KAFKA_HOSTS));
        properties.put("group.id",
"flink-consumer-"+UUID.randomUUID().toString());
        properties.put("value.deserializer",
"org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("security.protocol",
configurations.get(ConfigKeys.KAFKA_SECURITY_PROTOCOL));
        properties.put("ssl.truststore.location",
configurations.get(ConfigKeys.KAFKA_SSL_TRUSTSTORE_LOCATION));
        properties.put("ssl.truststore.password",
configurations.get(ConfigKeys.KAFKA_SSL_TRUSTSTORE_PASSWORD));
        properties.put("ssl.keystore.location",
configurations.get(ConfigKeys.KAFKA_SSL_KEYSTORE_LOCATION));
        properties.put("ssl.keystore.password",
configurations.get(ConfigKeys.KAFKA_SSL_KEYSTORE_PASSWORD));
        return properties;
    }
/


Moreover, when I stop the job, the task manager does not terminate the kafka
connection and the memory is kept allocated. To stop this, I have to kill
the task manager process. 

*My Flink version: 1.2.1
Kafka consumer: 010
Kafka version: 2_11_0.10.1.0-2*

I've activated the /taskmanager.debug.memory.startLogThread/ property to
output for every 5 seconds and attached the log with the results.

The output of free -m before submitting the job:
/              total        used        free      shared  buff/cache  
available
Mem:          15817         245       14755          24         816      
15121
Swap:             0           0           0/

after having the job running for about 5 min:
 free -m
/              total        used        free      shared  buff/cache  
available
Mem:          15817        9819        5150          24         847       
5547
Swap:             0           0           0
/

taskmanager.log
<http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/file/n14342/taskmanager.log>
 





-----
Best Regards,
Pedro Chaves
--
View this message in context: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/FlinkKafkaConsumer010-Memory-Issue-tp14342.html
Sent from the Apache Flink User Mailing List archive. mailing list archive at Nabble.com.

Mime
View raw message