flink-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Jark Wu (Jira)" <j...@apache.org>
Subject [jira] [Comment Edited] (FLINK-14567) Aggregate query with more than two group fields can't be write into HBase sink
Date Sun, 17 Nov 2019 03:08:00 GMT

    [ https://issues.apache.org/jira/browse/FLINK-14567?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16975898#comment-16975898
] 

Jark Wu edited comment on FLINK-14567 at 11/17/19 3:07 AM:
-----------------------------------------------------------

Another soution is letting sink concats keys. For example, an HBase sink can have more than
one key fields, say k1, k2, k3, then a {{key-delimiter}} option is required to concat key
fields to a rowkey, the concated rowkey will always be varchar type. The HBase sink will insert
the concated rowkey into HBase table. 


{code:java}
create table my_table (
  k1 int,
  k2 varchar,
  k3 timestamp(3),
  f1 row<q1 bigint, q2 bigint>
) with (
  'connector.type' = 'hbase',
  'connector.key-delimiter' = '-'
);

insert into my_table 
select k1, k2, k3, ROW(count(*), count(distinct user))
group by k1, k2, k3
{code}

This is very similar to the [ElasticSearch Connector|https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector].
In this way, {{UpsertStreamTableSink#setKeyFields}} still work for the sink, because the sink
pretends it has a composite key with 3 fields. 


However, this can't solve all the problems. For example, if one of the 3 fields is transformed
from the group key, but the transformation will lose key information.


was (Author: jark):
Another soution is letting sink concats keys. For example, an HBase sink can have more than
one key fields, say k1, k2, k3, then a {{key-delimiter}} option is required to concat key
fields to a rowkey, the concated rowkey will always be varchar type. The HBase sink will insert
the concated rowkey into HBase table. 


{code:java}
create table my_table (
  k1 int,
  k2 varchar,
  k3 timestamp(3),
  f1 row<q1 bigint, q2 bigint>
) with (
  'connector.type' = 'hbase',
  'connector.key-delimiter' = '-'
);

insert into my_table 
select k1, k2, k3, ROW(count(*), count(distinct user))
group by k1, k2, k3
{code}

This is very similar to the [ElasticSearch Connector|https://ci.apache.org/projects/flink/flink-docs-master/dev/table/connect.html#elasticsearch-connector].
In this way, {{UpsertStreamTableSink#setKeyFields}} still work for the sink, because the sink
pretends it has a composite key with 3 fields. 


However, this can't solve all the problems. For example, if the one of the 3 fields is transformed
form the group key, but the transformation will lose key information.

> Aggregate query with more than two group fields can't be write into HBase sink
> ------------------------------------------------------------------------------
>
>                 Key: FLINK-14567
>                 URL: https://issues.apache.org/jira/browse/FLINK-14567
>             Project: Flink
>          Issue Type: Bug
>          Components: Connectors / HBase, Table SQL / Legacy Planner, Table SQL / Planner
>            Reporter: Jark Wu
>            Priority: Critical
>             Fix For: 1.10.0
>
>
> If we have a hbase table sink with rowkey of varchar (also primary key) and a column
of bigint, we want to write the result of the following query into the sink using upsert mode.
However, it will fail when primary key check with the exception "UpsertStreamTableSink requires
that Table has a full primary keys if it is updated."
> {code:sql}
> select concat(f0, '-', f1) as key, sum(f2)
> from T1
> group by f0, f1
> {code}
> This happens in both blink planner and old planner. That is because if the query works
in update mode, then there must be a primary key exist to be extracted and set to {{UpsertStreamTableSink#setKeyFields}}.

> That's why we want to derive primary key for concat in FLINK-14539, however, we found
that the primary key is not preserved after concating. For example, if we have a primary key
(f0, f1, f2) which are all varchar type, say we have two unique records ('a', 'b', 'c') and
('ab', '', 'c'), but the results of concat(f0, f1, f2) are the same, which means the concat
result is not primary key anymore.
> So here comes the problem, how can we proper support HBase sink or such use case? 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Mime
View raw message