james-server-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From matth...@apache.org
Subject [james-project] 02/04: JAMES-2873 Add Hostname into Started event
Date Mon, 02 Sep 2019 08:15:59 GMT
This is an automated email from the ASF dual-hosted git repository.

matthieu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit cc8f433ad7c5c39a207b00621fada3d66998b4fd
Author: Gautier DI FOLCO <gdifolco@linagora.com>
AuthorDate: Fri Aug 23 16:21:03 2019 +0200

    JAMES-2873 Add Hostname into Started event
---
 ...{TaskManagerModule.java => HostnameModule.java} | 37 ++++++++++++++++------
 .../james/modules/server/TaskManagerModule.java    |  1 +
 ...assandraTaskExecutionDetailsProjectionDAO.scala |  3 ++
 ...andraTaskExecutionDetailsProjectionModule.scala |  2 ++
 .../eventsourcing/distributed/TaskEventDTO.scala   |  7 ++--
 .../distributed/TaskEventsSerializationTest.java   |  4 +--
 .../org/apache/james/task/MemoryTaskManager.java   |  8 +++--
 .../apache/james/task/TaskExecutionDetails.scala   | 26 ++++++++++-----
 .../james/task/eventsourcing/CommandHandlers.scala |  7 ++--
 .../eventsourcing/EventSourcingTaskManager.scala   |  4 +--
 .../apache/james/task/eventsourcing/Events.scala   |  2 +-
 .../james/task/eventsourcing/TaskAggregate.scala   |  4 +--
 .../TaskExecutionDetailsProjection.scala           |  4 +--
 .../EventSourcingTaskManagerTest.java              | 19 +++++++++++
 14 files changed, 92 insertions(+), 36 deletions(-)

diff --git a/server/container/guice/guice-common/src/main/java/org/apache/james/modules/server/TaskManagerModule.java
b/server/container/guice/guice-common/src/main/java/org/apache/james/modules/server/HostnameModule.java
similarity index 52%
copy from server/container/guice/guice-common/src/main/java/org/apache/james/modules/server/TaskManagerModule.java
copy to server/container/guice/guice-common/src/main/java/org/apache/james/modules/server/HostnameModule.java
index b3d1e4f..6607a74 100644
--- a/server/container/guice/guice-common/src/main/java/org/apache/james/modules/server/TaskManagerModule.java
+++ b/server/container/guice/guice-common/src/main/java/org/apache/james/modules/server/HostnameModule.java
@@ -1,4 +1,5 @@
-/****************************************************************
+/**
+ * *************************************************************
  * Licensed to the Apache Software Foundation (ASF) under one   *
  * or more contributor license agreements.  See the NOTICE file *
  * distributed with this work for additional information        *
@@ -6,29 +7,45 @@
  * to you under the Apache License, Version 2.0 (the            *
  * "License"); you may not use this file except in compliance   *
  * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0                   *
+ * *
  * Unless required by applicable law or agreed to in writing,   *
  * software distributed under the License is distributed on an  *
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
  * KIND, either express or implied.  See the License for the    *
  * specific language governing permissions and limitations      *
  * under the License.                                           *
- ****************************************************************/
+ ***************************************************************/
 
 package org.apache.james.modules.server;
 
-import org.apache.james.task.MemoryTaskManager;
-import org.apache.james.task.TaskManager;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+
+import org.apache.james.task.eventsourcing.Hostname;
 
 import com.google.inject.AbstractModule;
 import com.google.inject.Scopes;
 
