spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Gourav Sengupta <gourav.sengu...@gmail.com>
Subject Re: SPARK Issue in Standalone cluster
Date Wed, 02 Aug 2017 19:05:49 GMT
Hi Steve,

I have written a sincere note of apology to everyone in a separate email. I
sincerely request your kind forgiveness before hand if anything does sound
impolite in my emails, in advance.

Let me first start by thanking you.

I know it looks like I formed all my opinion based on that document, but
that is not the case at all. If you or anyone tries to execute the code
that I have given then they will see what I mean. Code speaks louder and
better than words for me.

So I am not saying you are wrong. I am asking verify and expecting someone
will be able to correct  a set of understanding that a moron like me has
gained after long hours of not having anything better to do.


SCENARIO: there are two files file1.csv and file2.csv stored in HDFS with
replication 2 and there is a HADOOP cluster of three nodes. All these nodes
have SPARK workers (executors) running in them.  Both are stored in the
following way:
-----------------------------------------------------
| SYSTEM 1 |  SYSTEM 2 | SYSTEM 3 |
| (worker1)   |  (worker2)    |  (worker3)   |
| (master)     |                     |                    |
-----------------------------------------------------
| file1.csv      |                     | file1.csv     |
-----------------------------------------------------
|                    |  file2.csv      | file2.csv     |
-----------------------------------------------------
| file3.csv      |  file3.csv      |                   |
-----------------------------------------------------

CONSIDERATION BASED ON WHICH ABOVE SCENARIO HAS BEEN DRAWN:
HDFS replication does not store the same file in all the nodes in the
cluster. So if I have three nodes and the replication is two then the same
file will be stored physically in two nodes in the cluster. Does that sound
right?

ASSUMPTION  (STEVE PLEASE CLARIFY THIS):
If SPARK is trying to process to the records then I am expecting that
WORKER2 should not be processing file1.csv, and similary WORKER 1 should
not be processing file2.csv and WORKER3 should not be processing file3.csv.
Because in case WORKER2 was trying to process file1.csv then it will
actually causing network transmission of the file unnecessarily.

ASSUMPTION BASED ON ABOVE ASSUMPTION (STEVE ONCE AGAIN, PLEASE CLARIFY
THIS):
if WORKER 2 is not processing file1.csv then how does it matter whether the
file is there or not at all in the system? Should not SPARK just ask the
workers to process the files which are avialable in the worker nodes? In
case both WORKER2 and WORKER3 fails and are not available then file2.csv
will not be processed at all.

ALSO I DID POST THE CODE AND I GENUINELY THINK THAT THE CODE SHOULD BE
EXECUTED (Its been pointed out that I am learning SPARK, and even I did not
take more than 13 mins to set up the cluster and run the code).

Once you execute the code then you will find that:
1.  if the path starts with file:/// while reading back then there is no
error reported, but the number of records reported back are only those
records in the worker which also has the server.
2. also you will notice that once you cache the file before writing the
partitions are ditributed nicely across the workers, and while writing
back, the dataframe partitions does write properly to the worker node in
the Master, but the workers in the other system have the files written in
_temporary folder which does not get copied back to the main folder.
Inspite of this the job is not reported as failed in SPARK.

Now in my own world, if I see, the following things are happening,
something is going wrong (with me):
1. SPARK transfers files from different systems to process, instead of
processing them locally (I do not have code to prove this, and therefore
its just an assumption)
2. SPARK cannot determine when the writes are failing in standalone
clusters workers and reports success (code is there for this)
3. SPARK reports back number of records in the worker running in the master
node when count() is given without reporting an error while using file:///
and reports an error when I mention the path without file:/// (for SPARK
2.1.x onwards, code is there for this)


I very sincerely hope with your genuine help the bar of language and social
skills will be lowered for me. And everyone will find a way to excuse me
and not qualify this email as a means to measure my extremely versatile and
amazingly vivid social skills. It will be a lot of help to just focus on
the facts related to machines, data, error and (the language that I somehow
understand better) code.


My sincere apologies once again, as I am 100% sure that I did not meet the
required social and language skills.

Thanks a ton once again for your kindness, patience and understanding.


Regards,
Gourav Sengupta



On Wed, Aug 2, 2017 at 4:59 PM, Steve Loughran <stevel@hortonworks.com>
wrote:

