calcite-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From 陈江枫 <kan...@gmail.com>
Subject Fwd: Stream sql problem on Kafka
Date Thu, 23 Feb 2017 15:23:18 GMT
Hi,all I'm trying to integrate calcite with Kafka, I refrenced
CsvStreamableTable.

Each ConsumerRecord is convert to Object[] using the fowlloing code:

static class ArrayRowConverter extends RowConverter<Object[]> {
    private List<Schema.Field> fields;

    public ArrayRowConverter(List<Schema.Field> fields) {
        this.fields = fields;
    }

    @Override
    Object[] convertRow(ConsumerRecord<String, GenericRecord> consumerRecord) {
        Object[] objects = new Object[fields.size()+1];
        int i = 0 ;
        objects[i++] = consumerRecord.timestamp();
        for(Schema.Field field : this.fields) {
            Object obj = consumerRecord.value().get(field.name());
            if( obj instanceof Utf8 ){
                objects[i ++] = obj.toString();
            }else {
                objects[i ++] = obj;
            }
        }
        return objects;
    }
}

Enumerator is implemented as following:

public E current() {
    return current;
}

public boolean moveNext() {
    for(;;) {
        if(cancelFlag.get()) {
            return false;
        }
        ConsumerRecord<String, GenericRecord> record = getRecord();
        if(record ==  null) {
            try {
                Thread.sleep(200L);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            continue;
        }
        current = rowConvert.convertRow(record);
        return true;
    }
}

I tested "SELECT STREAM * FROM Kafka.clicks", it works fine.

rowtime is the first column explicitly added,and the value is record
Timestamp of Kafka.

But when I tried "SELECT STREAM FLOOR(rowtime TO HOUR)

AS rowtime,ip,COUNT(*) AS c FROM KAFKA.clicks  GROUP BY FLOOR(rowtime
TO HOUR), ip"

It threw exception

java.sql.SQLException: Error while executing SQL "SELECT STREAM
FLOOR(rowtime TO HOUR) AS rowtime,ip,COUNT(*) AS c FROM KAFKA.clicks
GROUP BY FLOOR(rowtime TO HOUR), ip": From line 1, column 85 to line
1, column 119: Streaming aggregation requires at least one monotonic
expression in GROUP BY clause
	at org.apache.calcite.avatica.Helper.createException(Helper.java:56)
	at org.apache.calcite.avatica.Helper.createException(Helper.java:41)


Could someone help?

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message