flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chesnay Schepler <ches...@apache.org>
Subject Re: Is that possible for flink to dynamically read and change configuration?
Date Mon, 24 Jul 2017 13:01:28 GMT

That's an error in the documentation, only the ValueStateDescriptor has 
a defaultValue constructor argument.


On 24.07.2017 14:56, ZalaCheung wrote:
> Hi Martin,
> Thanks for your advice. That’s really helpful. I am using the push 
> scenario. I am now having some trouble because of the state I want to 
> maintain. For me, the simplest way is to maintain to ValueState in a 
> CoFlatMapFunction(Actually RichCoFlatMapFunction). But the rich 
> function can only be used on Keyed Stream. And for a connected stream, 
> at least for my scenario, I should not use KeyBy() method(Actually it 
> seems not allowed to use KeyBy() function on connected stream ).
> Thus instead of using Rich function for Keyed Managed State, I tried 
> to use CheckpointedFunction for my non-keyed state. However, in 
> CheckpointedFunction, I can only use ListState, which only has add() 
> and Iterator method. I am not sure whether I can just replace the 
> element in the ListState. What exactly make me stuck is that I cannot 
> initialize my ListState with ListStateDescriptor. It says there is no 
> constructor for initialization value. I actually saw that on official 
> document.
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html
> |@Override public void initializeState(FunctionInitializationContext 
> context) throws Exception { ListStateDescriptor<Tuple2<String, 
> Integer>> descriptor = new ListStateDescriptor<>( "buffered-elements", 
> TypeInformation.of(new TypeHint<Tuple2<Long, Long>>() {}), 
> Tuple2.of(0L, 0L)); checkpointedState = 
> context.getOperatorStateStore().getListState(descriptor); if 
> (context.isRestored()) { for (Tuple2<String, Integer> element : 
> checkpointedState.get()) { bufferedElements.add(element); } } }|
> But in my code(Flink 1.3.1), it says there’s no constructor for three 
> arguments(the third argument in the example above is the default 
> value). I am really confused.
> How can I maintain my state for the CoFlatMap function?
> Thanks
>  Desheng Zhang
>> On Jul 24, 2017, at 19:44, Martin Eden <martineden131@gmail.com 
>> <mailto:martineden131@gmail.com>> wrote:
>> Hey Desheng,
>> Some options that come to mind:
>> - Cave man style: Stop and restart job with new config.
>> - Poll scenario: You could build your own thread that periodically 
>> loads from the db into a per worker accessible cache.
>> - Push scenario: have a config stream (based off of some queue) which 
>> you connect to your data stream via the connect operator. In the 
>> CoFlatMapFunction that you have to provide you basically update Flink 
>> state from the config flatMap and read the flink state from the data 
>> flatMap and pass it along with the data. Then in the specific 
>> operator that uses the config it can always get it from the data 
>> tuple that comes alongside the data, say in an invoke method call of 
>> a sink. Example here 
>> <https://image.slidesharecdn.com/flinkstreambasics-160909223620/95/apache-flink-training-datastream-api-basics-34-638.jpg?cb=1497888680>.
>> Hope that gives u some ideas,
>> M
>> On Mon, Jul 24, 2017 at 12:16 PM, ZalaCheung 
>> <gzzhangdesheng@corp.netease.com 
>> <mailto:gzzhangdesheng@corp.netease.com>> wrote:
>>     Hi all,
>>     I am  now trying to implement a anomaly detection algorithm on
>>     Flink, which is actually implement a Map operator to do anomaly
>>     detection based on timeseries.
>>     At first I want to read configuration(like which kafka source
>>     host to read datastream from and which sink address to write data
>>     to ) from mongo db. It contains some system metric  I want to
>>     monitor.
>>     What I did was read configuration from mongo DB and set as
>>     configuration of flink.
>>     StreamExecutionEnvironment  see =StreamExecutionEnvironment.getExecutionEnvironment();
Configuration conf =new Configuration();
>>     JSONObject jsonConfiguration =readConfiguration();
>>     conf.setInteger("period",jsonConfiguration.getInt("period"));
>>     conf.setDouble("percentage",jsonConfiguration.getDouble("percentage"));
>>     conf.setDouble(“metric",jsonConfiguration.getDouble(“metric"));
>>     see.getConfig().setGlobalJobParameters(conf);
>>     The “readConfiguration()” method read the configuration from mongoDB.
>>     Just like the code I showed above. I set globalJobParameters to
>>     let all my workers share these parameters including the metric I
>>     want to monitor.But maybe at some point I want to change the
>>     metric I want to monitor. I think one possible way is to
>>     dynamically(or periodically) read  configuration and reset
>>     the globalJobParameters to make the Flink program to change the
>>     metric to monitor. Is  that possible?
>>     Thanks
>>     Desheng Zhang

View raw message