calcite-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 陈江枫 <kan...@gmail.com>
Subject Re: Stream sql problem on Kafka
Date Fri, 24 Feb 2017 09:30:54 GMT
Hi, sorry to bother again


 @Override
    public Statistic getStatistic() {
        return Statistics.of(100d,
                ImmutableList.<ImmutableBitSet>of(),
                RelCollations.createSingleton(0));
    }

    @Override
    public TableType getJdbcTableType() {
        return TableType.STREAM;
    }

I declared rowtime increasing using the above code, and execute the
GROUP BY sql again, it did not throw

Exception, but no result is out.


Another question, if  I changed the rowCount to
Double.POSITIVE_INFIITY, I got the following Exception

java.sql.SQLException: Error while executing SQL "SELECT STREAM
FLOOR(rowtime TO HOUR) AS rowtime,ip,COUNT(*) AS c FROM KAFKA.clicks
GROUP BY FLOOR(rowtime TO HOUR ), ip": Node
[rel#24:Subset#3.ENUMERABLE.[]] could not be implemented; planner
state:

Root: rel#24:Subset#3.ENUMERABLE.[]
Original rel:

Sets:
Set#0, type: RecordType(TIMESTAMP(0) rowtime, VARCHAR(1) ip, BIGINT
timestamp, VARCHAR(1) url, VARCHAR(1) referrer, VARCHAR(1) useragent,
VARCHAR(1) more)
	rel#5:Subset#0.NONE.[0], best=null, importance=0.6561
		rel#0:LogicalTableScan.NONE.[[0]](table=[KAFKA, clicks]),
rowcount=1.7976931348623157E308, cumulative cost={inf}
	rel#38:Subset#0.ENUMERABLE.[0], best=null, importance=0.49499999999999994
		rel#46:EnumerableTableScan.ENUMERABLE.[[0]](table=[KAFKA, clicks]),
rowcount=1.7976931348623157E308, cumulative cost={Infinity rows,
Infinity cpu, 0.0 io}
		rel#53:EnumerableInterpreter.ENUMERABLE.[0](input=rel#45:Subset#0.BINDABLE.[0]),
rowcount=1.7976931348623157E308, cumulative cost={inf}
		rel#55:EnumerableInterpreter.ENUMERABLE.[[0]](input=rel#45:Subset#0.BINDABLE.[0]),
rowcount=1.7976931348623157E308, cumulative cost={inf}
	rel#45:Subset#0.BINDABLE.[0], best=null, importance=0.49499999999999994
		rel#44:BindableTableScan.BINDABLE.[[0]](table=[KAFKA, clicks]),
rowcount=1.7976931348623157E308, cumulative cost={Infinity rows,
Infinity cpu, 0.0 io}
Set#1, type: RecordType(TIMESTAMP(0) rowtime, VARCHAR(1) ip)
	rel#7:Subset#1.NONE.[0], best=null, importance=0.7290000000000001
		rel#6:LogicalProject.NONE.[[0]](input=rel#5:Subset#0.NONE.[0],rowtime=FLOOR($0,
FLAG(HOUR)),ip=$1), rowcount=1.7976931348623157E308, cumulative
cost={inf}
	rel#40:Subset#1.ENUMERABLE.[0], best=null, importance=0.49499999999999994
		rel#39:EnumerableProject.ENUMERABLE.[[0]](input=rel#38:Subset#0.ENUMERABLE.[0],rowtime=FLOOR($0,
FLAG(HOUR)),ip=$1), rowcount=1.7976931348623157E308, cumulative
cost={inf}
Set#2, type: RecordType(TIMESTAMP(0) rowtime, VARCHAR(1) ip, BIGINT c)
	rel#9:Subset#2.NONE.[], best=null, importance=0.81
		rel#8:LogicalAggregate.NONE.[](input=rel#7:Subset#1.NONE.[0],group={0,
1},c=COUNT()), rowcount=1.7976931348623158E307, cumulative cost={inf}
	rel#42:Subset#2.ENUMERABLE.[], best=null, importance=0.405
		rel#41:EnumerableAggregate.ENUMERABLE.[](input=rel#40:Subset#1.ENUMERABLE.[0],group={0,
