calcite-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Julian Hyde <jh...@apache.org>
Subject Re: Stream sql problem on Kafka
Date Fri, 24 Feb 2017 00:53:12 GMT
You need to declare that the ROWTIME column is monotonic. In MockCatalogReader, note how ROWTIME
is declared monotonic in the ORDERS and SHIPMENTS streams. That’s why some queries in SqlValidatorTest.testStreamGroupBy
are valid and others are not. The key method is SqlValidatorTable.getMonotonicity(String columnName).

Julian

> On Feb 23, 2017, at 7:23 AM, 陈江枫 <kanppa@gmail.com> wrote:
> 
> Hi,all I'm trying to integrate calcite with Kafka, I refrenced
> CsvStreamableTable.
> 
> Each ConsumerRecord is convert to Object[] using the fowlloing code:
> 
> static class ArrayRowConverter extends RowConverter<Object[]> {
>    private List<Schema.Field> fields;
> 
>    public ArrayRowConverter(List<Schema.Field> fields) {
>        this.fields = fields;
>    }
> 
>    @Override
>    Object[] convertRow(ConsumerRecord<String, GenericRecord> consumerRecord) {
>        Object[] objects = new Object[fields.size()+1];
>        int i = 0 ;
>        objects[i++] = consumerRecord.timestamp();
>        for(Schema.Field field : this.fields) {
>            Object obj = consumerRecord.value().get(field.name());
>            if( obj instanceof Utf8 ){
>                objects[i ++] = obj.toString();
>            }else {
>                objects[i ++] = obj;
>            }
>        }
>        return objects;
>    }
> }
> 
> Enumerator is implemented as following:
> 
> public E current() {
>    return current;
> }
> 
> public boolean moveNext() {
>    for(;;) {
>        if(cancelFlag.get()) {
>            return false;
>        }
>        ConsumerRecord<String, GenericRecord> record = getRecord();
>        if(record ==  null) {
>            try {
>                Thread.sleep(200L);
>            } catch (InterruptedException e) {
>                e.printStackTrace();
>            }
>            continue;
>        }
>        current = rowConvert.convertRow(record);
>        return true;
>    }
> }
> 
> I tested "SELECT STREAM * FROM Kafka.clicks", it works fine.
> 
> rowtime is the first column explicitly added,and the value is record
> Timestamp of Kafka.
> 
> But when I tried "SELECT STREAM FLOOR(rowtime TO HOUR)
> 
> AS rowtime,ip,COUNT(*) AS c FROM KAFKA.clicks  GROUP BY FLOOR(rowtime
> TO HOUR), ip"
> 
> It threw exception
> 
> java.sql.SQLException: Error while executing SQL "SELECT STREAM
> FLOOR(rowtime TO HOUR) AS rowtime,ip,COUNT(*) AS c FROM KAFKA.clicks
> GROUP BY FLOOR(rowtime TO HOUR), ip": From line 1, column 85 to line
> 1, column 119: Streaming aggregation requires at least one monotonic
> expression in GROUP BY clause
> 	at org.apache.calcite.avatica.Helper.createException(Helper.java:56)
> 	at org.apache.calcite.avatica.Helper.createException(Helper.java:41)
> 
> 
> Could someone help?


Mime
View raw message