usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From toddn...@apache.org
Subject [1/3] git commit: Upgrade JobFactory to only return 1 job, not many for a descriptor since we're not using it in this way.
Date Tue, 29 Jul 2014 21:47:36 GMT
Repository: incubator-usergrid
Updated Branches:
  refs/heads/add-metrics acdfcd76d -> 530d0e987


Upgrade JobFactory to only return 1 job, not many for a descriptor since we're not using it
in this way.

Updated the scheduler semaphore logic and reporting to be clearer.

Added a test to ensure scheduler pool times are correct.


Project: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/commit/00a5032d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/tree/00a5032d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-usergrid/diff/00a5032d

Branch: refs/heads/add-metrics
Commit: 00a5032d92121537331057f30a03c75bc36d7673
Parents: 6c8416d
Author: Todd Nine <toddnine@apache.org>
Authored: Tue Jul 29 15:31:15 2014 -0600
Committer: Todd Nine <toddnine@apache.org>
Committed: Tue Jul 29 15:31:15 2014 -0600

----------------------------------------------------------------------
 .../org/apache/usergrid/batch/JobFactory.java   |   2 +-
 .../usergrid/batch/UsergridJobFactory.java      |   6 +-
 .../batch/service/JobSchedulerService.java      | 289 +++++++++++++------
 .../main/resources/usergrid-core-context.xml    |   1 +
 .../apache/usergrid/batch/BulkTestUtils.java    |   4 +-
 .../usergrid/batch/UsergridJobFactoryTest.java  |   3 +-
 .../batch/job/SchedulerRuntimeIntervalIT.java   | 115 ++++++++
 7 files changed, 317 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/00a5032d/stack/core/src/main/java/org/apache/usergrid/batch/JobFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/batch/JobFactory.java b/stack/core/src/main/java/org/apache/usergrid/batch/JobFactory.java