1},c=COUNT()), rowcount=1.7976931348623158E307, cumulative cost={inf}
Set#3, type: RecordType(TIMESTAMP(0) rowtime, VARCHAR(1) ip, BIGINT c)
	rel#11:Subset#3.NONE.[], best=null, importance=0.9
		rel#10:LogicalDelta.NONE.[](input=rel#9:Subset#2.NONE.[]),
rowcount=1.7976931348623158E307, cumulative cost={inf}
		rel#12:LogicalProject.NONE.[](input=rel#11:Subset#3.NONE.[],rowtime=$0,ip=$1,c=$2),
rowcount=1.7976931348623158E307, cumulative cost={inf}
		rel#30:LogicalAggregate.NONE.[](input=rel#29:Subset#5.NONE.[0],group={0,
1},c=COUNT()), rowcount=1.7976931348623158E307, cumulative cost={inf}
	rel#24:Subset#3.ENUMERABLE.[], best=null, importance=1.0
		rel#25:AbstractConverter.ENUMERABLE.[](input=rel#11:Subset#3.NONE.[],convention=ENUMERABLE,sort=[]),
rowcount=1.7976931348623158E307, cumulative cost={inf}
		rel#26:EnumerableProject.ENUMERABLE.[](input=rel#24:Subset#3.ENUMERABLE.[],rowtime=$0,ip=$1,c=$2),
rowcount=1.7976931348623158E307, cumulative cost={inf}
		rel#32:EnumerableAggregate.ENUMERABLE.[](input=rel#31:Subset#5.ENUMERABLE.[0],group={0,
1},c=COUNT()), rowcount=1.7976931348623158E307, cumulative cost={inf}
Set#5, type: RecordType(TIMESTAMP(0) rowtime, VARCHAR(1) ip)
	rel#29:Subset#5.NONE.[0], best=null, importance=0.81
		rel#27:LogicalDelta.NONE.[0](input=rel#7:Subset#1.NONE.[0]),
rowcount=1.7976931348623157E308, cumulative cost={inf}
	rel#31:Subset#5.ENUMERABLE.[0], best=null, importance=0.9
		rel#49:EnumerableProject.ENUMERABLE.[[0]](input=rel#48:Subset#6.ENUMERABLE.[0],rowtime=FLOOR($0,
FLAG(HOUR)),ip=$1), rowcount=1.7976931348623157E308, cumulative
cost={inf}
	rel#37:Subset#5.NONE.[], best=null, importance=0.45
		rel#27:LogicalDelta.NONE.[0](input=rel#7:Subset#1.NONE.[0]),
rowcount=1.7976931348623157E308, cumulative cost={inf}
		rel#36:LogicalProject.NONE.[](input=rel#35:Subset#6.NONE.[0],rowtime=FLOOR($0,
FLAG(HOUR)),ip=$1), rowcount=1.7976931348623157E308, cumulative
cost={inf}
Set#6, type: RecordType(TIMESTAMP(0) rowtime, VARCHAR(1) ip, BIGINT
timestamp, VARCHAR(1) url, VARCHAR(1) referrer, VARCHAR(1) useragent,
VARCHAR(1) more)
	rel#35:Subset#6.NONE.[0], best=null, importance=0.6187499999999999
		rel#33:LogicalDelta.NONE.[0](input=rel#5:Subset#0.NONE.[0]),
rowcount=1.7976931348623157E308, cumulative cost={inf}
		rel#47:LogicalTableScan.NONE.[[0]](table=[KAFKA, clicks, (STREAM)]),
rowcount=1.7976931348623157E308, cumulative cost={inf}
	rel#48:Subset#6.ENUMERABLE.[0], best=null, importance=0.81
		rel#56:EnumerableInterpreter.ENUMERABLE.[0](input=rel#52:Subset#6.BINDABLE.[0]),
rowcount=1.7976931348623157E308, cumulative cost={inf}
		rel#58:EnumerableInterpreter.ENUMERABLE.[[0]](input=rel#52:Subset#6.BINDABLE.[0]),
