spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Davies Liu <dav...@databricks.com>
Subject Re: trouble with jsonRDD and jsonFile in pyspark
Date Wed, 06 Aug 2014 06:45:21 GMT
There is a PR to fix this: https://github.com/apache/spark/pull/1802

On Tue, Aug 5, 2014 at 10:11 PM, Brad Miller <bmiller1@eecs.berkeley.edu> wrote:
> I concur that printSchema works; it just seems to be operations that use the
> data where trouble happens.
>
> Thanks for posting the bug.
>
> -Brad
>
>
> On Tue, Aug 5, 2014 at 10:05 PM, Yin Huai <yhuai@databricks.com> wrote:
>>
>> I tried jsonRDD(...).printSchema() and it worked. Seems the problem is
>> when we take the data back to the Python side, SchemaRDD#javaToPython failed
>> on your cases. I have created
>> https://issues.apache.org/jira/browse/SPARK-2875 to track it.
>>
>> Thanks,
>>
>> Yin
>>
>>
>> On Tue, Aug 5, 2014 at 9:20 PM, Brad Miller <bmiller1@eecs.berkeley.edu>
>> wrote:
>>>
>>> Hi All,
>>>
>>> I checked out and built master.  Note that Maven had a problem building
>>> Kafka (in my case, at least); I was unable to fix this easily so I moved on
>>> since it seemed unlikely to have any influence on the problem at hand.
>>>
>>> Master improves functionality (including the example Nicholas just
>>> demonstrated) but unfortunately there still seems to be a bug related to
>>> using dictionaries as values.  I've put some code below to illustrate the
>>> bug.
>>>
>>> # dictionary as value works fine
>>> > print sqlCtx.jsonRDD(sc.parallelize(['{"key0": {"key1":
>>> > "value"}}'])).collect()
>>> [Row(key0=Row(key1=u'value'))]
>>>
>>> # dictionary as value works fine, even when inner keys are varied
>>> > print sqlCtx.jsonRDD(sc.parallelize(['{"key0": {"key1": "value1"}}',
>>> > '{"key0": {"key2": "value2"}}'])).collect()
>>> [Row(key0=Row(key1=u'value1', key2=None)), Row(key0=Row(key1=None,
>>> key2=u'value2'))]
>>>
>>> # dictionary as value works fine when inner keys are missing and outer
>>> key is present
>>> > print sqlCtx.jsonRDD(sc.parallelize(['{"key0": {}}', '{"key0": {"key1":
>>> > "value1"}}'])).collect()
>>> [Row(key0=Row(key1=None)), Row(key0=Row(key1=u'value1'))]
>>>
>>> # dictionary as value FAILS when outer key is missing
>>> > print sqlCtx.jsonRDD(sc.parallelize(['{}', '{"key0": {"key1":
>>> > "value1"}}'])).collect()
>>> Py4JJavaError: An error occurred while calling o84.collect.
>>> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 14 in stage 7.0 failed 4 times, most recent failure: Lost task 14.3 in stage
>>> 7.0 (TID 242, engelland.research.intel-research.net):
>>> java.lang.NullPointerException...
>>>
>>> # dictionary as value FAILS when outer key is present with null value
>>> > print sqlCtx.jsonRDD(sc.parallelize(['{"key0": null}', '{"key0":
>>> > {"key1": "value1"}}'])).collect()
>>> Py4JJavaError: An error occurred while calling o98.collect.
>>> : org.apache.spark.SparkException: Job aborted due to stage failure: Task
>>> 14 in stage 9.0 failed 4 times, most recent failure: Lost task 14.3 in stage
>>> 9.0 (TID 305, kunitz.research.intel-research.net):
>>> java.lang.NullPointerException...
>>>
>>> # nested lists work even when outer key is missing
>>> > print sqlCtx.jsonRDD(sc.parallelize(['{}', '{"key0": [["item0",
>>> > "item1"], ["item2", "item3"]]}'])).collect()
>>> [Row(key0=None), Row(key0=[[u'item0', u'item1'], [u'item2', u'item3']])]
>>>
>>> Is anyone able to replicate this behavior?
>>>
>>> -Brad
>>>
>>>
>>>
>>>
>>> On Tue, Aug 5, 2014 at 6:11 PM, Michael Armbrust <michael@databricks.com>
>>> wrote:
>>>>
>>>> We try to keep master very stable, but this is where active development
>>>> happens. YMMV, but a lot of people do run very close to master without
>>>> incident (myself included).
>>>>
>>>> branch-1.0 has been cut for a while and we only merge bug fixes into it
>>>> (this is more strict for non-alpha components like spark core.).  For Spark
>>>> SQL, this branch is pretty far behind as the project is very young and we
>>>> are fixing bugs / adding features very rapidly compared with Spark core.
>>>>
>>>> branch-1.1 was just cut and is being QAed for a release, at this point
>>>> its likely the same as master, but that will change as features start
>>>> getting added to master in the coming weeks.
>>>>
>>>>
>>>>
>>>> On Tue, Aug 5, 2014 at 5:38 PM, Nicholas Chammas
>>>> <nicholas.chammas@gmail.com> wrote:
>>>>>
>>>>> collect() works, too.
>>>>>
>>>>> >>> sqlContext.jsonRDD(sc.parallelize(['{"foo":[[1,2,3], [4,5,6]]}',
>>>>> >>> '{"foo":[[1,2,3], [4,5,6]]}'])).collect()
>>>>> [Row(foo=[[1, 2, 3], [4, 5, 6]]), Row(foo=[[1, 2, 3], [4, 5, 6]])]
>>>>>
>>>>> Can’t answer your question about branch stability, though. Spark is
a
>>>>> very active project, so stuff is happening all the time.
>>>>>
>>>>> Nick
>>>>>
>>>>>
>>>>>
>>>>> On Tue, Aug 5, 2014 at 7:20 PM, Brad Miller
>>>>> <bmiller1@eecs.berkeley.edu> wrote:
>>>>>>
>>>>>> Hi Nick,
>>>>>>
>>>>>> Can you check that the call to "collect()" works as well as
>>>>>> "printSchema()"?  I actually experience that "printSchema()" works
fine, but
>>>>>> then it crashes on "collect()".
>>>>>>
>>>>>> In general, should I expect the master (which seems to be on
>>>>>> branch-1.1) to be any more/less stable than branch-1.0?  While it
would be
>>>>>> great to have this fixed, it would be good to know if I should expect
lots
>>>>>> of other instability.
>>>>>>
>>>>>> best,
>>>>>> -Brad
>>>>>>
>>>>>>
>>>>>> On Tue, Aug 5, 2014 at 4:15 PM, Nicholas Chammas
>>>>>> <nicholas.chammas@gmail.com> wrote:
>>>>>>>
>>>>>>> This looks to be fixed in master:
>>>>>>>
>>>>>>> >>> from pyspark.sql import SQLContext
>>>>>>> >>> sqlContext = SQLContext(sc)
>>>>>>> >>> sc.parallelize(['{"foo":[[1,2,3], [4,5,6]]}', '{"foo":[[1,2,3],
>>>>>>> >>> [4,5,6]]}'
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> ])
>>>>>>> ParallelCollectionRDD[5] at parallelize at PythonRDD.scala:315
>>>>>>> >>> sqlContext.jsonRDD(sc.parallelize(['{"foo":[[1,2,3],
[4,5,6]]}',
>>>>>>> >>> '{"foo":[[1,2,3], [4,5,6]]}']))
>>>>>>> MapPartitionsRDD[14] at mapPartitions at SchemaRDD.scala:408
>>>>>>> >>> sqlContext.jsonRDD(sc.parallelize(['{"foo":[[1,2,3],
[4,5,6]]}',
>>>>>>> >>> '{"foo":[[1,2,3], [4,5,6]]}'])).printSchema()
>>>>>>> root
>>>>>>>  |-- foo: array (nullable = true)
>>>>>>>  |    |-- element: array (containsNull = false)
>>>>>>>  |    |    |-- element: integer (containsNull = false)
>>>>>>>
>>>>>>> >>>
>>>>>>>
>>>>>>> Nick
>>>>>>>
>>>>>>>
>>>>>>>
>>>>>>> On Tue, Aug 5, 2014 at 7:12 PM, Brad Miller
>>>>>>> <bmiller1@eecs.berkeley.edu> wrote:
>>>>>>>>
>>>>>>>> Hi All,
>>>>>>>>
>>>>>>>> I've built and deployed the current head of branch-1.0, but
it seems
>>>>>>>> to have only partly fixed the bug.
>>>>>>>>
>>>>>>>> This code now runs as expected with the indicated output:
>>>>>>>> > srdd = sqlCtx.jsonRDD(sc.parallelize(['{"foo":[1,2,3]}',
>>>>>>>> > '{"foo":[4,5,6]}']))
>>>>>>>> > srdd.printSchema()
>>>>>>>> root
>>>>>>>>  |-- foo: ArrayType[IntegerType]
>>>>>>>> > srdd.collect()
>>>>>>>> [{u'foo': [1, 2, 3]}, {u'foo': [4, 5, 6]}]
>>>>>>>>
>>>>>>>> This code still crashes:
>>>>>>>> > srdd = sqlCtx.jsonRDD(sc.parallelize(['{"foo":[[1,2,3],
>>>>>>>> > [4,5,6]]}', '{"foo":[[1,2,3], [4,5,6]]}']))
>>>>>>>> > srdd.printSchema()
>>>>>>>> root
>>>>>>>>  |-- foo: ArrayType[ArrayType(IntegerType)]
>>>>>>>> > srdd.collect()
>>>>>>>> Py4JJavaError: An error occurred while calling o63.collect.
>>>>>>>> : org.apache.spark.SparkException: Job aborted due to stage
failure:
>>>>>>>> Task 3.0:29 failed 4 times, most recent failure: Exception
failure in TID 67
>>>>>>>> on host kunitz.research.intel-research.net:
>>>>>>>> net.razorvine.pickle.PickleException: couldn't introspect
javabean:
>>>>>>>> java.lang.IllegalArgumentException: wrong number of arguments
>>>>>>>>
>>>>>>>> I may be able to see if this is fixed in master, but since
it's not
>>>>>>>> fixed in 1.0.3 it seems unlikely to be fixed in master either.
I previously
>>>>>>>> tried master as well, but ran into a build problem that did
not occur with
>>>>>>>> the 1.0 branch.
>>>>>>>>
>>>>>>>> Can anybody else verify that the second example still crashes
(and
>>>>>>>> is meant to work)? If so, would it be best to modify JIRA-2376
or start a
>>>>>>>> new bug?
>>>>>>>> https://issues.apache.org/jira/browse/SPARK-2376
>>>>>>>>
>>>>>>>> best,
>>>>>>>> -Brad
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>>
>>>>>>>> On Tue, Aug 5, 2014 at 12:10 PM, Brad Miller
>>>>>>>> <bmiller1@eecs.berkeley.edu> wrote:
>>>>>>>>>
>>>>>>>>> Nick: Thanks for both the original JIRA bug report and
the link.
>>>>>>>>>
>>>>>>>>> Michael: This is on the 1.0.1 release.  I'll update to
master and
>>>>>>>>> follow-up if I have any problems.
>>>>>>>>>
>>>>>>>>> best,
>>>>>>>>> -Brad
>>>>>>>>>
>>>>>>>>>
>>>>>>>>> On Tue, Aug 5, 2014 at 12:04 PM, Michael Armbrust
>>>>>>>>> <michael@databricks.com> wrote:
>>>>>>>>>>
>>>>>>>>>> Is this on 1.0.1?  I'd suggest running this on master
or the
>>>>>>>>>> 1.1-RC which should be coming out this week.  Pyspark
did not have good
>>>>>>>>>> support for nested data previously.  If you still
encounter issues using a
>>>>>>>>>> more recent version, please file a JIRA.  Thanks!
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>> On Tue, Aug 5, 2014 at 11:55 AM, Brad Miller
>>>>>>>>>> <bmiller1@eecs.berkeley.edu> wrote:
>>>>>>>>>>>
>>>>>>>>>>> Hi All,
>>>>>>>>>>>
>>>>>>>>>>> I am interested to use jsonRDD and jsonFile to
create a SchemaRDD
>>>>>>>>>>> out of some JSON data I have, but I've run into
some instability involving
>>>>>>>>>>> the following java exception:
>>>>>>>>>>>
>>>>>>>>>>> An error occurred while calling o1326.collect.
>>>>>>>>>>> : org.apache.spark.SparkException: Job aborted
due to stage
>>>>>>>>>>> failure: Task 181.0:29 failed 4 times, most recent
failure: Exception
>>>>>>>>>>> failure in TID 1664 on host neal.research.intel-research.net:
>>>>>>>>>>> net.razorvine.pickle.PickleException: couldn't
introspect javabean:
>>>>>>>>>>> java.lang.IllegalArgumentException: wrong number
of arguments
>>>>>>>>>>>
>>>>>>>>>>> I've pasted code which produces the error as
well as the full
>>>>>>>>>>> traceback below.  Note that I don't have any
problem when I parse the JSON
>>>>>>>>>>> myself and use inferSchema.
>>>>>>>>>>>
>>>>>>>>>>> Is anybody able to reproduce this bug?
>>>>>>>>>>>
>>>>>>>>>>> -Brad
>>>>>>>>>>>
>>>>>>>>>>> > srdd = sqlCtx.jsonRDD(sc.parallelize(['{"foo":"bar",
>>>>>>>>>>> > "baz":[1,2,3]}', '{"foo":"boom", "baz":[1,2,3]}']))
>>>>>>>>>>> > srdd.printSchema()
>>>>>>>>>>>
>>>>>>>>>>> root
>>>>>>>>>>>  |-- baz: ArrayType[IntegerType]
>>>>>>>>>>>  |-- foo: StringType
>>>>>>>>>>>
>>>>>>>>>>> > srdd.collect()
>>>>>>>>>>>
>>>>>>>>>>>
>>>>>>>>>>> ---------------------------------------------------------------------------
>>>>>>>>>>> Py4JJavaError                             Traceback
(most recent
>>>>>>>>>>> call last)
>>>>>>>>>>> <ipython-input-89-ec7e8e8c68c4> in <module>()
>>>>>>>>>>> ----> 1 srdd.collect()
>>>>>>>>>>>
>>>>>>>>>>> /home/spark/spark-1.0.1-bin-hadoop1/python/pyspark/rdd.py
in
>>>>>>>>>>> collect(self)
>>>>>>>>>>>     581         """
>>>>>>>>>>>     582         with _JavaStackTrace(self.context)
as st:
>>>>>>>>>>> --> 583           bytesInJava = self._jrdd.collect().iterator()
>>>>>>>>>>>     584         return
>>>>>>>>>>> list(self._collect_iterator_through_file(bytesInJava))
>>>>>>>>>>>     585
>>>>>>>>>>>
>>>>>>>>>>> /usr/local/lib/python2.7/dist-packages/py4j/java_gateway.pyc
in
>>>>>>>>>>> __call__(self, *args)
>>>>>>>>>>>     535         answer =
>>>>>>>>>>> self.gateway_client.send_command(command)
>>>>>>>>>>>     536         return_value = get_return_value(answer,
>>>>>>>>>>> self.gateway_client,
>>>>>>>>>>> --> 537                 self.target_id, self.name)
>>>>>>>>>>>     538
>>>>>>>>>>>     539         for temp_arg in temp_args:
>>>>>>>>>>>
>>>>>>>>>>> /usr/local/lib/python2.7/dist-packages/py4j/protocol.pyc
in
>>>>>>>>>>> get_return_value(answer, gateway_client, target_id,
name)
>>>>>>>>>>>     298                 raise Py4JJavaError(
>>>>>>>>>>>     299                     'An error occurred
while calling
>>>>>>>>>>> {0}{1}{2}.\n'.
>>>>>>>>>>> --> 300                     format(target_id,
'.', name), value)
>>>>>>>>>>>     301             else:
>>>>>>>>>>>     302                 raise Py4JError(
>>>>>>>>>>>
>>>>>>>>>>> Py4JJavaError: An error occurred while calling
o1326.collect.
>>>>>>>>>>> : org.apache.spark.SparkException: Job aborted
due to stage
>>>>>>>>>>> failure: Task 181.0:29 failed 4 times, most recent
failure: Exception
>>>>>>>>>>> failure in TID 1664 on host neal.research.intel-research.net:
>>>>>>>>>>> net.razorvine.pickle.PickleException: couldn't
introspect javabean:
>>>>>>>>>>> java.lang.IllegalArgumentException: wrong number
of arguments
>>>>>>>>>>>
>>>>>>>>>>> net.razorvine.pickle.Pickler.put_javabean(Pickler.java:603)
>>>>>>>>>>>         net.razorvine.pickle.Pickler.dispatch(Pickler.java:299)
>>>>>>>>>>>         net.razorvine.pickle.Pickler.save(Pickler.java:125)
>>>>>>>>>>>         net.razorvine.pickle.Pickler.put_map(Pickler.java:322)
>>>>>>>>>>>         net.razorvine.pickle.Pickler.dispatch(Pickler.java:286)
>>>>>>>>>>>         net.razorvine.pickle.Pickler.save(Pickler.java:125)
>>>>>>>>>>>
>>>>>>>>>>> net.razorvine.pickle.Pickler.put_arrayOfObjects(Pickler.java:392)
>>>>>>>>>>>         net.razorvine.pickle.Pickler.dispatch(Pickler.java:195)
>>>>>>>>>>>         net.razorvine.pickle.Pickler.save(Pickler.java:125)
>>>>>>>>>>>         net.razorvine.pickle.Pickler.dump(Pickler.java:95)
>>>>>>>>>>>         net.razorvine.pickle.Pickler.dumps(Pickler.java:80)
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.sql.SchemaRDD$anonfun$javaToPython$1$anonfun$apply$3.apply(SchemaRDD.scala:385)
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.sql.SchemaRDD$anonfun$javaToPython$1$anonfun$apply$3.apply(SchemaRDD.scala:385)
>>>>>>>>>>>
>>>>>>>>>>> scala.collection.Iterator$anon$11.next(Iterator.scala:328)
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$.writeIteratorToStream(PythonRDD.scala:294)
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply$mcV$sp(PythonRDD.scala:200)
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply(PythonRDD.scala:175)
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$WriterThread$anonfun$run$1.apply(PythonRDD.scala:175)
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1160)
>>>>>>>>>>>
>>>>>>>>>>> org.apache.spark.api.python.PythonRDD$WriterThread.run(PythonRDD.scala:174)
>>>>>>>>>>> Driver stacktrace:
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$failJobAndIndependentStages(DAGScheduler.scala:1044)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1028)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$anonfun$abortStage$1.apply(DAGScheduler.scala:1026)
>>>>>>>>>>> at
>>>>>>>>>>> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>>>>>>>>>>> at
>>>>>>>>>>> scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1026)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:634)
>>>>>>>>>>> at scala.Option.foreach(Option.scala:236)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:634)
>>>>>>>>>>> at
>>>>>>>>>>> org.apache.spark.scheduler.DAGSchedulerEventProcessActor$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1229)
>>>>>>>>>>> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>>>>>>>>>>> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>>>>>>>>>>> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>>>>>>>>>>> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>>>>>>>>>>> at
>>>>>>>>>>> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>>>>>>>>>>> at
>>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>>>>>>>>>> at
>>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>>>>>>>>>> at
>>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>>>>>>>>>> at
>>>>>>>>>>> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>>>>>>>>>
>>>>>>>>>>
>>>>>>>>>
>>>>>>>>
>>>>>>>
>>>>>>
>>>>>
>>>>
>>>
>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscribe@spark.apache.org
For additional commands, e-mail: user-help@spark.apache.org


Mime
View raw message