This is an automated email from the ASF dual-hosted git repository.
jbonofre pushed a commit to branch activemq-5.15.x
in repository https://gitbox.apache.org/repos/asf/activemq.git
The following commit(s) were added to refs/heads/activemq-5.15.x by this push:
new d57ba38 Quick fix on InMemoryJobSchedulerTest
d57ba38 is described below
commit d57ba38d20cf21ae9cda049c72fa3cf034fdf412
Author: jbonofre <jbonofre@apache.org>
AuthorDate: Wed Dec 2 10:06:11 2020 +0100
Quick fix on InMemoryJobSchedulerTest
---
.../scheduler/memory/InMemoryJobSchedulerTest.java | 269 ++++++++++++++++++++-
1 file changed, 261 insertions(+), 8 deletions(-)
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTest.java
index 36771b0..9b86693 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTest.java
@@ -16,21 +16,274 @@
*/
package org.apache.activemq.broker.scheduler.memory;
-import org.apache.activemq.broker.scheduler.JobSchedulerStore;
-import org.apache.activemq.broker.scheduler.JobSchedulerTest;
+import org.apache.activemq.broker.scheduler.*;
+import org.apache.activemq.util.ByteSequence;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Calendar;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
/**
* In-Memory store based variation of the JobSchedulerTest
*/
-public class InMemoryJobSchedulerTest extends JobSchedulerTest {
+public class InMemoryJobSchedulerTest {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerTest.class);
+
+ private JobSchedulerStore store;
+ private JobScheduler scheduler;
+
+ @Test(timeout = 60000)
+ public void testAddLongStringByteSequence() throws Exception {
+ final int COUNT = 10;
+ final CountDownLatch latch = new CountDownLatch(COUNT);
+ scheduler.addListener(new JobListener() {
+ @Override
+ public void scheduledJob(String id, ByteSequence job) {
+ latch.countDown();
+ }
+
+ });
+ for (int i = 0; i < COUNT; i++) {
+ String test = new String("test" + i);
+ scheduler.schedule("id" + i, new ByteSequence(test.getBytes()), 1000);
+ }
+ latch.await(5, TimeUnit.SECONDS);
+ assertEquals(0, latch.getCount());
+ }
- @Override
+ @Test(timeout = 60000)
+ public void testAddCronAndByteSequence() throws Exception {
+
+ final CountDownLatch latch = new CountDownLatch(1);
+ scheduler.addListener(new JobListener() {
+ @Override
+ public void scheduledJob(String id, ByteSequence job) {
+ latch.countDown();
+ }
+ });
+
+ Calendar current = Calendar.getInstance();
+ current.add(Calendar.MINUTE, 1);
+ int minutes = current.get(Calendar.MINUTE);
+ int hour = current.get(Calendar.HOUR_OF_DAY);
+ int day = current.get(Calendar.DAY_OF_WEEK) - 1;
+
+ String cronTab = String.format("%d %d * * %d", minutes, hour, day);
+
+ String str = new String("test1");
+ scheduler.schedule("id:1", new ByteSequence(str.getBytes()), cronTab, 0, 0, 0);
+
+ // need a little slack so go over 60 seconds
+ assertTrue(latch.await(70, TimeUnit.SECONDS));
+ assertEquals(0, latch.getCount());
+ }
+
+ @Test(timeout = 60000)
+ public void testAddLongLongIntStringByteSequence() throws Exception {
+ final int COUNT = 10;
+ final CountDownLatch latch = new CountDownLatch(COUNT);
+ scheduler.addListener(new JobListener() {
+ @Override
+ public void scheduledJob(String id, ByteSequence job) {
+ latch.countDown();
+ }
+ });
+ long time = 2000;
+ for (int i = 0; i < COUNT; i++) {
+ String test = new String("test" + i);
+ scheduler.schedule("id" + i, new ByteSequence(test.getBytes()), "", time, 10,
-1);
+ }
+ assertTrue(latch.getCount() == COUNT);
+ latch.await(3000, TimeUnit.SECONDS);
+ assertTrue(latch.getCount() == 0);
+ }
+
+ @Test(timeout = 60000)
+ @Ignore
public void testAddStopThenDeliver() throws Exception {
- // In Memory store that's stopped doesn't retain the jobs.
+ final int COUNT = 10;
+ final CountDownLatch latch = new CountDownLatch(COUNT);
+ long time = 2000;
+ for (int i = 0; i < COUNT; i++) {
+ String test = new String("test" + i);
+ scheduler.schedule("id" + i, new ByteSequence(test.getBytes()), "", time, 1000,
-1);
+ }
+ File directory = store.getDirectory();
+ tearDown();
+ setUp();
+ scheduler.addListener(new JobListener() {
+ @Override
+ public void scheduledJob(String id, ByteSequence job) {
+ latch.countDown();
+ }
+ });
+ assertTrue(latch.getCount() == COUNT);
+ latch.await(3000, TimeUnit.SECONDS);
+ assertTrue(latch.getCount() == 0);
+ }
+
+ @Test(timeout = 60000)
+ public void testRemoveLong() throws Exception {
+ final int COUNT = 10;
+
+ long time = 60000;
+ for (int i = 0; i < COUNT; i++) {
+ String str = new String("test" + i);
+ scheduler.schedule("id" + i, new ByteSequence(str.getBytes()), "", time, 1000,
-1);
+ }
+
+ int size = scheduler.getAllJobs().size();
+ assertEquals(size, COUNT);
+
+ long removeTime = scheduler.getNextScheduleTime();
+ scheduler.remove(removeTime);
+
+ // If all jobs are not started within the same second we need to call remove again
+ if (size != 0) {
+ removeTime = scheduler.getNextScheduleTime();
+ scheduler.remove(removeTime);
+ }
+
+ size = scheduler.getAllJobs().size();
+ assertEquals(0, size);
+ }
+
+ @Test(timeout = 60000)
+ public void testRemoveString() throws Exception {
+ final int COUNT = 10;
+ final String test = "TESTREMOVE";
+ long time = 20000;
+
+ for (int i = 0; i < COUNT; i++) {
+ String str = new String("test" + i);
+ scheduler.schedule("id" + i, new ByteSequence(str.getBytes()), "", time, 1000,
-1);
+ if (i == COUNT / 2) {
+ scheduler.schedule(test, new ByteSequence(test.getBytes()), "", time, 1000,
-1);
+ }
+ }
+
+ int size = scheduler.getAllJobs().size();
+ assertEquals(size, COUNT + 1);
+ scheduler.remove(test);
+ size = scheduler.getAllJobs().size();
+ assertEquals(size, COUNT);
+ }
+
+ @Test(timeout = 60000)
+ public void testGetExecutionCount() throws Exception {
+ final String jobId = "Job-1";
+ long time = 10000;
+ final CountDownLatch done = new CountDownLatch(10);
+
+ String str = new String("test");
+ scheduler.schedule(jobId, new ByteSequence(str.getBytes()), "", time, 1000, 10);
+
+ int size = scheduler.getAllJobs().size();
+ assertEquals(size, 1);
+
+ scheduler.addListener(new JobListener() {
+ @Override
+ public void scheduledJob(String id, ByteSequence job) {
+ LOG.info("Job exectued: {}", 11 - done.getCount());
+ done.countDown();
+ }
+ });
+
+ List<Job> jobs = scheduler.getNextScheduleJobs();
+ assertEquals(1, jobs.size());
+ Job job = jobs.get(0);
+ assertEquals(jobId, job.getJobId());
+ assertEquals(0, job.getExecutionCount());
+ assertTrue("Should have fired ten times.", done.await(60, TimeUnit.SECONDS));
+ // The job is not updated on the last firing as it is removed from the store following
+ // it's last execution so the count will always be one less than the max firings.
+ assertTrue(job.getExecutionCount() >= 9);
+ }
+
+ @Test(timeout = 60000)
+ public void testgetAllJobs() throws Exception {
+ final int COUNT = 10;
+ final String ID = "id:";
+ long time = 20000;
+
+ for (int i = 0; i < COUNT; i++) {
+ String str = new String("test" + i);
+ scheduler.schedule(ID + i, new ByteSequence(str.getBytes()), "", time, 10 + i,
-1);
+ }
+
+ List<Job> list = scheduler.getAllJobs();
+
+ assertEquals(list.size(), COUNT);
+ int count = 0;
+ for (Job job : list) {
+ assertEquals(job.getJobId(), ID + count);
+ count++;
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testgetAllJobsInRange() throws Exception {
+ final int COUNT = 10;
+ final String ID = "id:";
+ long start = 10000;
+
+ for (int i = 0; i < COUNT; i++) {
+ String str = new String("test" + i);
+ scheduler.schedule(ID + i, new ByteSequence(str.getBytes()), "", start + (i *
1000), 10000 + i, 0);
+ }
+
+ start = System.currentTimeMillis();
+ long finish = start + 12000 + (COUNT * 1000);
+ List<Job> list = scheduler.getAllJobs(start, finish);
+
+ assertEquals(COUNT, list.size());
+ int count = 0;
+ for (Job job : list) {
+ assertEquals(job.getJobId(), ID + count);
+ count++;
+ }
+ }
+
+ @Test(timeout = 60000)
+ public void testRemoveAllJobsInRange() throws Exception {
+ final int COUNT = 10;
+ final String ID = "id:";
+ long start = 10000;
+
+ for (int i = 0; i < COUNT; i++) {
+ String str = new String("test" + i);
+ scheduler.schedule(ID + i, new ByteSequence(str.getBytes()), "", start + (i *
1000), 10000 + i, 0);
+ }
+ start = System.currentTimeMillis();
+ long finish = start + 12000 + (COUNT * 1000);
+ scheduler.removeAllJobs(start, finish);
+
+ assertTrue(scheduler.getAllJobs().isEmpty());
+ }
+
+ @Before
+ public void setUp() throws Exception {
+ store = new InMemoryJobSchedulerStore();
+ store.start();
+ scheduler = store.getJobScheduler("test");
+ scheduler.startDispatching();
}
- @Override
- protected JobSchedulerStore createJobSchedulerStore() throws Exception {
- return new InMemoryJobSchedulerStore();
+ @After
+ public void tearDown() throws Exception {
+ scheduler.stopDispatching();
+ store.stop();
}
}
|