-public class TaskManagerModule extends AbstractModule {
+public class HostnameModule extends AbstractModule {
+    private static class UnconfigurableHostnameException extends RuntimeException {
+        UnconfigurableHostnameException(String message, Exception originException) {
+            super(message, originException);
+        }
+    }
+
     @Override
     protected void configure() {
-        bind(MemoryTaskManager.class).in(Scopes.SINGLETON);
-        bind(TaskManager.class).to(MemoryTaskManager.class);
+        bind(Hostname.class).in(Scopes.SINGLETON);
+        bind(Hostname.class).toInstance(getHostname());
+    }
+
+    private Hostname getHostname() {
+        try {
+            return new Hostname(InetAddress.getLocalHost().getHostName());
+        } catch (UnknownHostException e) {
+            throw new UnconfigurableHostnameException("Hostname can not be retrieved, unable
to initialize the distributed task manager", e);
+        }
     }
 }
diff --git a/server/container/guice/guice-common/src/main/java/org/apache/james/modules/server/TaskManagerModule.java
b/server/container/guice/guice-common/src/main/java/org/apache/james/modules/server/TaskManagerModule.java
index b3d1e4f..484ad4c 100644
--- a/server/container/guice/guice-common/src/main/java/org/apache/james/modules/server/TaskManagerModule.java
+++ b/server/container/guice/guice-common/src/main/java/org/apache/james/modules/server/TaskManagerModule.java
@@ -28,6 +28,7 @@ import com.google.inject.Scopes;
 public class TaskManagerModule extends AbstractModule {
     @Override
     protected void configure() {
+        install(new HostnameModule());
         bind(MemoryTaskManager.class).in(Scopes.SINGLETON);
         bind(TaskManager.class).to(MemoryTaskManager.class);
     }
diff --git a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala
b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala
index e6b7aef..e7af442 100644
--- a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala
+++ b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionDAO.scala
@@ -44,6 +44,7 @@ class CassandraTaskExecutionDetailsProjectionDAO(session: Session, typesProvider
     .value(SUBMITTED_DATE, bindMarker(SUBMITTED_DATE))
     .value(SUBMITTED_NODE, bindMarker(SUBMITTED_NODE))
     .value(STARTED_DATE, bindMarker(STARTED_DATE))
+    .value(RAN_NODE, bindMarker(RAN_NODE))
     .value(COMPLETED_DATE, bindMarker(COMPLETED_DATE))
     .value(CANCELED_DATE, bindMarker(CANCELED_DATE))
     .value(FAILED_DATE, bindMarker(FAILED_DATE)))
@@ -61,6 +62,7 @@ class CassandraTaskExecutionDetailsProjectionDAO(session: Session, typesProvider
       .setUDTValue(SUBMITTED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getSubmitDate))
       .setString(SUBMITTED_NODE, details.getSubmittedNode.asString)
       .setUDTValue(STARTED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getStartedDate).orElse(null))
+      .setString(RAN_NODE, details.getRanNode.map[String](_.asString).orElse(null))
       .setUDTValue(COMPLETED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getCompletedDate).orElse(null))
       .setUDTValue(CANCELED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getCanceledDate).orElse(null))
       .setUDTValue(FAILED_DATE, CassandraZonedDateTimeModule.toUDT(dateType, details.getStartedDate).orElse(null)))
@@ -80,6 +82,7 @@ class CassandraTaskExecutionDetailsProjectionDAO(session: Session, typesProvider
     submittedDate = CassandraZonedDateTimeModule.fromUDT(row.getUDTValue(SUBMITTED_DATE)),
     submittedNode = Hostname(row.getString(SUBMITTED_NODE)),
     startedDate = CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(STARTED_DATE)),
+    ranNode = Optional.ofNullable(row.getString(RAN_NODE)).map(Hostname(_)),
     completedDate = CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(COMPLETED_DATE)),
     canceledDate = CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(CANCELED_DATE)),
     failedDate = CassandraZonedDateTimeModule.fromUDTOptional(row.getUDTValue(FAILED_DATE)),
diff --git a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala
b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala
index 2f7300d..1ee0554 100644
--- a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala
+++ b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/cassandra/CassandraTaskExecutionDetailsProjectionModule.scala
@@ -33,6 +33,7 @@ object CassandraTaskExecutionDetailsProjectionTable {
   val SUBMITTED_DATE: String = "submittedDate"
   val SUBMITTED_NODE: String = "submittedNode"
   val STARTED_DATE: String = "startedDate"
+  val RAN_NODE: String = "ranNode"
   val COMPLETED_DATE: String = "completedDate"
   val CANCELED_DATE: String = "canceledDate"
   val FAILED_DATE: String = "failedDate"
@@ -54,6 +55,7 @@ object CassandraTaskExecutionDetailsProjectionModule {
       .addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.SUBMITTED_DATE, SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
       .addColumn(CassandraTaskExecutionDetailsProjectionTable.SUBMITTED_NODE, text)
       .addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.STARTED_DATE, SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
+      .addColumn(CassandraTaskExecutionDetailsProjectionTable.RAN_NODE, text)
       .addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.COMPLETED_DATE, SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
       .addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.CANCELED_DATE, SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME))
       .addUDTColumn(CassandraTaskExecutionDetailsProjectionTable.FAILED_DATE, SchemaBuilder.frozen(CassandraZonedDateTimeModule.ZONED_DATE_TIME)))
