Now i have a better version, but now the problem is that the saveAsTextFile do not finish the Job, in the hdfs repository only exist a partial temporary file, someone can tell me what is wrong: 

Thanks !!

object SimpleApp {

        def main(args: Array[String]){

                val conf = new SparkConf().setAppName("Csv Clipper")

                val sc = new SparkContext(conf)

                val csvPath = "hdfs://m01/user/acoronado/mov/movilidad_64mb.csv"

                val csv = sc.textFile(csvPath)

                csv.cache()

                val clipPoints = csv.map({line: String =>

                                               val Array(usuario, lat, lon, date) = line.split(",").map(_.trim)

                                               val punto = Point(lon.toDouble,lat.toDouble)

                                               val internal = geoDataExternal.get.find(f => f.geometry intersects punto)

                                               val (cve_est, cve_mun) = internal match {

                                       case Some(f:org.geoscript.feature.Feature) => {

                                                            val index = f.getAttribute(1).toString()

                                                            val existe = geoDataMun.get(index).find(f => f.geometry intersects punto)

                                                            existe match {

                                                                            case Some(f) => (f.getAttribute(1).toString, f.getAttribute(2).toString)

                                                                            case None => ("0", "0")

                                                                          }

                                                          }

                                          case None => ("0", "0")

                                        }

                                        val time = try {(new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")).parse(date.replaceAll("Z$", "+0000")).getTime().toString()} catch {case e: Exception => "0"}

                                                    line+","+time+","+cve_est+","+cve_mun

                                })

                clipPoints.saveAsTextFile("hdfs://m01/user/acoronado/mov/resultados_movilidad_60.csv")

                println("Spark Clip Exito!!!")

        }

        object geoDataMun {

          private val shp = Shapefile("/geoData/MunicipiosLatLon.shp")

          val features = shp.getFeatures.toIterable

      val result = scala.io.Source.fromFile("/geoData/indice_espacial.csv")

        .getLines()

        .toList map { line: String =>

                                       val campos = line.split(",").map(_.trim)

                                       val cve_edo = campos(0)

                                       val cve_mun = campos(1)

                                       val index = campos(2)

                                           scala.collection.immutable.List(index.toInt , (cve_edo,cve_mun))

                                    }

      val mapaIx = result.groupBy(x=>x(0)).mapValues(cves => cves.map(x => x(1)))

      def get(index:String) = {

        features.filter(f => mapaIx(index.toInt).contains((f.getAttribute(1).toString,f.getAttribute(2).toString)))

      }

        }

    object geoDataExternal{

      private val shp = Shapefile("/geoData/IndiceRecortado.shp")

      val features = shp.getFeatures

      def get: FeatureCollection[org.geoscript.feature.Schema,org.geoscript.feature.Feature]  = features

    }

}


the log of the driver is:

14/09/18 19:27:55 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@axaxaxa-cloudera-s05.xxxnetworks.com:44895] -> [akka.tcp://sparkExecutor@axaxaxa-cloudera-s05.xxxnetworks.com:43942]: Error [Association failed with [akka.tcp://sparkExecutor@axaxaxa-cloudera-s05.xxxnetworks.com:43942]] [

akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@axaxaxa-cloudera-s05.xxxnetworks.com:43942]

Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: axaxaxa-cloudera-s05.xxxnetworks.com/10.5.96.42:43942

]

14/09/18 19:27:55 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@axaxaxa-cloudera-s05.xxxnetworks.com:44895] -> [akka.tcp://sparkExecutor@axaxaxa-cloudera-s05.xxxnetworks.com:43942]: Error [Association failed with [akka.tcp://sparkExecutor@axaxaxa-cloudera-s05.xxxnetworks.com:43942]] [

akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@axaxaxa-cloudera-s05.xxxnetworks.com:43942]

Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: axaxaxa-cloudera-s05.xxxnetworks.com/10.5.96.42:43942

]

14/09/18 19:27:55 ERROR EndpointWriter: AssociationError [akka.tcp://sparkWorker@axaxaxa-cloudera-s05.xxxnetworks.com:44895] -> [akka.tcp://sparkExecutor@axaxaxa-cloudera-s05.xxxnetworks.com:43942]: Error [Association failed with [akka.tcp://sparkExecutor@axaxaxa-cloudera-s05.xxxnetworks.com:43942]] [

akka.remote.EndpointAssociationException: Association failed with [akka.tcp://sparkExecutor@axaxaxa-cloudera-s05.xxxnetworks.com:43942]

Caused by: akka.remote.transport.netty.NettyTransport$$anonfun$associate$1$$anon$2: Connection refused: axaxaxa-cloudera-s05.xxxnetworks.com/10.5.96.42:43942




On Mon, Sep 15, 2014 at 1:30 PM, Abel Coronado Iruegas <acoronadoiruegas@gmail.com> wrote:
Here an example of a working code that takes a csv with lat lon points and intersects with polygons of municipalities of Mexico, generating a new version of the file with new attributes.

Do you think that could be improved?

Thanks.

The Code:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.SparkConf
import org.geoscript.feature._
import org.geoscript.geometry._
import org.geoscript.geometry.builder._
import com.vividsolutions.jts._
import org.geoscript.layer.Shapefile
import org.geotools.feature.FeatureCollection
import java.text._
import java.util._

object SimpleApp {
        def main(args: Array[String]){
                val conf = new SparkConf().setAppName("Csv Clipper")
                val sc = new SparkContext(conf)
                val csvPath = "hdfs://x01/user/acoronado/mov/movilidad.csv" //70 Millions of rows
                val csv = sc.textFile(csvPath)
                val clipPoints = csv.map({line: String =>
                                               val Array(usuario, lat, lon, date) = line.split(",").map(_.trim)
                                               val punto = Point(lon.toDouble,lat.toDouble)
                                               val existe = geoData.get.filter(f => f.geometry intersects punto) // Geospatial operation
                                               var cve_est = "0"
                                               var cve_mun = "0"
                                               var time = "0"
                                               if(!existe.isEmpty){
                                                  val f = existe.take(1)
                                                  val ff = f.toList(0)
                                                  cve_est = ff.getAttribute(1).toString //State Code
                                                  cve_mun = ff.getAttribute(2).toString  // Municipality Code
                                                  time = (new SimpleDateFormat("yyyy-MM-dd'T'HH:mm:ss.SSSZ")).parse(date.replaceAll("Z$", "+0000")).getTime().toString()
                                               }
                                               line+","+time+","+cve_est+","+cve_mun
                                           })
                clipPoints.coalesce(1,true).saveAsTextFile("hdfs://m01/user/acoronado/mov/mov_all.csv")
                println("Spark Clip Exito!!!")
        }
        object geoData {
            private val estatal = Shapefile("/geoData/MunicipiosLatLon.shp") //This directory exist in all the nodes.
            private val estatalColl = estatal.getFeatures
            def get:FeatureCollection[org.geoscript.feature.Schema,org.geoscript.feature.Feature] = estatalColl
        }