rowcount=1.7976931348623157E308, cumulative cost={inf}
	rel#52:Subset#6.BINDABLE.[0], best=null, importance=0.7290000000000001
		rel#51:BindableTableScan.BINDABLE.[[0]](table=[KAFKA, clicks,
(STREAM)]), rowcount=1.7976931348623157E308, cumulative cost={Infinity
rows, Infinity cpu, 0.0 io}


	at org.apache.calcite.avatica.Helper.createException(Helper.java:56)
	at org.apache.calcite.avatica.Helper.createException(Helper.java:41)
	at org.apache.calcite.avatica.AvaticaStatement.executeInternal(AvaticaStatement.java:156)
	at org.apache.calcite.avatica.AvaticaStatement.executeQuery(AvaticaStatement.java:218)
	at TestQuery.main(TestQuery.java:43)


2017-02-24 8:53 GMT+08:00 Julian Hyde <jhyde@apache.org>:

> You need to declare that the ROWTIME column is monotonic. In
> MockCatalogReader, note how ROWTIME is declared monotonic in the ORDERS and
> SHIPMENTS streams. That’s why some queries in SqlValidatorTest.testStreamGroupBy
> are valid and others are not. The key method is SqlValidatorTable.getMonotonicity(String
> columnName).
>
> Julian
>
> > On Feb 23, 2017, at 7:23 AM, 陈江枫 <kanppa@gmail.com> wrote:
> >
> > Hi,all I'm trying to integrate calcite with Kafka, I refrenced
> > CsvStreamableTable.
> >
> > Each ConsumerRecord is convert to Object[] using the fowlloing code:
> >
> > static class ArrayRowConverter extends RowConverter<Object[]> {
> >    private List<Schema.Field> fields;
> >
> >    public ArrayRowConverter(List<Schema.Field> fields) {
> >        this.fields = fields;
> >    }
> >
> >    @Override
> >    Object[] convertRow(ConsumerRecord<String, GenericRecord>
> consumerRecord) {
> >        Object[] objects = new Object[fields.size()+1];
> >        int i = 0 ;
> >        objects[i++] = consumerRecord.timestamp();
> >        for(Schema.Field field : this.fields) {
> >            Object obj = consumerRecord.value().get(field.name());
> >            if( obj instanceof Utf8 ){
> >                objects[i ++] = obj.toString();
> >            }else {
> >                objects[i ++] = obj;
> >            }
> >        }
> >        return objects;
> >    }
> > }
> >
> > Enumerator is implemented as following:
> >
> > public E current() {
> >    return current;
> > }
> >
> > public boolean moveNext() {
> >    for(;;) {
> >        if(cancelFlag.get()) {
> >            return false;
> >        }
> >        ConsumerRecord<String, GenericRecord> record = getRecord();
> >        if(record ==  null) {
> >            try {
> >                Thread.sleep(200L);
> >            } catch (InterruptedException e) {
> >                e.printStackTrace();
> >            }
> >            continue;
> >        }
> >        current = rowConvert.convertRow(record);
> >        return true;
> >    }
> > }
> >
> > I tested "SELECT STREAM * FROM Kafka.clicks", it works fine.
> >
> > rowtime is the first column explicitly added,and the value is record
> > Timestamp of Kafka.
> >
> > But when I tried "SELECT STREAM FLOOR(rowtime TO HOUR)
> >
> > AS rowtime,ip,COUNT(*) AS c FROM KAFKA.clicks  GROUP BY FLOOR(rowtime
> > TO HOUR), ip"
> >
> > It threw exception
> >
> > java.sql.SQLException: Error while executing SQL "SELECT STREAM
> > FLOOR(rowtime TO HOUR) AS rowtime,ip,COUNT(*) AS c FROM KAFKA.clicks
> > GROUP BY FLOOR(rowtime TO HOUR), ip": From line 1, column 85 to line
> > 1, column 119: Streaming aggregation requires at least one monotonic
> > expression in GROUP BY clause
> >       at org.apache.calcite.avatica.Helper.createException(Helper.
> java:56)
> >       at org.apache.calcite.avatica.Helper.createException(Helper.
> java:41)
> >
> >
> > Could someone help?
>
>

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message