giraph-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (GIRAPH-1137) Remove channel probing from Netty worker thread for credit-based flow-control
Date Tue, 21 Mar 2017 16:15:41 GMT

    [ https://issues.apache.org/jira/browse/GIRAPH-1137?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15934816#comment-15934816
] 

ASF GitHub Bot commented on GIRAPH-1137:
----------------------------------------

Github user majakabiljo commented on a diff in the pull request:

    https://github.com/apache/giraph/pull/26#discussion_r107200544
  
    --- Diff: giraph-core/src/main/java/org/apache/giraph/comm/flow_control/CreditBasedFlowControl.java
---
    @@ -215,10 +231,38 @@ public void run() {
             }
           }
         });
    -    thread.setUncaughtExceptionHandler(exceptionHandler);
    -    thread.setName("resume-sender");
    -    thread.setDaemon(true);
    -    thread.start();
    +    resumeHandlerThread.setUncaughtExceptionHandler(exceptionHandler);
    +    resumeHandlerThread.setName("resume-sender");
    +    resumeHandlerThread.setDaemon(true);
    +    resumeHandlerThread.start();
    +
    +    // Thread to handle/send cached requests
    +    Thread cachedRequestHandlerThread = new Thread(new Runnable() {
    +      @Override
    +      public void run() {
    +        while (true) {
    +          Pair<Integer, WritableRequest> pair = null;
    +          try {
    +            pair = toBeSent.take();
    +          } catch (InterruptedException e) {
    +            throw new IllegalStateException("run: failed while waiting to " +
    +                "take an element from the request queue!", e);
    +          }
    +          int taskId = pair.getLeft();
    +          WritableRequest request = pair.getRight();
    +          nettyClient.doSend(taskId, request);
    +          if (aggregateUnsentRequests.decrementAndGet() == 0) {
    +            synchronized (aggregateUnsentRequests) {
    +              aggregateUnsentRequests.notifyAll();
    +            }
    +          }
    +        }
    +      }
    +    });
    +    cachedRequestHandlerThread.setUncaughtExceptionHandler(exceptionHandler);
    +    cachedRequestHandlerThread.setName("cached-req-sender");
    +    cachedRequestHandlerThread.setDaemon(true);
    +    cachedRequestHandlerThread.start();
    --- End diff --
    
    You can create a utility method like ThreadUtils.startThread with exception handler.


> Remove channel probing from Netty worker thread for credit-based flow-control
> -----------------------------------------------------------------------------
>
>                 Key: GIRAPH-1137
>                 URL: https://issues.apache.org/jira/browse/GIRAPH-1137
>             Project: Giraph
>          Issue Type: Bug
>            Reporter: Hassan Eslami
>            Assignee: Hassan Eslami
>
> In credit-based flow-control, sometimes, client threads (one type of Netty worker threads
used in Giraph) try to send requests to other workers. This is bad practice for Netty and
can cause Netty to mark the execution as deadlock-prone (an example exception shown below).
Client threads should only be responsible for sending ACK/NACK messages in response to requests,
and they should do so by reuseing the channel from which they received the request. In the
current implementation, client threads may try to send unsent/cached requests in credit-based
flow control. Sending such requests should be delegated to other threads.
> WARN 2017-03-08 06:06:22,104 [netty-client-worker-3] ....
> io.netty.util.concurrent.BlockingOperationException: DefaultChannelPromise@2c455378(incomplete)
> at io.netty.util.concurrent.DefaultPromise.checkDeadLock(DefaultPromise.java:383)
> at io.netty.channel.DefaultChannelPromise.checkDeadLock(DefaultChannelPromise.java:157)
> at io.netty.util.concurrent.DefaultPromise.await0(DefaultPromise.java:343)
> at io.netty.util.concurrent.DefaultPromise.await(DefaultPromise.java:259)
> at org.apache.giraph.utils.ProgressableUtils$ChannelFutureWaitable.waitFor(ProgressableUtils.java:461)
> at org.apache.giraph.utils.ProgressableUtils.waitFor(ProgressableUtils.java:214)
> at org.apache.giraph.utils.ProgressableUtils.waitForever(ProgressableUtils.java:180)
> at org.apache.giraph.utils.ProgressableUtils.waitForever(ProgressableUtils.java:165)
> at org.apache.giraph.utils.ProgressableUtils.awaitChannelFuture(ProgressableUtils.java:132)
> at org.apache.giraph.comm.netty.NettyClient.getNextChannel(NettyClient.java:715)
> at org.apache.giraph.comm.netty.NettyClient.writeRequestToChannel(NettyClient.java:799)
> at org.apache.giraph.comm.netty.NettyClient.doSend(NettyClient.java:789)
> at org.apache.giraph.comm.flow_control.CreditBasedFlowControl.trySendCachedRequests(CreditBasedFlowControl.java:515)
> at org.apache.giraph.comm.flow_control.CreditBasedFlowControl.messageAckReceived(CreditBasedFlowControl.java:485)
> at org.apache.giraph.comm.netty.NettyClient.messageReceived(NettyClient.java:840)
> at org.apache.giraph.comm.netty.handler.ResponseClientHandler.channelRead(ResponseClientHandler.java:87)
> at io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:338)
> at io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:324)
> at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:153)
> at io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:338)
> at io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:324)
> at org.apache.giraph.comm.netty.InboundByteCounter.channelRead(InboundByteCounter.java:89)
> at io.netty.channel.DefaultChannelHandlerContext.invokeChannelRead(DefaultChannelHandlerContext.java:338)
> at io.netty.channel.DefaultChannelHandlerContext.fireChannelRead(DefaultChannelHandlerContext.java:324)
> at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:785)
> at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:126)
> at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:485)
> at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:452)
> at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:346)
> at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:101)
> at java.lang.Thread.run(Thread.java:745)



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Mime
View raw message