flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "vinoyang (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (FLINK-10785) Update FlinkKinesisConsumerMigrationTest for 1.7
Date Sun, 14 Apr 2019 07:08:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-10785?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16817209#comment-16817209
] 

vinoyang commented on FLINK-10785:
----------------------------------

When I update this migration test for Flink 1.8. I encounter the exception like before (my
first comments). I fixed this problem with this code snippet in {{FlinkKinesisConsumerMigrationTest
to generate snapshot file}}: 

 
{code:java}
private void writeSnapshot(String path, HashMap<StreamShardMetadata, SequenceNumber>
state) throws Exception {
    final List<StreamShardHandle> initialDiscoveryShards = new ArrayList<>(state.size());
    for (StreamShardMetadata shardMetadata : state.keySet()) {
        Shard shard = new Shard();
        shard.setShardId(shardMetadata.getShardId());

        SequenceNumberRange sequenceNumberRange = new SequenceNumberRange();
        sequenceNumberRange.withStartingSequenceNumber("1");
        shard.setSequenceNumberRange(sequenceNumberRange);

        initialDiscoveryShards.add(new StreamShardHandle(shardMetadata.getStreamName(), shard));
    }

    final TestFetcher<String> fetcher = new TestFetcher<>(
        Collections.singletonList(TEST_STREAM_NAME),
        new TestSourceContext<>(),
        new TestRuntimeContext(true, 1, 0),
        TestUtils.getStandardProperties(),                                   //one change
point
        new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
        state,
        initialDiscoveryShards);                                             //another change
point
{code}
Currently, the code is:

 

 
{code:java}
private void writeSnapshot(String path, HashMap<StreamShardMetadata, SequenceNumber>
state) throws Exception {
   final TestFetcher<String> fetcher = new TestFetcher<>(
      Collections.singletonList(TEST_STREAM_NAME),
      new TestSourceContext<>(),
      new TestRuntimeContext(true, 1, 0),
      new Properties(),
      new KinesisDeserializationSchemaWrapper<>(new SimpleStringSchema()),
      state,
      null);
{code}
[~tzulitai] Do you think my fix is correct?

 

 

> Update FlinkKinesisConsumerMigrationTest for 1.7
> ------------------------------------------------
>
>                 Key: FLINK-10785
>                 URL: https://issues.apache.org/jira/browse/FLINK-10785
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Connectors / Kinesis, Tests
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Tzu-Li (Gordon) Tai
>            Priority: Major
>             Fix For: 1.8.0
>
>
> Update {{FlinkKinesisConsumerMigrationTest}} so that it covers restoring from 1.7.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message