flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From JP de Vooght ...@vooght.de>
Subject Re: logging question
Date Sat, 07 Apr 2018 16:31:04 GMT
Just to recap

I use Flink 1.4.2 with Docker compose which launches a jobmanager and a
taskmanager.
My hope is to learn another library which can be used with Flink, so
logging is important to me.
I start the cluster and deploy the following task (I dropped all calls
to that library so I can focus on plain Flink and docker)

public class ParallelMLExample {
    private static final Logger LOG =
LoggerFactory.getLogger(ParallelMLExample.class);

    public static void main(String[] args) throws Exception {
        final ExecutionEnvironment env;

        //Set-up Flink session
        env = ExecutionEnvironment.getExecutionEnvironment();
        env.getConfig().disableSysoutLogging();

        DataSet<Integer> amounts = env.fromElements(1, 29, 40, 50);

        LOG.info("########## BEFORE UPDATEMODEL ##########");
        List<Integer> collect = amounts.filter(a -> a >
30).reduce((integer, t1) -> integer + t1).collect();
        LOG.info("########## AFTER UPDATEMODEL ##########");

        LOG.info(collect.get(0).toString());
    }
}

Log output of jobmanager does not show anything after "BEFORE UPDATE MODEL"

$ docker-compose up
Starting flink_jobmanager_1
Starting flink_taskmanager_1
Attaching to flink_jobmanager_1, flink_taskmanager_1
jobmanager_1   | Starting Job Manager
jobmanager_1   | config file:
jobmanager_1   | jobmanager.rpc.address: jobmanager
jobmanager_1   | jobmanager.rpc.port: 6123
jobmanager_1   | jobmanager.heap.mb: 1024
jobmanager_1   | taskmanager.heap.mb: 1024
jobmanager_1   | taskmanager.numberOfTaskSlots: 1
jobmanager_1   | taskmanager.memory.preallocate: false
jobmanager_1   | parallelism.default: 1
jobmanager_1   | web.port: 8081
jobmanager_1   | blob.server.port: 6124
jobmanager_1   | query.server.port: 6125
jobmanager_1   | blob.server.port: 6124
jobmanager_1   | query.server.port: 6125
jobmanager_1   | blob.server.port: 6124
jobmanager_1   | query.server.port: 6125
taskmanager_1  | Starting Task Manager
taskmanager_1  | config file:
taskmanager_1  | jobmanager.rpc.address: jobmanager
taskmanager_1  | jobmanager.rpc.port: 6123
taskmanager_1  | jobmanager.heap.mb: 1024
taskmanager_1  | taskmanager.heap.mb: 1024
taskmanager_1  | taskmanager.numberOfTaskSlots: 4
taskmanager_1  | taskmanager.memory.preallocate: false
taskmanager_1  | parallelism.default: 1
taskmanager_1  | web.port: 8081
taskmanager_1  | blob.server.port: 6124
taskmanager_1  | query.server.port: 6125
taskmanager_1  | blob.server.port: 6124
taskmanager_1  | query.server.port: 6125
taskmanager_1  | blob.server.port: 6124
taskmanager_1  | query.server.port: 6125
jobmanager_1   | Starting jobmanager as a console application on host
e207d6ad4a1a.
taskmanager_1  | Starting taskmanager as a console application on host
1d724ce8ae5e.
jobmanager_1   | Slf4jLogger started
taskmanager_1  | Slf4jLogger started
taskmanager_1  | Could not load Queryable State Client Proxy. Probable
reason: flink-queryable-state-runtime is not in the classpath. To enable
Queryable State, please move the flink-queryable-state-runtime jar from
the opt to the lib folder.
taskmanager_1  | Could not load Queryable State Server. Probable reason:
flink-queryable-state-runtime is not in the classpath. To enable
Queryable State, please move the flink-queryable-state-runtime jar from
the opt to the lib folder.
jobmanager_1   | ########## BEFORE UPDATEMODEL ##########
taskmanager_1  | The operator name DataSource (at
main(ParallelMLExample.java:30)
(org.apache.flink.api.java.io.CollectionInputFormat)) exceeded the 80
characters length limit and was truncated.



