spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Bin Wang <wbi...@gmail.com>
Subject Re: How to close connection in mapPartitions?
Date Fri, 23 Oct 2015 06:17:09 GMT
BTW, "lines" is a DStream.

Bin Wang <wbin00@gmail.com>于2015年10月23日周五 下午2:16写道:

> I use mapPartitions to open connections to Redis, I write it like this:
>
>     val seqs = lines.mapPartitions { lines =>
>       val cache = new RedisCache(redisUrl, redisPort)
>       val result = lines.map(line => Parser.parseBody(line, cache))
>       cache.redisPool.close
>       result
>     }
>
> But it seems the pool is closed before I use it. Am I doing anything
> wrong? Here is the error:
>
> java.lang.IllegalStateException: Pool not open
> 	at org.apache.commons.pool.BaseObjectPool.assertOpen(BaseObjectPool.java:140)
> 	at org.apache.commons.pool.impl.StackObjectPool.borrowObject(StackObjectPool.java:166)
> 	at com.redis.RedisClientPool.withClient(Pool.scala:34)
> 	at com.appadhoc.data.cache.RedisCache.getExpId(RedisCache.scala:17)
> 	at com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:29)
> 	at com.appadhoc.data.parser.Parser$$anonfun$parseBody$1.apply(Parser.scala:26)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> 	at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
> 	at scala.collection.immutable.List.foreach(List.scala:318)
> 	at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
> 	at scala.collection.AbstractTraversable.map(Traversable.scala:105)
> 	at com.appadhoc.data.parser.Parser$.parseBody(Parser.scala:26)
> 	at com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33)
> 	at com.appadhoc.data.main.StatCounter$$anonfun$2$$anonfun$3.apply(StatCounter.scala:33)
> 	at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
> 	at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:371)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> 	at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:327)
> 	at org.apache.spark.util.collection.ExternalSorter.insertAll(ExternalSorter.scala:209)
> 	at org.apache.spark.shuffle.sort.SortShuffleWriter.write(SortShuffleWriter.scala:73)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
> 	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
> 	at org.apache.spark.scheduler.Task.run(Task.scala:88)
> 	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:745)
>
>

Mime
View raw message