flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Fabian Hueske <fhue...@gmail.com>
Subject Re: KafkaJsonTableSource purpose
Date Tue, 24 Apr 2018 10:18:53 GMT
Hi Sebastien,

I think you can do that with Flink's Table API / SQL and the
KafkaJsonTableSource.
Note that in Flink 1.4.2, the KafkaJsonTableSource does not support flat
JSON yet.
You'd also need a table-valued UDFs for the parsing of the message and
joining the result with the original row. Depending on what you want to do,
you might need additional UDFs.

Best,
Fabian

2018-04-24 8:48 GMT+02:00 miki haiat <miko5054@gmail.com>:

> HI ,
> Assuming that your looking for streaming   use case , i think this is a
> better approach
>
>    1. Send Avro from logstash  ,better performance.
>    2. Deserialize it to POJO .
>    3. Do logic...
>
>
>
>
> On Mon, Apr 23, 2018 at 4:03 PM, Lehuede sebastien <lehuede.s@gmail.com>
> wrote:
>
>> Hi Guys,
>>
>> I'm actually trying to understand the purpose of Table and in particular
>> KafkaJsonTableSource. I try to see if for my use case ths can be usefull.
>>
>> Here is my context :
>>
>> I send logs on logstash, i add some information (Type, Tags), Logstash
>> send logs to Kafka in JSON format and finally i use Flink-Connector-Kafka
>> to read from Kafka and parse the logs.
>>
>>
>> Before any processing events from Kafka to Flink look like this :
>>
>> *{"message":"accept service_id: domain-udp; src: 1.1.1.1; dst: 2.2.2.2;
>> proto: udp; product: VPN-1 & FireWall-1; service: 53; s_port:
>> 32769","@timestamp":"2018-04-20T14:47:35.285Z","host":"FW"}*
>>
>> Then i use "JSONDeserializationSchema" to deserialize events :
>>
>> *FlinkKafkaConsumer011<ObjectNode> kafkaConsumer = new
>> FlinkKafkaConsumer011<>("Firewall",new
>> JSONDeserializationSchema(),properties);*
>>
>> I take the value of the key "message" :
>>
>> *public String map(ObjectNode value) throws Exception {*
>> *                                String message =
>> value.get("message").asText();*
>>
>> Then parse it with Java Regex and put each match group in a
>> String/Int/... :
>>
>> action : accept
>> service_id : doamin-udp
>> src_ip : 1.1.1.1
>> dst_ip : 2.2.2.2
>> .....
>>
>> Now i want to replace "message" key by "rawMessage" and put each match
>> group in JSON object to obain the final result :
>>
>> *{"rawMessage":"accept service_id: domain-udp; src: 1.1.1.1; dst:
>> 2.2.2.2; proto: udp; product: VPN-1 & FireWall-1; service: 53; s_port:
>> 32769",*
>> *"@timestamp":"2018-04-20T14:47:35.285Z",*
>> *"host":"FW",*
>> *"type":"firewall",*
>> *"tags":["Checkpoint"],*
>> *"action":"accept",*
>> *"service_id":"domain-udp",*
>> *"src_ip":"1.1.1.1",*
>> *"dst_ip":"2.2.2.2",*
>> *...}*
>>
>> I'm a newbie with Streaming Application technologies, with Flink, and for
>> the moment i still discover how it works and what are the different
>> fonctionnalities. But when i was looking for a solution to obtain my final
>> result, i came across KafkaJsonTableSource.
>>
>> Does anyone think this can be a good solution for my use case ?
>>
>> I think i will be able to store JSON from Kafka, process data then modify
>> the table and send data to another Kafka, is it correct ?
>>
>> Regards,
>> Sebastien
>>
>>
>>
>

Mime
View raw message