index 0b61ae6..dabd1f1 100644
--- a/stack/core/src/main/java/org/apache/usergrid/batch/JobFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/batch/JobFactory.java
@@ -31,5 +31,5 @@ import org.apache.usergrid.batch.repository.JobDescriptor;
 public interface JobFactory {
 
     /** Return one or more BulkJob ready for execution by a worker thread */
-    List<Job> jobsFrom( JobDescriptor descriptor ) throws JobNotFoundException;
+    Job jobsFrom( JobDescriptor descriptor ) throws JobNotFoundException;
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/00a5032d/stack/core/src/main/java/org/apache/usergrid/batch/UsergridJobFactory.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/batch/UsergridJobFactory.java b/stack/core/src/main/java/org/apache/usergrid/batch/UsergridJobFactory.java
index 907f6f9..ccbc761 100644
--- a/stack/core/src/main/java/org/apache/usergrid/batch/UsergridJobFactory.java
+++ b/stack/core/src/main/java/org/apache/usergrid/batch/UsergridJobFactory.java
@@ -37,17 +37,17 @@ public class UsergridJobFactory implements JobFactory {
 
 
     @Override
-    public List<Job> jobsFrom( JobDescriptor descriptor ) throws JobNotFoundException
{
+    public Job jobsFrom( JobDescriptor descriptor ) throws JobNotFoundException {
 
         Job job = context.getBean( descriptor.getJobName(), Job.class );
 
         if ( job == null ) {
             String error =
-                    String.format( "Could not find job impelmentation for job name %s", descriptor.getJobName()
);
+                    String.format( "Could not find job implementation for job name %s", descriptor.getJobName()
);
             logger.error( error );
             throw new JobNotFoundException( error );
         }
 
-        return Collections.singletonList( job );
+        return job;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/00a5032d/stack/core/src/main/java/org/apache/usergrid/batch/service/JobSchedulerService.java
----------------------------------------------------------------------
diff --git a/stack/core/src/main/java/org/apache/usergrid/batch/service/JobSchedulerService.java
b/stack/core/src/main/java/org/apache/usergrid/batch/service/JobSchedulerService.java
index b29f84d..cd4d4d4 100644
--- a/stack/core/src/main/java/org/apache/usergrid/batch/service/JobSchedulerService.java
+++ b/stack/core/src/main/java/org/apache/usergrid/batch/service/JobSchedulerService.java
@@ -21,18 +21,12 @@ import java.util.List;
 import java.util.concurrent.Callable;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Semaphore;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
 
-import com.google.common.util.concurrent.AbstractScheduledService;
-import com.google.common.util.concurrent.FutureCallback;
-import com.google.common.util.concurrent.Futures;
-import com.google.common.util.concurrent.ListenableFuture;
-import com.google.common.util.concurrent.ListeningScheduledExecutorService;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.yammer.metrics.annotation.ExceptionMetered;
-import com.yammer.metrics.annotation.Timed;
-import java.util.HashMap;
-import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 import org.apache.usergrid.batch.Job;
 import org.apache.usergrid.batch.JobExecution;
@@ -42,9 +36,18 @@ import org.apache.usergrid.batch.JobFactory;
 import org.apache.usergrid.batch.JobNotFoundException;
 import org.apache.usergrid.batch.repository.JobAccessor;
 import org.apache.usergrid.batch.repository.JobDescriptor;
+import org.apache.usergrid.metrics.MetricsFactory;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import com.codahale.metrics.Counter;
+import com.codahale.metrics.Timer;
+import com.google.common.util.concurrent.AbstractScheduledService;
+import com.google.common.util.concurrent.FutureCallback;
+import com.google.common.util.concurrent.Futures;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.ListeningScheduledExecutorService;
+import com.google.common.util.concurrent.MoreExecutors;
+import com.yammer.metrics.annotation.ExceptionMetered;
+import com.yammer.metrics.annotation.Timed;
 
 
 /**
@@ -55,9 +58,6 @@ public class JobSchedulerService extends AbstractScheduledService {
 
     private static final Logger LOG = LoggerFactory.getLogger( JobSchedulerService.class
);
 
-    // keep track of exceptions thrown in scheduler so we can reduce noise in logs
-    private Map<String, Integer> schedulerRunFailures = new HashMap<String, Integer>();
-
     private long interval = DEFAULT_DELAY;
     private int workerSize = 1;
     private int maxFailCount = 10;
@@ -70,16 +70,24 @@ public class JobSchedulerService extends AbstractScheduledService {
     private ListeningScheduledExecutorService service;
     private JobListener jobListener;
 
+    private Timer jobTimer;
+    private Counter runCounter;
+    private Counter successCounter;
+    private Counter failCounter;
+
+    //TODO Add meters for throughput of start and stop
+
+
     public JobSchedulerService() { }
 
 
-    @Timed(name = "BulkJobScheduledService_runOneIteration", group = "scheduler", durationUnit
= TimeUnit.MILLISECONDS,
-            rateUnit = TimeUnit.MINUTES)
+    @Timed( name = "BulkJobScheduledService_runOneIteration", group = "scheduler", durationUnit
= TimeUnit.MILLISECONDS,
+            rateUnit = TimeUnit.MINUTES )
     @Override
     protected void runOneIteration() throws Exception {
 
         try {
-            LOG.info( "running iteration..." );
+            LOG.info( "Running one check iteration ..." );
             List<JobDescriptor> activeJobs;
 
             // run until there are no more active jobs
@@ -116,22 +124,7 @@ public class JobSchedulerService extends AbstractScheduledService {
             }
         }
         catch ( Throwable t ) {
-
-            // errors here happen a lot on shutdown, don't fill the logs with them
-            String error = t.getClass().getCanonicalName();
-            if (schedulerRunFailures.get( error ) == null) {
-                LOG.error( "Scheduler run failed, first instance of this exception", t );
-                schedulerRunFailures.put( error, 1);
-
-            } else {
-                int count = schedulerRunFailures.get(error) + 1; 
-                schedulerRunFailures.put(error, count);
-                if (LOG.isDebugEnabled()) {
-                    LOG.debug( error + " caused scheduler run failure, count =  " + count,
t );
-                } else {
-                    LOG.error( error + " caused scheduler run failure, count =  " + count
);
-                }
-            }
+            LOG.error( "Scheduler run failed, error is", t );
         }
     }
 
@@ -147,124 +140,197 @@ public class JobSchedulerService extends AbstractScheduledService
{
     }
 
 
-    /** Use the provided BulkJobFactory to build and submit BulkJob items as ListenableFuture
objects */
-    @ExceptionMetered(name = "BulkJobScheduledService_submitWork_exceptions", group = "scheduler")
+    /**
+     * Use the provided BulkJobFactory to build and submit BulkJob items as ListenableFuture
objects
+     */
+    @ExceptionMetered( name = "BulkJobScheduledService_submitWork_exceptions", group = "scheduler"
)
     private void submitWork( final JobDescriptor jobDescriptor ) {
-        List<Job> jobs;
+        final Job job;
 
         try {
-            jobs = jobFactory.jobsFrom( jobDescriptor );
+            job = jobFactory.jobsFrom( jobDescriptor );
         }
         catch ( JobNotFoundException e ) {
             LOG.error( "Could not create jobs", e );
             return;
         }
 
-        for ( final Job job : jobs ) {
 
-            // job execution needs to be external to both the callback and the task.
-            // This way regardless of any error we can
-            // mark a job as failed if required
-            final JobExecution execution = new JobExecutionImpl( jobDescriptor );
+        // job execution needs to be external to both the callback and the task.
+        // This way regardless of any error we can
+        // mark a job as failed if required
+        final JobExecution execution = new JobExecutionImpl( jobDescriptor );
+
+        // We don't care if this is atomic (not worth using a lock object)
+        // we just need to prevent NPEs from ever occurring
+        final JobListener currentListener = this.jobListener;
+
+        /**
+         * Acquire the semaphore before we schedule.  This way we wont' take things from
the Q that end up
+         * stuck in the queue for the scheduler and then time out their distributed heartbeat
+         */
+        try {
+            capacitySemaphore.acquire();
+        }
+        catch ( InterruptedException e ) {
+            LOG.error( "Unable to acquire semaphore capacity before submitting job", e );
+            //just return, they'll get picked up again later
+            return;
+        }
 
-            // We don't care if this is atomic (not worth using a lock object)
-            // we just need to prevent NPEs from ever occurring
-            final JobListener currentListener = this.jobListener;
 
-            ListenableFuture<Void> future = service.submit( new Callable<Void>()
{
-                @Override
-                public Void call() throws Exception {
-                    capacitySemaphore.acquire();
+        final Timer.Context timer = jobTimer.time();
 
-                    execution.start( maxFailCount );
 
-                    jobAccessor.save( execution );
+        ListenableFuture<Void> future = service.submit( new Callable<Void>()
{
+            @Override
+            public Void call() throws Exception {
 
-                    //this job is dead, treat it as such
-                    if ( execution.getStatus() == Status.DEAD ) {
-                        return null;
-                    }
+                LOG.debug( "Starting the job with job id {}", execution.getJobId() );
+                runCounter.inc();
 
-                    // TODO wrap and throw specifically typed exception for onFailure,
-                    // needs jobId
-                    job.execute( execution );
+                execution.start( maxFailCount );
 
-                    if ( currentListener != null ) {
-                        currentListener.onSubmit( execution );
-                    }
+                jobAccessor.save( execution );
 
+                //this job is dead, treat it as such
+                if ( execution.getStatus() == Status.DEAD ) {
                     return null;
                 }
-            } );
 
-            Futures.addCallback( future, new FutureCallback<Void>() {
-                @Override
-                public void onSuccess( Void param ) {
+                // TODO wrap and throw specifically typed exception for onFailure,
+                // needs jobId
+                job.execute( execution );
 
-                    if ( execution.getStatus() == Status.IN_PROGRESS ) {
-                        LOG.info( "Successful completion of bulkJob {}", execution );
-                        execution.completed();
-                    }
+                if ( currentListener != null ) {
+                    currentListener.onSubmit( execution );
+                }
+
+                return null;
+            }
+        } );
+
+        Futures.addCallback( future, new FutureCallback<Void>() {
+            @Override
+            public void onSuccess( Void param ) {
+
+                /**
+                 * Release semaphore first in case there are other problems with communicating
with Cassandra
+                 */
+
+                LOG.debug( "Job succeeded with the job id {}", execution.getJobId() );
+                capacitySemaphore.release();
+                timer.stop();
+                runCounter.dec();
+                successCounter.inc();
 
-                    jobAccessor.save( execution );
-                    capacitySemaphore.release();
 
-                    if ( currentListener != null ) {
-                        currentListener.onSuccess( execution );
-                    }
+                //TODO, refactor into the execution itself for checking if done
+                if ( execution.getStatus() == Status.IN_PROGRESS ) {
+                    LOG.info( "Successful completion of bulkJob {}", execution );
+                    execution.completed();
                 }
 
+                jobAccessor.save( execution );
+
+
+                if ( currentListener != null ) {
+                    currentListener.onSuccess( execution );
+                }
+            }
+
 
-                @Override
-                public void onFailure( Throwable throwable ) {
-                    LOG.error( "Failed execution for bulkJob", throwable );
-                    // mark it as failed
-                    if ( execution.getStatus() == Status.IN_PROGRESS ) {
-                        execution.failed();
-                    }
+            @Override
+            public void onFailure( Throwable throwable ) {
+
+                /**
+                 * Release semaphore first in case there are other problems with communicating
with Cassandra
+                 */
+                LOG.error( "Job failed with the job id {}", execution.getJobId() );
+                capacitySemaphore.release();
+                timer.stop();
+                runCounter.dec();
+                failCounter.inc();
 
-                    jobAccessor.save( execution );
-                    capacitySemaphore.release();
 
-                    if ( currentListener != null ) {
-                        currentListener.onFailure( execution );
-                    }
+                LOG.error( "Failed execution for bulkJob", throwable );
+                // mark it as failed
+                if ( execution.getStatus() == Status.IN_PROGRESS ) {
+                    execution.failed();
                 }
-            } );
-        }
+
+                jobAccessor.save( execution );
+
+
+                if ( currentListener != null ) {
+                    currentListener.onFailure( execution );
+                }
+            }
+        } );
     }
 
 
-    /** @param milliseconds the milliseconds to set to wait if we didn't receive a job to
run */
+    /**
+     * @param milliseconds the milliseconds to set to wait if we didn't receive a job to
run
+     */
     public void setInterval( long milliseconds ) {
         this.interval = milliseconds;
     }
 
 
-    /** @param listeners the listeners to set */
+    public long getInterval() {
+        return interval;
+    }
+
+
+    /**
+     * @param listeners the listeners to set
+     */
     public void setWorkerSize( int listeners ) {
         this.workerSize = listeners;
     }
 
 
-    /** @param jobAccessor the jobAccessor to set */
+    public int getWorkerSize() {
+        return workerSize;
+    }
+
+
+    /**
+     * @param jobAccessor the jobAccessor to set
+     */
     public void setJobAccessor( JobAccessor jobAccessor ) {
         this.jobAccessor = jobAccessor;
     }
 
 
-    /** @param jobFactory the jobFactory to set */
+    /**
+     * @param jobFactory the jobFactory to set
+     */
     public void setJobFactory( JobFactory jobFactory ) {
         this.jobFactory = jobFactory;
     }
 
 
-    /** @param maxFailCount the maxFailCount to set */
+    /**
+     * @param maxFailCount the maxFailCount to set
+     */
     public void setMaxFailCount( int maxFailCount ) {
         this.maxFailCount = maxFailCount;
     }
 
 
+    /**
+     * Set the metrics factory
+     */
+    public void setMetricsFactory( MetricsFactory metricsFactory ) {
+        jobTimer = metricsFactory.getTimer( JobSchedulerService.class, "job_execution_timer"
);
+        runCounter = metricsFactory.getCounter( JobSchedulerService.class, "running_workers"
);
+        successCounter = metricsFactory.getCounter( JobSchedulerService.class, "successful_jobs"
);
+        failCounter = metricsFactory.getCounter( JobSchedulerService.class, "failed_jobs"
);
+    }
+
+
     /*
      * (non-Javadoc)
      *
@@ -272,9 +338,15 @@ public class JobSchedulerService extends AbstractScheduledService {
      */
     @Override
     protected void startUp() throws Exception {
-        service = MoreExecutors.listeningDecorator( Executors.newScheduledThreadPool( workerSize
) );
+        service = MoreExecutors
+                .listeningDecorator( Executors.newScheduledThreadPool( workerSize, JobThreadFactory.INSTANCE
) );
         capacitySemaphore = new Semaphore( workerSize );
+
+        LOG.info( "Starting executor pool.  Capacity is {}", workerSize );
+
         super.startUp();
+
+        LOG.info( "Job Scheduler started" );
     }
 
 
@@ -285,7 +357,11 @@ public class JobSchedulerService extends AbstractScheduledService {
      */
     @Override
     protected void shutDown() throws Exception {
+        LOG.info( "Shutting down job scheduler" );
+
         service.shutdown();
+
+        LOG.info( "Job scheduler shut down" );
         super.shutDown();
     }
 
@@ -294,6 +370,7 @@ public class JobSchedulerService extends AbstractScheduledService {
      * Sets the JobListener notified of Job events on this SchedulerService.
      *
      * @param jobListener the listener to receive Job events
+     *
      * @return the previous listener if set, or null if none was set
      */
     public JobListener setJobListener( JobListener jobListener ) {
@@ -311,4 +388,26 @@ public class JobSchedulerService extends AbstractScheduledService {
     public JobListener getJobListener() {
         return jobListener;
     }
+
+
+    /**
+     * Simple factory for labeling job worker threads for easier debugging
+     */
+    private static final class JobThreadFactory implements ThreadFactory {
+
+        public static final JobThreadFactory INSTANCE = new JobThreadFactory();
+
+        private static final String NAME = "JobWorker-";
+        private final AtomicLong counter = new AtomicLong();
+
+
+        @Override
+        public Thread newThread( final Runnable r ) {
+
+            Thread newThread = new Thread( r, NAME + counter.incrementAndGet() );
+            newThread.setDaemon( true );
+
+            return newThread;
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/00a5032d/stack/core/src/main/resources/usergrid-core-context.xml
----------------------------------------------------------------------
diff --git a/stack/core/src/main/resources/usergrid-core-context.xml b/stack/core/src/main/resources/usergrid-core-context.xml
index 0c66cf2..515d642 100644
--- a/stack/core/src/main/resources/usergrid-core-context.xml
+++ b/stack/core/src/main/resources/usergrid-core-context.xml
@@ -181,6 +181,7 @@
     <bean id="jobSchedulerBackgroundService" class="org.apache.usergrid.batch.service.JobSchedulerService">
       <property name="jobFactory" ref="jobFactory" />
       <property name="jobAccessor" ref="schedulerService" />
+      <property name="metricsFactory" ref="metricsFactory"/>
       <property name="workerSize" value="${usergrid.scheduler.job.workers}" />
       <property name="interval" value="${usergrid.scheduler.job.interval}" />
       <property name="maxFailCount" value="${usergrid.scheduler.job.maxfail}" />

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/00a5032d/stack/core/src/test/java/org/apache/usergrid/batch/BulkTestUtils.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/batch/BulkTestUtils.java b/stack/core/src/test/java/org/apache/usergrid/batch/BulkTestUtils.java
index 28cadb3..3b3347b 100644
--- a/stack/core/src/test/java/org/apache/usergrid/batch/BulkTestUtils.java
+++ b/stack/core/src/test/java/org/apache/usergrid/batch/BulkTestUtils.java
@@ -38,8 +38,8 @@ public class BulkTestUtils {
          * @see org.apache.usergrid.batch.JobFactory#jobsFrom(org.apache.usergrid.batch.repository.JobDescriptor)
          */
         @Override
-        public List<Job> jobsFrom( JobDescriptor descriptor ) {
-            return Arrays.asList( new Job[] { new MyBulkJob() } );
+        public Job jobsFrom( JobDescriptor descriptor ) {
+            return  new MyBulkJob();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/00a5032d/stack/core/src/test/java/org/apache/usergrid/batch/UsergridJobFactoryTest.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/batch/UsergridJobFactoryTest.java
b/stack/core/src/test/java/org/apache/usergrid/batch/UsergridJobFactoryTest.java
index 26306e3..44d767b 100644
--- a/stack/core/src/test/java/org/apache/usergrid/batch/UsergridJobFactoryTest.java
+++ b/stack/core/src/test/java/org/apache/usergrid/batch/UsergridJobFactoryTest.java
@@ -40,8 +40,7 @@ public class UsergridJobFactoryTest {
         JobDescriptor jobDescriptor = new JobDescriptor( "", jobId, UUID.randomUUID(), null,
null, null );
 
 
-        List<Job> bulkJobs = BulkTestUtils.getBulkJobFactory().jobsFrom( jobDescriptor
);
+        Job bulkJobs = BulkTestUtils.getBulkJobFactory().jobsFrom( jobDescriptor );
         assertNotNull( bulkJobs );
-        assertEquals( 1, bulkJobs.size() );
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-usergrid/blob/00a5032d/stack/core/src/test/java/org/apache/usergrid/batch/job/SchedulerRuntimeIntervalIT.java
----------------------------------------------------------------------
diff --git a/stack/core/src/test/java/org/apache/usergrid/batch/job/SchedulerRuntimeIntervalIT.java
b/stack/core/src/test/java/org/apache/usergrid/batch/job/SchedulerRuntimeIntervalIT.java
new file mode 100644
index 0000000..351ca7b
--- /dev/null
+++ b/stack/core/src/test/java/org/apache/usergrid/batch/job/SchedulerRuntimeIntervalIT.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.usergrid.batch.job;
+
+
+import java.util.concurrent.TimeUnit;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.usergrid.batch.service.JobSchedulerService;
+import org.apache.usergrid.cassandra.Concurrent;
+import org.apache.usergrid.persistence.entities.JobData;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+
+
+/**
+ * Class to test job that the run loop executes in the time expected when there's no jobs
to run.  Tests
+ * saturation at each point of the runtime as well
+ */
+@Concurrent
+public class SchedulerRuntimeIntervalIT extends AbstractSchedulerRuntimeIT {
+	
+	private static final Logger logger = LoggerFactory.getLogger(SchedulerRuntimeIntervalIT.class.getName());
+
+    private static final long EXPECTED_RUNTIME = 60000;
+
+
+//    private static final long EXPECTED_RUNTIME = 3000000;
+
+
+    /**
+     * This is a combination of ( count+1 ) * interval*2.  If this test takes longer than
this to run, we have a bug in how
+     * often the run loop is executing
+     * @throws InterruptedException
+     */
+    @Test(timeout = EXPECTED_RUNTIME)
+    public void runLoopTest() throws InterruptedException {
+
+        /**
+         * the number of iterations we should run
+         *
+         */
+        final int pollCount = 5;
+        final int expectedInterval = 5000;
+
+
+
+
+        JobSchedulerService schedulerService = cassandraResource.getBean( JobSchedulerService.class
);
+
+        final long interval = schedulerService.getInterval();
+
+        final int numberOfWorkers = schedulerService.getWorkerSize();
+
+        final int expectedExecutions = numberOfWorkers * pollCount;
+
+
+        assertEquals("Interval must be set to "+ expectedInterval + " for test to work properly",
expectedInterval, interval);
+
+
+        CountdownLatchJob counterJob = cassandraResource.getBean( CountdownLatchJob.class
);
+            // set the counter job latch size
+        counterJob.setLatch( expectedExecutions );
+
+
+        getJobListener().setExpected(expectedExecutions );
+
+
+        long fireTime = System.currentTimeMillis();
+
+        /**
+         * We want to space the jobs out so there will most likely be an empty poll phase.
 For each run where we do
+         * get jobs, we want to saturate the worker pool to ensure the semaphore is release
properly
+         */
+        for ( int i = 0; i < pollCount; i++ ) {
+
+            for(int j = 0; j < numberOfWorkers; j ++){
+                scheduler.createJob( "countdownLatch", fireTime, new JobData() );
+            }
+
+
+            fireTime += expectedInterval*2;
+        }
+
+
+        boolean waited = counterJob.waitForCount(EXPECTED_RUNTIME, TimeUnit.MILLISECONDS);
+
+        assertTrue( "Ran" + getCount() + " number of jobs", waited);
+
+        while (!getJobListener().blockTilDone(EXPECTED_RUNTIME)) {
+        	logger.warn("Jobs not yet finished after waited {}, block again" , waitTime);
+        }
+
+        //If we get to here without timing out, the test ran correctly.  The assertion is
implicit in the timeout
+        
+    }
+}


Mime
View raw message