spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mekal Zheng <mekal.zh...@gmail.com>
Subject Re: scala.MatchError on stand-alone cluster mode
Date Mon, 18 Jul 2016 02:34:44 GMT
Hi, Rishabh Bhardwaj, Saisai Shao,

Thx for your help. I hava found that the key reason is I forgot to upload
the jar package to all of the node in cluster, so after the master
distributed the job and selected one node as the driver,  the driver can
not find the jar package and throw an exception.

-- 
Mekal Zheng
Sent with Airmail

发件人: Rishabh Bhardwaj <rbnext29@gmail.com> <rbnext29@gmail.com>
回复: Rishabh Bhardwaj <rbnext29@gmail.com> <rbnext29@gmail.com>
日期: July 15, 2016 at 17:28:43
至: Saisai Shao <sai.sai.shao@gmail.com> <sai.sai.shao@gmail.com>
抄送: Mekal Zheng <mekal.zheng@gmail.com> <mekal.zheng@gmail.com>, spark users
<user@spark.apache.org> <user@spark.apache.org>
主题:  Re: scala.MatchError on stand-alone cluster mode

Hi Mekal,
It may be a scala version mismatch error,kindly check whether you are
running both (your streaming app and spark cluster ) on 2.10 scala or 2.11.

Thanks,
Rishabh.

On Fri, Jul 15, 2016 at 1:38 PM, Saisai Shao <sai.sai.shao@gmail.com> wrote:

