flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ted Yu <yuzhih...@gmail.com>
Subject Re: Periodic flush sink?
Date Wed, 03 May 2017 09:11:59 GMT
bq. is the mutator thread safe?

See HBASE-17361

On Wed, May 3, 2017 at 1:52 AM, Aljoscha Krettek <aljoscha@apache.org>
wrote:

> Hi Niels,
> With any kind of buffering you need to be careful when it comes to fault
> tolerance. In your case, you should make sure to flush the buffers when
> checkpointing, otherwise you might lose data because those elements will
> not be resend after a failure.
>
> With the periodic timer my only concern would be concurrency issues, i.e.
> is the mutator thread safe?
>
> Best,
> Aljoscha
>
> On 30. Apr 2017, at 09:24, Kamil Dziublinski <kamil.dziublinski@gmail.com>
> wrote:
>
> Hi Niels,
>
> This sounds to me like a great use case for using window functions. You
> could partition your data (use keyby) based on website and then hold your
> window for certain amount of time. After that you could give your sink
> already batched object and store it directly. On top of that if you are
> worried that data might become too big in fixed window time you could use a
> trigger that fires both based on time and size. Although imo its no problem
> to have bigger put for hbase. But you need to test.
> I have very similar use case with kafka and hbase and I solved it like
> that.
> Hope that helps.
> On Sat, 29 Apr 2017 at 18:05, Niels Basjes <Niels@basjes.nl> wrote:
>
>> Thanks.
>>
>> The specific table I have here is used to debugging purposes so at the
>> HBase level I set a TTL of the data of 12 hours.
>> So I'm not worrying about the Hfiles.
>> Doing a lot of 'small' calls has an impact on HBase as a whole (not just
>> this table) so I want buffering.
>> Having a buffer that can hold 1000 events and at times I create 10 events
>> with a single page and I'm the only on on the site (at that moment) the
>> events will be buffered for a much too long time.
>>
>> I did a quick test and this seems to work for my case.
>> In what situations do you guys expect this code construct to fail? Any
>> edge cases I missed?
>>
>> Niels
>>
>> private transient BufferedMutator mutator = null;
>> private transient Timer timer = null;
>>
>> @Override
>> public void open(Configuration parameters) throws Exception {
>>   org.apache.hadoop.conf.Configuration hbaseConfig = HBaseConfiguration.create();
>>   Connection connection = ConnectionFactory.createConnection(hbaseConfig);
>>
>>   mutator = connection.getBufferedMutator(
>>     new BufferedMutatorParams(TableName.valueOf(tableName))
>>       .pool(getDefaultExecutor(hbaseConfig))
>>       .writeBufferSize(HBASE_BUFFER_SIZE)
>>   );
>>
>>   timer = new Timer();
>>   timer.schedule(new TimerTask(){
>>     @Override
>>     public void run() {
>>       try {
>>         MySink.this.mutator.flush();
>>       } catch (Exception e) {
>>         // Ignore
>>       }
>>     }}, HBASE_BUFFER_AUTO_FLUSH_INTERVAL, HBASE_BUFFER_AUTO_FLUSH_INTERVAL);
>> }
>>
>> @Override
>> public void close() throws IOException {
>>   timer.cancel();
>>   mutator.close();
>> }
>>
>>
>>
>>
>>
>> On Sat, Apr 29, 2017 at 4:57 PM, Ted Yu <yuzhihong@gmail.com> wrote:
>>
>>> I expect Flink expert to answer your question.
>>>
>>> bq. I get a flush of the buffers atleast every few seconds
>>>
>>> From hbase point of view, during low traffic period, the above may
>>> result in many small hfiles, leading to more work for the compaction.
>>>
>>> FYI
>>>
>>> On Sat, Apr 29, 2017 at 7:32 AM, Niels Basjes <Niels@basjes.nl> wrote:
>>>
>>>> Hi,
>>>>
>>>> I have a sink that writes my records into HBase.
>>>>
>>>> The data stream is attached to measurements from an internal testing
>>>> instance of the website.
>>>> As a consequence there are periods of really high load (someone is
>>>> doing a load test) and really low load (only a hand full of people are
>>>> testing stuff).
>>>>
>>>> I read the records from Kafka and I want to write the records into
>>>> HBase.
>>>> Because under high load it is more efficient to buffer the writes
>>>> between the client and the server and as indicated by HBase I use a
>>>> BufferedMutator.
>>>>
>>>> This BufferedMutator works with a 'fixed size' buffer and under high
>>>> load setting it to a few MiB improves the performance writing to HBase
>>>> greatly.
>>>> However under low load you have to wait until the buffer is full and
>>>> that can be a LONG time (hours) when the load is really low.
>>>>
>>>> I want to fire a periodic event into my sink to ensure I get a flush of
>>>> the buffers atleast every few seconds.
>>>>
>>>> Simply implement a standard Java  TimerTask and fire that using a Timer?
>>>> Or is there a better way of doing that in Flink?
>>>>
>>>>
>>>> --
>>>> Best regards / Met vriendelijke groeten,
>>>>
>>>> Niels Basjes
>>>>
>>>
>>>
>>
>>
>> --
>> Best regards / Met vriendelijke groeten,
>>
>> Niels Basjes
>>
>
>

Mime
View raw message