From commits-return-1682-apmail-helix-commits-archive=helix.apache.org@helix.apache.org Tue May 7 03:13:57 2013 Return-Path: X-Original-To: apmail-helix-commits-archive@minotaur.apache.org Delivered-To: apmail-helix-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id CFC21FE3F for ; Tue, 7 May 2013 03:13:57 +0000 (UTC) Received: (qmail 80803 invoked by uid 500); 7 May 2013 03:13:57 -0000 Delivered-To: apmail-helix-commits-archive@helix.apache.org Received: (qmail 80538 invoked by uid 500); 7 May 2013 03:13:56 -0000 Mailing-List: contact commits-help@helix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@helix.apache.org Delivered-To: mailing list commits@helix.apache.org Received: (qmail 80491 invoked by uid 99); 7 May 2013 03:13:55 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 May 2013 03:13:55 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 07 May 2013 03:13:28 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id 4917E2388C3E; Tue, 7 May 2013 03:12:05 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1479756 [31/39] - in /incubator/helix/site-content: ./ apidocs/reference/org/apache/helix/ apidocs/reference/org/apache/helix/agent/ apidocs/reference/org/apache/helix/manager/zk/ apidocs/reference/org/apache/helix/messaging/handling/ apid... Date: Tue, 07 May 2013 03:11:00 -0000 To: commits@helix.incubator.apache.org From: kishoreg@apache.org X-Mailer: svnmailer-1.0.8-patched Message-Id: <20130507031205.4917E2388C3E@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Modified: incubator/helix/site-content/xref-test/org/apache/helix/integration/TestZkCallbackHandlerLeak.html URL: http://svn.apache.org/viewvc/incubator/helix/site-content/xref-test/org/apache/helix/integration/TestZkCallbackHandlerLeak.html?rev=1479756&r1=1479755&r2=1479756&view=diff ============================================================================== --- incubator/helix/site-content/xref-test/org/apache/helix/integration/TestZkCallbackHandlerLeak.html (original) +++ incubator/helix/site-content/xref-test/org/apache/helix/integration/TestZkCallbackHandlerLeak.html Tue May 7 03:10:53 2013 @@ -34,305 +34,460 @@ 24 import java.util.Map; 25 import java.util.Set; 26 -27 import org.apache.helix.TestHelper; -28 import org.apache.helix.ZkHelixTestManager; -29 import org.apache.helix.ZkTestHelper; -30 import org.apache.helix.ZkUnitTestBase; -31 import org.apache.helix.manager.zk.CallbackHandler; +27 import org.I0Itec.zkclient.IZkChildListener; +28 import org.I0Itec.zkclient.IZkDataListener; +29 import org.apache.helix.*; +30 import org.apache.helix.manager.zk.CallbackHandler; +31 import org.apache.helix.manager.zk.ZkClient; 32 import org.apache.helix.mock.controller.ClusterController; 33 import org.apache.helix.mock.participant.MockParticipant; -34 import org.apache.helix.tools.ClusterStateVerifier; -35 import org.testng.Assert; -36 import org.testng.annotations.Test; -37 -38 public class TestZkCallbackHandlerLeak extends ZkUnitTestBase { -39 -40 @Test -41 public void testCbHandlerLeakOnParticipantSessionExpiry() throws Exception -42 { -43 // Logger.getRootLogger().setLevel(Level.INFO); -44 String className = TestHelper.getTestClassName(); -45 String methodName = TestHelper.getTestMethodName(); -46 String clusterName = className + "_" + methodName; -47 final int n = 2; -48 -49 System.out.println("START " + clusterName + " at " -50 + new Date(System.currentTimeMillis())); -51 -52 TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port -53 "localhost", // participant name prefix -54 "TestDB", // resource name prefix -55 1, // resources -56 32, // partitions per resource -57 n, // number of nodes -58 2, // replicas -59 "MasterSlave", -60 true); // do rebalance -61 +34 import org.apache.helix.model.CurrentState; +35 import org.apache.helix.tools.ClusterStateVerifier; +36 import org.testng.Assert; +37 import org.testng.annotations.Test; +38 +39 public class TestZkCallbackHandlerLeak extends ZkUnitTestBase { +40 +41 @Test +42 public void testCbHandlerLeakOnParticipantSessionExpiry() throws Exception +43 { +44 // Logger.getRootLogger().setLevel(Level.INFO); +45 String className = TestHelper.getTestClassName(); +46 String methodName = TestHelper.getTestMethodName(); +47 String clusterName = className + "_" + methodName; +48 final int n = 2; +49 +50 System.out.println("START " + clusterName + " at " +51 + new Date(System.currentTimeMillis())); +52 +53 TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port +54 "localhost", // participant name prefix +55 "TestDB", // resource name prefix +56 1, // resources +57 32, // partitions per resource +58 n, // number of nodes +59 2, // replicas +60 "MasterSlave", +61 true); // do rebalance 62 -63 ClusterController controller = -64 new ClusterController(clusterName, "controller_0", ZK_ADDR); -65 controller.syncStart(); -66 -67 // start participants -68 MockParticipant[] participants = new MockParticipant[n]; -69 for (int i = 0; i < n; i++) -70 { -71 String instanceName = "localhost_" + (12918 + i); -72 -73 participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null); -74 participants[i].syncStart(); -75 } -76 -77 boolean result = -78 ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, -79 clusterName)); -80 Assert.assertTrue(result); -81 final ZkHelixTestManager controllerManager = controller.getManager(); -82 final ZkHelixTestManager participantManagerToExpire = (ZkHelixTestManager)participants[1].getManager(); -83 -84 // check controller zk-watchers -85 result = TestHelper.verify(new TestHelper.Verifier() { -86 -87 @Override -88 public boolean verify() throws Exception { -89 Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(ZK_ADDR); -90 Set<String> watchPaths = watchers.get("0x" + controllerManager.getSessionId()); -91 // System.out.println("controller watch paths: " + watchPaths); -92 -93 // controller should have 5 + 2n + m + (m+2)n zk-watchers -94 // where n is number of nodes and m is number of resources -95 return watchPaths.size() == (6 + 5 * n); -96 } -97 }, 500); -98 Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers."); -99 -100 // check participant zk-watchers -101 result = TestHelper.verify(new TestHelper.Verifier() { -102 -103 @Override -104 public boolean verify() throws Exception { -105 Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(ZK_ADDR); -106 Set<String> watchPaths = watchers.get("0x" + participantManagerToExpire.getSessionId()); -107 // System.out.println("participant watch paths: " + watchPaths); -108 -109 // participant should have 2 zk-watchers: 1 for MESSAGE and 1 for CONTROLLER -110 return watchPaths.size() == 2; -111 } -112 }, 500); -113 Assert.assertTrue(result, "Participant should have 2 zk-watchers."); -114 -115 -116 // check HelixManager#_handlers -117 // printHandlers(controllerManager); -118 // printHandlers(participantManagerToExpire); -119 int controllerHandlerNb = controllerManager.getHandlers().size(); -120 int particHandlerNb = participantManagerToExpire.getHandlers().size(); -121 Assert.assertEquals(controllerHandlerNb, 9, "HelixController should have 9 (5+2n) callback handlers for 2 (n) participant"); -122 Assert.assertEquals(particHandlerNb, 2, "HelixParticipant should have 2 (msg+cur-state) callback handlers"); -123 -124 // expire the session of participant -125 System.out.println("Expiring participant session..."); -126 String oldSessionId = participantManagerToExpire.getSessionId(); -127 -128 ZkTestHelper.expireSession(participantManagerToExpire.getZkClient()); -129 String newSessionId = participantManagerToExpire.getSessionId(); -130 System.out.println("Expried participant session. oldSessionId: " + oldSessionId + ", newSessionId: " + newSessionId); -131 -132 result = -133 ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, -134 clusterName)); -135 Assert.assertTrue(result); -136 -137 // check controller zk-watchers -138 result = TestHelper.verify(new TestHelper.Verifier() { -139 -140 @Override -141 public boolean verify() throws Exception { -142 Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(ZK_ADDR); -143 Set<String> watchPaths = watchers.get("0x" + controllerManager.getSessionId()); -144 // System.out.println("controller watch paths after session expiry: " + watchPaths); -145 -146 // controller should have 5 + 2n + m + (m+2)n zk-watchers -147 // where n is number of nodes and m is number of resources -148 return watchPaths.size() == (6 + 5 * n); -149 } -150 }, 500); -151 Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers after session expiry."); -152 -153 // check participant zk-watchers -154 result = TestHelper.verify(new TestHelper.Verifier() { -155 -156 @Override -157 public boolean verify() throws Exception { -158 Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(ZK_ADDR); -159 Set<String> watchPaths = watchers.get("0x" + participantManagerToExpire.getSessionId()); -160 // System.out.println("participant watch paths after session expiry: " + watchPaths); -161 -162 // participant should have 2 zk-watchers: 1 for MESSAGE and 1 for CONTROLLER -163 return watchPaths.size() == 2; -164 } -165 }, 500); -166 Assert.assertTrue(result, "Participant should have 2 zk-watchers after session expiry."); -167 +63 +64 ClusterController controller = +65 new ClusterController(clusterName, "controller_0", ZK_ADDR); +66 controller.syncStart(); +67 +68 // start participants +69 MockParticipant[] participants = new MockParticipant[n]; +70 for (int i = 0; i < n; i++) +71 { +72 String instanceName = "localhost_" + (12918 + i); +73 +74 participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null); +75 participants[i].syncStart(); +76 } +77 +78 boolean result = +79 ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, +80 clusterName)); +81 Assert.assertTrue(result); +82 final ZkHelixTestManager controllerManager = controller.getManager(); +83 final ZkHelixTestManager participantManagerToExpire = (ZkHelixTestManager)participants[1].getManager(); +84 +85 // check controller zk-watchers +86 result = TestHelper.verify(new TestHelper.Verifier() { +87 +88 @Override +89 public boolean verify() throws Exception { +90 Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(ZK_ADDR); +91 Set<String> watchPaths = watchers.get("0x" + controllerManager.getSessionId()); +92 // System.out.println("controller watch paths: " + watchPaths); +93 +94 // controller should have 5 + 2n + m + (m+2)n zk-watchers +95 // where n is number of nodes and m is number of resources +96 return watchPaths.size() == (6 + 5 * n); +97 } +98 }, 500); +99 Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers."); +100 +101 // check participant zk-watchers +102 result = TestHelper.verify(new TestHelper.Verifier() { +103 +104 @Override +105 public boolean verify() throws Exception { +106 Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(ZK_ADDR); +107 Set<String> watchPaths = watchers.get("0x" + participantManagerToExpire.getSessionId()); +108 // System.out.println("participant watch paths: " + watchPaths); +109 +110 // participant should have 2 zk-watchers: 1 for MESSAGE and 1 for CONTROLLER +111 return watchPaths.size() == 2; +112 } +113 }, 500); +114 Assert.assertTrue(result, "Participant should have 2 zk-watchers."); +115 +116 +117 // check HelixManager#_handlers +118 // printHandlers(controllerManager); +119 // printHandlers(participantManagerToExpire); +120 int controllerHandlerNb = controllerManager.getHandlers().size(); +121 int particHandlerNb = participantManagerToExpire.getHandlers().size(); +122 Assert.assertEquals(controllerHandlerNb, 9, "HelixController should have 9 (5+2n) callback handlers for 2 (n) participant"); +123 Assert.assertEquals(particHandlerNb, 2, "HelixParticipant should have 2 (msg+cur-state) callback handlers"); +124 +125 // expire the session of participant +126 System.out.println("Expiring participant session..."); +127 String oldSessionId = participantManagerToExpire.getSessionId(); +128 +129 ZkTestHelper.expireSession(participantManagerToExpire.getZkClient()); +130 String newSessionId = participantManagerToExpire.getSessionId(); +131 System.out.println("Expried participant session. oldSessionId: " + oldSessionId + ", newSessionId: " + newSessionId); +132 +133 result = +134 ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, +135 clusterName)); +136 Assert.assertTrue(result); +137 +138 // check controller zk-watchers +139 result = TestHelper.verify(new TestHelper.Verifier() { +140 +141 @Override +142 public boolean verify() throws Exception { +143 Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(ZK_ADDR); +144 Set<String> watchPaths = watchers.get("0x" + controllerManager.getSessionId()); +145 // System.out.println("controller watch paths after session expiry: " + watchPaths); +146 +147 // controller should have 5 + 2n + m + (m+2)n zk-watchers +148 // where n is number of nodes and m is number of resources +149 return watchPaths.size() == (6 + 5 * n); +150 } +151 }, 500); +152 Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers after session expiry."); +153 +154 // check participant zk-watchers +155 result = TestHelper.verify(new TestHelper.Verifier() { +156 +157 @Override +158 public boolean verify() throws Exception { +159 Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(ZK_ADDR); +160 Set<String> watchPaths = watchers.get("0x" + participantManagerToExpire.getSessionId()); +161 // System.out.println("participant watch paths after session expiry: " + watchPaths); +162 +163 // participant should have 2 zk-watchers: 1 for MESSAGE and 1 for CONTROLLER +164 return watchPaths.size() == 2; +165 } +166 }, 500); +167 Assert.assertTrue(result, "Participant should have 2 zk-watchers after session expiry."); 168 -169 // check handlers -170 // printHandlers(controllerManager); -171 // printHandlers(participantManagerToExpire); -172 int handlerNb = controllerManager.getHandlers().size(); -173 Assert.assertEquals(handlerNb, controllerHandlerNb, "controller callback handlers should not increase after participant session expiry"); -174 handlerNb = participantManagerToExpire.getHandlers().size(); -175 Assert.assertEquals(handlerNb, particHandlerNb, "participant callback handlers should not increase after participant session expiry"); -176 -177 System.out.println("END " + clusterName + " at " -178 + new Date(System.currentTimeMillis())); -179 } -180 -181 @Test -182 public void testCbHandlerLeakOnControllerSessionExpiry() throws Exception -183 { -184 // Logger.getRootLogger().setLevel(Level.INFO); -185 String className = TestHelper.getTestClassName(); -186 String methodName = TestHelper.getTestMethodName(); -187 String clusterName = className + "_" + methodName; -188 final int n = 2; -189 -190 System.out.println("START " + clusterName + " at " -191 + new Date(System.currentTimeMillis())); -192 -193 TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port -194 "localhost", // participant name prefix -195 "TestDB", // resource name prefix -196 1, // resources -197 32, // partitions per resource -198 n, // number of nodes -199 2, // replicas -200 "MasterSlave", -201 true); // do rebalance -202 +169 +170 // check handlers +171 // printHandlers(controllerManager); +172 // printHandlers(participantManagerToExpire); +173 int handlerNb = controllerManager.getHandlers().size(); +174 Assert.assertEquals(handlerNb, controllerHandlerNb, "controller callback handlers should not increase after participant session expiry"); +175 handlerNb = participantManagerToExpire.getHandlers().size(); +176 Assert.assertEquals(handlerNb, particHandlerNb, "participant callback handlers should not increase after participant session expiry"); +177 +178 System.out.println("END " + clusterName + " at " +179 + new Date(System.currentTimeMillis())); +180 } +181 +182 @Test +183 public void testCbHandlerLeakOnControllerSessionExpiry() throws Exception +184 { +185 // Logger.getRootLogger().setLevel(Level.INFO); +186 String className = TestHelper.getTestClassName(); +187 String methodName = TestHelper.getTestMethodName(); +188 String clusterName = className + "_" + methodName; +189 final int n = 2; +190 +191 System.out.println("START " + clusterName + " at " +192 + new Date(System.currentTimeMillis())); +193 +194 TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port +195 "localhost", // participant name prefix +196 "TestDB", // resource name prefix +197 1, // resources +198 32, // partitions per resource +199 n, // number of nodes +200 2, // replicas +201 "MasterSlave", +202 true); // do rebalance 203 -204 ClusterController controller = -205 new ClusterController(clusterName, "controller_0", ZK_ADDR); -206 controller.syncStart(); -207 -208 // start participants -209 MockParticipant[] participants = new MockParticipant[n]; -210 for (int i = 0; i < n; i++) -211 { -212 String instanceName = "localhost_" + (12918 + i); -213 -214 participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null); -215 participants[i].syncStart(); -216 } -217 -218 boolean result = -219 ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, -220 clusterName)); -221 Assert.assertTrue(result); -222 final ZkHelixTestManager controllerManager = controller.getManager(); -223 final ZkHelixTestManager participantManager = participants[0].getManager(); -224 -225 // wait until we get all the listeners registered -226 result = TestHelper.verify(new TestHelper.Verifier() { -227 -228 @Override -229 public boolean verify() throws Exception { -230 int controllerHandlerNb = controllerManager.getHandlers().size(); -231 int particHandlerNb = participantManager.getHandlers().size(); -232 if (controllerHandlerNb == 9 && particHandlerNb == 2) -233 return true; -234 else -235 return false; -236 } -237 }, 1000); -238 -239 int controllerHandlerNb = controllerManager.getHandlers().size(); -240 int particHandlerNb = participantManager.getHandlers().size(); -241 Assert.assertEquals(controllerHandlerNb, 9, "HelixController should have 9 (5+2n) callback handlers for 2 participant, but was " -242 + controllerHandlerNb + ", " -243 + printHandlers(controllerManager)); -244 Assert.assertEquals(particHandlerNb, 2, "HelixParticipant should have 2 (msg+cur-state) callback handlers, but was " -245 + particHandlerNb + ", " -246 + printHandlers(participantManager)); -247 -248 // expire controller -249 System.out.println("Expiring controller session..."); -250 String oldSessionId = controllerManager.getSessionId(); -251 -252 ZkTestHelper.expireSession(controllerManager.getZkClient()); -253 String newSessionId = controllerManager.getSessionId(); -254 System.out.println("Expired controller session. oldSessionId: " + oldSessionId + ", newSessionId: " + newSessionId); -255 -256 result = -257 ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, -258 clusterName)); -259 Assert.assertTrue(result); -260 -261 // check controller zk-watchers -262 result = TestHelper.verify(new TestHelper.Verifier() { -263 -264 @Override -265 public boolean verify() throws Exception { -266 Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(ZK_ADDR); -267 Set<String> watchPaths = watchers.get("0x" + controllerManager.getSessionId()); -268 // System.out.println("controller watch paths after session expiry: " + watchPaths); -269 -270 // controller should have 5 + 2n + m + (m+2)n zk-watchers -271 // where n is number of nodes and m is number of resources -272 return watchPaths.size() == (6 + 5 * n); -273 } -274 }, 500); -275 Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers after session expiry."); -276 -277 // check participant zk-watchers -278 result = TestHelper.verify(new TestHelper.Verifier() { -279 -280 @Override -281 public boolean verify() throws Exception { -282 Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(ZK_ADDR); -283 Set<String> watchPaths = watchers.get("0x" + participantManager.getSessionId()); -284 // System.out.println("participant watch paths after session expiry: " + watchPaths); -285 -286 // participant should have 2 zk-watchers: 1 for MESSAGE and 1 for CONTROLLER -287 return watchPaths.size() == 2; -288 } -289 }, 500); -290 Assert.assertTrue(result, "Participant should have 2 zk-watchers after session expiry."); -291 -292 // check HelixManager#_handlers -293 // printHandlers(controllerManager); -294 int handlerNb = controllerManager.getHandlers().size(); -295 Assert.assertEquals(handlerNb, controllerHandlerNb, "controller callback handlers should not increase after participant session expiry, but was " -296 + printHandlers(controllerManager)); -297 handlerNb = participantManager.getHandlers().size(); -298 Assert.assertEquals(handlerNb, particHandlerNb, "participant callback handlers should not increase after participant session expiry, but was " -299 + printHandlers(participantManager)); -300 +204 +205 ClusterController controller = +206 new ClusterController(clusterName, "controller_0", ZK_ADDR); +207 controller.syncStart(); +208 +209 // start participants +210 MockParticipant[] participants = new MockParticipant[n]; +211 for (int i = 0; i < n; i++) +212 { +213 String instanceName = "localhost_" + (12918 + i); +214 +215 participants[i] = new MockParticipant(clusterName, instanceName, ZK_ADDR, null); +216 participants[i].syncStart(); +217 } +218 +219 boolean result = +220 ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, +221 clusterName)); +222 Assert.assertTrue(result); +223 final ZkHelixTestManager controllerManager = controller.getManager(); +224 final ZkHelixTestManager participantManager = participants[0].getManager(); +225 +226 // wait until we get all the listeners registered +227 result = TestHelper.verify(new TestHelper.Verifier() { +228 +229 @Override +230 public boolean verify() throws Exception { +231 int controllerHandlerNb = controllerManager.getHandlers().size(); +232 int particHandlerNb = participantManager.getHandlers().size(); +233 if (controllerHandlerNb == 9 && particHandlerNb == 2) +234 return true; +235 else +236 return false; +237 } +238 }, 1000); +239 +240 int controllerHandlerNb = controllerManager.getHandlers().size(); +241 int particHandlerNb = participantManager.getHandlers().size(); +242 Assert.assertEquals(controllerHandlerNb, 9, "HelixController should have 9 (5+2n) callback handlers for 2 participant, but was " +243 + controllerHandlerNb + ", " +244 + printHandlers(controllerManager)); +245 Assert.assertEquals(particHandlerNb, 2, "HelixParticipant should have 2 (msg+cur-state) callback handlers, but was " +246 + particHandlerNb + ", " +247 + printHandlers(participantManager)); +248 +249 // expire controller +250 System.out.println("Expiring controller session..."); +251 String oldSessionId = controllerManager.getSessionId(); +252 +253 ZkTestHelper.expireSession(controllerManager.getZkClient()); +254 String newSessionId = controllerManager.getSessionId(); +255 System.out.println("Expired controller session. oldSessionId: " + oldSessionId + ", newSessionId: " + newSessionId); +256 +257 result = +258 ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, +259 clusterName)); +260 Assert.assertTrue(result); +261 +262 // check controller zk-watchers +263 result = TestHelper.verify(new TestHelper.Verifier() { +264 +265 @Override +266 public boolean verify() throws Exception { +267 Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(ZK_ADDR); +268 Set<String> watchPaths = watchers.get("0x" + controllerManager.getSessionId()); +269 // System.out.println("controller watch paths after session expiry: " + watchPaths); +270 +271 // controller should have 5 + 2n + m + (m+2)n zk-watchers +272 // where n is number of nodes and m is number of resources +273 return watchPaths.size() == (6 + 5 * n); +274 } +275 }, 500); +276 Assert.assertTrue(result, "Controller should have 6 + 5*n zk-watchers after session expiry."); +277 +278 // check participant zk-watchers +279 result = TestHelper.verify(new TestHelper.Verifier() { +280 +281 @Override +282 public boolean verify() throws Exception { +283 Map<String, Set<String>> watchers = ZkTestHelper.getListenersBySession(ZK_ADDR); +284 Set<String> watchPaths = watchers.get("0x" + participantManager.getSessionId()); +285 // System.out.println("participant watch paths after session expiry: " + watchPaths); +286 +287 // participant should have 2 zk-watchers: 1 for MESSAGE and 1 for CONTROLLER +288 return watchPaths.size() == 2; +289 } +290 }, 500); +291 Assert.assertTrue(result, "Participant should have 2 zk-watchers after session expiry."); +292 +293 // check HelixManager#_handlers +294 // printHandlers(controllerManager); +295 int handlerNb = controllerManager.getHandlers().size(); +296 Assert.assertEquals(handlerNb, controllerHandlerNb, "controller callback handlers should not increase after participant session expiry, but was " +297 + printHandlers(controllerManager)); +298 handlerNb = participantManager.getHandlers().size(); +299 Assert.assertEquals(handlerNb, particHandlerNb, "participant callback handlers should not increase after participant session expiry, but was " +300 + printHandlers(participantManager)); 301 -302 System.out.println("END " + clusterName + " at " -303 + new Date(System.currentTimeMillis())); -304 } -305 -306 // debug -307 static String printHandlers(ZkHelixTestManager manager) -308 { -309 StringBuilder sb = new StringBuilder(); -310 List<CallbackHandler> handlers = manager.getHandlers(); -311 sb.append(manager.getInstanceName() + " has " + handlers.size() + " cb-handlers. ["); -312 -313 for (int i = 0; i < handlers.size(); i++) { -314 CallbackHandler handler = handlers.get(i); -315 String path = handler.getPath(); -316 sb.append(path.substring(manager.getClusterName().length() + 1) + ": " + handler.getListener()); -317 if (i < (handlers.size() - 1) ) { -318 sb.append(", "); -319 } -320 } -321 sb.append("]"); -322 -323 return sb.toString(); -324 } -325 } +302 +303 System.out.println("END " + clusterName + " at " +304 + new Date(System.currentTimeMillis())); +305 } +306 +307 +308 @Test +309 public void testRemoveUserCbHandlerOnPathRemoval() throws Exception { +310 String className = TestHelper.getTestClassName(); +311 String methodName = TestHelper.getTestMethodName(); +312 String clusterName = className + "_" + methodName; +313 final int n = 3; +314 final String zkAddr = ZK_ADDR; +315 System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); +316 +317 TestHelper.setupCluster(clusterName, zkAddr, 12918, +318 "localhost", +319 "TestDB", +320 1, // resource +321 32, // partitions +322 n, // nodes +323 2, // replicas +324 "MasterSlave", +325 true); +326 +327 ClusterController controller = new ClusterController(clusterName, "controller_0", zkAddr); +328 controller.syncStart(); +329 +330 MockParticipant[] participants = new MockParticipant[n]; +331 for (int i = 0; i < n; i++) { +332 String instanceName = "localhost_" + (12918 + i); +333 participants[i] = new MockParticipant(clusterName, instanceName, zkAddr, null); +334 participants[i].syncStart(); +335 +336 // register a controller listener on participant_0 +337 if (i == 0) { +338 ZkHelixTestManager manager = participants[0].getManager(); +339 manager.addCurrentStateChangeListener(new CurrentStateChangeListener() { +340 @Override +341 public void onStateChange(String instanceName, List<CurrentState> statesInfo, NotificationContext changeContext) { +342 //To change body of implemented methods use File | Settings | File Templates. +343 // System.out.println(instanceName + " on current-state change, type: " + changeContext.getType()); +344 } +345 }, manager.getInstanceName(), manager.getSessionId()); +346 } +347 } +348 +349 Boolean result = ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr, +350 clusterName)); +351 Assert.assertTrue(result); +352 +353 ZkHelixTestManager participantToExpire = participants[0].getManager(); +354 String oldSessionId = participantToExpire.getSessionId(); +355 PropertyKey.Builder keyBuilder = new PropertyKey.Builder(clusterName); +356 +357 +358 // check manager#hanlders +359 Assert.assertEquals(participantToExpire.getHandlers().size(), 3, "Should have 3 handlers: CURRENTSTATE/{sessionId}, CONTROLLER, and MESSAGES"); +360 +361 // check zkclient#listeners +362 Map<String, Set<IZkDataListener>> dataListeners = ZkTestHelper.getZkDataListener(participantToExpire.getZkClient()); +363 Map<String, Set<IZkChildListener>> childListeners = ZkTestHelper.getZkChildListener(participantToExpire.getZkClient()); +364 // printZkListeners(participantToExpire.getZkClient()); +365 Assert.assertEquals(dataListeners.size(), 1, "Should have 1 path (CURRENTSTATE/{sessionId}/TestDB0) which has 1 data-listeners"); +366 String path = keyBuilder.currentState(participantToExpire.getInstanceName(), oldSessionId, "TestDB0").getPath(); +367 Assert.assertEquals(dataListeners.get(path).size(), 1, "Should have 1 data-listeners on path: " + path); +368 Assert.assertEquals(childListeners.size(), 3, "Should have 3 paths (CURRENTSTATE/{sessionId}, CONTROLLER, and MESSAGES) each of which has 1 child-listener"); +369 path = keyBuilder.currentStates(participantToExpire.getInstanceName(), oldSessionId).getPath(); +370 Assert.assertEquals(childListeners.get(path).size(), 1, "Should have 1 child-listener on path: " + path); +371 path = keyBuilder.messages(participantToExpire.getInstanceName()).getPath(); +372 Assert.assertEquals(childListeners.get(path).size(), 1, "Should have 1 child-listener on path: " + path); +373 path = keyBuilder.controller().getPath(); +374 Assert.assertEquals(childListeners.get(path).size(), 1, "Should have 1 child-listener on path: " + path); +375 +376 // check zookeeper#watches on client side +377 Map<String, List<String>> watchPaths = ZkTestHelper.getZkWatch(participantToExpire.getZkClient()); +378 // System.out.println("localhost_12918 zk-client side watchPaths: " + watchPaths + "\n"); +379 Assert.assertEquals(watchPaths.get("dataWatches").size(), 4, "Should have 4 data-watches: CURRENTSTATE/{sessionId}, CURRENTSTATE/{sessionId}/TestDB, CONTROLLER, MESSAGES"); +380 Assert.assertEquals(watchPaths.get("childWatches").size(), 3, "Should have 3 child-watches: CONTROLLER, MESSAGES, and CURRENTSTATE/{sessionId}"); +381 +382 +383 // expire localhost_12918 +384 System.out.println("Expire participant: " + participantToExpire.getInstanceName() + ", session: " + participantToExpire.getSessionId()); +385 ZkTestHelper.expireSession(participantToExpire.getZkClient()); +386 String newSessionId = participantToExpire.getSessionId(); +387 System.out.println(participantToExpire.getInstanceName() + " oldSessionId: " + oldSessionId + ", newSessionId: " + newSessionId); +388 result = ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(zkAddr, +389 clusterName)); +390 Assert.assertTrue(result); +391 +392 // check manager#hanlders +393 Assert.assertEquals(participantToExpire.getHandlers().size(), 2, "Should have 2 handlers: CONTROLLER and MESSAGES. CURRENTSTATE/{sessionId} handler should be removed by CallbackHandler#handleChildChange()"); +394 +395 // check zkclient#listeners +396 dataListeners = ZkTestHelper.getZkDataListener(participantToExpire.getZkClient()); +397 childListeners = ZkTestHelper.getZkChildListener(participantToExpire.getZkClient()); +398 // printZkListeners(participantToExpire.getZkClient()); +399 Assert.assertTrue(dataListeners.isEmpty(), "Should have no data-listeners"); +400 Assert.assertEquals(childListeners.size(), 3, "Should have 3 paths (CURRENTSTATE/{oldSessionId}, CONTROLLER, and MESSAGES). " +401 + "CONTROLLER and MESSAGE has 1 child-listener each. CURRENTSTATE/{oldSessionId} doesn't have listener (ZkClient doesn't remove empty childListener set. probably a ZkClient bug. see ZkClient#unsubscribeChildChange())"); +402 path = keyBuilder.currentStates(participantToExpire.getInstanceName(), oldSessionId).getPath(); +403 Assert.assertEquals(childListeners.get(path).size(), 0, "Should have no child-listener on path: " + path); +404 path = keyBuilder.messages(participantToExpire.getInstanceName()).getPath(); +405 Assert.assertEquals(childListeners.get(path).size(), 1, "Should have 1 child-listener on path: " + path); +406 path = keyBuilder.controller().getPath(); [... 78 lines stripped ...]