On 04/07/2018 02:46 PM, JP de Vooght wrote:
>
> Nico, all,
>
> I am still stuck with this. Upgraded the docker image to 1.4.2 and the
> AMIDST library to 0.7.0
>
> Just noticed this issue which signals logging issues outside
> transforms: https://issues.apache.org/jira/browse/FLINK-7990
>
> Could this be related? Although I don't see the relation to logback.
>
> Below is the library code invoked after "BEFORE updateModel"
>
> |try { Configuration config = new Configuration();
> config.setString(BN_NAME, this.dag.getName());
> config.setBytes(EFBN_NAME,
> Serialization.serializeObject(efBayesianNetwork));
> DataSet<DataInstance> dataset = dataUpdate.getDataSet(); this.sumSS =
> dataset.map(new SufficientSatisticsMAP()) .withParameters(config)
> .reduce(new SufficientSatisticsReduce()) .collect().get(0); //Add the
> prior sumSS.sum(efBayesianNetwork.createInitSufficientStatistics());
> JobExecutionResult result =
> dataset.getExecutionEnvironment().getLastJobExecutionResult();
> numInstances =
> result.getAccumulatorResult(ParallelMaximumLikelihood.COUNTER_NAME+"_"+this.dag.getName());
> numInstances++;//Initial counts }catch(Exception ex){ throw new
> UndeclaredThrowableException(ex); } |
> JP
>
> On 01/16/2018 10:50 AM, Nico Kruber wrote:
>> Just a guess, but probably our logging initialisation changes the global
>> log level (see conf/log4j.properties). DataStream.collect() executes the
>> program along with creating a local Flink "cluster" (if you are testing
>> locally / in an IDE) and initializing logging, among other things.
>>
>> Please comment the first line out and uncomment the following one to
>> read like this:
>> ==========
>> # This affects logging for both user code and Flink
>> #log4j.rootLogger=INFO, file
>>
>> # Uncomment this if you want to _only_ change Flink's logging
>> log4j.logger.org.apache.flink=INFO
>> ==========
>>
>>
>> Nico
>>
>> On 13/01/18 13:52, jp@vooght.de wrote:
>>> Hello,
>>> I am learning Flink and using the docker image along with the AMIDST
>>> library for this.
>>> Below is a sample task from AMIDST which provides INFO output up until I
>>> reach updateModel(). I pasted the short method as well and wonder what
>>> prevents the Logger from
>>>
>>>         //Set-up Flink session
>>>         env = ExecutionEnvironment.getExecutionEnvironment();
>>>         env.getConfig().disableSysoutLogging();
>>>         Logger LOG = LoggerFactory.getLogger(">>>>> ParallelMLExample");
>>>
>>>         //generate a random dataset
>>>         DataFlink<DataInstance> dataFlink = new
>>> DataSetGenerator().generate(env, 1234, 1000, 5, 0);
>>>
>>>         //Creates a DAG with the NaiveBayes structure for the random
>>> dataset
>>>         DAG dag =
>>> DAGGenerator.getNaiveBayesStructure(dataFlink.getAttributes(),
>>> "DiscreteVar4");
>>>         LOG.info(dag.toString());
>>>
>>>         //Create the Learner object
>>>         ParameterLearningAlgorithm learningAlgorithmFlink = new
>>> ParallelMaximumLikelihood();
>>>
>>>         //Learning parameters
>>>         learningAlgorithmFlink.setBatchSize(10);
>>>         learningAlgorithmFlink.setDAG(dag);
>>>
>>>         //Initialize the learning process
>>>         learningAlgorithmFlink.initLearning();
>>>
>>>         //Learn from the flink data
>>>         LOG.info("BEFORE UPDATEMODEL");
>>>         learningAlgorithmFlink.updateModel(dataFlink);
>>>         LOG.info("AFTER UPDATEMODEL");
>>>
>>>         //Print the learnt Bayes Net
>>>         BayesianNetwork bn =
>>> learningAlgorithmFlink.getLearntBayesianNetwork();
>>>         LOG.info(bn.toString());
>>>
>>>
>>> Below is the updateModel method.
>>>
>>>     public double updateModel(DataFlink<DataInstance> dataUpdate) {
>>>         try {
>>>             Configuration config = new Configuration();
>>>             config.setString(BN_NAME, this.dag.getName());
>>>             config.setBytes(EFBN_NAME,
>>> Serialization.serializeObject(efBayesianNetwork));
>>>
>>>             DataSet<DataInstance> dataset = dataUpdate.getDataSet();
>>>             this.sumSS = dataset.map(new SufficientSatisticsMAP())
>>>                     .withParameters(config)
>>>                     .reduce(new SufficientSatisticsReduce())
>>>                     .collect().get(0);
>>>
>>>             //Add the prior
>>>             sumSS.sum(efBayesianNetwork.createInitSufficientStatistics());
>>>
>>>             JobExecutionResult result =
>>> dataset.getExecutionEnvironment().getLastJobExecutionResult();
>>>
>>>             numInstances =
>>> result.getAccumulatorResult(ParallelMaximumLikelihood.COUNTER_NAME+"_"+this.dag.getName());
>>>
>>>             numInstances++;//Initial counts
>>>
>>>         }catch(Exception ex){
>>>             throw new UndeclaredThrowableException(ex);
>>>         }
>>>
>>>         return this.getLogMarginalProbability();
>>>     }
>>>
>>>
>>> Not sure why LOG.info past that method are not output to the console.
>>> TIA
>>> JP
>


Mime
View raw message