activemq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jbono...@apache.org
Subject [activemq] branch activemq-5.16.x updated: Quick fix on InMemoryJobSchedulerTest
Date Wed, 02 Dec 2020 09:07:16 GMT
This is an automated email from the ASF dual-hosted git repository.

jbonofre pushed a commit to branch activemq-5.16.x
in repository https://gitbox.apache.org/repos/asf/activemq.git


The following commit(s) were added to refs/heads/activemq-5.16.x by this push:
     new c5622d6  Quick fix on InMemoryJobSchedulerTest
c5622d6 is described below

commit c5622d674d91a779159b8fb2109751fc76110dd6
Author: jbonofre <jbonofre@apache.org>
AuthorDate: Wed Dec 2 10:06:11 2020 +0100

    Quick fix on InMemoryJobSchedulerTest
---
 .../scheduler/memory/InMemoryJobSchedulerTest.java | 269 ++++++++++++++++++++-
 1 file changed, 261 insertions(+), 8 deletions(-)

diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTest.java
b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTest.java
index 36771b0..9b86693 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/broker/scheduler/memory/InMemoryJobSchedulerTest.java
@@ -16,21 +16,274 @@
  */
 package org.apache.activemq.broker.scheduler.memory;
 
-import org.apache.activemq.broker.scheduler.JobSchedulerStore;
-import org.apache.activemq.broker.scheduler.JobSchedulerTest;
+import org.apache.activemq.broker.scheduler.*;
+import org.apache.activemq.util.ByteSequence;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.util.Calendar;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 
 /**
  * In-Memory store based variation of the JobSchedulerTest
  */
