hama-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chl...@apache.org
Subject [hama] 01/01: Intialize commit for componentization.
Date Sun, 24 Feb 2019 15:35:38 GMT
This is an automated email from the ASF dual-hosted git repository.

chl501 pushed a commit to branch componentization
in repository https://gitbox.apache.org/repos/asf/hama.git

commit 0a24dc62593a84f0ad1243c789c406f2fb8b3265
Author: Chiahung Lin <chl501@apache.org>
AuthorDate: Sun Feb 24 16:34:24 2019 +0100

    Intialize commit for componentization.
---
 .gitignore                                         |   1 +
 bsp/build.sbt                                      |  24 +++
 bsp/src/main/scala/org/apache/hama/bsp/Step.scala  |  51 +++++
 .../main/scala/org/apache/hama/bsp/package.scala   |  62 +++++++
 bsp/src/test/resources/log4j.properties            |  13 ++
 .../test/scala/org/apache/hama/bsp/StepSpec.scala  |  48 +++++
 build.sbt                                          |  40 ++++
 commons.v2/build.sbt                               |  24 +++
 .../main/scala/org/apache/hama/conf/Setting.scala  |  57 ++++++
 .../scala/org/apache/hama/logging/Logging.scala    |  26 +++
 .../main/scala/org/apache/hama/util/Utils.scala    |  30 +++
 commons.v2/src/test/resources/log4j.properties     |  13 ++
 .../scala/org/apache/hama/conf/SettingSpec.scala   |  34 ++++
 membership/build.sbt                               |  27 +++
 .../org/apache/hama/membership/BSPMaster.scala     | 192 +++++++++++++++++++
 .../org/apache/hama/membership/Communicator.scala  | 206 +++++++++++++++++++++
 .../scala/org/apache/hama/membership/Driver.scala  |  43 +++++
 .../scala/org/apache/hama/membership/Job.scala     |  29 +++
 .../scala/org/apache/hama/membership/Role.scala    |  22 +++
 .../org/apache/hama/membership/Scheduler.scala     |  28 +++
 .../scala/org/apache/hama/membership/package.scala |  28 +++
 membership/src/test/resources/log4j.properties     |  13 ++
 .../org/apache/hama/membership/BSPMasterSpec.scala |  62 +++++++
 .../apache/hama/membership/CommunicatorSpec.scala  | 135 ++++++++++++++
 project/assembly.sbt                               |   1 +
 project/build.properties                           |   1 +
 26 files changed, 1210 insertions(+)