diff --git a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala
b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala
index 63d9afe..4e13454 100644
--- a/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala
+++ b/server/task-distributed/src/main/scala/org/apache/james/task/eventsourcing/distributed/TaskEventDTO.scala
@@ -47,14 +47,15 @@ object CreatedDTO {
 
 case class StartedDTO(@JsonProperty("type") typeName: String,
                       @JsonProperty("aggregate") aggregateId: String,
-                      @JsonProperty("event") eventId: Int)
+                      @JsonProperty("event") eventId: Int,
+                      @JsonProperty("hostname") getHostname: String)
   extends TaskEventDTO(typeName, aggregateId, eventId) {
-  def toDomainObject: Started = Started(domainAggregateId, domainEventId)
+  def toDomainObject: Started = Started(domainAggregateId, domainEventId, Hostname(getHostname))
 }
 
 object StartedDTO {
   def fromDomainObject(event: Started, typeName: String): StartedDTO =
-    StartedDTO(typeName, event.aggregateId.taskId.asString(), event.eventId.serialize())
+    StartedDTO(typeName, event.aggregateId.taskId.asString(), event.eventId.serialize(),
event.hostname.asString)
 }
 
 case class CancelRequestedDTO(@JsonProperty("type") typeName: String,
diff --git a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/TaskEventsSerializationTest.java
b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/TaskEventsSerializationTest.java
index 183e10e..b41c762 100644
--- a/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/TaskEventsSerializationTest.java
+++ b/server/task-distributed/src/test/java/org/apache/james/task/eventsourcing/distributed/TaskEventsSerializationTest.java
@@ -80,7 +80,7 @@ class TaskEventsSerializationTest {
     private static Stream<Arguments> validTasks() throws Exception {
         return Stream.of(
             Arguments.of(new Created(AGGREGATE_ID, EVENT_ID, TASK, HOSTNAME), "{\"task\":\"{\\\"type\\\":\\\"completed-task\\\"}\",\"type\":\"task-manager-created\",\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42,\"hostname\":\"foo\"}\n"),
-            Arguments.of(new Started(AGGREGATE_ID, EVENT_ID), "{\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42,\"type\":\"task-manager-started\"}"),
+            Arguments.of(new Started(AGGREGATE_ID, EVENT_ID, HOSTNAME), "{\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42,\"type\":\"task-manager-started\",\"hostname\":\"foo\"}"),
             Arguments.of(new CancelRequested(AGGREGATE_ID, EVENT_ID), "{\"type\":\"task-manager-cancel-requested\",\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42}\n"),
             Arguments.of(new Completed(AGGREGATE_ID, EVENT_ID, Task.Result.COMPLETED), "{\"result\":\"COMPLETED\",\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42,\"type\":\"task-manager-completed\"}"),
             Arguments.of(new Completed(AGGREGATE_ID, EVENT_ID, Task.Result.PARTIAL), "{\"result\":\"PARTIAL\",\"aggregate\":\"2c7f4081-aa30-11e9-bf6c-2d3b9e84aafd\",\"event\":42,\"type\":\"task-manager-completed\"}"),
@@ -89,4 +89,4 @@ class TaskEventsSerializationTest {
         );
     }
 
-}
\ No newline at end of file
+}
diff --git a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
index 2e71938..b065edd 100644
--- a/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
+++ b/server/task/src/main/java/org/apache/james/task/MemoryTaskManager.java
@@ -47,14 +47,16 @@ public class MemoryTaskManager implements TaskManager {
     private static class DetailsUpdater implements TaskManagerWorker.Listener {
 
         private final TaskExecutionDetailsUpdaterFactory updaterFactory;
+        private final Hostname hostname;
 
-        DetailsUpdater(TaskExecutionDetailsUpdaterFactory updaterFactory) {
+        DetailsUpdater(TaskExecutionDetailsUpdaterFactory updaterFactory, Hostname hostname)
{
             this.updaterFactory = updaterFactory;
+            this.hostname = hostname;
         }
 
         @Override
         public void started(TaskId taskId) {
-            updaterFactory.apply(taskId).accept(TaskExecutionDetails::started);
+            updaterFactory.apply(taskId).accept(details -> details.started(hostname));
         }
 
         @Override
@@ -162,7 +164,7 @@ public class MemoryTaskManager implements TaskManager {
     }
 
     private DetailsUpdater updater() {
-        return new DetailsUpdater(this::updateDetails);
+        return new DetailsUpdater(this::updateDetails, hostname);
     }
 
     private Consumer<TaskExecutionDetailsUpdater> updateDetails(TaskId taskId) {
diff --git a/server/task/src/main/scala/org/apache/james/task/TaskExecutionDetails.scala b/server/task/src/main/scala/org/apache/james/task/TaskExecutionDetails.scala
index 4e9f20d..95b0d99 100644
--- a/server/task/src/main/scala/org/apache/james/task/TaskExecutionDetails.scala
+++ b/server/task/src/main/scala/org/apache/james/task/TaskExecutionDetails.scala
@@ -41,6 +41,7 @@ class TaskExecutionDetails(val taskId: TaskId,
                            private val submittedNode: Hostname,
                            private val additionalInformation: () => Optional[TaskExecutionDetails.AdditionalInformation],
                            private val startedDate: Optional[ZonedDateTime] = Optional.empty(),
+                           private val ranNode: Optional[Hostname] = Optional.empty(),
                            private val completedDate: Optional[ZonedDateTime] = Optional.empty(),
                            private val canceledDate: Optional[ZonedDateTime] = Optional.empty(),
                            private val failedDate: Optional[ZonedDateTime] = Optional.empty())
{
@@ -58,14 +59,16 @@ class TaskExecutionDetails(val taskId: TaskId,
 
   def getStartedDate: Optional[ZonedDateTime] = startedDate
 
+  def getRanNode: Optional[Hostname] = ranNode
+
   def getCompletedDate: Optional[ZonedDateTime] = completedDate
 
   def getCanceledDate: Optional[ZonedDateTime] = canceledDate
 
   def getFailedDate: Optional[ZonedDateTime] = failedDate
 
-  def started: TaskExecutionDetails = status match {
-    case WAITING => start
+  def started(hostname: Hostname): TaskExecutionDetails = status match {
+    case WAITING => start(hostname)
     case _ => this
   }
 
@@ -82,7 +85,7 @@ class TaskExecutionDetails(val taskId: TaskId,
     case _ => this
   }
 
-  def cancelRequested: TaskExecutionDetails = status match {
+  def cancelRequested(hostname: Hostname): TaskExecutionDetails = status match {
     case IN_PROGRESS => requestCancel
     case WAITING => requestCancel
     case _ => this
@@ -107,6 +110,7 @@ class TaskExecutionDetails(val taskId: TaskId,
         Objects.equals(submittedDate, that.submittedDate) &&
         Objects.equals(submittedNode, that.submittedNode) &&
         Objects.equals(startedDate, that.startedDate) &&
+        Objects.equals(ranNode, that.ranNode) &&
         Objects.equals(completedDate, that.completedDate) &&
         Objects.equals(canceledDate, that.canceledDate) &&
         Objects.equals(failedDate, that.failedDate)
@@ -114,7 +118,7 @@ class TaskExecutionDetails(val taskId: TaskId,
   }
 
   override def hashCode(): Int =
-    Objects.hash(taskId, `type`, additionalInformation(), status, submittedDate, submittedNode,
startedDate, completedDate, canceledDate, failedDate)
+    Objects.hash(taskId, `type`, additionalInformation(), status, submittedDate, submittedNode,
startedDate, ranNode, completedDate, canceledDate, failedDate)
 
   override def toString: String =
     MoreObjects.toStringHelper(this)
@@ -125,38 +129,44 @@ class TaskExecutionDetails(val taskId: TaskId,
       .add("submittedDate", submittedDate)
       .add("submittedNode", submittedNode)
       .add("startedDate", startedDate)
+      .add("ranNode", ranNode)
       .add("completedDate", completedDate)
       .add("canceledDate", canceledDate)
       .add("failedDate", failedDate)
       .toString
 
-  private def start = new TaskExecutionDetails(taskId, `type`, IN_PROGRESS,
+  private def start(hostname: Hostname) = new TaskExecutionDetails(taskId, `type`, IN_PROGRESS,
     submittedDate = submittedDate,
     submittedNode = submittedNode,
     additionalInformation = additionalInformation,
-    startedDate = Optional.of(ZonedDateTime.now))
+    startedDate = Optional.of(ZonedDateTime.now),
+    ranNode = Optional.of(hostname))
   private def complete = new TaskExecutionDetails(taskId, `type`, TaskManager.Status.COMPLETED,
     submittedDate = submittedDate,
     submittedNode = submittedNode,
-    startedDate = startedDate,
     additionalInformation = additionalInformation,
+    startedDate = startedDate,
+    ranNode = ranNode,
     completedDate = Optional.of(ZonedDateTime.now))
   private def fail = new TaskExecutionDetails(taskId, `type`, TaskManager.Status.FAILED,
     submittedDate = submittedDate,
     submittedNode = submittedNode,
-    startedDate = startedDate,
     additionalInformation = additionalInformation,
+    startedDate = startedDate,
+    ranNode = ranNode,
     failedDate = Optional.of(ZonedDateTime.now))
   private def requestCancel = new TaskExecutionDetails(taskId, `type`, TaskManager.Status.CANCEL_REQUESTED,
     submittedDate = submittedDate,
     submittedNode = submittedNode,
     additionalInformation = additionalInformation,
     startedDate = startedDate,
+    ranNode = ranNode,
     canceledDate = Optional.of(ZonedDateTime.now))
   private def cancel = new TaskExecutionDetails(taskId, `type`, TaskManager.Status.CANCELLED,
     submittedDate = submittedDate,
     submittedNode = submittedNode,
     additionalInformation = additionalInformation,
     startedDate = startedDate,
+    ranNode = ranNode,
     canceledDate = Optional.of(ZonedDateTime.now))
 }
diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
index 905f1ec..cdc241e 100644
--- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
+++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/CommandHandlers.scala
@@ -41,11 +41,12 @@ class CreateCommandHandler(private val loadHistory: TaskAggregateId =>
History,
   }
 }
 
-class StartCommandHandler(private val loadHistory: TaskAggregateId => History) extends
TaskCommandHandler[Start] {
+class StartCommandHandler(private val loadHistory: TaskAggregateId => History,
+                          private val hostname: Hostname) extends TaskCommandHandler[Start]
{
   override def handledClass: Class[Start] = classOf[Start]
 
   override def handle(command: Start): util.List[_ <: Event] = {
-    loadAggregate(loadHistory, command.id).start()
+    loadAggregate(loadHistory, command.id).start(hostname)
   }
 }
 
@@ -79,4 +80,4 @@ class FailCommandHandler(private val loadHistory: TaskAggregateId =>
History) ex
   override def handle(command: Fail): util.List[_ <: Event] = {
     loadAggregate(loadHistory, command.id).fail()
   }
-}
\ No newline at end of file
+}
diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
index d091197..7302b4b 100644
--- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
+++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/EventSourcingTaskManager.scala
@@ -53,13 +53,13 @@ class EventSourcingTaskManager @Inject @VisibleForTesting private[eventsourcing]
   private val eventSourcingSystem = ScalaEventSourcingSystem(
     handlers = Set(
       new CreateCommandHandler(loadHistory, hostname),
-      new StartCommandHandler(loadHistory),
+      new StartCommandHandler(loadHistory, hostname),
       new RequestCancelCommandHandler(loadHistory),
       new CompleteCommandHandler(loadHistory),
       new CancelCommandHandler(loadHistory),
       new FailCommandHandler(loadHistory)),
     subscribers = Set(
-      executionDetailsProjection.asSubscriber,
+      executionDetailsProjection.asSubscriber(hostname),
       workDispatcher),
     eventStore = eventStore)
 
diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala b/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala
index 8f8bb40..61d7d8f 100644
--- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala
+++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/Events.scala
@@ -32,7 +32,7 @@ case class Hostname(private val value: String) {
 
 case class Created(aggregateId: TaskAggregateId, override val eventId: EventId, task: Task,
hostname: Hostname) extends TaskEvent(aggregateId, eventId)
 
-case class Started(aggregateId: TaskAggregateId, override val eventId: EventId) extends TaskEvent(aggregateId,
eventId)
+case class Started(aggregateId: TaskAggregateId, override val eventId: EventId, hostname:
Hostname) extends TaskEvent(aggregateId, eventId)
 
 case class CancelRequested(aggregateId: TaskAggregateId, override val eventId: EventId) extends
TaskEvent(aggregateId, eventId)
 
diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala
index 2013d91..24aef85 100644
--- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala
+++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskAggregate.scala
@@ -43,9 +43,9 @@ class TaskAggregate private(val aggregateId: TaskAggregateId, private val
histor
     } else Nil.asJava
   }
 
-  private[eventsourcing] def start(): util.List[Event] = {
+  private[eventsourcing] def start(hostname: Hostname): util.List[Event] = {
     currentStatus match {
-      case Some(Status.WAITING) => createEventWithId(Started(aggregateId, _))
+      case Some(Status.WAITING) => createEventWithId(Started(aggregateId, _, hostname))
       case _ => Nil.asJava
     }
   }
diff --git a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala
b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala
index 65f0aca..5654e0f 100644
--- a/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala
+++ b/server/task/src/main/scala/org/apache/james/task/eventsourcing/TaskExecutionDetailsProjection.scala
@@ -25,13 +25,13 @@ import org.apache.james.task.{TaskExecutionDetails, TaskId}
 import collection.JavaConverters._
 
 trait TaskExecutionDetailsProjection {
-  val asSubscriber: Subscriber = {
+  def asSubscriber(hostname: Hostname): Subscriber = {
     case created: Created =>
       update(TaskExecutionDetails.from(created.task, created.aggregateId.taskId, created.hostname))
     case cancelRequested: CancelRequested =>
       update(cancelRequested.aggregateId.taskId)(_.cancelRequested)
     case started: Started =>
-      update(started.aggregateId.taskId)(_.started)
+      update(started.aggregateId.taskId)(_.started(hostname))
     case completed: Completed =>
       update(completed.aggregateId.taskId)(_.completed)
     case failed: Failed =>
diff --git a/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
b/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
index 867665f..b8773a8 100644
--- a/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
+++ b/server/task/src/test/java/org/apache/james/task/eventsourcing/EventSourcingTaskManagerTest.java
@@ -20,6 +20,7 @@
 package org.apache.james.task.eventsourcing;
 
 import static org.assertj.core.api.Assertions.assertThat;
+import static org.awaitility.Duration.ONE_HUNDRED_MILLISECONDS;
 
 import org.apache.james.eventsourcing.EventSourcingSystem;
 import org.apache.james.eventsourcing.eventstore.EventStore;
@@ -32,6 +33,9 @@ import org.apache.james.task.TaskId;
 import org.apache.james.task.TaskManager;
 import org.apache.james.task.TaskManagerContract;
 import org.apache.james.task.TaskManagerWorker;
+
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionFactory;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -39,6 +43,10 @@ import org.junit.jupiter.api.extension.ExtendWith;
 
 @ExtendWith(CountDownLatchExtension.class)
 class EventSourcingTaskManagerTest implements TaskManagerContract {
+    ConditionFactory CALMLY_AWAIT = Awaitility
+        .with().pollInterval(ONE_HUNDRED_MILLISECONDS)
+        .and().pollDelay(ONE_HUNDRED_MILLISECONDS)
+        .await();
 
     private static final Hostname HOSTNAME = new Hostname("foo");
     private EventSourcingTaskManager taskManager;
@@ -76,4 +84,15 @@ class EventSourcingTaskManagerTest implements TaskManagerContract {
                 .containsOnly(HOSTNAME);
     }
 
+    @Test
+    void startedTaskShouldKeepOriginHostname() {
+        TaskId taskId = taskManager.submit(() -> Task.Result.COMPLETED);
+        TaskAggregateId aggregateId = new TaskAggregateId(taskId);
+
+        CALMLY_AWAIT.untilAsserted(() ->
+            assertThat(eventStore.getEventsOfAggregate(aggregateId).getEvents())
+                .filteredOn(event -> event instanceof Started)
+                .extracting("hostname")
+                .containsOnly(HOSTNAME));
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org


Mime
View raw message