-public class InMemoryJobSchedulerTest extends JobSchedulerTest {
+public class InMemoryJobSchedulerTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(JobSchedulerTest.class);
+
+    private JobSchedulerStore store;
+    private JobScheduler scheduler;
+
+    @Test(timeout = 60000)
+    public void testAddLongStringByteSequence() throws Exception {
+        final int COUNT = 10;
+        final CountDownLatch latch = new CountDownLatch(COUNT);
+        scheduler.addListener(new JobListener() {
+            @Override
+            public void scheduledJob(String id, ByteSequence job) {
+                latch.countDown();
+            }
+
+        });
+        for (int i = 0; i < COUNT; i++) {
+            String test = new String("test" + i);
+            scheduler.schedule("id" + i, new ByteSequence(test.getBytes()), 1000);
+        }
+        latch.await(5, TimeUnit.SECONDS);
+        assertEquals(0, latch.getCount());
+    }
 
-    @Override
+    @Test(timeout = 60000)
+    public void testAddCronAndByteSequence() throws Exception {
+
+        final CountDownLatch latch = new CountDownLatch(1);
+        scheduler.addListener(new JobListener() {
+            @Override
+            public void scheduledJob(String id, ByteSequence job) {
+                latch.countDown();
+            }
+        });
+
+        Calendar current = Calendar.getInstance();
+        current.add(Calendar.MINUTE, 1);
+        int minutes = current.get(Calendar.MINUTE);
+        int hour = current.get(Calendar.HOUR_OF_DAY);
+        int day = current.get(Calendar.DAY_OF_WEEK) - 1;
+
+        String cronTab = String.format("%d %d * * %d", minutes, hour, day);
+
+        String str = new String("test1");
+        scheduler.schedule("id:1", new ByteSequence(str.getBytes()), cronTab, 0, 0, 0);
+
+        // need a little slack so go over 60 seconds
+        assertTrue(latch.await(70, TimeUnit.SECONDS));
+        assertEquals(0, latch.getCount());
+    }
+
+    @Test(timeout = 60000)
+    public void testAddLongLongIntStringByteSequence() throws Exception {
+        final int COUNT = 10;
+        final CountDownLatch latch = new CountDownLatch(COUNT);
+        scheduler.addListener(new JobListener() {
+            @Override
+            public void scheduledJob(String id, ByteSequence job) {
+                latch.countDown();
+            }
+        });
+        long time = 2000;
+        for (int i = 0; i < COUNT; i++) {
+            String test = new String("test" + i);
+            scheduler.schedule("id" + i, new ByteSequence(test.getBytes()), "", time, 10,
-1);
+        }
+        assertTrue(latch.getCount() == COUNT);
+        latch.await(3000, TimeUnit.SECONDS);
+        assertTrue(latch.getCount() == 0);
+    }
+
+    @Test(timeout = 60000)
+    @Ignore
     public void testAddStopThenDeliver() throws Exception {
-        // In Memory store that's stopped doesn't retain the jobs.
+        final int COUNT = 10;
+        final CountDownLatch latch = new CountDownLatch(COUNT);
+        long time = 2000;
+        for (int i = 0; i < COUNT; i++) {
+            String test = new String("test" + i);
+            scheduler.schedule("id" + i, new ByteSequence(test.getBytes()), "", time, 1000,
-1);
+        }
+        File directory = store.getDirectory();
+        tearDown();
+        setUp();
+        scheduler.addListener(new JobListener() {
+            @Override
+            public void scheduledJob(String id, ByteSequence job) {
+                latch.countDown();
+            }
+        });
+        assertTrue(latch.getCount() == COUNT);
+        latch.await(3000, TimeUnit.SECONDS);
+        assertTrue(latch.getCount() == 0);
+    }
+
+    @Test(timeout = 60000)
+    public void testRemoveLong() throws Exception {
+        final int COUNT = 10;
+
+        long time = 60000;
+        for (int i = 0; i < COUNT; i++) {
+            String str = new String("test" + i);
+            scheduler.schedule("id" + i, new ByteSequence(str.getBytes()), "", time, 1000,
-1);
+        }
+
+        int size = scheduler.getAllJobs().size();
+        assertEquals(size, COUNT);
+
+        long removeTime = scheduler.getNextScheduleTime();
+        scheduler.remove(removeTime);
+
+        // If all jobs are not started within the same second we need to call remove again
+        if (size != 0) {
+            removeTime = scheduler.getNextScheduleTime();
+            scheduler.remove(removeTime);
+        }
+
+        size = scheduler.getAllJobs().size();
+        assertEquals(0, size);
+    }
+
+    @Test(timeout = 60000)
+    public void testRemoveString() throws Exception {
+        final int COUNT = 10;
+        final String test = "TESTREMOVE";
+        long time = 20000;
+
+        for (int i = 0; i < COUNT; i++) {
+            String str = new String("test" + i);
+            scheduler.schedule("id" + i, new ByteSequence(str.getBytes()), "", time, 1000,
-1);
+            if (i == COUNT / 2) {
+                scheduler.schedule(test, new ByteSequence(test.getBytes()), "", time, 1000,
-1);
+            }
+        }
+
+        int size = scheduler.getAllJobs().size();
+        assertEquals(size, COUNT + 1);
+        scheduler.remove(test);
+        size = scheduler.getAllJobs().size();
+        assertEquals(size, COUNT);
+    }
+
+    @Test(timeout = 60000)
+    public void testGetExecutionCount() throws Exception {
+        final String jobId = "Job-1";
+        long time = 10000;
+        final CountDownLatch done = new CountDownLatch(10);
+
+        String str = new String("test");
+        scheduler.schedule(jobId, new ByteSequence(str.getBytes()), "", time, 1000, 10);
+
+        int size = scheduler.getAllJobs().size();
+        assertEquals(size, 1);
+
+        scheduler.addListener(new JobListener() {
+            @Override
+            public void scheduledJob(String id, ByteSequence job) {
+                LOG.info("Job exectued: {}", 11 - done.getCount());
+                done.countDown();
+            }
+        });
+
+        List<Job> jobs = scheduler.getNextScheduleJobs();
+        assertEquals(1, jobs.size());
+        Job job = jobs.get(0);
+        assertEquals(jobId, job.getJobId());
+        assertEquals(0, job.getExecutionCount());
+        assertTrue("Should have fired ten times.", done.await(60, TimeUnit.SECONDS));
+        // The job is not updated on the last firing as it is removed from the store following
+        // it's last execution so the count will always be one less than the max firings.
+        assertTrue(job.getExecutionCount() >= 9);
+    }
+
+    @Test(timeout = 60000)
+    public void testgetAllJobs() throws Exception {
+        final int COUNT = 10;
+        final String ID = "id:";
+        long time = 20000;
+
+        for (int i = 0; i < COUNT; i++) {
+            String str = new String("test" + i);
+            scheduler.schedule(ID + i, new ByteSequence(str.getBytes()), "", time, 10 + i,
-1);
+        }
+
+        List<Job> list = scheduler.getAllJobs();
+
+        assertEquals(list.size(), COUNT);
+        int count = 0;
+        for (Job job : list) {
+            assertEquals(job.getJobId(), ID + count);
+            count++;
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testgetAllJobsInRange() throws Exception {
+        final int COUNT = 10;
+        final String ID = "id:";
+        long start = 10000;
+
+        for (int i = 0; i < COUNT; i++) {
+            String str = new String("test" + i);
+            scheduler.schedule(ID + i, new ByteSequence(str.getBytes()), "", start + (i *
1000), 10000 + i, 0);
+        }
+
+        start = System.currentTimeMillis();
+        long finish = start + 12000 + (COUNT * 1000);
+        List<Job> list = scheduler.getAllJobs(start, finish);
+
+        assertEquals(COUNT, list.size());
+        int count = 0;
+        for (Job job : list) {
+            assertEquals(job.getJobId(), ID + count);
+            count++;
+        }
+    }
+
+    @Test(timeout = 60000)
+    public void testRemoveAllJobsInRange() throws Exception {
+        final int COUNT = 10;
+        final String ID = "id:";
+        long start = 10000;
+
+        for (int i = 0; i < COUNT; i++) {
+            String str = new String("test" + i);
+            scheduler.schedule(ID + i, new ByteSequence(str.getBytes()), "", start + (i *
1000), 10000 + i, 0);
+        }
+        start = System.currentTimeMillis();
+        long finish = start + 12000 + (COUNT * 1000);
+        scheduler.removeAllJobs(start, finish);
+
+        assertTrue(scheduler.getAllJobs().isEmpty());
+    }
+
+    @Before
+    public void setUp() throws Exception {
+        store = new InMemoryJobSchedulerStore();
+        store.start();
+        scheduler = store.getJobScheduler("test");
+        scheduler.startDispatching();
     }
 
-    @Override
-    protected JobSchedulerStore createJobSchedulerStore() throws Exception {
-        return new InMemoryJobSchedulerStore();
+    @After
+    public void tearDown() throws Exception {
+        scheduler.stopDispatching();
+        store.stop();
     }
 }


Mime
View raw message