From commits-return-5586-apmail-helix-commits-archive=helix.apache.org@helix.apache.org Mon Jun 22 04:17:29 2015 Return-Path: X-Original-To: apmail-helix-commits-archive@minotaur.apache.org Delivered-To: apmail-helix-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DA73318F21 for ; Mon, 22 Jun 2015 04:17:29 +0000 (UTC) Received: (qmail 43491 invoked by uid 500); 22 Jun 2015 04:17:29 -0000 Delivered-To: apmail-helix-commits-archive@helix.apache.org Received: (qmail 43456 invoked by uid 500); 22 Jun 2015 04:17:29 -0000 Mailing-List: contact commits-help@helix.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@helix.apache.org Delivered-To: mailing list commits@helix.apache.org Received: (qmail 43447 invoked by uid 99); 22 Jun 2015 04:17:29 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 22 Jun 2015 04:17:29 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8E3A2DFA25; Mon, 22 Jun 2015 04:17:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: kanak@apache.org To: commits@helix.apache.org Message-Id: <52284e6bd377412f871797acbe82f752@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: helix git commit: [HELIX-600] Task scheduler fails to schedule a recurring workflow if the startTime is set to a future timestamp. Date: Mon, 22 Jun 2015 04:17:29 +0000 (UTC) Repository: helix Updated Branches: refs/heads/helix-0.6.x 589c96cb0 -> 2775e1566 [HELIX-600] Task scheduler fails to schedule a recurring workflow if the startTime is set to a future timestamp. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/2775e156 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/2775e156 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/2775e156 Branch: refs/heads/helix-0.6.x Commit: 2775e1566df7ceb67edd7587b81d4c245aedab38 Parents: 589c96c Author: Lei Xia Authored: Tue Jun 9 14:40:32 2015 -0700 Committer: Lei Xia Committed: Wed Jun 17 11:10:26 2015 -0700 ---------------------------------------------------------------------- .../java/org/apache/helix/task/TaskDriver.java | 10 +- .../org/apache/helix/task/TaskRebalancer.java | 15 +- .../integration/task/TestRecurringJobQueue.java | 429 +++++++++++++++++++ 3 files changed, 442 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/2775e156/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java index cb079cc..dcd13f2 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java @@ -580,10 +580,12 @@ public class TaskDriver { DataUpdater updater = new DataUpdater() { @Override public ZNRecord update(ZNRecord currentData) { - // Only update target state for non-completed workflows - String finishTime = currentData.getSimpleField(WorkflowContext.FINISH_TIME); - if (finishTime == null || finishTime.equals(WorkflowContext.UNFINISHED)) { - currentData.setSimpleField(WorkflowConfig.TARGET_STATE, state.name()); + if (currentData != null){ + // Only update target state for non-completed workflows + String finishTime = currentData.getSimpleField(WorkflowContext.FINISH_TIME); + if (finishTime == null || finishTime.equals(WorkflowContext.UNFINISHED)) { + currentData.setSimpleField(WorkflowConfig.TARGET_STATE, state.name()); + } } return currentData; } http://git-wip-us.apache.org/repos/asf/helix/blob/2775e156/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java index fe3f496..98a74c3 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskRebalancer.java @@ -544,19 +544,14 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { } } - // No need to schedule the same runnable at the same time - if (SCHEDULED_TIMES.containsKey(workflowResource) - || SCHEDULED_TIMES.inverse().containsKey(startTime)) { - return false; - } - scheduleRebalance(workflowResource, jobResource, startTime, delayFromStart); return false; } private void scheduleRebalance(String id, String jobResource, Date startTime, long delayFromStart) { - // No need to schedule the same runnable at the same time - if (SCHEDULED_TIMES.containsKey(id) || SCHEDULED_TIMES.inverse().containsKey(startTime)) { + // Do nothing if there is already a timer set for the this workflow with the same start time. + if ((SCHEDULED_TIMES.containsKey(id) && SCHEDULED_TIMES.get(id).equals(startTime)) + || SCHEDULED_TIMES.inverse().containsKey(startTime)) { return; } LOG.info("Schedule rebalance with id: " + id + "and job: " + jobResource); @@ -756,6 +751,10 @@ public abstract class TaskRebalancer implements Rebalancer, MappingCalculator { "Error occurred while trying to clean up workflow %s. Failed to remove node %s from Helix. Aborting further clean up steps.", workflowResource, workflowPropStoreKey)); } + // Remove pending timer for this workflow if exists + if (SCHEDULED_TIMES.containsKey(workflowResource)) { + SCHEDULED_TIMES.remove(workflowResource); + } } } http://git-wip-us.apache.org/repos/asf/helix/blob/2775e156/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java new file mode 100644 index 0000000..011ed81 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TestRecurringJobQueue.java @@ -0,0 +1,429 @@ +package org.apache.helix.integration.task; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Calendar; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map;; + +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.helix.PropertyKey; +import org.apache.helix.TestHelper; +import org.apache.helix.integration.ZkIntegrationTestBase; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.participant.StateMachineEngine; +import org.apache.helix.task.JobConfig; +import org.apache.helix.task.JobContext; +import org.apache.helix.task.JobQueue; +import org.apache.helix.task.Task; +import org.apache.helix.task.TaskCallbackContext; +import org.apache.helix.task.TaskDriver; +import org.apache.helix.task.TaskFactory; +import org.apache.helix.task.TaskResult; +import org.apache.helix.task.TaskState; +import org.apache.helix.task.TaskStateModelFactory; +import org.apache.helix.task.TaskUtil; +import org.apache.helix.task.WorkflowConfig; +import org.apache.helix.task.WorkflowContext; +import org.apache.helix.tools.ClusterSetup; +import org.apache.helix.tools.ClusterStateVerifier; +import org.apache.log4j.Logger; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Sets; + +public class TestRecurringJobQueue extends ZkIntegrationTestBase { + private static final Logger LOG = Logger.getLogger(TestRecurringJobQueue.class); + private static final int n = 5; + private static final int START_PORT = 12918; + private static final String MASTER_SLAVE_STATE_MODEL = "MasterSlave"; + private static final String TIMEOUT_CONFIG = "Timeout"; + private static final String TGT_DB = "TestDB"; + private static final int NUM_PARTITIONS = 20; + private static final int NUM_REPLICAS = 3; + private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName(); + private final MockParticipantManager[] _participants = new MockParticipantManager[n]; + private ClusterControllerManager _controller; + + private HelixManager _manager; + private TaskDriver _driver; + + @BeforeClass + public void beforeClass() throws Exception { + String namespace = "/" + CLUSTER_NAME; + if (_gZkClient.exists(namespace)) { + _gZkClient.deleteRecursive(namespace); + } + + ClusterSetup setupTool = new ClusterSetup(ZK_ADDR); + setupTool.addCluster(CLUSTER_NAME, true); + for (int i = 0; i < n; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + } + + // Set up target db + setupTool.addResourceToCluster(CLUSTER_NAME, TGT_DB, NUM_PARTITIONS, MASTER_SLAVE_STATE_MODEL); + setupTool.rebalanceStorageCluster(CLUSTER_NAME, TGT_DB, NUM_REPLICAS); + + Map taskFactoryReg = new HashMap(); + taskFactoryReg.put("Reindex", new TaskFactory() { + @Override + public Task createNewTask(TaskCallbackContext context) { + return new ReindexTask(context); + } + }); + + // start dummy participants + for (int i = 0; i < n; i++) { + String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); + + // Register a Task state model factory. + StateMachineEngine stateMachine = _participants[i].getStateMachineEngine(); + stateMachine.registerStateModelFactory("Task", new TaskStateModelFactory(_participants[i], + taskFactoryReg)); + + _participants[i].syncStart(); + } + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + // create cluster manager + _manager = + HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, + ZK_ADDR); + _manager.connect(); + + _driver = new TaskDriver(_manager); + + boolean result = + ClusterStateVerifier.verifyByZkCallback(new ClusterStateVerifier.MasterNbInExtViewVerifier( + ZK_ADDR, CLUSTER_NAME)); + Assert.assertTrue(result); + + result = + ClusterStateVerifier + .verifyByZkCallback(new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, + CLUSTER_NAME)); + Assert.assertTrue(result); + } + + @AfterClass + public void afterClass() throws Exception { + _controller.syncStop(); + for (int i = 0; i < n; i++) { + _participants[i].syncStop(); + } + _manager.disconnect(); + } + + private JobQueue buildRecurrentJobQueue(String jobQueueName, int delayStart) { + Map cfgMap = new HashMap(); + cfgMap.put(WorkflowConfig.EXPIRY, String.valueOf(50000)); + cfgMap.put(WorkflowConfig.RECURRENCE_INTERVAL, String.valueOf(120)); + cfgMap.put(WorkflowConfig.RECURRENCE_UNIT, "SECONDS"); + Calendar cal = Calendar.getInstance(); + cal.set(Calendar.MINUTE, cal.get(Calendar.MINUTE) + delayStart / 60); + cal.set(Calendar.SECOND, cal.get(Calendar.SECOND) + delayStart % 60); + cal.set(Calendar.MILLISECOND, 0); + cfgMap.put(WorkflowConfig.START_TIME, + WorkflowConfig.getDefaultDateFormat().format(cal.getTime())); + return (new JobQueue.Builder(jobQueueName).fromMap(cfgMap)).build(); + } + + private JobQueue buildRecurrentJobQueue(String jobQueueName) { + return buildRecurrentJobQueue(jobQueueName, 0); + } + + @Test + public void deleteRecreateRecurrentQueue() throws Exception { + String queueName = TestHelper.getTestMethodName(); + + // Create a queue + LOG.info("Starting job-queue: " + queueName); + JobQueue queue = buildRecurrentJobQueue(queueName); + _driver.createQueue(queue); + + // Create and Enqueue jobs + List currentJobNames = new ArrayList(); + for (int i = 0; i <= 2; i++) { + String targetPartition = (i == 0) ? "MASTER" : "SLAVE"; + + JobConfig.Builder job = + new JobConfig.Builder().setCommand("Reindex") + .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB) + .setTargetPartitionStates(Sets.newHashSet(targetPartition)); + String jobName = targetPartition.toLowerCase() + "Job" + i; + _driver.enqueueJob(queueName, jobName, job); + currentJobNames.add(jobName); + } + + WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName); + + // ensure job 1 is started before stop it + String scheduledQueue = wCtx.getLastScheduledSingleWorkflow(); + String namedSpaceJob1 = String.format("%s_%s", scheduledQueue, currentJobNames.get(0)); + TestUtil + .pollForJobState(_manager, scheduledQueue, namedSpaceJob1, TaskState.IN_PROGRESS); + + _driver.stop(queueName); + _driver.delete(queueName); + Thread.sleep(500); + + queue = buildRecurrentJobQueue(queueName, 5); + _driver.createQueue(queue); + currentJobNames.clear(); + for (int i = 0; i <= 2; i++) { + String targetPartition = (i == 0) ? "MASTER" : "SLAVE"; + + JobConfig.Builder job = + new JobConfig.Builder().setCommand("Reindex") + .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB) + .setTargetPartitionStates(Sets.newHashSet(targetPartition)); + String jobName = targetPartition.toLowerCase() + "Job" + i; + _driver.enqueueJob(queueName, jobName, job); + currentJobNames.add(jobName); + } + + wCtx = TestUtil.pollForWorkflowContext(_manager, queueName); + + // ensure jobs are started and completed + scheduledQueue = wCtx.getLastScheduledSingleWorkflow(); + namedSpaceJob1 = String.format("%s_%s", scheduledQueue, currentJobNames.get(0)); + TestUtil + .pollForJobState(_manager, scheduledQueue, namedSpaceJob1, TaskState.COMPLETED); + + scheduledQueue = wCtx.getLastScheduledSingleWorkflow(); + String namedSpaceJob2 = String.format("%s_%s", scheduledQueue, currentJobNames.get(1)); + TestUtil + .pollForJobState(_manager, scheduledQueue, namedSpaceJob2, TaskState.COMPLETED); + } + + @Test + public void stopDeleteJobAndResumeRecurrentQueue() throws Exception { + String queueName = TestHelper.getTestMethodName(); + + // Create a queue + LOG.info("Starting job-queue: " + queueName); + JobQueue queue = buildRecurrentJobQueue(queueName); + _driver.createQueue(queue); + + // Create and Enqueue jobs + List currentJobNames = new ArrayList(); + Map commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(500)); + for (int i = 0; i <= 4; i++) { + String targetPartition = (i == 0) ? "MASTER" : "SLAVE"; + + JobConfig.Builder job = + new JobConfig.Builder().setCommand("Reindex") + .setJobCommandConfigMap(commandConfig) + .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB) + .setTargetPartitionStates(Sets.newHashSet(targetPartition)); + String jobName = targetPartition.toLowerCase() + "Job" + i; + LOG.info("Enqueuing job: " + jobName); + _driver.enqueueJob(queueName, jobName, job); + currentJobNames.add(i, jobName); + } + + WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName); + String scheduledQueue = wCtx.getLastScheduledSingleWorkflow(); + + // ensure job 1 is started before deleting it + String deletedJob1 = currentJobNames.get(0); + String namedSpaceDeletedJob1 = String.format("%s_%s", scheduledQueue, deletedJob1); + TestUtil + .pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.IN_PROGRESS); + + // stop the queue + LOG.info("Pausing job-queue: " + scheduledQueue); + _driver.stop(queueName); + TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceDeletedJob1, TaskState.STOPPED); + TestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.STOPPED); + + // delete the in-progress job (job 1) and verify it being deleted + _driver.deleteJob(queueName, deletedJob1); + verifyJobDeleted(queueName, namedSpaceDeletedJob1); + verifyJobDeleted(scheduledQueue, namedSpaceDeletedJob1); + + LOG.info("Resuming job-queue: " + queueName); + _driver.resume(queueName); + + // ensure job 2 is started + TestUtil.pollForJobState(_manager, scheduledQueue, + String.format("%s_%s", scheduledQueue, currentJobNames.get(1)), TaskState.IN_PROGRESS); + + // stop the queue + LOG.info("Pausing job-queue: " + queueName); + _driver.stop(queueName); + TestUtil.pollForJobState(_manager, scheduledQueue, + String.format("%s_%s", scheduledQueue, currentJobNames.get(1)), TaskState.STOPPED); + TestUtil.pollForWorkflowState(_manager, scheduledQueue, TaskState.STOPPED); + + // Ensure job 3 is not started before deleting it + String deletedJob2 = currentJobNames.get(2); + String namedSpaceDeletedJob2 = String.format("%s_%s", scheduledQueue, deletedJob2); + TestUtil.pollForEmptyJobState(_manager, scheduledQueue, namedSpaceDeletedJob2); + + // delete not-started job (job 3) and verify it being deleted + _driver.deleteJob(queueName, deletedJob2); + verifyJobDeleted(queueName, namedSpaceDeletedJob2); + verifyJobDeleted(scheduledQueue, namedSpaceDeletedJob2); + + LOG.info("Resuming job-queue: " + queueName); + _driver.resume(queueName); + + // Ensure the jobs left are successful completed in the correct order + currentJobNames.remove(deletedJob1); + currentJobNames.remove(deletedJob2); + long preJobFinish = 0; + for (int i = 0; i < currentJobNames.size(); i++) { + String namedSpaceJobName = String.format("%s_%s", scheduledQueue, currentJobNames.get(i)); + TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJobName, TaskState.COMPLETED); + + JobContext jobContext = TaskUtil.getJobContext(_manager, namedSpaceJobName); + long jobStart = jobContext.getStartTime(); + Assert.assertTrue(jobStart >= preJobFinish); + preJobFinish = jobContext.getFinishTime(); + } + // verify the job is not there for the next recurrence of queue schedule + } + + @Test + public void deleteJobFromRecurrentQueueNotStarted() throws Exception { + String queueName = TestHelper.getTestMethodName(); + + // Create a queue + LOG.info("Starting job-queue: " + queueName); + JobQueue queue = buildRecurrentJobQueue(queueName); + _driver.createQueue(queue); + + // create jobs + List jobs = new ArrayList(); + List jobNames = new ArrayList(); + Map commandConfig = ImmutableMap.of(TIMEOUT_CONFIG, String.valueOf(500)); + + final int JOB_COUNTS = 3; + + for (int i = 0; i < JOB_COUNTS; i++) { + String targetPartition = (i == 0) ? "MASTER" : "SLAVE"; + + JobConfig.Builder job = + new JobConfig.Builder().setCommand("Reindex").setJobCommandConfigMap(commandConfig) + .setTargetResource(WorkflowGenerator.DEFAULT_TGT_DB) + .setTargetPartitionStates(Sets.newHashSet(targetPartition)); + jobs.add(job); + jobNames.add(targetPartition.toLowerCase() + "Job" + i); + } + + // enqueue all jobs except last one + for (int i = 0; i < JOB_COUNTS - 1; ++i) { + LOG.info("Enqueuing job: " + jobNames.get(i)); + _driver.enqueueJob(queueName, jobNames.get(i), jobs.get(i)); + } + String currentLastJob = jobNames.get(JOB_COUNTS - 2); + + WorkflowContext wCtx = TestUtil.pollForWorkflowContext(_manager, queueName); + String scheduledQueue = wCtx.getLastScheduledSingleWorkflow(); + + // ensure all jobs are finished + String namedSpaceJob = String.format("%s_%s", scheduledQueue, currentLastJob); + TestUtil.pollForJobState(_manager, scheduledQueue, namedSpaceJob, TaskState.COMPLETED); + + // enqueue the last job + LOG.info("Enqueuing job: " + jobNames.get(JOB_COUNTS - 1)); + _driver.enqueueJob(queueName, jobNames.get(JOB_COUNTS - 1), jobs.get(JOB_COUNTS - 1)); + + // remove the last job + _driver.deleteJob(queueName, jobNames.get(JOB_COUNTS - 1)); + + // verify + verifyJobDeleted(queueName, + String.format("%s_%s", scheduledQueue, jobNames.get(JOB_COUNTS - 1))); + } + + private void verifyJobDeleted(String queueName, String jobName) throws Exception { + HelixDataAccessor accessor = _manager.getHelixDataAccessor(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + + Assert.assertNull(accessor.getProperty(keyBuilder.idealStates(jobName))); + Assert.assertNull(accessor.getProperty(keyBuilder.resourceConfig(jobName))); + TestUtil.pollForEmptyJobState(_manager, queueName, jobName); + } + + public static class ReindexTask implements Task { + private final long _delay; + private volatile boolean _canceled; + + public ReindexTask(TaskCallbackContext context) { + JobConfig jobCfg = context.getJobConfig(); + Map cfg = jobCfg.getJobCommandConfigMap(); + if (cfg == null) { + cfg = Collections.emptyMap(); + } + _delay = cfg.containsKey(TIMEOUT_CONFIG) ? Long.parseLong(cfg.get(TIMEOUT_CONFIG)) : 200L; + } + + @Override + public TaskResult run() { + long expiry = System.currentTimeMillis() + _delay; + long timeLeft; + while (System.currentTimeMillis() < expiry) { + if (_canceled) { + timeLeft = expiry - System.currentTimeMillis(); + return new TaskResult(TaskResult.Status.CANCELED, String.valueOf(timeLeft < 0 ? 0 + : timeLeft)); + } + sleep(10L); + } + timeLeft = expiry - System.currentTimeMillis(); + return new TaskResult(TaskResult.Status.COMPLETED, + String.valueOf(timeLeft < 0 ? 0 : timeLeft)); + } + + @Override + public void cancel() { + _canceled = true; + } + + private static void sleep(long d) { + try { + Thread.sleep(d); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + } +} +