>
> On 2 Aug 2017, at 14:25, Gourav Sengupta <gourav.sengupta@gmail.com>
> wrote:
>
> Hi,
>
> I am definitely sure that at this point of time everyone who has kindly
> cared to respond to my query do need to go and check this link
> https://spark.apache.org/docs/2.2.0/spark-standalone.
> html#spark-standalone-mode.
>
>
> I see. Well, we shall have to edit that document to make clear something
> which had been omitted:
>
> *in order for multiple spark workers to process data, they must have a
> shared store for that data, one with read/write access for all workers.
> This is must be provided by a shared filesystem: HDFS, network-mounted NFS,
> Glusterfs, through an object store (S3, Azure WASB, ...), or through
> alternative datastores implementing the Hadoop Filesystem API (example:
> Apache Cassandra).*
>
> n your case, for a small cluster of 1-3 machines, especially if you are
> just learning to play with spark, I'd start with an NFS mounted disk
> accessible on the same path on all machines. If you aren't willing to set
> that up, stick to spark standalone on a single machine first. You don't
> need a shared cluster to use spark standalone.
>
> Personally, I'd recommend downloading apache zeppelin and running it
> locally as the simplest out-the-box experience.
>
>
> It does mention that SPARK standalone cluster can have multiple machines
> running as slaves.
>
>
> Clearly it omits the small detail about the requirement for a shared store.
>
> The general idea of writing to the user group is that people who know
> should answer, and not those who do not know.
>
>
> Agreed, but if the answer doesn't appear to be correct to you, do consider
> that there may be some detail that hasn't been mentioned, rather than
> immediately concluding that the person replying is wrong.
>
> -Steve
>
>
>
>
>
> Regards,
> Gourav Sengupta
>
> On Tue, Aug 1, 2017 at 4:50 AM, Mahesh Sawaiker <
> mahesh_sawaiker@persistent.com> wrote:
>
>> Gourav,
>>
>> Riccardo’s answer is spot on.
>>
>> What is happening is one node of spark is writing to its own directory
>> and telling a slave to read the data from there, when the slave goes to
>> read it, the part is not found.
>>
>>
>>
>> Check the folder Users/gouravsengupta/Developme
>> nt/spark/sparkdata/test1/part-00001-e79273b5-9b4e-4037-92f3-
>> 2e52f523dfdf-c000.snappy.parquet on the slave.
>>
>> The reason it ran on spark 1.5 may have been because the executor ran on
>> the driver itself. There is not much use to a set up where you don’t have
>> some kind of distributed file system, so I would encourage you to use hdfs,
>> or a mounted file system shared by all nodes.
>>
>>
>>
>> Regards,
>>
>> Mahesh
>>
>>
>>
>>
>>
>> *From:* Gourav Sengupta [mailto:gourav.sengupta@gmail.com]
>> *Sent:* Monday, July 31, 2017 9:54 PM
>> *To:* Riccardo Ferrari
>> *Cc:* user
>> *Subject:* Re: SPARK Issue in Standalone cluster
>>
>>
>>
>> Hi Riccardo,
>>
>>
>>
>> I am grateful for your kind response.
>>
>>
>>
>> Also I am sure that your answer is completely wrong and errorneous. SPARK
>> must be having a method so that different executors do not pick up the same
>> files to process. You also did not answer the question why was the
>> processing successful in SPARK 1.5 and not in SPARK 2.2.
>>
>>
>>
>> Also the exact same directory is is present across in both the nodes.
>>
>>
>>
>> I feel quite facinated when individuals respond before even understanding
>> the issue, or trying out the code.
>>
>>
>>
>> It will be of great help if someone could kindly read my email and help
>> me figure out the issue.
>>
>>
>>
>>
>>
>> Regards,
>>
>> Gourav Sengupta
>>
>>
>>
>>
>>
>>
>>
>> On Mon, Jul 31, 2017 at 9:27 AM, Riccardo Ferrari <ferrarir@gmail.com>
>> wrote:
>>
>> Hi Gourav,
>>
>>
>>
>> The issue here is the location where you're trying to write/read from :
>> /Users/gouravsengupta/Development/spark/sparkdata/test1/p...
>>
>> When dealing with clusters all the paths and resources should be
>> available to all executors (and driver), and that is reason why you
>> generally use HDFS, S3, NFS or any shared file system.
>>
>>
>>
>> Spark assumes your data is generally available to all nodes and does not
>> tries to pick up the data from a selected node, it rather tries to
>> write/read in parallel from the executor nodes. Also given its control
>> logic there is no way (read. you should not care) to know what executor is
>> doing what task.
>>
>>
>>
>> Hope it helps,
>>
>> Riccardo
>>
>>
>>
>> On Mon, Jul 31, 2017 at 2:14 AM, Gourav Sengupta <
>> gourav.sengupta@gmail.com> wrote:
>>
>> Hi,
>>
>>
>>
>> I am working by creating a native SPARK standalone cluster (
>> https://spark.apache.org/docs/2.2.0/spark-standalone.html)
>>
>>
>>
>> Therefore I  do not have a HDFS.
>>
>>
>>
>>
>>
>> EXERCISE:
>>
>> Its the most fundamental and simple exercise. Create a sample SPARK
>> dataframe and then write it to a location and then read it back.
>>
>>
>>
>> SETTINGS:
>>
>> So after I have installed SPARK in two physical systems with the same:
>>
>> 1. SPARK version,
>>
>> 2. JAVA version,
>>
>> 3. PYTHON_PATH
>>
>> 4. SPARK_HOME
>>
>> 5. PYSPARK_PYTHON
>>
>> the user in both the systems is the root user therefore there are no
>> permission issues anywhere.
>>
>>
>>
>> I am able to start:
>>
>> 1. ./spark-2.2.0-bin-hadoop2.7/sbin/start-master.sh
>>
>> 2. ./spark-2.2.0-bin-hadoop2.7/sbin/start-slave.sh (from two separate
>> computers)
>>
>>
>>
>> After that I can see in the spark UI (at port 8080) two workers.
>>
>>
>>
>>
>>
>> CODE:
>>
>> Then I run the following code:
>>
>>
>>
>> ======================================================
>>
>> import findspark
>>
>> import os
>>
>> os.environ["SPARK_HOME"] = '/Users/gouravsengupta/Develop
>> ment/spark/spark/'
>>
>> findspark.init()
>>
>> import pyspark
>>
>> from pyspark.sql import SparkSession
>>
>> spark = (SparkSession.builder
>>
>>         .master("spark://mastersystem.local:7077")
>>
>>         .appName("gouravtest")
>>
>>         .enableHiveSupport()
>>
>>         .getOrCreate())
>>
>> import pandas, numpy
>>
>> testdf = spark.createDataFrame(pandas.DataFrame(numpy.random.randn(10000,
>> 4), columns=list('ABCD')))
>>
>> testdf.cache()
>>
>> testdf.count()
>>
>> testdf.write.save("/Users/gouravsengupta/Development/spark/
>> sparkdata/test2")
>>
>> spark.read.load("/Users/gouravsengupta/Development/spark/
>> sparkdata/test2").count()
>>
>> ======================================================
>>
>>
>>
>>
>>
>> ERROR I (in above code):
>>
>> ERROR in line: testdf.write.save("/Users/gour
>> avsengupta/Development/spark/sparkdata/test2")
>>
>> This line does not fail or report any error. But when I am looking at the
>> stage in spark Application UI the error reported for one of the slave node
>> which is not in the same system as the master node is mentioned below. The
>> writing on the slave node which is in the same physical system as the
>> Master happens correctly. (NOTE: slave node basically the worker and master
>> node the driver)
>>
>> ------------------------------------------------------------
>> ----------------------------------------------------------------------
>>
>> 0 (TID 41). 2060 bytes result sent to driver
>>
>> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000006_0'
to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000006
>>
>> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000006_0:
Committed
>>
>> 17/07/31 00:19:29 INFO Executor: Finished task 31.0 in stage 2.0 (TID 64). 2060 bytes
result sent to driver
>>
>> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000028_0'
to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000028
>>
>> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000028_0:
Committed
>>
>> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000021_0'
to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000021
>>
>> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000021_0:
Committed
>>
>> 17/07/31 00:19:29 INFO Executor: Finished task 12.0 in stage 2.0 (TID 45). 2103 bytes
result sent to driver
>>
>> 17/07/31 00:19:29 INFO Executor: Finished task 4.0 in stage 2.0 (TID 37). 2060 bytes
result sent to driver
>>
>> 17/07/31 00:19:29 INFO Executor: Finished task 6.0 in stage 2.0 (TID 39). 2060 bytes
result sent to driver
>>
>> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000018_0'
to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000018
>>
>> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000018_0:
Committed
>>
>> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000029_0'
to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000029
>>
>> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000029_0:
Committed
>>
>> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000027_0'
to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000027
>>
>> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000027_0:
Committed
>>
>> 17/07/31 00:19:29 INFO Executor: Finished task 21.0 in stage 2.0 (TID 54). 2060 bytes
result sent to driver
>>
>> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000010_0'
to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000010
>>
>> 17/07/31 00:19:29 INFO Executor: Finished task 19.0 in stage 2.0 (TID 52). 2060 bytes
result sent to driver
>>
>> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000010_0:
Committed
>>
>> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000030_0'
to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000030
>>
>> 17/07/31 00:19:29 INFO Executor: Finished task 22.0 in stage 2.0 (TID 55). 2060 bytes
result sent to driver
>>
>> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000030_0:
Committed
>>
>> 17/07/31 00:19:29 INFO Executor: Finished task 20.0 in stage 2.0 (TID 53). 2060 bytes
result sent to driver
>>
>> 17/07/31 00:19:29 INFO Executor: Finished task 28.0 in stage 2.0 (TID 61). 2060 bytes
result sent to driver
>>
>> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000016_0'
to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000016
>>
>> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000016_0:
Committed
>>
>> 17/07/31 00:19:29 INFO Executor: Finished task 26.0 in stage 2.0 (TID 59). 2060 bytes
result sent to driver
>>
>> 17/07/31 00:19:29 INFO Executor: Finished task 18.0 in stage 2.0 (TID 51). 2060 bytes
result sent to driver
>>
>> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000024_0'
to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000024
>>
>> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000024_0:
Committed
>>
>> 17/07/31 00:19:29 INFO FileOutputCommitter: Saved output of task 'attempt_20170731001928_0002_m_000023_0'
to file:/Users/gouravsengupta/Development/spark/sparkdata/test1/_temporary/0/task_20170731001928_0002_m_000023
>>
>> 17/07/31 00:19:29 INFO SparkHadoopMapRedUtil: attempt_20170731001928_0002_m_000023_0:
Committed
>>
>> 17/07/31 00:19:29 INFO Executor: Finished task 29.0 in stage 2.0 (TID 62). 2103 bytes
result sent to driver
>>
>> 17/07/31 00:19:29 INFO Executor: Finished task 10.0 in stage 2.0 (TID 43). 2060 bytes
result sent to driver
>>
>> 17/07/31 00:19:29 INFO Executor: Finished task 16.0 in stage 2.0 (TID 49). 2060 bytes
result sent to driver
>>
>> 17/07/31 00:19:29 INFO Executor: Finished task 27.0 in stage 2.0 (TID 60). 2060 bytes
result sent to driver
>>
>> 17/07/31 00:19:29 INFO Executor: Finished task 30.0 in stage 2.0 (TID 63). 2103 bytes
result sent to driver
>>
>> 17/07/31 00:19:29 INFO Executor: Finished task 23.0 in stage 2.0 (TID 56). 2060 bytes
result sent to driver
>>
>> 17/07/31 00:19:29 INFO Executor: Finished task 24.0 in stage 2.0 (TID 57). 2060 bytes
result sent to driver
>>
>> 17/07/31 00:20:23 INFO CoarseGrainedExecutorBackend: Got assigned task 65
>>
>> 17/07/31 00:20:23 INFO Executor: Running task 0.0 in stage 3.0 (TID 65)
>>
>> 17/07/31 00:20:23 INFO TorrentBroadcast: Started reading broadcast variable 3
>>
>> 17/07/31 00:20:23 INFO MemoryStore: Block broadcast_3_piece0 stored as bytes in memory
(estimated size 24.9 KB, free 365.9 MB)
>>
>> 17/07/31 00:20:23 INFO TorrentBroadcast: Reading broadcast variable 3 took 10 ms
>>
>> 17/07/31 00:20:23 INFO MemoryStore: Block broadcast_3 stored as values in memory
(estimated size 70.3 KB, free 365.9 MB)
>>
>> 17/07/31 00:20:23 ERROR Executor: Exception in task 0.0 in stage 3.0 (TID 65)
>>
>> java.io.FileNotFoundException: File file:/Users/gouravsengupta/Development/spark/sparkdata/test1/part-00001-e79273b5-9b4e-4037-92f3-2e52f523dfdf-c000.snappy.parquet
does not exist
>>
>>           at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileStatus(RawLocalFileSystem.java:611)
>>
>>           at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStatusInternal(RawLocalFileSystem.java:824)
>>
>>           at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:601)
>>
>>           at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:421)
>>
>>           at org.apache.hadoop.fs.ChecksumFileSystem$ChecksumFSInputChecker.<init>(ChecksumFileSystem.java:142)
>>
>>           at org.apache.hadoop.fs.ChecksumFileSystem.open(ChecksumFileSystem.java:346)
>>
>>           at org.apache.hadoop.fs.FileSystem.open(FileSystem.java:769)
>>
>>           at org.apache.parquet.hadoop.util.HadoopInputFile.newStream(HadoopInputFile.java:65)
>>
>>           at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:443)
>>
>>           at org.apache.parquet.hadoop.ParquetFileReader.readFooter(ParquetFileReader.java:421)
>>
>>           at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:491)
>>
>>           at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$$anonfun$readParquetFootersInParallel$1.apply(ParquetFileFormat.scala:485)
>>
>>           at scala.collection.parallel.AugmentedIterableIterator$class.flatmap2combiner(RemainsIterator.scala:132)
>>
>>           at scala.collection.parallel.immutable.ParVector$ParVectorIterator.flatmap2combiner(ParVector.scala:62)
>>
>>           at scala.collection.parallel.ParIterableLike$FlatMap.leaf(ParIterableLike.scala:1072)
>>
>>           at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
>>
>>           at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>>
>>           at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>>
>>           at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
>>
>>           at scala.collection.parallel.ParIterableLike$FlatMap.tryLeaf(ParIterableLike.scala:1068)
>>
>>           at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
>>
>>           at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
>>
>>           at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
>>
>>           at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>
>>           at scala.concurrent.forkjoin.ForkJoinTask.doJoin(ForkJoinTask.java:341)
>>
>>           at scala.concurrent.forkjoin.ForkJoinTask.join(ForkJoinTask.java:673)
>>
>>           at scala.collection.parallel.ForkJoinTasks$WrappedTask$class.sync(Tasks.scala:378)
>>
>>           at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.sync(Tasks.scala:443)
>>
>>           at scala.collection.parallel.ForkJoinTasks$class.executeAndWaitResult(Tasks.scala:426)
>>
>>           at scala.collection.parallel.ForkJoinTaskSupport.executeAndWaitResult(TaskSupport.scala:56)
>>
>>           at scala.collection.parallel.ParIterableLike$ResultMapping.leaf(ParIterableLike.scala:958)
>>
>>           at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
>>
>>           at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>>
>>           at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
>>
>>           at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
>>
>>           at scala.collection.parallel.ParIterableLike$ResultMapping.tryLeaf(ParIterableLike.scala:953)
>>
>>           at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:152)
>>
>>           at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
>>
>>           at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
>>
>>           at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>>
>>           at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>>
>>           at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>>
>>           at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>>
>> 17/07/31 00:20:23 INFO CoarseGrainedExecutorBackend: Got assigned task 66
>>
>>
>>
>> ------------------------------------------------------------
>> ----------------------------------------------------------------------
>>
>>
>>
>>
>>
>> ERROR II  (in above code):
>>
>> While trying to read the file there is now a distinct error thrown which
>> mentions the same saying that the files do not exist.
>>
>>
>>
>> Also why is SPARK trying to search for the same files in both the
>> systems? If the same path in two systems have different files should SPARK
>> not combine and work on them?
>>
>>
>>
>>
>>
>>
>>
>> NOW DEMONSTRATING THAT THIS IS AN ERROR IN SPARK 2.x
>>
>> I started spark using the same method but now using SPARK 1.5 and this
>> does not give any error:
>>
>> ======================================================
>>
>> import findspark
>>
>> import os
>>
>> os.environ["SPARK_HOME"] = '/Users/gouravsengupta/Develop
>> ment/spark/spark/'
>>
>> findspark.init()
>>
>> import pyspark
>>
>>
>>
>> sc = pyspark.SparkContext("spark://Gouravs-iMac.local:7077", "test")
>>
>> sqlContext = pyspark.SQLContext(sc)
>>
>> import pandas, numpy
>>
>> testdf = sqlContext createDataFrame(pandas.DataFrame(numpy.random.randn(10000,
>> 4), columns=list('ABCD')))
>>
>> testdf.cache()
>>
>> testdf.count()
>>
>> testdf.write.save("/Users/gouravsengupta/Development/spark/
>> sparkdata/test3")
>>
>> spark.read.load("/Users/gouravsengupta/Development/spark/
>> sparkdata/test3").count()
>>
>> ======================================================
>>
>>
>>
>> I will be sincerely obliged if someone could kindly help me out with this
>> issue and point out my mistakes/ assumptions.
>>
>>
>>
>>
>>
>>
>>
>>
>>
>> Regards,
>>
>> Gourav Sengupta
>>
>>
>>
>>
>> DISCLAIMER
>> ==========
>> This e-mail may contain privileged and confidential information which is
>> the property of Persistent Systems Ltd. It is intended only for the use of
>> the individual or entity to which it is addressed. If you are not the
>> intended recipient, you are not authorized to read, retain, copy, print,
>> distribute or use this message. If you have received this communication in
>> error, please notify the sender and delete all copies of this message.
>> Persistent Systems Ltd. does not accept any liability for virus infected
>> mails.
>>
>
>
>

Mime
View raw message