> The error stack is throwing from your code:
>
> Caused by: scala.MatchError: [Ljava.lang.String;@68d279ec (of class
> [Ljava.lang.String;)
>         at com.jd.deeplog.LogAggregator$.main(LogAggregator.scala:29)
>         at com.jd.deeplog.LogAggregator.main(LogAggregator.scala)
>
> I think you should debug the code yourself, it may not be the problem of
> Spark.
>
> On Fri, Jul 15, 2016 at 3:17 PM, Mekal Zheng <mekal.zheng@gmail.com>
> wrote:
>
>> Hi,
>>
>> I have a Spark Streaming job written in Scala and is running well on
>> local and client mode, but when I submit it on cluster mode, the driver
>> reported an error shown as below.
>> Is there anyone know what is wrong here?
>> pls help me!
>>
>> the Job CODE is after
>>
>> 16/07/14 17:28:21 DEBUG ByteBufUtil:
>> -Dio.netty.threadLocalDirectBufferSize: 65536
>> 16/07/14 17:28:21 DEBUG NetUtil: Loopback interface: lo (lo,
>> 0:0:0:0:0:0:0:1%lo)
>> 16/07/14 17:28:21 DEBUG NetUtil: /proc/sys/net/core/somaxconn: 32768
>> 16/07/14 17:28:21 DEBUG TransportServer: Shuffle server started on port
>> :43492
>> 16/07/14 17:28:21 INFO Utils: Successfully started service 'Driver' on
>> port 43492.
>> 16/07/14 17:28:21 INFO WorkerWatcher: Connecting to worker spark://
>> Worker@172.20.130.98:23933
>> 16/07/14 17:28:21 DEBUG TransportClientFactory: Creating new connection
>> to /172.20.130.98:23933
>> Exception in thread "main" java.lang.reflect.InvocationTargetException
>>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>>         at
>> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>>         at
>> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>>         at java.lang.reflect.Method.invoke(Method.java:497)
>>         at
>> org.apache.spark.deploy.worker.DriverWrapper$.main(DriverWrapper.scala:58)
>>         at
>> org.apache.spark.deploy.worker.DriverWrapper.main(DriverWrapper.scala)
>> Caused by: scala.MatchError: [Ljava.lang.String;@68d279ec (of class
>> [Ljava.lang.String;)
>>         at com.jd.deeplog.LogAggregator$.main(LogAggregator.scala:29)
>>         at com.jd.deeplog.LogAggregator.main(LogAggregator.scala)
>>         ... 6 more
>>
>> ==================
>> Job CODE:
>>
>> object LogAggregator {
>>
>>   val batchDuration = Seconds(5)
>>
>>   def main(args:Array[String]) {
>>
>>     val usage =
>>       """Usage: LogAggregator <zkQuorum> <group> <topics> <numThreads>
<logFormat> <logSeparator> <batchDuration> <destType> <destPath>
>>         |  logFormat: fieldName:fieldRole[,fieldName:fieldRole] each field must have
both name and role
>>         |  logFormat.role: can be key|avg|enum|sum|ignore
>>       """.stripMargin
>>
>>     if (args.length < 9) {
>>       System.err.println(usage)
>>       System.exit(1)
>>     }
>>
>>     val Array(zkQuorum, group, topics, numThreads, logFormat, logSeparator, batchDuration,
destType, destPath) = args
>>
>>     println("Start streaming calculation...")
>>
>>     val conf = new SparkConf().setAppName("LBHaproxy-LogAggregator")
>>     val ssc = new StreamingContext(conf, Seconds(batchDuration.toInt))
>>
>>     val topicMap = topics.split(",").map((_, numThreads.toInt)).toMap
>>
>>     val lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(_._2)
>>
>>     val logFields = logFormat.split(",").map(field => {
>>       val fld = field.split(":")
>>       if (fld.size != 2) {
>>         System.err.println("Wrong parameters for logFormat!\n")
>>         System.err.println(usage)
>>         System.exit(1)
>>       }
>>       // TODO: ensure the field has both 'name' and 'role'
>>       new LogField(fld(0), fld(1))
>>     })
>>
>>     val keyFields = logFields.filter(logFieldName => {
>>       logFieldName.role == "key"
>>     })
>>     val keys = keyFields.map(key => {
>>       key.name
>>     })
>>
>>     val logsByKey = lines.map(line => {
>>       val log = new Log(logFields, line, logSeparator)
>>       log.toMap
>>     }).filter(log => log.nonEmpty).map(log => {
>>       val keys = keyFields.map(logField => {
>>         log(logField.name).value
>>       })
>>
>>       val key = keys.reduce((key1, key2) => {
>>         key1.asInstanceOf[String] + key2.asInstanceOf[String]
>>       })
>>
>>       val fullLog = log + ("count" -> new LogSegment("sum", 1))
>>
>>       (key, fullLog)
>>     })
>>
>>
>>     val aggResults = logsByKey.reduceByKey((log_a, log_b) => {
>>
>>       log_a.map(logField => {
>>         val logFieldName = logField._1
>>         val logSegment_a = logField._2
>>         val logSegment_b = log_b(logFieldName)
>>
>>         val segValue = logSegment_a.role match {
>>           case "avg" => {
>>             logSegment_a.value.toString.toInt + logSegment_b.value.toString.toInt
>>           }
>>           case "sum" => {
>>             logSegment_a.value.toString.toInt + logSegment_b.value.toString.toInt
>>           }
>>           case "enum" => {
>>             val list_a = logSegment_a.value.asInstanceOf[List[(String, Int)]]
>>             val list_b = logSegment_b.value.asInstanceOf[List[(String, Int)]]
>>             list_a ++ list_b
>>           }
>>           case _ => logSegment_a.value
>>         }
>>         (logFieldName, new LogSegment(logSegment_a.role, segValue))
>>       })
>>     }).map(logRecord => {
>>       val log = logRecord._2
>>       val count = log("count").value.toString.toInt
>>
>>
>>       val logContent = log.map(logField => {
>>         val logFieldName = logField._1
>>         val logSegment = logField._2
>>         val fieldValue = logSegment.role match {
>>           case "avg" => {
>>             logSegment.value.toString.toInt / count
>>           }
>>           case "enum" => {
>>             val enumList = logSegment.value.asInstanceOf[List[(String, Int)]]
>>             val enumJson = enumList.groupBy(_._1).map(el => el._2.reduce((e1,
e2) => (e1._1, e1._2.toString.toInt + e2._2.toString.toInt)))
>>             JSONObject(enumJson)
>>           }
>>           case _ => logSegment.value
>>         }
>>         (logFieldName, fieldValue)
>>       })
>>
>>       logContent + ("count" -> count)
>>     })
>>
>>     if (destType == "hbase") {
>>
>>       val hbaseQuorum = "localhost"
>>       val hbaseClientPort = "2181"
>>       val hbaseStore = new HBaseStore(hbaseQuorum, hbaseClientPort, keys.toList,
"tb_", true)
>>
>>       val jobConf = hbaseStore.jobConf()
>>
>>       aggResults.foreachRDD((rdd, time) => {
>>         rdd.map(record => {
>>           val logPut = hbaseStore.convert(record, time)
>>           (new ImmutableBytesWritable, logPut)
>>         }).saveAsHadoopDataset(jobConf)
>>       })
>>     } else if (destType == "file") {
>>       aggResults.foreachRDD((rdd, time) => {
>>         rdd.foreach(record => {
>>           val res = record + ("timestamp" -> time.milliseconds)
>>           io.File(destPath).appendAll(res.toString() + "\n")
>>         })
>>       })
>>     }
>>
>>     ssc.start()
>>     ssc.awaitTermination()
>>   }
>>
>>
>> --
>> Mekal Zheng
>> Sent with Airmail
>>
>
>

Mime
View raw message