spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mich Talebzadeh <mich.talebza...@gmail.com>
Subject Re: Flume integration
Date Sun, 20 Nov 2016 21:29:51 GMT
Agreed but imagine a situation where your input stream is IBM/MQ and you
want to use Spark to do some form of near real time calculation (Speed
layer).

Ignoring ingesting directly from IBM/MQ you can possibly use Flume to get
data from IBM/MQ on HDFS. However, that is by definition a batch layer.

Other alternative is to use Flume to feed into Kafka and do calculation in
Spark itself on the raw data.


I did search in Spark forum to see how many examples of using Flume
integration with Spark and so found this example from Ian. So I suppose you
can argue that there is no need for Kafka. However, Kafka integration into
Spark streaming is well established and at least I have used in in anger
and feel more comfortable that it is tried and tested. The issue is not
necessarily flume to Spark streaming is not doable but more risk aversion.

Let me know your thoughts.

HTH


​​

Dr Mich Talebzadeh



LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
<https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*



http://talebzadehmich.wordpress.com


*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.



On 20 November 2016 at 20:59, ayan guha <guha.ayan@gmail.com> wrote:

> Hi
>
> While I am following this discussion with interest, I am trying to
> comprehend any architectural benefit of a spark sink.
>
> Is there any feature in flume makes it more suitable to ingest stream data
> than sppark streaming, so that we should chain them? For example does it
> help durability or reliability of the source?
>
> Or, it is a more tactical choice based on connector availability or such?
>
> To me, flume is important component to ingest streams to hdfs or hive
> directly ie it plays on the batch side of lambda architecture pattern.
> On 20 Nov 2016 22:30, "Mich Talebzadeh" <mich.talebzadeh@gmail.com> wrote:
>
>> Hi Ian,
>>
>> Has this been resolved?
>>
>> How about data to Flume and then Kafka and Kafka streaming into Spark?
>>
>> Thanks
>>
>> Dr Mich Talebzadeh
>>
>>
>>
>> LinkedIn * https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
>> <https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw>*
>>
>>
>>
>> http://talebzadehmich.wordpress.com
>>
>>
>> *Disclaimer:* Use it at your own risk. Any and all responsibility for
>> any loss, damage or destruction of data or any other property which may
>> arise from relying on this email's technical content is explicitly
>> disclaimed. The author will in no case be liable for any monetary damages
>> arising from such loss, damage or destruction.
>>
>>
>>
>> On 13 July 2016 at 11:13, Ian Brooks <i.brooks@sensewhere.com> wrote:
>>
>>> Hi,
>>>
>>>
>>>
>>> I'm currently trying to implement a prototype Spark application that
>>> gets data from Flume and processes it. I'm using the pull based method
>>> mentioned in https://spark.apache.org/docs/
>>> 1.6.1/streaming-flume-integration.html
>>>
>>>
>>>
>>> The is initially working fine for getting data from Flume, however the
>>> Spark client doesn't appear to be letting Flume know that the data has been
>>> received, so Flume doesn't remove it from the batch.
>>>
>>>
>>>
>>> After 100 requests Flume stops allowing any new data and logs
>>>
>>>
>>>
>>> 08 Jul 2016 14:59:00,265 WARN  [Spark Sink Processor Thread - 5]
>>> (org.apache.spark.streaming.flume.sink.Logging$class.logWarning:80)  -
>>> Error while processing transaction.
>>> org.apache.flume.ChannelException: Take list for MemoryTransaction,
>>> capacity 100 full, consider committing more frequently, increasing
>>> capacity, or increasing thread count
>>>        at org.apache.flume.channel.MemoryChannel$MemoryTransaction.doT
>>> ake(MemoryChannel.java:96)
>>>
>>>
>>>
>>> My code to pull the data from Flume is
>>>
>>>
>>>
>>> SparkConf sparkConf = new SparkConf(true).setAppName("SLAMSpark");
>>>
>>> Duration batchInterval = new Duration(10000);
>>>
>>> final String checkpointDir = "/tmp/";
>>>
>>>
>>>
>>> final JavaStreamingContext ssc = new JavaStreamingContext(sparkConf,
>>> batchInterval);
>>>
>>> ssc.checkpoint(checkpointDir);
>>>
>>> JavaReceiverInputDStream<SparkFlumeEvent> flumeStream =
>>> FlumeUtils.createPollingStream(ssc, host, port);
>>>
>>>
>>>
>>> // Transform each flume avro event to a process-able format
>>>
>>> JavaDStream<String> transformedEvents = flumeStream.map(new
>>> Function<SparkFlumeEvent, String>() {
>>>
>>>
>>>
>>> @Override
>>>
>>> public String call(SparkFlumeEvent flumeEvent) throws Exception {
>>>
>>> String flumeEventStr = flumeEvent.event().toString();
>>>
>>> avroData avroData = new avroData();
>>>
>>> Gson gson = new GsonBuilder().create();
>>>
>>> avroData = gson.fromJson(flumeEventStr, avroData.class);
>>>
>>> HashMap<String,String> body = avroData.getBody();
>>>
>>> String data = body.get("bytes");
>>>
>>> return data;
>>>
>>> }
>>>
>>> });
>>>
>>>
>>>
>>> ...
>>>
>>>
>>>
>>> ssc.start();
>>>
>>> ssc.awaitTermination();
>>>
>>> ssc.close();
>>>
>>> }
>>>
>>>
>>>
>>> Is there something specific I should be doing to let the Flume server
>>> know the batch has been received and processed?
>>>
>>>
>>> --
>>>
>>> Ian Brooks
>>>
>>>
>>>
>>
>>

Mime
View raw message