kudu-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ravi Kanth <ravikanth....@gmail.com>
Subject Re: Spark Streaming + Kudu
Date Thu, 08 Mar 2018 18:11:09 GMT
Mike,

I am attaching the jstack of executor pid's when Kudu is not available.

Thanks,
Ravi

On 6 March 2018 at 20:46, Ravi Kanth <ravikanth.4b0@gmail.com> wrote:

> Mike- I actually got a hold of the pid's for the spark executors but
> facing issues to run the jstack. There are some VM exceptions. I will
> figure it out and will attach the jstack. Thanks for your patience.
>
> On 6 March 2018 at 20:42, Mike Percy <mpercy@apache.org> wrote:
>
>> Hmm, could you try in spark local mode? i.e. https://jaceklaskowski.gi
>> tbooks.io/mastering-apache-spark/content/spark-local.html
>>
>> Mike
>>
>> On Tue, Mar 6, 2018 at 7:14 PM, Ravi Kanth <ravikanth.4b0@gmail.com>
>> wrote:
>>
>>> Mike,
>>>
>>> Can you clarify a bit on grabbing the jstack for the process? I launched
>>> my Spark application and tried to get the pid using which I thought I can
>>> grab jstack trace during hang. Unfortunately, I am not able to figure out
>>> grabbing pid for Spark application.
>>>
>>> Thanks,
>>> Ravi
>>>
>>> On 6 March 2018 at 18:36, Mike Percy <mpercy@apache.org> wrote:
>>>
>>>> Thanks Ravi. Would you mind attaching the output of jstack on the
>>>> process during this hang? That would show what the Kudu client threads are
>>>> doing, as what we are seeing here is just the netty boss thread.
>>>>
>>>> Mike
>>>>
>>>> On Tue, Mar 6, 2018 at 8:52 AM, Ravi Kanth <ravikanth.4b0@gmail.com>
>>>> wrote:
>>>>
>>>>>
>>>>> Yes, I have debugged to find the root cause. Every logger before "table
>>>>> = client.openTable(tableName);" is executing fine and exactly at the
>>>>> point of opening the table, it is throwing the below exception and nothing
>>>>> is being executed after that. Still the Spark batches are being processed
>>>>> and at opening the table is failing. I tried catching it with no luck.
>>>>> Please find below the exception.
>>>>>
>>>>> 8/02/23 00:16:30 ERROR client.TabletClient: [Peer
>>>>> bd91f34d456a4eccaae50003c90f0fb2] Unexpected exception from
>>>>> downstream on [id: 0x6e13b01f]
>>>>> java.net.ConnectException: Connection refused:
>>>>> kudu102.dev.sac.int.threatmetrix.com/10.112.3.12:7050
>>>>>     at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
>>>>>     at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl
>>>>> .java:717)
>>>>>     at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>>>>> .nio.NioClientBoss.connect(NioClientBoss.java:152)
>>>>>     at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>>>>> .nio.NioClientBoss.processSelectedKeys(NioClientBoss.java:105)
>>>>>     at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>>>>> .nio.NioClientBoss.process(NioClientBoss.java:79)
>>>>>     at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>>>>> .nio.AbstractNioSelector.run(AbstractNioSelector.java:337)
>>>>>     at org.apache.kudu.client.shaded.org.jboss.netty.channel.socket
>>>>> .nio.NioClientBoss.run(NioClientBoss.java:42)
>>>>>     at org.apache.kudu.client.shaded.org.jboss.netty.util.ThreadRen
>>>>> amingRunnable.run(ThreadRenamingRunnable.java:108)
>>>>>     at org.apache.kudu.client.shaded.org.jboss.netty.util.internal.
>>>>> DeadLockProofWorker$1.run(DeadLockProofWorker.java:42)
>>>>>     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPool
>>>>> Executor.java:1142)
>>>>>     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoo
>>>>> lExecutor.java:617)
>>>>>     at java.lang.Thread.run(Thread.java:745)
>>>>>
>>>>>
>>>>> Thanks,
>>>>> Ravi
>>>>>
>>>>> On 5 March 2018 at 23:52, Mike Percy <mpercy@apache.org> wrote:
>>>>>
>>>>>> Have you considered checking your session error count or pending
>>>>>> errors in your while loop every so often? Can you identify where
your code
>>>>>> is hanging when the connection is lost (what line)?
>>>>>>
>>>>>> Mike
>>>>>>
>>>>>> On Mon, Mar 5, 2018 at 9:08 PM, Ravi Kanth <ravikanth.4b0@gmail.com>
>>>>>> wrote:
>>>>>>
>>>>>>> In addition to my previous comment, I raised a support ticket
for
>>>>>>> this issue with Cloudera and one of the support person mentioned
below,
>>>>>>>
>>>>>>> *"Thank you for clarifying, The exceptions are logged but not
>>>>>>> re-thrown to an upper layer, so that explains why the Spark application
is
>>>>>>> not aware of the underlying error."*
>>>>>>>
>>>>>>> On 5 March 2018 at 21:02, Ravi Kanth <ravikanth.4b0@gmail.com>
>>>>>>> wrote:
>>>>>>>
>>>>>>>> Mike,
>>>>>>>>
>>>>>>>> Thanks for the information. But, once the connection to any
of the
>>>>>>>> Kudu servers is lost then there is no way I can have a control
on the
>>>>>>>> KuduSession object and so with getPendingErrors(). The KuduClient
in this
>>>>>>>> case is becoming a zombie and never returned back till the
connection is
>>>>>>>> properly established. I tried doing all that you have suggested
with no
>>>>>>>> luck. Attaching my KuduClient code.
>>>>>>>>
>>>>>>>> package org.dwh.streaming.kudu.sparkkudustreaming;
>>>>>>>>
>>>>>>>> import java.util.HashMap;
>>>>>>>> import java.util.Iterator;
>>>>>>>> import java.util.Map;
>>>>>>>> import org.apache.hadoop.util.ShutdownHookManager;
>>>>>>>> import org.apache.kudu.client.*;
>>>>>>>> import org.apache.spark.api.java.JavaRDD;
>>>>>>>> import org.slf4j.Logger;
>>>>>>>> import org.slf4j.LoggerFactory;
>>>>>>>> import org.dwh.streaming.kudu.sparkkudustreaming.constants.SpecialN
>>>>>>>> ullConstants;
>>>>>>>>
>>>>>>>> public class KuduProcess {
>>>>>>>> private static Logger logger = LoggerFactory.getLogger(KuduPr
>>>>>>>> ocess.class);
>>>>>>>> private KuduTable table;
>>>>>>>> private KuduSession session;
>>>>>>>>
>>>>>>>> public static void upsertKudu(JavaRDD<Map<String, Object>>
rdd,
>>>>>>>> String host, String tableName) {
>>>>>>>> rdd.foreachPartition(iterator -> {
>>>>>>>> RowErrorsAndOverflowStatus errors = upsertOpIterator(iterator,
>>>>>>>> tableName, host);
>>>>>>>> int errorCount = errors.getRowErrors().length;
>>>>>>>> if(errorCount > 0){
>>>>>>>> throw new RuntimeException("Failed to write " + errorCount
+ "
>>>>>>>> messages into Kudu");
>>>>>>>> }
>>>>>>>> });
>>>>>>>> }
>>>>>>>> private static RowErrorsAndOverflowStatus
>>>>>>>> upsertOpIterator(Iterator<Map<String, Object>>
iter, String
>>>>>>>> tableName, String host) {
>>>>>>>> try {
>>>>>>>> AsyncKuduClient asyncClient = KuduConnection.getAsyncClient(host);
>>>>>>>> KuduClient client = asyncClient.syncClient();
>>>>>>>> table = client.openTable(tableName);
>>>>>>>> session = client.newSession();
>>>>>>>> session.setFlushMode(SessionConfiguration.FlushMode.AUTO_FLU
>>>>>>>> SH_BACKGROUND);
>>>>>>>> while (iter.hasNext()) {
>>>>>>>> upsertOp(iter.next());
>>>>>>>> }
>>>>>>>> } catch (KuduException e) {
>>>>>>>> logger.error("Exception in upsertOpIterator method", e);
>>>>>>>> }
>>>>>>>> finally{
>>>>>>>> try {
>>>>>>>> session.close();
>>>>>>>> } catch (KuduException e) {
>>>>>>>> logger.error("Exception in Connection close", e);
>>>>>>>> }
>>>>>>>> }
>>>>>>>> return session.getPendingErrors();        --------------------->
>>>>>>>> Once, the connection is lost, this part of the code never
gets called and
>>>>>>>> the Spark job will keep on running and processing the records
>>>>>>>> while the KuduClient is trying to connect to Kudu. Meanwhile,
we are
>>>>>>>> loosing all the records.
>>>>>>>> }
>>>>>>>> public static void upsertOp(Map<String, Object> formattedMap)
{
>>>>>>>> if (formattedMap.size() != 0) {
>>>>>>>> try {
>>>>>>>> Upsert upsert = table.newUpsert();
>>>>>>>> PartialRow row = upsert.getRow();
>>>>>>>> for (Map.Entry<String, Object> entry : formattedMap.entrySet())
{
>>>>>>>> if (entry.getValue().getClass().equals(String.class)) {
>>>>>>>> if (entry.getValue().equals(SpecialNullConstants.specialStringN
>>>>>>>> ull))
>>>>>>>> row.setNull(entry.getKey());
>>>>>>>> else
>>>>>>>> row.addString(entry.getKey(), (String) entry.getValue());
>>>>>>>> } else if (entry.getValue().getClass().equals(Long.class))
{
>>>>>>>> if (entry.getValue().equals(SpecialNullConstants.specialLongNull))
>>>>>>>> row.setNull(entry.getKey());
>>>>>>>> else
>>>>>>>> row.addLong(entry.getKey(), (Long) entry.getValue());
>>>>>>>> } else if (entry.getValue().getClass().equals(Integer.class))
{
>>>>>>>> if (entry.getValue().equals(SpecialNullConstants.specialIntNull))
>>>>>>>> row.setNull(entry.getKey());
>>>>>>>> else
>>>>>>>> row.addInt(entry.getKey(), (Integer) entry.getValue());
>>>>>>>> }
>>>>>>>> }
>>>>>>>>
>>>>>>>> session.apply(upsert);
>>>>>>>> } catch (Exception e) {
>>>>>>>> logger.error("Exception during upsert:", e);
>>>>>>>> }
>>>>>>>> }
>>>>>>>> }
>>>>>>>> }
>>>>>>>> class KuduConnection {
>>>>>>>> private static Logger logger = LoggerFactory.getLogger(KuduCo
>>>>>>>> nnection.class);
>>>>>>>> private static Map<String, AsyncKuduClient> asyncCache
= new
>>>>>>>> HashMap<>();
>>>>>>>> private static int ShutdownHookPriority = 100;
>>>>>>>>
>>>>>>>> static AsyncKuduClient getAsyncClient(String kuduMaster)
{
>>>>>>>> if (!asyncCache.containsKey(kuduMaster)) {
>>>>>>>> AsyncKuduClient asyncClient = new AsyncKuduClient.AsyncKuduClien
>>>>>>>> tBuilder(kuduMaster).build();
>>>>>>>> ShutdownHookManager.get().addShutdownHook(new Runnable()
{
>>>>>>>> @Override
>>>>>>>> public void run() {
>>>>>>>> try {
>>>>>>>> asyncClient.close();
>>>>>>>> } catch (Exception e) {
>>>>>>>> logger.error("Exception closing async client", e);
>>>>>>>> }
>>>>>>>> }
>>>>>>>> }, ShutdownHookPriority);
>>>>>>>> asyncCache.put(kuduMaster, asyncClient);
>>>>>>>> }
>>>>>>>> return asyncCache.get(kuduMaster);
>>>>>>>> }
>>>>>>>> }
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> Thanks,
>>>>>>>> Ravi
>>>>>>>>
>>>>>>>> On 5 March 2018 at 16:20, Mike Percy <mpercy@apache.org>
wrote:
>>>>>>>>
>>>>>>>>> Hi Ravi, it would be helpful if you could attach what
you are
>>>>>>>>> getting back from getPendingErrors() -- perhaps from
dumping
>>>>>>>>> RowError.toString() from items in the returned array
-- and indicate what
>>>>>>>>> you were hoping to get back. Note that a RowError can
also return to you
>>>>>>>>> the Operation
>>>>>>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/RowError.html#getOperation-->
>>>>>>>>> that you used to generate the write. From the Operation,
you can get the
>>>>>>>>> original PartialRow
>>>>>>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/PartialRow.html>
>>>>>>>>> object, which should be able to identify the affected
row that the write
>>>>>>>>> failed for. Does that help?
>>>>>>>>>
>>>>>>>>> Since you are using the Kudu client directly, Spark is
not
>>>>>>>>> involved from the Kudu perspective, so you will need
to deal with Spark on
>>>>>>>>> your own in that case.
>>>>>>>>>
>>>>>>>>> Mike
>>>>>>>>>
>>>>>>>>> On Mon, Mar 5, 2018 at 1:59 PM, Ravi Kanth <
>>>>>>>>> ravikanth.4b0@gmail.com> wrote:
>>>>>>>>>
>>>>>>>>>> Hi Mike,
>>>>>>>>>>
>>>>>>>>>> Thanks for the reply. Yes, I am using AUTO_FLUSH_BACKGROUND.
>>>>>>>>>>
>>>>>>>>>> So, I am trying to use Kudu Client API to perform
UPSERT into
>>>>>>>>>> Kudu and I integrated this with Spark. I am trying
to test a case where in
>>>>>>>>>> if any of Kudu server fails. So, in this case, if
there is any problem in
>>>>>>>>>> writing, getPendingErrors() should give me a way
to handle these errors so
>>>>>>>>>> that I can successfully terminate my Spark Job. This
is what I am trying to
>>>>>>>>>> do.
>>>>>>>>>>
>>>>>>>>>> But, I am not able to get a hold of the exceptions
being thrown
>>>>>>>>>> from with in the KuduClient when retrying to connect
to Tablet Server. My
>>>>>>>>>> getPendingErrors is not getting ahold of these exceptions.
>>>>>>>>>>
>>>>>>>>>> Let me know if you need more clarification. I can
post some
>>>>>>>>>> Snippets.
>>>>>>>>>>
>>>>>>>>>> Thanks,
>>>>>>>>>> Ravi
>>>>>>>>>>
>>>>>>>>>> On 5 March 2018 at 13:18, Mike Percy <mpercy@apache.org>
wrote:
>>>>>>>>>>
>>>>>>>>>>> Hi Ravi, are you using AUTO_FLUSH_BACKGROUND
>>>>>>>>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/SessionConfiguration.FlushMode.html>?
>>>>>>>>>>> You mention that you are trying to use getPendingErrors()
>>>>>>>>>>> <https://kudu.apache.org/releases/1.6.0/apidocs/org/apache/kudu/client/KuduSession.html#getPendingErrors-->
but
>>>>>>>>>>> it sounds like it's not working for you -- can
you be more specific about
>>>>>>>>>>> what you expect and what you are observing?
>>>>>>>>>>>
>>>>>>>>>>> Thanks,
>>>>>>>>>>> Mike
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> On Mon, Feb 26, 2018 at 8:04 PM, Ravi Kanth <
>>>>>>>>>>> ravikanth.4b0@gmail.com> wrote:
>>>>>>>>>>>
>>>>>>>>>>>> Thank Clifford. We are running Kudu 1.4 version.
Till date we
>>>>>>>>>>>> didn't see any issues in production and we
are not losing tablet servers.
>>>>>>>>>>>> But, as part of testing I have to generate
few unforeseen cases to analyse
>>>>>>>>>>>> the application performance. One among that
is bringing down the tablet
>>>>>>>>>>>> server or master server intentionally during
which I observed the loss of
>>>>>>>>>>>> records. Just wanted to test cases out of
the happy path here. Once again
>>>>>>>>>>>> thanks for taking time to respond to me.
>>>>>>>>>>>>
>>>>>>>>>>>> - Ravi
>>>>>>>>>>>>
>>>>>>>>>>>> On 26 February 2018 at 19:58, Clifford Resnick
<
>>>>>>>>>>>> cresnick@mediamath.com> wrote:
>>>>>>>>>>>>
>>>>>>>>>>>>> I'll have to get back to you on the code
bits, but I'm pretty
>>>>>>>>>>>>> sure we're doing simple sync batching.
We're not in production yet, but
>>>>>>>>>>>>> after some months of development I haven't
seen any failures, even when
>>>>>>>>>>>>> pushing load doing multiple years' backfill.
I think the real question is
>>>>>>>>>>>>> why are you losing tablet servers? The
only instability we ever had with
>>>>>>>>>>>>> Kudu was when it had that weird ntp sync
issue that was fixed I think for
>>>>>>>>>>>>> 1.6. What version are you running?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Anyway I would think that infinite loop
should be catchable
>>>>>>>>>>>>> somewhere. Our pipeline is set to fail/retry
with Flink snapshots. I
>>>>>>>>>>>>> imagine there is similar with Spark.
Sorry I cant be of more help!
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Feb 26, 2018 9:10 PM, Ravi Kanth <ravikanth.4b0@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Cliff,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks for the response. Well, I do agree
that its simple and
>>>>>>>>>>>>> seamless. In my case, I am able to upsert
~25000 events/sec into Kudu. But,
>>>>>>>>>>>>> I am facing the problem when any of the
Kudu Tablet or master server is
>>>>>>>>>>>>> down. I am not able to get a hold of
the exception from client. The client
>>>>>>>>>>>>> is going into an infinite loop trying
to connect to Kudu. Meanwhile, I am
>>>>>>>>>>>>> loosing my records. I tried handling
the errors through getPendingErrors()
>>>>>>>>>>>>> but still it is helpless. I am using
AsyncKuduClient to establish the
>>>>>>>>>>>>> connection and retrieving the syncClient
from the Async to open the session
>>>>>>>>>>>>> and table. Any help?
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Ravi
>>>>>>>>>>>>>
>>>>>>>>>>>>> On 26 February 2018 at 18:00, Cliff Resnick
<cresny@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> While I can't speak for Spark, we do
use the client API from
>>>>>>>>>>>>> Flink streaming and it's simple and seamless.
It's especially nice if you
>>>>>>>>>>>>> require an Upsert semantic.
>>>>>>>>>>>>>
>>>>>>>>>>>>> On Feb 26, 2018 7:51 PM, "Ravi Kanth"
<ravikanth.4b0@gmail.com>
>>>>>>>>>>>>> wrote:
>>>>>>>>>>>>>
>>>>>>>>>>>>> Hi,
>>>>>>>>>>>>>
>>>>>>>>>>>>> Anyone using Spark Streaming to ingest
data into Kudu and
>>>>>>>>>>>>> using Kudu Client API to do so rather
than the traditional KuduContext API?
>>>>>>>>>>>>> I am stuck at a point and couldn't find
a solution.
>>>>>>>>>>>>>
>>>>>>>>>>>>> Thanks,
>>>>>>>>>>>>> Ravi
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>>
>>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

Mime
View raw message