hive-issues mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Gopal V (JIRA)" <j...@apache.org>
Subject [jira] [Updated] (HIVE-20377) Hive Kafka Storage Handler
Date Wed, 15 Aug 2018 16:45:00 GMT

     [ https://issues.apache.org/jira/browse/HIVE-20377?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Gopal V updated HIVE-20377:
---------------------------
    Description: 
h1. Goal
* Read streaming data form Kafka queue as an external table.
* Allow streaming navigation by pushing down filters on Kafka record partition id, offset
and timestamp. 
* Insert streaming data form Kafka to an actual Hive internal table, using CTAS statement.
h1. Example
h2. Create the external table
{code} 
CREATE EXTERNAL TABLE kafka_table (`timestamp` timestamp, page string, `user` string, language
string, added int, deleted int, flags string,comment string, namespace string)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES 
("kafka.topic" = "wikipedia", 
"kafka.bootstrap.servers"="brokeraddress:9092",
"kafka.serde.class"="org.apache.hadoop.hive.serde2.JsonSerDe");
{code}
h2. Kafka Metadata
In order to keep track of Kafka records the storage handler will add automatically the Kafka
row metadata eg partition id, record offset and record timestamp. 
{code}
DESCRIBE EXTENDED kafka_table

timestamp              	timestamp           	from deserializer   
page                	string              	from deserializer   
user                	string              	from deserializer   
language            	string              	from deserializer   
country             	string              	from deserializer   
continent           	string              	from deserializer   
namespace           	string              	from deserializer   
newpage             	boolean             	from deserializer   
unpatrolled         	boolean             	from deserializer   
anonymous           	boolean             	from deserializer   
robot               	boolean             	from deserializer   
added               	int                 	from deserializer   
deleted             	int                 	from deserializer   
delta               	bigint              	from deserializer   
__partition         	int                 	from deserializer   
__offset            	bigint              	from deserializer   
__timestamp         	bigint              	from deserializer   

{code}

h2. Filter push down.
Newer Kafka consumers 0.11.0 and higher allow seeking on the stream based on a given offset.
The proposed storage handler will be able to leverage such API by pushing down filters over
metadata columns, namely __partition (int), __offset(long) and __timestamp(long)
For instance Query like
{code} 
select `__offset` from kafka_table where (`__offset` < 10 and `__offset`>3 and `__partition`
= 0) or (`__partition` = 0 and `__offset` < 105 and `__offset` > 99) or (`__offset`
= 109);
{code}
Will result on a scan of partition 0 only then read only records between offset 4 and 109.


h2. With timestamp seeks 

The seeking based on the internal timestamps allows the handler to run on recently arrived
data, by doing

{code}
select count(*) from kafka_table where `__timestamp` >  1000 * to_unix_timestamp(CURRENT_TIMESTAMP
- interval '20' hours) ;
{code}

This allows for implicit relationships between event timestamps and kafka timestamps to be
expressed in queries (i.e event_timestamp is always < than kafka __timestamp and kafka
__timestamp is never > 15 minutes from event etc).

  was:
h1. Goal
* Read streaming data form Kafka queue as an external table.
* Allow streaming navigation by pushing down filters on Kafka record partition id, offset
and timestamp. 
* Insert streaming data form Kafka to an actual Hive internal table, using CTAS statement.
h1. Example
h2. Create the external table
{code} 
CREATE EXTERNAL TABLE kafka_table (`timestamp` timestamp, page string, `user` string, language
string, added int, deleted int, flags string,comment string, namespace string)
STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
TBLPROPERTIES 
("kafka.topic" = "wikipedia", 
"kafka.bootstrap.servers"="brokeraddress:9092",
"kafka.serde.class"="org.apache.hadoop.hive.serde2.JsonSerDe");
{code}
h2. Kafka Metadata
In order to keep track of Kafka records the storage handler will add automatically the Kafka
row metadata eg partition id, record offset and record timestamp. 
{code}
DESCRIBE EXTENDED kafka_table

timestamp              	timestamp           	from deserializer   
page                	string              	from deserializer   
user                	string              	from deserializer   
language            	string              	from deserializer   
country             	string              	from deserializer   
continent           	string              	from deserializer   
namespace           	string              	from deserializer   
newpage             	boolean             	from deserializer   
unpatrolled         	boolean             	from deserializer   
anonymous           	boolean             	from deserializer   
robot               	boolean             	from deserializer   
added               	int                 	from deserializer   
deleted             	int                 	from deserializer   
delta               	bigint              	from deserializer   
__partition         	int                 	from deserializer   
__offset            	bigint              	from deserializer   
__timestamp         	bigint              	from deserializer   

{code}

h2. Filter push down.
Newer Kafka consumers 0.11.0 and higher allow seeking on the stream based on a given offset.
The proposed storage handler will be able to leverage such API by pushing down filters over
metadata columns, namely __partition (int), __offset(long) and __timestamp(long)
For instance Query like
{code} 
select `__offset` from kafka_table where (`__offset` < 10 and `__offset`>3 and `__partition`
= 0) or (`__partition` = 0 and `__offset` < 105 and `__offset` > 99) or (`__offset`
= 109);
{code}
Will result on a scan of partition 0 only then read only records between offset 4 and 109.




> Hive Kafka Storage Handler
> --------------------------
>
>                 Key: HIVE-20377
>                 URL: https://issues.apache.org/jira/browse/HIVE-20377
>             Project: Hive
>          Issue Type: New Feature
>    Affects Versions: 4.0.0
>            Reporter: slim bouguerra
>            Assignee: slim bouguerra
>            Priority: Major
>         Attachments: HIVE-20377.4.patch, HIVE-20377.5.patch, HIVE-20377.6.patch, HIVE-20377.patch
>
>
> h1. Goal
> * Read streaming data form Kafka queue as an external table.
> * Allow streaming navigation by pushing down filters on Kafka record partition id, offset
and timestamp. 
> * Insert streaming data form Kafka to an actual Hive internal table, using CTAS statement.
> h1. Example
> h2. Create the external table
> {code} 
> CREATE EXTERNAL TABLE kafka_table (`timestamp` timestamp, page string, `user` string,
language string, added int, deleted int, flags string,comment string, namespace string)
> STORED BY 'org.apache.hadoop.hive.kafka.KafkaStorageHandler'
> TBLPROPERTIES 
> ("kafka.topic" = "wikipedia", 
> "kafka.bootstrap.servers"="brokeraddress:9092",
> "kafka.serde.class"="org.apache.hadoop.hive.serde2.JsonSerDe");
> {code}
> h2. Kafka Metadata
> In order to keep track of Kafka records the storage handler will add automatically the
Kafka row metadata eg partition id, record offset and record timestamp. 
> {code}
> DESCRIBE EXTENDED kafka_table
> timestamp              	timestamp           	from deserializer   
> page                	string              	from deserializer   
> user                	string              	from deserializer   
> language            	string              	from deserializer   
> country             	string              	from deserializer   
> continent           	string              	from deserializer   
> namespace           	string              	from deserializer   
> newpage             	boolean             	from deserializer   
> unpatrolled         	boolean             	from deserializer   
> anonymous           	boolean             	from deserializer   
> robot               	boolean             	from deserializer   
> added               	int                 	from deserializer   
> deleted             	int                 	from deserializer   
> delta               	bigint              	from deserializer   
> __partition         	int                 	from deserializer   
> __offset            	bigint              	from deserializer   
> __timestamp         	bigint              	from deserializer   
> {code}
> h2. Filter push down.
> Newer Kafka consumers 0.11.0 and higher allow seeking on the stream based on a given
offset. The proposed storage handler will be able to leverage such API by pushing down filters
over metadata columns, namely __partition (int), __offset(long) and __timestamp(long)
> For instance Query like
> {code} 
> select `__offset` from kafka_table where (`__offset` < 10 and `__offset`>3 and
`__partition` = 0) or (`__partition` = 0 and `__offset` < 105 and `__offset` > 99) or
(`__offset` = 109);
> {code}
> Will result on a scan of partition 0 only then read only records between offset 4 and
109. 
> h2. With timestamp seeks 
> The seeking based on the internal timestamps allows the handler to run on recently arrived
data, by doing
> {code}
> select count(*) from kafka_table where `__timestamp` >  1000 * to_unix_timestamp(CURRENT_TIMESTAMP
- interval '20' hours) ;
> {code}
> This allows for implicit relationships between event timestamps and kafka timestamps
to be expressed in queries (i.e event_timestamp is always < than kafka __timestamp and
kafka __timestamp is never > 15 minutes from event etc).



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message