diff --git a/.gitignore b/.gitignore
new file mode 100644
index 0000000..eb5a316
--- /dev/null
+++ b/.gitignore
@@ -0,0 +1 @@
+target
diff --git a/bsp/build.sbt b/bsp/build.sbt
new file mode 100644
index 0000000..7e66536
--- /dev/null
+++ b/bsp/build.sbt
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+name := "bsp"
+
+scalacOptions += "-Ypartial-unification"
+
+libraryDependencies ++= Seq (
+  "org.typelevel" %% "cats-core" % "1.2.0"
+)
diff --git a/bsp/src/main/scala/org/apache/hama/bsp/Step.scala b/bsp/src/main/scala/org/apache/hama/bsp/Step.scala
new file mode 100644
index 0000000..7b256d7
--- /dev/null
+++ b/bsp/src/main/scala/org/apache/hama/bsp/Step.scala
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp
+
+import cats.data.Kleisli
+import cats.data.NonEmptyList
+import cats.implicits._
+import cats.data._
+import scala.util.Either
+
+// TODO: replace with zio.KleisliIO instead.
+object Step {
+
+  def apply[I, O](f: I => Result[O]) = new Step[I, O](Kleisli(f))
+
+}
+class Step[I, O](k: Kleisli[Result, I, O]) {
+
+  /**
+   * Form a new computation by taking in input I, and produce output O1.
+   */
+  // TODO: rename for name collision preventation.
+  def then[O1](next: Step[O, O1]) = new Step[I, O1](
+    k andThen sync() andThen next.kleisli
+  )
+
+  protected[bsp] def kleisli = k
+
+  def run(in: I) = k.run(in) 
+
+  protected[bsp] def sync() = Kleisli[Result, O, O]( (o: O) => {
+    /* TODO: barrier sync() operation */
+    Success(o)
+  })
+
+}
diff --git a/bsp/src/main/scala/org/apache/hama/bsp/package.scala b/bsp/src/main/scala/org/apache/hama/bsp/package.scala
new file mode 100644
index 0000000..e1ee846
--- /dev/null
+++ b/bsp/src/main/scala/org/apache/hama/bsp/package.scala
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama
+
+import cats.FlatMap
+import cats.data.NonEmptyList
+import scala.util.Either
+import scala.annotation.tailrec
+
+package object bsp {
+
+  type WithSideEffect[A] = Either[NonEmptyList[Seq[Throwable]], A]
+
+  sealed class Result[A](withSideEffect: WithSideEffect[A]) {
+
+    def map[B](f: A => B): Result[B] = this match {
+      case Success(s) => Success(f(s))
+      case _ => this.asInstanceOf[Result[B]]
+    }
+
+    def flatMap[B](f: A => Result[B]): Result[B] = this match {
+      case Success(s) => f(s)
+      case _ => this.asInstanceOf[Result[B]]
+    } 
+  }
+
+  final case class Failure[A](exs: Throwable*) extends Result[A](Left(NonEmptyList.of(exs)))
+
+  final case class Success[A](value: A) extends Result(Right(value))
+
+  implicit val resultFlatMap: FlatMap[Result] = new FlatMap[Result] {
+
+    def map[A, B](fa: Result[A])(f: A => B): Result[B] = fa.map(f)
+
+    def flatMap[A, B](fa: Result[A])(f: A => Result[B]): Result[B] = fa.flatMap(f)
+
+    @tailrec
+    def tailRecM[A, B](a: A)(f: A => Result[Either[A, B]]): Result[B] = f(a) match {
+      case Failure(exs) => Failure(exs)
+      case Success(Left(a1)) => tailRecM(a1)(f)
+      case Success(Right(b)) => Success(b)
+    }
+
+  }
+
+}
+
diff --git a/bsp/src/test/resources/log4j.properties b/bsp/src/test/resources/log4j.properties
new file mode 100644
index 0000000..2695454
--- /dev/null
+++ b/bsp/src/test/resources/log4j.properties
@@ -0,0 +1,13 @@
+log4j.rootLogger=INFO, file, stdout
+
+log4j.appender.file=org.apache.log4j.RollingFileAppender
+log4j.appender.file.File=/tmp/hama-core/bsp-test.log
+log4j.appender.file.MaxFileSize=10MB
+log4j.appender.file.MaxBackupIndex=10
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
+ 
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
diff --git a/bsp/src/test/scala/org/apache/hama/bsp/StepSpec.scala b/bsp/src/test/scala/org/apache/hama/bsp/StepSpec.scala
new file mode 100644
index 0000000..4fd458b
--- /dev/null
+++ b/bsp/src/test/scala/org/apache/hama/bsp/StepSpec.scala
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.bsp
+
+import org.scalatest._
+import scala.collection.mutable.WrappedArray
+
+class StepSpec extends FlatSpec with Matchers {
+
+  "Step" should "be composable" in {
+
+    val expected = Success(Seq('1', '2', '3', '.', '0', '_', 'd', 'o', 'u', 'b', 'l', 'e'))
+
+    val step1 = Step[Int, Double]( (int: Int) => 
+      if(int > 0) Success(int.toDouble) else Failure(new Exception(s"$int smaller than 0!"))
+    )
+
+    val step2 = Step[Double, String]( (double: Double) => 
+      if(double % 10.0 > 0) Success(double.toString + "_double") 
+      else Failure(new Exception(s"$double % 10 is 0!"))
+    )
+
+    val step3 = Step[String, Seq[Char]]( (str: String) => Success(str.toCharArray.toSeq))
+
+    val result = (step1 then step2 then step3) run 123 
+
+    result.map { wrappedArray => wrappedArray.map { e => 
+      assert(expected.value.contains(e))
+    }}
+
+  }
+
+}
diff --git a/build.sbt b/build.sbt
new file mode 100644
index 0000000..83c3fc4
--- /dev/null
+++ b/build.sbt
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+lazy val common = Seq (
+  version := "0.0.1",
+  organization := "org.apache.hama",
+  scalaVersion := "2.12.6",
+  scalacOptions := Seq ("-deprecation", "-Ypartial-unification"), 
+  libraryDependencies ++= Seq (
+    "org.scalatest" %% "scalatest" % "3.0.5" % "test"
+  )
+)
+
+lazy val commons = (project in file("commons.v2")).settings (
+  common
+)
+
+lazy val bsp = (project in file("bsp")).settings (
+  common
+)
+
+lazy val membership = (project in file("membership")).settings (
+  common
+) .dependsOn(commons)
+
+lazy val core = (project in file(".")).aggregate(commons, bsp, membership)
diff --git a/commons.v2/build.sbt b/commons.v2/build.sbt
new file mode 100644
index 0000000..2d7fe2f
--- /dev/null
+++ b/commons.v2/build.sbt
@@ -0,0 +1,24 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+name := "commons.v2"
+
+libraryDependencies ++= Seq (
+  "com.typesafe" % "config" % "1.3.3",
+  "org.slf4j" % "slf4j-api" % "1.7.25",
+  "org.slf4j" % "slf4j-simple" % "1.7.25"
+)
diff --git a/commons.v2/src/main/scala/org/apache/hama/conf/Setting.scala b/commons.v2/src/main/scala/org/apache/hama/conf/Setting.scala
new file mode 100644
index 0000000..8f64b00
--- /dev/null
+++ b/commons.v2/src/main/scala/org/apache/hama/conf/Setting.scala
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.conf
+
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import com.typesafe.config.ConfigValue
+import com.typesafe.config.ConfigValueFactory
+import scala.util.Try
+
+trait Setting {
+
+  def set[T](key: String, value: T): DefaultSetting 
+
+  def get[T](key: String): Option[T] 
+
+}
+
+object Setting {
+
+  val hama = "hama"
+
+  def create(name: String = hama): Setting = DefaultSetting (
+    ConfigFactory.load(name)
+  )
+
+  // TODO: hadoop configuration to  
+  // def from(configuration: Configuration): Setting = ???
+
+}
+
+protected[conf] case class DefaultSetting(config: Config) extends Setting {
+
+  override def set[T](key: String, value: T): DefaultSetting = DefaultSetting (
+    config.withValue(key, ConfigValueFactory.fromAnyRef(value))
+  )
+
+  override def get[T](key: String): Option[T] = 
+    Try(config.getAnyRef(key).asInstanceOf[T]).toOption
+
+}
+
diff --git a/commons.v2/src/main/scala/org/apache/hama/logging/Logging.scala b/commons.v2/src/main/scala/org/apache/hama/logging/Logging.scala
new file mode 100644
index 0000000..5b62a73
--- /dev/null
+++ b/commons.v2/src/main/scala/org/apache/hama/logging/Logging.scala
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.logging
+
+import org.slf4j.LoggerFactory
+
+trait Logging {
+
+  lazy val log = LoggerFactory.getLogger(getClass)
+
+}
diff --git a/commons.v2/src/main/scala/org/apache/hama/util/Utils.scala b/commons.v2/src/main/scala/org/apache/hama/util/Utils.scala
new file mode 100644
index 0000000..6ca05fd
--- /dev/null
+++ b/commons.v2/src/main/scala/org/apache/hama/util/Utils.scala
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.util
+
+object Utils {
+
+  implicit class StringExt(value: String) {
+    def isBlank: Boolean = null == value || "".equals(value)
+  }
+
+  implicit class PortExt(value: Int) {
+    def isValid: Boolean = 0 < value && 65536 > value
+  }  
+
+}
diff --git a/commons.v2/src/test/resources/log4j.properties b/commons.v2/src/test/resources/log4j.properties
new file mode 100644
index 0000000..fb283b1
--- /dev/null
+++ b/commons.v2/src/test/resources/log4j.properties
@@ -0,0 +1,13 @@
+log4j.rootLogger=INFO, file, stdout
+
+log4j.appender.file=org.apache.log4j.RollingFileAppender
+log4j.appender.file.File=/tmp/hama-core/commons-test.log
+log4j.appender.file.MaxFileSize=10MB
+log4j.appender.file.MaxBackupIndex=10
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
+ 
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
diff --git a/commons.v2/src/test/scala/org/apache/hama/conf/SettingSpec.scala b/commons.v2/src/test/scala/org/apache/hama/conf/SettingSpec.scala
new file mode 100644
index 0000000..1f622dd
--- /dev/null
+++ b/commons.v2/src/test/scala/org/apache/hama/conf/SettingSpec.scala
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.conf
+
+import org.scalatest._
+
+class SettingSpec extends FlatSpec with Matchers {
+
+  "Setting" should "get/ set value" in {
+    val setting = Setting.create()
+    val newSetting = setting.set[String]("db.host", "host1").
+                             set[Int]("db.port", 1234).
+                             set[Double]("cpu.load", 1.2d)
+    assert(Some("host1") == newSetting.get("db.host"))
+    assert(Some(1234) == newSetting.get("db.port"))
+    assert(Some(1.2d) == newSetting.get("cpu.load"))
+  }
+
+}
diff --git a/membership/build.sbt b/membership/build.sbt
new file mode 100644
index 0000000..ef179da
--- /dev/null
+++ b/membership/build.sbt
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+name := "membership"
+
+libraryDependencies ++= Seq (
+  "com.google.guava" % "guava" % "27.0-jre",
+  "io.aeron" % "aeron-all" % "1.11.3",
+  "org.apache.curator" % "curator-framework" % "4.0.1",
+  "org.scalaz" %% "scalaz-zio" % "0.3.1",
+  "org.typelevel" %% "cats-core" % "1.4.0",
+  "org.apache.curator" % "curator-test" % "4.0.1" % Test
+)
diff --git a/membership/src/main/scala/org/apache/hama/membership/BSPMaster.scala b/membership/src/main/scala/org/apache/hama/membership/BSPMaster.scala
new file mode 100644
index 0000000..cde62d5
--- /dev/null
+++ b/membership/src/main/scala/org/apache/hama/membership/BSPMaster.scala
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.membership
+
+import cats.data.{ State => CatsState }
+import cats.syntax._
+import cats.implicits._
+import com.google.common.hash.Hashing
+import com.google.common.base.Charsets._
+import org.apache.hama.conf.Setting
+import org.apache.hama.util.Utils._
+
+object Address {
+
+  def create(host: String, port: Int): ValidatedResult[Address] = 
+    if(!host.isBlank && port.isValid) Address(host, port).validNel else
+    new RuntimeException(s"Invalid Addrss($host, $port) value!").invalidNel
+
+}
+final case class Address(host: String, port: Int) 
+
+object Identifier {
+
+  def create(host: String, port: Int): ValidatedResult[Identifier] = 
+    if(!host.isBlank && port.isValid) Identifier(hash(host, port)).validNel 
+    else new RuntimeException(s"Invalid Identifier hash values!").invalidNel
+
+  def hash(host: String, port: Int): String = 
+    Hashing.sipHash24().newHasher().putString(host, UTF_8).
+      putInt(port).hash.toString
+
+}
+final case class Identifier(value: String) 
+
+object BSPMaster {
+
+  lazy val defaultSetting = Setting.create()
+
+  object Event {
+
+    case object CommunicatorReady extends Event
+    case object SchedulerReady extends Event
+    case object BarrierSynchronizationReady extends Event
+    case object MonitorReady extends Event
+    case object StorageReady extends Event
+
+    case object CommunicatorFailure extends Event
+    case object SchedulerFailure extends Event
+    case object BarrierSynchronizationFailure extends Event
+    case object MonitorFailure extends Event
+    case object StorageFailure extends Event
+
+    case object CommunicatorShuttingDown extends Event
+    case object SchedulerShuttingDown extends Event
+    case object BarrierSynchronizationShuttingDown extends Event
+    case object MonitorShuttingDown extends Event
+    case object StorageShuttingDown extends Event
+
+    case object CommunicatorStopped extends Event
+    case object SchedulerStopped extends Event
+    case object BarrierSynchronizationStopped extends Event
+    case object MonitorStopped extends Event
+    case object StorageStopped extends Event
+
+    val ready = Set (
+      CommunicatorReady, SchedulerReady, BarrierSynchronizationReady, 
+      MonitorReady, StorageReady
+    )
+
+    val failures = Set (
+      CommunicatorFailure, SchedulerFailure, BarrierSynchronizationFailure, 
+      MonitorFailure, StorageFailure
+    )
+
+    val shuttingDown = Set (
+      CommunicatorShuttingDown, SchedulerShuttingDown,
+      BarrierSynchronizationShuttingDown, MonitorShuttingDown, 
+      StorageShuttingDown
+    )
+
+    val stopped = Set(
+      CommunicatorStopped, SchedulerStopped, BarrierSynchronizationStopped, 
+      MonitorStopped, StorageStopped
+    )
+  }
+  trait Event extends Product with Serializable
+
+  object State {
+
+    type NextState[N] = CatsState[Either[Failure, N], Unit]
+
+    import Event._
+
+    case object Stopped extends State
+    case object Starting extends State
+    case object Running extends State 
+    case object ShuttingDown extends State
+    trait Failure extends State 
+    case class IllegalState(expected: State, actual: State) extends Failure
+    case object ServiceFailure extends Failure // TODO: change to case class with events
+    case class Faulty(prev: Failure) extends Failure 
+    case object Recovering extends State
+
+    /**
+     * Entry state.
+     */
+    def stopped2Starting: NextState[State] = CatsState { s => s match {
+      case Right(currentState) => currentState match {
+        case Stopped => (Right(Starting), Unit) 
+        case actual@_ => (Left(IllegalState(Stopped, actual)), Unit)
+      }
+      case Left(prev) => (Left(Faulty(prev)), Unit)
+    }}
+
+    /**
+     * Transit from Starting to either Starting, Running, or Failure.
+     */
+    def starting2Running(events: Set[Event]): NextState[State] = 
+      CatsState { s => s match {
+        case Right(currentState) => currentState match {
+          case Starting => 
+            if(ready.equals(events)) (Right(Running), Unit) 
+            else if(!failures.intersect(events).isEmpty) (Left(ServiceFailure), Unit)
+            else (Right(Starting), Unit) 
+          case actual@_ => (Left(IllegalState(Starting, actual)), Unit)
+        }
+        case Left(prev) => (Left(Faulty(prev)), Unit)
+      }}
+
+    /**
+     * Transit from Running to either Running, ShuttingDown, or Failure.
+     */
+    def running2ShuttingDown(events: Set[Event]): NextState[State] = 
+      CatsState { s => s match {
+        case Right(currentState) => currentState match {
+          case Running => 
+            if(!shuttingDown.intersect(events).isEmpty) (Right(ShuttingDown), Unit)
+            else if(!failures.intersect(events).isEmpty) (Left(ServiceFailure), Unit)
+            else (Right(Running), Unit) 
+          case actual@_ => (Left(IllegalState(Running, actual)), Unit)
+        }
+        case Left(prev) => (Left(Faulty(prev)), Unit)
+      }}
+
+    def shuttingDown2Stopped(events: Set[Event]): NextState[State] =
+      CatsState { s => s match {
+        case Right(currentState) => currentState match {
+          case ShuttingDown =>
+            if(stopped.equals(events)) (Right(Stopped), Unit)
+            else if(!failures.intersect(events).isEmpty) (Left(ServiceFailure), Unit)
+            else (Right(ShuttingDown), Unit)
+          case actual@_ => (Left(IllegalState(ShuttingDown, actual)), Unit)
+        }
+        case Left(prev) => (Left(Faulty(prev)), Unit)
+      }}
+    
+  }
+  sealed trait State extends Product with Serializable
+
+  /**
+   * Builder to create BSPMaster.
+   */
+  case class Builder(host: String, port: Int) {
+
+    def withAddress(host: String, port: Int) = 
+      this.copy(host = host, port = port)
+
+    def build(): ValidatedResult[BSPMaster] = (
+      Address.create(host, port), 
+      Identifier.create(host, port),
+      Master.validNel
+    ).mapN(BSPMaster.apply)
+
+  }
+
+}
+final case class BSPMaster (address: Address, id: Identifier, role: Role)
diff --git a/membership/src/main/scala/org/apache/hama/membership/Communicator.scala b/membership/src/main/scala/org/apache/hama/membership/Communicator.scala
new file mode 100644
index 0000000..95ea10f
--- /dev/null
+++ b/membership/src/main/scala/org/apache/hama/membership/Communicator.scala
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.membership
+
+import io.aeron.Aeron
+import io.aeron.FragmentAssembler
+import io.aeron.Publication
+import io.aeron.Publication._
+import io.aeron.Subscription
+import io.aeron.logbuffer.Header
+import java.util.concurrent.atomic.AtomicBoolean
+import org.agrona.BitUtil._
+import org.agrona.BufferUtil
+import org.agrona.DirectBuffer
+import org.agrona.concurrent.BusySpinIdleStrategy
+import org.agrona.concurrent.SigInt
+import org.agrona.concurrent.UnsafeBuffer
+import org.apache.hama.logging.Logging
+import org.apache.hama.util.Utils._
+import scala.annotation.tailrec
+import scala.concurrent.duration.Duration
+import scala.concurrent.duration.DurationInt
+import scala.util.Either
+
+object Communicator {
+
+  lazy val defaultAeronContext = new Aeron.Context()
+
+  lazy val defaultChannel = Channel()
+
+  final case object BackPressured extends RuntimeException (
+    "Failure due to back pressure!"
+  )
+
+  final case object NotConnected extends RuntimeException (
+    s"Failure because publisher is not connected to subscriber!"
+  )
+
+  final case object AdminAction extends RuntimeException (
+    "Failure because of administration action!"
+  )
+
+  final case object Closed extends RuntimeException (
+    "Failure because publication is closed!"
+  )
+
+  final case object MaxPositionExceeded extends RuntimeException (
+    "Failure due to publication reaching max position!"
+  )
+
+  final case object UnknownReason extends RuntimeException (
+    "Failure due to unknown reason!"
+  )
+
+  final case class Channel(host: String = localhost, port: Int = 12345) {
+
+    require(!host.isBlank, "Channel's host value is not presented!")
+
+    require(port.isValid, s"Invalid Channel port value: $port!")
+
+    protected[membership] val channel = 
+      (host: String, port: Int) => s"aeron:udp?endpoint=$host:$port"
+
+    override def toString(): String = channel(host, port)
+  }
+
+  final case class Publisher(publication: Publication) extends Logging {
+
+    require(null != publication, "Aeron Publication is not presented!")
+
+    def isConnected (
+      implicit _deadline: Long = (System.nanoTime + 3.seconds.toNanos),
+      countLimit: Int = 3, sleep: () => Unit = { () => Thread.sleep(1) }
+    ): Boolean = {
+      @tailrec
+      def _isConnected(pub: Publication, times: Int = 0): Boolean = {
+        val connected = pub.isConnected
+        if(countLimit > times && (false == connected)) {
+          if(System.nanoTime >= _deadline) false else {
+            sleep() 
+            _isConnected(pub, times + 1)
+          }
+        } else connected
+      }
+      _isConnected(publication)
+    }
+
+    def send (
+      messageBytes: Array[Byte],
+      bufferCapacity: Int = 512, 
+      boundaryAlighment: Int = CACHE_LINE_LENGTH,
+      deadline: Long = (System.nanoTime + 3.seconds.toNanos),
+      sleep: () => Unit = { () => Thread.sleep(1) }
+    ): Either[Throwable, Publisher] = {
+      // TODO: replace with isConnected
+      while(!publication.isConnected()) { 
+        if(System.nanoTime >= deadline) { 
+          return Left(new RuntimeException (
+            s"Can't connect to Publication(${publication.channel}, ${publication.streamId})" 
+          )) 
+        }
+        sleep()
+      }
+      val buffer = new UnsafeBuffer(BufferUtil.allocateDirectAligned (
+        bufferCapacity, boundaryAlighment
+      ))
+      buffer.putBytes(0, messageBytes)
+      val result = publication.offer(buffer, 0, messageBytes.length)
+      log.debug(s"Publication.offer() returns result $result") 
+      result match {
+        case res if res < 0L && BACK_PRESSURED == res => Left(BackPressured)
+        case res if res < 0L && NOT_CONNECTED == res => Left(NotConnected)
+        case res if res < 0L && ADMIN_ACTION == res => Left(AdminAction)
+        case res if res < 0L && CLOSED == res => Left(Closed)
+        case res if res < 0L && MAX_POSITION_EXCEEDED == res => Left(MaxPositionExceeded)
+        case res if res < 0L => Left(UnknownReason)
+        case _ => Right(this)
+      }
+    } 
+
+    def run[O](f: Publication => O): O = f(publication)
+
+    def close() = try {} finally { publication.close }
+
+  }
+
+  final case class Subscriber(subscription: Subscription) extends Logging {
+
+    require(null != subscription, "Aeron Subscription is not presented!")
+
+    def receive(f: (DirectBuffer, Int, Int, Header) => Boolean) {
+      var continuous = true 
+      val handler = new FragmentAssembler({ 
+        (buffer: DirectBuffer, offset: Int, length: Int, header: Header) => {
+          val shouldContinuous = f(buffer, offset, length, header)
+          continuous = shouldContinuous
+        }
+      })
+      val idleStrategy = new BusySpinIdleStrategy()
+      while(continuous) {
+        val read = subscription.poll(handler, 10)
+        idleStrategy.idle(read)
+      }
+    }
+
+    def run[O](f: Subscription => O): O = f(subscription)
+
+    def close = try {} finally { subscription.close }
+
+  }
+
+  def create(ctx: Aeron.Context = defaultAeronContext) = 
+    Communicator(aeron = Aeron.connect(ctx))
+
+}
+
+/**
+ * Aeron client should only be created one per vm for different channels.
+ * Otherwise vm may crash with error
+ * {{{
+ *   org.agrona.concurrent.status.UnsafeBufferPosition.getVolatile() SIGSEGV 
+ * }}}
+ */
+final case class Communicator (
+  aeron: Aeron, 
+  publication: Option[Communicator.Publisher] = None, 
+  subscription: Option[Communicator.Subscriber] = None
+) extends Logging {
+
+  import Communicator._
+
+  require(null != aeron, "Aeron instance is not presented!")
+
+  def withPublication (
+    channel: Channel = defaultChannel, streamId: Int = 10
+  ) = this.copy(publication = Option (
+    Publisher(aeron.addPublication(channel.toString, streamId))
+  ))
+
+  def withSubscription (
+    channel: Channel = defaultChannel, streamId: Int = 10
+  ) = this.copy(subscription = Option (
+    Subscriber(aeron.addSubscription(channel.toString, streamId))
+  ))
+
+  def close() = try {} finally {
+    subscription.map(_.close)
+    publication.map(_.close)
+    if(!aeron.isClosed) aeron.close
+  }
+}
diff --git a/membership/src/main/scala/org/apache/hama/membership/Driver.scala b/membership/src/main/scala/org/apache/hama/membership/Driver.scala
new file mode 100644
index 0000000..385065c
--- /dev/null
+++ b/membership/src/main/scala/org/apache/hama/membership/Driver.scala
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.membership
+
+import io.aeron.driver.MediaDriver
+import io.aeron.driver.ThreadingMode._
+import org.agrona.concurrent.BusySpinIdleStrategy
+import scala.util.Try
+
+object Driver {
+
+  lazy val defaultMediaDriverContext = new MediaDriver.Context().
+    threadingMode(DEDICATED).
+    conductorIdleStrategy(new BusySpinIdleStrategy).
+    receiverIdleStrategy(new BusySpinIdleStrategy).
+    senderIdleStrategy(new BusySpinIdleStrategy)
+
+  def create(ctx: MediaDriver.Context = defaultMediaDriverContext) = 
+    new Driver(ctx)
+
+}
+
+final class Driver(ctx: MediaDriver.Context) {
+
+  def start() = Try(MediaDriver.launch(ctx)).toEither
+
+}
+
diff --git a/membership/src/main/scala/org/apache/hama/membership/Job.scala b/membership/src/main/scala/org/apache/hama/membership/Job.scala
new file mode 100644
index 0000000..3dc09fb
--- /dev/null
+++ b/membership/src/main/scala/org/apache/hama/membership/Job.scala
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.membership
+
+object Job {
+  sealed trait State
+  case object Submitted extends State
+  case object Runnable extends State
+  case object Starting extends State
+  case object Running extends State
+  case object Succeeded extends State
+  case object Failed extends State
+}
+final case class Job(state: Job.State)
diff --git a/membership/src/main/scala/org/apache/hama/membership/Role.scala b/membership/src/main/scala/org/apache/hama/membership/Role.scala
new file mode 100644
index 0000000..bcaea68
--- /dev/null
+++ b/membership/src/main/scala/org/apache/hama/membership/Role.scala
@@ -0,0 +1,22 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.membership
+
+sealed trait Role
+case object Master extends Role 
+case object Groom extends Role 
diff --git a/membership/src/main/scala/org/apache/hama/membership/Scheduler.scala b/membership/src/main/scala/org/apache/hama/membership/Scheduler.scala
new file mode 100644
index 0000000..934854c
--- /dev/null
+++ b/membership/src/main/scala/org/apache/hama/membership/Scheduler.scala
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.membership
+
+trait Scheduler
+object Scheduler {
+
+  def default(): Scheduler = new DefaultScheduler
+
+}
+protected[membership] class DefaultScheduler extends Scheduler {
+
+}
diff --git a/membership/src/main/scala/org/apache/hama/membership/package.scala b/membership/src/main/scala/org/apache/hama/membership/package.scala
new file mode 100644
index 0000000..2c7c29f
--- /dev/null
+++ b/membership/src/main/scala/org/apache/hama/membership/package.scala
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama
+
+import cats.data.ValidatedNel
+
+package object membership {
+
+  type ValidatedResult[A] = ValidatedNel[Throwable, A]
+
+  val localhost = "localhost"
+
+}
diff --git a/membership/src/test/resources/log4j.properties b/membership/src/test/resources/log4j.properties
new file mode 100644
index 0000000..deb4190
--- /dev/null
+++ b/membership/src/test/resources/log4j.properties
@@ -0,0 +1,13 @@
+log4j.rootLogger=INFO, file, stdout
+
+log4j.appender.file=org.apache.log4j.RollingFileAppender
+log4j.appender.file.File=/tmp/hama-core/communication-test.log
+log4j.appender.file.MaxFileSize=10MB
+log4j.appender.file.MaxBackupIndex=10
+log4j.appender.file.layout=org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
+ 
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.Target=System.out
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L - %m%n
diff --git a/membership/src/test/scala/org/apache/hama/membership/BSPMasterSpec.scala b/membership/src/test/scala/org/apache/hama/membership/BSPMasterSpec.scala
new file mode 100644
index 0000000..8440dc5
--- /dev/null
+++ b/membership/src/test/scala/org/apache/hama/membership/BSPMasterSpec.scala
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.membership
+
+import org.scalatest._
+import org.apache.hama.logging.Logging
+
+class BSPMasterSpec extends FlatSpec with Matchers with Logging {
+
+  import BSPMaster._
+  import BSPMaster.State._
+  import BSPMaster.Event._
+
+  "BSPMaster State" should "transit sequentially." in {
+    val starting = for {
+      s <- stopped2Starting
+    } yield s
+    val startingState = starting.run(Right(Stopped)).value._1
+    log.info(s"Actual BSPMaster state: $startingState")
+    assert(Right(Starting).equals(startingState))
+
+    val running = for {
+      r <- starting2Running(ready)
+    } yield r
+    val runningState = running.run(startingState).value._1
+    log.info(s"Actual BSPMaster state: $runningState")
+    assert(Right(Running).equals(runningState))
+
+    val shuttingdown = for {
+      sd <- running2ShuttingDown(Set(CommunicatorShuttingDown))
+    } yield sd
+    val shuttingDownState = shuttingdown.run(runningState).value._1
+    log.info(s"Actual BSPMaster state: $shuttingDownState")
+    assert(Right(ShuttingDown).equals(shuttingDownState))
+
+    val _stopped_ = for {
+      s <- shuttingDown2Stopped(stopped)
+    } yield s
+    val stoppedState = _stopped_.run(shuttingDownState).value._1
+    log.info(s"Actual BSPMaster state: $stoppedState")
+    assert(Right(Stopped).equals(stoppedState))
+      
+  }
+
+
+}
+
diff --git a/membership/src/test/scala/org/apache/hama/membership/CommunicatorSpec.scala b/membership/src/test/scala/org/apache/hama/membership/CommunicatorSpec.scala
new file mode 100644
index 0000000..5e4ac54
--- /dev/null
+++ b/membership/src/test/scala/org/apache/hama/membership/CommunicatorSpec.scala
@@ -0,0 +1,135 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hama.membership
+
+import io.aeron.driver.MediaDriver
+import org.agrona.concurrent.ShutdownSignalBarrier  
+import org.apache.hama.logging.Logging
+import org.scalatest._
+import scala.concurrent.ExecutionContext.Implicits.global
+import scala.concurrent.Future
+import scala.util.Failure
+import scala.util.Success
+import scala.util.Either
+import scalaz.zio._
+
+class CommunicatorSpec extends FlatSpec with Matchers with Logging {
+
+  import Communicator._
+
+  "Communicator with zio.Fiber" should "send/ receive message." in {
+
+    val barrier = new ShutdownSignalBarrier 
+    val driverIO = IO.sync{ 
+        val driver = Driver.create().start
+        barrier.await
+        driver
+    }
+
+    lazy val communicator = Communicator.create()
+
+    def subscriptionIO(comm: Communicator) = IO.sync { 
+      comm.withSubscription().subscription.map { sub => sub.receive { 
+        (buffer, offset, length, header) => 
+          val data = new Array[Byte](length)
+          buffer.getBytes(offset, data) 
+          val hello = new String(data)
+          log.info(s"Subscription handler receives message: $hello ")
+          assert("hello".equals(hello))
+          false // stop receive message while loop
+      }}
+      comm
+    }
+
+    def publicationIO(comm: Communicator) = IO.sync { 
+      comm.withPublication().publication.map { pub => pub.send (
+        messageBytes = "hello".getBytes
+      )}
+      comm
+    }
+
+    val handlers = for {
+      dio <- driverIO.fork
+      sio <- subscriptionIO(communicator).fork
+      pio <- publicationIO(communicator).fork
+      cio <- IO.sync { barrier.signal; IO.unit }.fork
+      _ <- cio.join
+      p <- pio.join
+      s <- sio.join
+      d <- dio.join
+    } yield (p, s, d)
+
+    new RTS {}.unsafeRun { 
+      handlers.run 
+    }.map { case (p, s, d) =>
+      p.close
+      s.close
+      d.map(_.close)
+    }
+    log.info("Done!")
+  }
+
+/*
+  "Communicator" should "send/ receive message." in {
+    val barrier = new ShutdownSignalBarrier
+    val f1 = Future {
+       val driver = Driver.create().start
+       barrier.await
+       driver
+    }
+    val communicator = Communicator.create()
+    val f2 = Future { 
+      communicator.withSubscription().subscription.map { sub => sub.receive { 
+        (buffer, offset, length, header) =>
+          val data = new Array[Byte](length)
+          buffer.getBytes(offset, data) 
+          val hello = new String(data)
+          log.info(s"Receive string $hello")
+          assert("hello".equals(hello))
+          false
+      }}
+      communicator
+    }
+    val f3 = Future {
+      communicator.withPublication().publication.map { pub => pub.send (
+        messageBytes = "hello".getBytes
+      )}
+      communicator
+    }
+    Thread.sleep(3 * 1000)  
+    barrier.signal  
+    f2.onComplete {
+      case Success(communicator) => communicator.close
+      case Failure(ex) => throw ex
+    }
+    f3.onComplete {
+      case Success(communicator) => communicator.close
+      case Failure(ex) => throw ex
+    }
+    f1.onComplete {
+      case Success(driver) => driver match {
+        case Left(ex) => throw ex
+        case Right(d) => d.close
+      }
+      case Failure(ex) => throw ex
+    }
+  }
+*/
+
+}
+
diff --git a/project/assembly.sbt b/project/assembly.sbt
new file mode 100644
index 0000000..d95475f
--- /dev/null
+++ b/project/assembly.sbt
@@ -0,0 +1 @@
+addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.7")
diff --git a/project/build.properties b/project/build.properties
new file mode 100644
index 0000000..d6e3507
--- /dev/null
+++ b/project/build.properties
@@ -0,0 +1 @@
+sbt.version=1.1.6


Mime
View raw message