spark-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Chan Chor Pang <chin...@indetail.co.jp>
Subject Re: REST api for monitoring Spark Streaming
Date Fri, 02 Dec 2016 02:19:02 GMT
hi everyone

I have done the coding and create the PR
the implementation is straightforward and similar to the api in spark-core
but we still need someone with streaming background to verify the patch
just to make sure everything is OK

so, please anyone can help?
https://github.com/apache/spark/pull/16000


On 11/8/16 1:46 PM, Chan Chor Pang wrote:
>
> Thank you
>
> this should take me at least a few days, and will let you know as soon 
> as the PR ready.
>
>
> On 11/8/16 11:44 AM, Tathagata Das wrote:
>> This may be a good addition. I suggest you read our guidelines on 
>> contributing code to Spark.
>>
>> https://cwiki.apache.org/confluence/display/SPARK/Contributing+to+Spark#ContributingtoSpark-PreparingtoContributeCodeChanges
>>
>> Its long document but it should have everything for you to figure out 
>> how to contribute your changes. I hope to see your changes in a 
>> Github PR soon!
>>
>> TD
>>
>> On Mon, Nov 7, 2016 at 5:30 PM, Chan Chor Pang 
>> <chin-sh@indetail.co.jp <mailto:chin-sh@indetail.co.jp>> wrote:
>>
>>     hi everyone
>>
>>     it seems that there is not much who interested in creating a api
>>     for Streaming.
>>     never the less I still really want the api for monitoring.
>>     so i tried to see if i can implement by my own.
>>
>>     after some test,
>>     i believe i can achieve the goal by
>>     1. implement a package(org.apache.spark.streaming.status.api.v1)
>>     that serve the same purpose as org.apache.spark.status.api.v1
>>     2. register the api path through StreamingTab
>>     and 3. retrive the streaming informateion through
>>     StreamingJobProgressListener
>>
>>     what my most concern now is will my implementation be able to
>>     merge to the main stream.
>>
>>     im new to open source project, so anyone could please show me
>>     some light?
>>     how should/could i proceed to make my implementation to be able
>>     to merge to the main stream.
>>
>>
>>     here is my test code base on v1.6.0
>>     ###################################
>>     diff --git
>>     a/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/JacksonMessageWriter.scala
>>     b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/JacksonMessageWriter.scala
>>     new file mode 100644
>>     index 0000000..690e2d8
>>     --- /dev/null
>>     +++
>>     b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/JacksonMessageWriter.scala
>>     @@ -0,0 +1,68 @@
>>     +package org.apache.spark.streaming.status.api.v1
>>     +
>>     +import java.io.OutputStream
>>     +import java.lang.annotation.Annotation
>>     +import java.lang.reflect.Type
>>     +import java.text.SimpleDateFormat
>>     +import java.util.{Calendar, SimpleTimeZone}
>>     +import javax.ws.rs.Produces
>>     +import javax.ws.rs.core.{MediaType, MultivaluedMap}
>>     +import javax.ws.rs.ext.{MessageBodyWriter, Provider}
>>     +
>>     +import com.fasterxml.jackson.annotation.JsonInclude
>>     +import com.fasterxml.jackson.databind.{ObjectMapper,
>>     SerializationFeature}
>>     +
>>     +@Provider
>>     +@Produces(Array(MediaType.APPLICATION_JSON))
>>     +private[v1] class JacksonMessageWriter extends
>>     MessageBodyWriter[Object]{
>>     +
>>     +  val mapper = new ObjectMapper() {
>>     +    override def writeValueAsString(t: Any): String = {
>>     +      super.writeValueAsString(t)
>>     +    }
>>     +  }
>>     +
>>     mapper.registerModule(com.fasterxml.jackson.module.scala.DefaultScalaModule)
>>     +  mapper.enable(SerializationFeature.INDENT_OUTPUT)
>>     +  mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
>>     +  mapper.setDateFormat(JacksonMessageWriter.makeISODateFormat)
>>     +
>>     +  override def isWriteable(
>>     +      aClass: Class[_],
>>     +      `type`: Type,
>>     +      annotations: Array[Annotation],
>>     +      mediaType: MediaType): Boolean = {
>>     +      true
>>     +  }
>>     +
>>     +  override def writeTo(
>>     +      t: Object,
>>     +      aClass: Class[_],
>>     +      `type`: Type,
>>     +      annotations: Array[Annotation],
>>     +      mediaType: MediaType,
>>     +      multivaluedMap: MultivaluedMap[String, AnyRef],
>>     +      outputStream: OutputStream): Unit = {
>>     +    t match {
>>     +      //case ErrorWrapper(err) =>
>>     outputStream.write(err.getBytes("utf-8"))
>>     +      case _ => mapper.writeValue(outputStream, t)
>>     +    }
>>     +  }
>>     +
>>     +  override def getSize(
>>     +      t: Object,
>>     +      aClass: Class[_],
>>     +      `type`: Type,
>>     +      annotations: Array[Annotation],
>>     +      mediaType: MediaType): Long = {
>>     +    -1L
>>     +  }
>>     +}
>>     +
>>     +private[spark] object JacksonMessageWriter {
>>     +  def makeISODateFormat: SimpleDateFormat = {
>>     +    val iso8601 = new
>>     SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSS'GMT'")
>>     +    val cal = Calendar.getInstance(new SimpleTimeZone(0, "GMT"))
>>     +    iso8601.setCalendar(cal)
>>     +    iso8601
>>     +  }
>>     +}
>>     diff --git
>>     a/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/StreamingApiRootResource.scala
>>     b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/StreamingApiRootResource.scala
>>     new file mode 100644
>>     index 0000000..f4e43dd
>>     --- /dev/null
>>     +++
>>     b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/StreamingApiRootResource.scala
>>     @@ -0,0 +1,74 @@
>>     +package org.apache.spark.streaming.status.api.v1
>>     +
>>     +import org.apache.spark.status.api.v1.UIRoot
>>     +import org.eclipse.jetty.server.handler.ContextHandler
>>     +import org.eclipse.jetty.servlet.ServletContextHandler
>>     +import org.eclipse.jetty.servlet.ServletHolder
>>     +
>>     +import com.sun.jersey.spi.container.servlet.ServletContainer
>>     +
>>     +import javax.servlet.ServletContext
>>     +import javax.ws.rs.Path
>>     +import javax.ws.rs.Produces
>>     +import javax.ws.rs.core.Context
>>     +import org.apache.spark.streaming.ui.StreamingJobProgressListener
>>     +
>>     +
>>     +@Path("/v1")
>>     +private[v1] class StreamingApiRootResource extends
>>     UIRootFromServletContext{
>>     +
>>     +  @Path("streaminginfo")
>>     +  def getStreamingInfo(): StreamingInfoResource = {
>>     +    new StreamingInfoResource(uiRoot,listener)
>>     +  }
>>     +
>>     +}
>>     +
>>     +private[spark] object StreamingApiRootResource {
>>     +
>>     +  def getServletHandler(uiRoot: UIRoot,
>>     listener:StreamingJobProgressListener): ServletContextHandler = {
>>     +
>>     +    val jerseyContext = new
>>     ServletContextHandler(ServletContextHandler.NO_SESSIONS)
>>     +    jerseyContext.setContextPath("/streamingapi")
>>     +    val holder: ServletHolder = new
>>     ServletHolder(classOf[ServletContainer])
>>     +
>>     holder.setInitParameter("com.sun.jersey.config.property.resourceConfigClass",
>>     +      "com.sun.jersey.api.core.PackagesResourceConfig")
>>     + holder.setInitParameter("com.sun.jersey.config.property.packages",
>>     +      "org.apache.spark.streaming.st
>>     <http://org.apache.spark.streaming.st>atus.api.v1")
>>     +
>>     //holder.setInitParameter(ResourceConfig.PROPERTY_CONTAINER_REQUEST_FILTERS,
>>     +    //  classOf[SecurityFilter].getCanonicalName)
>>     +    UIRootFromServletContext.setUiRoot(jerseyContext, uiRoot)
>>     +    UIRootFromServletContext.setListener(jerseyContext, listener)
>>     +    jerseyContext.addServlet(holder, "/*")
>>     +    jerseyContext
>>     +  }
>>     +}
>>     +
>>     +private[v1] object UIRootFromServletContext {
>>     +
>>     +  private val attribute = getClass.getCanonicalName
>>     +
>>     +  def setListener(contextHandler:ContextHandler, listener:
>>     StreamingJobProgressListener):Unit={
>>     +   contextHandler.setAttribute(attribute+"_listener", listener)
>>     +  }
>>     +
>>     +  def
>>     getListener(context:ServletContext):StreamingJobProgressListener={
>>     +
>>     context.getAttribute(attribute+"_listener").asInstanceOf[StreamingJobProgressListener]
>>     +  }
>>     +
>>     +  def setUiRoot(contextHandler: ContextHandler, uiRoot: UIRoot):
>>     Unit = {
>>     +    contextHandler.setAttribute(attribute, uiRoot)
>>     +  }
>>     +
>>     +  def getUiRoot(context: ServletContext): UIRoot = {
>>     +    context.getAttribute(attribute).asInstanceOf[UIRoot]
>>     +  }
>>     +}
>>     +
>>     +private[v1] trait UIRootFromServletContext {
>>     +  @Context
>>     +  var servletContext: ServletContext = _
>>     +
>>     +  def uiRoot: UIRoot =
>>     UIRootFromServletContext.getUiRoot(servletContext)
>>     +  def listener: StreamingJobProgressListener =
>>     UIRootFromServletContext.getListener(servletContext)
>>     +}
>>     diff --git
>>     a/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/StreamingInfoResource.scala
>>     b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/StreamingInfoResource.scala
>>     new file mode 100644
>>     index 0000000..d5fc11b
>>     --- /dev/null
>>     +++
>>     b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/StreamingInfoResource.scala
>>     @@ -0,0 +1,22 @@
>>     +package org.apache.spark.streaming.status.api.v1
>>     +
>>     +import org.apache.spark.status.api.v1.SimpleDateParam
>>     +import org.apache.spark.status.api.v1.UIRoot
>>     +
>>     +import javax.ws.rs.GET
>>     +import javax.ws.rs.Produces
>>     +import javax.ws.rs.core.MediaType
>>     +import org.apache.spark.streaming.StreamingContext
>>     +import org.apache.spark.streaming.ui.StreamingJobProgressListener
>>     +
>>     +@Produces(Array(MediaType.APPLICATION_JSON))
>>     +private[v1] class StreamingInfoResource(uiRoot: UIRoot,
>>     listener: StreamingJobProgressListener){
>>     +
>>     +  @GET
>>     +  def streamingInfo()
>>     +  :Iterator[StreamingInfo]={
>>     +    var v = listener.numTotalCompletedBatches
>>     +    Iterator(new StreamingInfo("testname",v))
>>     +
>>     +  }
>>     +}
>>     \ No newline at end of file
>>     diff --git
>>     a/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/api.scala
>>     b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/api.scala
>>     new file mode 100644
>>     index 0000000..958dd41
>>     --- /dev/null
>>     +++
>>     b/streaming/src/main/scala/org/apache/spark/streaming/status/api/v1/api.scala
>>     @@ -0,0 +1,6 @@
>>     +package org.apache.spark.streaming.status.api.v1
>>     +
>>     +class StreamingInfo private[streaming](
>>     +    val name:String,
>>     +    val completedBatchCount:Long)
>>     +
>>     \ No newline at end of file
>>     diff --git
>>     a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
>>     b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
>>     index bc53f2a..877abf4 100644
>>     ---
>>     a/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
>>     +++
>>     b/streaming/src/main/scala/org/apache/spark/streaming/ui/StreamingTab.scala
>>     @@ -22,6 +22,7 @@ import org.apache.spark.streaming.StreamingContext
>>      import org.apache.spark.ui.{SparkUI, SparkUITab}
>>
>>      import StreamingTab._
>>     +import
>>     org.apache.spark.streaming.status.api.v1.StreamingApiRootResource
>>
>>      /**
>>       * Spark Web UI tab that shows statistics of a streaming job.
>>     @@ -39,6 +40,9 @@ private[spark] class StreamingTab(val ssc:
>>     StreamingContext)
>>        ssc.sc.addSparkListener(listener)
>>        attachPage(new StreamingPage(this))
>>        attachPage(new BatchPage(this))
>>     +
>>     +  //register streaming api
>>     +
>>     parent.attachHandler(StreamingApiRootResource.getServletHandler(parent,listener));
>>
>>        def attach() {
>>          getSparkUI(ssc).attachTab(this)
>>
>>
>>     On 9/14/16 10:13 AM, Chan Chor Pang wrote:
>>
>>         Hi everyone,
>>
>>         Trying to monitoring our streaming application using Spark
>>         REST interface
>>         only to found that there is no such thing for Streaming.
>>
>>         I wonder if anyone already working on this or I should just
>>         start implementing my own one?
>>
>>
>>
>>     ---------------------------------------------------------------------
>>     To unsubscribe e-mail: dev-unsubscribe@spark.apache.org
>>     <mailto:dev-unsubscribe@spark.apache.org>
>>
>>
>

-- 
---*------------------------------------------------*---*---*---*---
株式会社INDETAIL
ニアショア総合サービス事業本部
ゲームサービス事業部
陳 楚鵬
E-mail :chin-sh@indetail.co.jp
URL : http://www.indetail.co.jp

【札幌本社/LABO/LABO2】
〒060-0042
札幌市中央区大通西9丁目3番地33
キタコーセンタービルディング
(札幌本社/LABO2:2階、LABO:9階)
TEL:011-206-9235 FAX:011-206-9236

【東京支店】
〒108-0014
東京都港区芝5丁目29番20号 クロスオフィス三田
TEL:03-6809-6502 FAX:03-6809-6504

【名古屋サテライト】
〒460-0002
愛知県名古屋市中区丸の内3丁目17番24号 NAYUTA BLD
TEL:052-971-0086


Mime
View raw message