flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Jörn Franke <jornfra...@gmail.com>
Subject Re: Different result on running Flink in local mode and Yarn cluster
Date Thu, 26 Apr 2018 05:31:15 GMT
The problem maybe that it is still static. How will the parser use this HashMap?

> On 26. Apr 2018, at 06:42, Soheil Pourbafrani <soheil.ir08@gmail.com> wrote:
> 
> I run a code using Flink Java API that gets some bytes from Kafka and parses it following
by inserting into Cassandra database using another library static method (both parsing and
inserting results is done by the library). Running code on local in IDE, I get the desired
answer, but running on YARN cluster the parse method didn't work as expected!
> 
> public class Test {
>     static HashMap<Integer, Object> ConfigHashMap = new HashMap<>();
> 
>     public static void main(String[] args) throws Exception {
> 
>         CassandraConnection.connect();
>         Parser.setInsert(true);
> 
>         stream.flatMap(new FlatMapFunction<byte[], Void>() {
>             @Override
>             public void flatMap(byte[] value, Collector<Void> out) throws Exception
{
>                 Parser.parse(ByteBuffer.wrap(value), ConfigHashMap);
>                 // Parser.parse(ByteBuffer.wrap(value));
>             }
>         });
>         env.execute();
>     }
> }
> 
> There is a static HashMap field in the classParser that configuration of parsing data
is based on its information, and data will insert it during the execution. The problem running
on YARN was this data was not available for taskmanagers and they just print config is not
available!
> 
> So I redefine that HashMap as a parameter for the methodparse, but no differences in
results!
> 
> How can I fix the problem?
> 
> 

Mime
View raw message