flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Niclas Hedhman <nic...@apache.org>
Subject Re: Only a single message processed
Date Mon, 19 Feb 2018 08:29:32 GMT
(Sorry for the incoherent order and ramblings. I am writing this as I am
trying to sort out what is going on...)

1. It is the first message to be processed in the Kafka topic. If I set the
offset manually, it will pick up the message at that point, process it, and
ignore all following messages.

2. Yes, the Kafka console consumer tool is spitting out the messages
without problem. Btw, they are plain Strings, well, serialized JSON objects.

3. Code is a bit messy, but I have copied out the relevant parts below.

I also noticed that a LOT of exceptions are thrown ("breakpoint on any
exception"), mostly ClassCastException, classes not found and
NoSuchMethodException, but nothing that bubbles up out of the internals. Is
this part of Scala raping the JVM, or just the normal JVM class loading
sequence (no wonder it is so slow)? Is that expected?

I have tried to use both the ObjectMapper from Jackson proper, as well as
the shadowed ObjectMapper in flink. No difference.

Recap; Positioning Kafka consumer to message 8th from the last. Only that
message is consumed, the remaining 7 are ignored/swallowed.


Ok, so I think I have traced this down to something happening in the
CassandraSink. There is a Exception being thrown somewhere, which I see as
the Kafka09Fetcher.runFetchLoop()'s finally clause is called.

Found it (hours later in debugging), on this line (Flink 1.4.1)

org/apache/flink/cassandra/shaded/com/google/common/util/concurrent/Futures.class:258

which contains

    future.addListener(callbackListener, executor);  // IDEA says
'future' is of type DefaultResultSetFuture

throws an Exception without stepping into the addListener() method. There
is nothing catching the Exception (and I don't want to go down the rabbit
hole of building from source), so I can't really say what Exception is
being thrown. IDEA doesn't seem to report it, and the catch clauses in
OperatorChain.pushToOperator() (ClassCastException and Exception) are in
the call stack, but doesn't catch it, which could suggest an
java.lang.Error, and NoClassDefFoundError comes to mind, since there are SO
MANY classloading exception going on all the time.

Hold on a second... There are TWO
com.datastax.driver.core.DefaultResultSetFuture types in the classpath. One
from the Cassandra client that I declared, and on from inside the
flink-connector-cassandra_2.11 artifact...

So will it work if I remove my own dependency declaration and that's it?


YEEEEESSSSS!!! FInally.....


SOLVED!

-o-o-o-o-o-

public static void main( String[] args )
    throws Exception
{
    cli = CommandLine.populateCommand( new ServerCliOptions(), args );
    initializeCassandra( cli );
    StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment().setMaxParallelism(
32768 );
//    createPollDocPipeline( env );
    createAdminPipeline( env );
    env.execute( "schedule.poll" );
}



private static void createAdminPipeline( StreamExecutionEnvironment env )
{
    try
    {
        FlinkKafkaConsumer011<String> adminSource = createKafkaAdminSource();
        SplitStream<AdminCommand> adminStream =
            env.addSource( adminSource )
               .name( "scheduler.admin" )
               .map( value -> {
                   try
                   {
                       return mapper.readValue( value, AdminCommand.class );
                   }
                   catch( Throwable e )
                   {
                       LOG.error( "Unexpected error deserializing
AdminCommand", e );
                       return null;
                   }
               } )
               .name( "admin.command.read" )
               .split( value -> singletonList( value.action() ) );

        SingleOutputStreamOperator<Tuple3<List<String>, String,
String>> insertStream =
            adminStream.select( AdminCommand.CMD_SCHEDULE_INSERT )
                       .map( new GetPollDeclaration() )
                       .name( "scheduler.admin.insert" )
                       .map( new PollDeclarationToTuple3Map() )
                       .name( "scheduler.pollDeclToTuple3" )
                       .filter( tuple -> tuple != null );

        SingleOutputStreamOperator<Tuple3<List<String>, String,
String>> deleteStream =
            adminStream.select( AdminCommand.CMD_SCHEDULE_DELETE )
                       .map( new GetPollDeclaration() )
                       .name( "scheduler.admin.delete" )
                       .map( new PollDeclarationToTuple3Map() )
                       .name( "scheduler.pollDeclToTuple3" )
                       .filter( tuple -> tuple != null );

        CassandraSink.addSink( insertStream )
                     .setHost( cli.primaryCassandraHost(),
cli.primaryCassandraPort() )
                     .setQuery( String.format( INSERT_SCHEDULE,
cli.cassandraKeyspace ) )
                     .build();

        CassandraSink.addSink( deleteStream )
                     .setHost( cli.primaryCassandraHost(),
cli.primaryCassandraPort() )
                     .setQuery( String.format( DELETE_SCHEDULE,
cli.cassandraKeyspace ) )
                     .build();
    }
    catch( Throwable e )
    {
        String message = "Unable to start Scheduling Admin";
        LOG.error( message );
        throw new RuntimeException( message, e );
    }
}


private static class GetPollDeclaration
    implements MapFunction<AdminCommand, PollDeclaration>
{
    private static final Logger LOG = LoggerFactory.getLogger(
GetPollDeclaration.class );

    @Override
    public PollDeclaration map( AdminCommand command )
        throws Exception
    {
        try
        {
            if( command == null )
            {
                return null;
            }
            return (PollDeclaration) command.value();
        }
        catch( Throwable e )
        {
            LOG.error( "Unable to cast command data to PollDeclaration", e );
            return null;
        }
    }
}


private static class PollDeclarationToTuple3Map
    implements MapFunction<PollDeclaration, Tuple3<List<String>,
String, String>>
{
    @Override
    public Tuple3<List<String>, String, String> map( PollDeclaration decl )
        throws Exception
    {
        try
        {
            if( decl == null )
            {
                return null;
            }
            return new Tuple3<>( singletonList(
mapper.writeValueAsString( decl ) ), decl.zoneId + ":" +
decl.schedule, decl.url );
        }
        catch( Throwable e )
        {
            LOG.error( "Unable to cast command data to PollDeclaration", e );
            return null;
        }
    }
}

Flink Dependencies;

flink         : [
        [group: "org.apache.flink", name: "flink-core", version: flinkVersion],
        [group: "org.apache.flink", name: "flink-java", version: flinkVersion],
        [group: "org.apache.flink", name:
"flink-connector-cassandra_2.11", version: flinkVersion],
        [group: "org.apache.flink", name:
"flink-connector-kafka-0.11_2.11", version: flinkVersion],
        [group: "org.apache.flink", name:
"flink-queryable-state-runtime_2.11", version: flinkVersion],
        [group: "org.apache.flink", name: "flink-streaming-java_2.11",
version: flinkVersion],
        [group: "org.apache.flink", name:
"flink-streaming-scala_2.11", version: flinkVersion]
],





On Sun, Feb 18, 2018 at 8:11 PM, Xingcan Cui <xingcanc@gmail.com> wrote:

> Hi Niclas,
>
> About the second point you mentioned, was the processed message a random
> one or a fixed one?
>
> The default startup mode for FlinkKafkaConsumer is
> StartupMode.GROUP_OFFSETS, maybe you could try StartupMode.EARLIST while
> debugging. Also, before that, you may try fetching the messages with the
> Kafka console consumer tool to see whether they can be consumed completely.
>
> Besides, I wonder if you could provide the code for you Flink pipeline.
> That’ll be helpful.
>
> Best,
> Xingcan
>
>
>
> On 18 Feb 2018, at 7:52 PM, Niclas Hedhman <niclas@apache.org> wrote:
>
>
> So, the producer is run (at the moment) manually (command-line) one
> message at a time.
> Kafka's tooling (different consumer group) shows that a message is added
> each time.
>
> Since my last post, I have also added a UUID as the key, and that didn't
> make a difference, so you are likely correct about de-dup.
>
>
> There is only a single partition on the topic, so it shouldn't be a
> partitioning issue.
>
> I also noticed;
> 1. Sending a message while consumer topology is running, after the first
> message, then that message will be processed after a restart.
>
> 2. Sending many messages, while consumer is running, and then doing many
> restarts will only process a single of those. No idea what happens to the
> others.
>
> I am utterly confused.
>
> And digging in the internals are not for the faint-hearted, but the
> kafka.poll() returns frequently with empty records.
>
> Will continue debugging that tomorrow...
>
>
> Niclas
>
> On Feb 18, 2018 18:50, "Fabian Hueske" <fhueske@gmail.com> wrote:
>
>> Hi Niclas,
>>
>> Flink's Kafka consumer should not apply any deduplication. AFAIK, such a
>> "feature" is not implemented.
>> Do you produce into the topic that you want to read or is the data in the
>> topic static?
>> If you do not produce in the topic while the consuming application is
>> running, this might be an issue with the start position of the consumer
>> [1].
>>
>> Best, Fabian
>>
>> [1] https://ci.apache.org/projects/flink/flink-docs-release-1.4/
>> dev/connectors/kafka.html#kafka-consumers-start-position-configuration
>>
>> 2018-02-18 8:14 GMT+01:00 Niclas Hedhman <niclas@apache.org>:
>>
>>> Hi,
>>> I am pretty new to Flink, and I like what I see and have started to
>>> build my first application using it.
>>> I must be missing something very fundamental. I have a
>>> FlinkKafkaConsumer011, followed by a handful of filter, map and flatMap
>>> functions and terminated with the standard CassandraSink. I have try..catch
>>> on all my own maps/filters and the first message in the queue is processed
>>> after start-up, but any additional messages are ignore, i.e. not reaching
>>> the first map(). Any additional messages are swallowed (i.e. consumed but
>>> not forwarded).
>>>
>>> I suspect that this is some type of de-duplication going on, since the
>>> (test) producer of these messages. The producer provide different values on
>>> each, but there is no "key" being passed to the KafkaProducer.
>>>
>>> Is that required? And if so, why? Can I tell Flink or Flink's
>>> KafkaConsumer to ingest all messages, and not try to de-duplicate them?
>>>
>>> Thanks
>>>
>>> --
>>> Niclas Hedhman, Software Developer
>>> http://zest.apache.org - New Energy for Java
>>>
>>
>>
>


-- 
Niclas Hedhman, Software Developer
http://zest.apache.org - New Energy for Java

Mime
View raw message