spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "萝卜丝炒饭" <1427357...@qq.com>
Subject Re: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext
Date Sat, 11 Mar 2017 08:10:05 GMT
i think the val you defined are only valid in the driver, you can  try boardcast variable.

---Original---
From: "lk_spark"<lk_spark@163.com>
Date: 2017/2/27 11:14:23
To: "user.spark"<user@spark.apache.org>;
Subject: java.io.NotSerializableException: org.apache.spark.streaming.StreamingContext


  hi,all:
        I want to extract some info from kafka  useing sparkstream,my code like :
        
     val keyword = ""
    val system =  "dmp"
    val datetime_idx = 0
    val  datetime_length = 23
    val logLevelBeginIdx =  datetime_length + 2 - 1
    val logLevelMaxLenght = 5
        
     val lines = messages.filter(record =>  record.value().matches("\\d{4}.*")).map(record
=>  {
      val assembly =  record.topic()
      val value =  record.value
      val datatime =  value.substring(datetime_idx, datetime_length -  1)
      val level =  value.substring(logLevelBeginIdx, logLevelBeginIdx + logLevelMaxLenght
-  1)
       (assembly,value,datatime,level)
    })
     
     I will get error :
     Caused by: java.io.NotSerializableException:  org.apache.spark.streaming.StreamingContext
Serialization stack:
 -  object not serializable (class: org.apache.spark.streaming.StreamingContext,  value: org.apache.spark.streaming.StreamingContext@5a457aa1)
 -  field (class: $iw, name: streamingContext, type: class  org.apache.spark.streaming.StreamingContext)
 - object (class $iw, $iw@38eb2140)
 - field (class: $iw, name:  $iw, type: class $iw)
 - object (class $iw, $iw@2a3ced3d)
 - field (class: $iw, name:  $iw, type: class $iw)
 - object (class $iw, $iw@7c5dbca5)
....
 ==================================================================================
   if I change the parameter to constant I will not got  error  :
   
   val lines = messages.filter(record =>  record.value().matches("\\d{4}.*")).map(record
=>  {
      val assembly =  record.topic()
      val value =  record.value
      val datatime = value.substring(0,  22)
      val level = value.substring(24,  27)
       (assembly,value,datatime,level)
       
    })
  
 how can I pass parameter to the map function.
  
  2017-02-27
 
 lk_spark
Mime
View raw message