spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ted Yu <yuzhih...@gmail.com>
Subject Re: How to connect to remote HDFS programmatically to retrieve data, analyse it and then write the data back to HDFS?
Date Thu, 06 Aug 2015 02:12:59 GMT
Please see the comments at the tail of SPARK-2356

Cheers

On Wed, Aug 5, 2015 at 6:04 PM, Ashish Dutt <ashish.dutt8@gmail.com> wrote:

> *Use Case:* To automate the process of data extraction (HDFS), data
> analysis (pySpark/sparkR) and saving the data back to HDFS
> programmatically.
>
> *Prospective solutions:*
>
> 1. Create a remote server connectivity program in an IDE like pyCharm or
> RStudio and use it to retrieve the data from HDFS or else
> 2. Create the data retrieval code in python or R and then point the IDE to
> the remote server using TCP.
>
> *Problem:* How to achieve either of the prospective solution 1 or 2
> defined above? Do you have any better solution then these, if yes please
> share?
>
> *What have I tried so far?*
>
> The server and 3 namenodes already installed with pyspark and I have
> checked pyspark works in standalone mode on all four servers. Pyspark works
> in standalone mode on my laptop too.
>
> I use the following code but I am not able to connect to the remote server.
>
>     import os
>     import sys
> try:
>     from pyspark import SparkContext
>     from pyspark import SparkConf
>     print ("Pyspark sucess")
> except ImportError as e:
>     print ("Error importing Spark Modules", e)
>
> conf = SparkConf()
> conf.setMaster("spark://10.210.250.400:7077")
> conf.setAppName("First_Remote_Spark_Program")
> sc = SparkContext(conf=conf)
> print ("connection succeeded with Master",conf)
> data = [1, 2, 3, 4, 5]
> distData = sc.parallelize(data)
> print(distData)
>
> The stack trace of error is
>
>         Pyspark sucess
> 15/08/01 14:08:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your
platform... using builtin-java classes where applicable
> 15/08/01 14:08:24 ERROR Shell: Failed to locate the winutils binary in the hadoop binary
path
> java.io.IOException: Could not locate executable null\bin\winutils.exe in the Hadoop
binaries.
> at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318)
> at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333)
> at org.apache.hadoop.util.Shell.<clinit>(Shell.java:326)
> at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:76)
> at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:93)
> at org.apache.hadoop.security.Groups.<init>(Groups.java:77)
> at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240)
> at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255)
> at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:232)
> at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:718)
> at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:703)
> at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:605)
> at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2162)
> at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2162)
> at scala.Option.getOrElse(Option.scala:120)
> at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2162)
> at org.apache.spark.SparkContext.<init>(SparkContext.scala:301)
> at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:61)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
> at py4j.Gateway.invoke(Gateway.java:214)
> at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
> at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
> at py4j.GatewayConnection.run(GatewayConnection.java:207)
> at java.lang.Thread.run(Thread.java:745)
> 15/08/01 14:08:25 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting
port 4041.
> 15/08/01 14:08:26 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@10.210.250.400:7077:
akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@10.210.250.400:7077
> 15/08/01 14:08:26 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkMaster@10.210.250.400:7077].
Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters.
Reason: Connection refused: no further information: /10.210.250.400:7077
> 15/08/01 14:08:46 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@10.210.250.400:7077:
akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@10.210.250.400:7077
> 15/08/01 14:08:46 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkMaster@10.210.250.400:7077].
Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters.
Reason: Connection refused: no further information: /10.210.250.400:7077
> 15/08/01 14:09:06 WARN AppClient$ClientActor: Could not connect to akka.tcp://sparkMaster@10.210.250.400:7077:
akka.remote.InvalidAssociation: Invalid address: akka.tcp://sparkMaster@10.210.250.400:7077
> 15/08/01 14:09:06 WARN Remoting: Tried to associate with unreachable remote address [akka.tcp://sparkMaster@10.210.250.400:7077].
Address is now gated for 5000 ms, all messages to this address will be delivered to dead letters.
Reason: Connection refused: no further information: /10.210.250.400:7077
> 15/08/01 14:09:25 ERROR SparkDeploySchedulerBackend: Application has been killed. Reason:
All masters are unresponsive! Giving up.
> 15/08/01 14:09:25 WARN SparkDeploySchedulerBackend: Application ID is not initialized
yet.
> 15/08/01 14:09:25 ERROR OneForOneStrategy:
> java.lang.NullPointerException
> at org.apache.spark.deploy.client.AppClient$ClientActor$$anonfun$receiveWithLogging$1.applyOrElse(AppClient.scala:160)
> at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
> at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
> at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
> at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
> at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
> at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
> at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
> at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
> at org.apache.spark.deploy.client.AppClient$ClientActor.aroundReceive(AppClient.scala:61)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
> at akka.actor.ActorCell.invoke(ActorCell.scala:487)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
> at akka.dispatch.Mailbox.run(Mailbox.scala:220)
> at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
> 15/08/01 14:09:25 ERROR SparkContext: Error initializing SparkContext.
> java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext
> at org.apache.spark.SparkContext.org <http://org.apache.spark.sparkcontext.org/>$apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:103)
> at org.apache.spark.SparkContext.getSchedulingMode(SparkContext.scala:1501)
> at org.apache.spark.SparkContext.postEnvironmentUpdate(SparkContext.scala:2005)
> at org.apache.spark.SparkContext.<init>(SparkContext.scala:543)
> at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:61)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
> at py4j.Gateway.invoke(Gateway.java:214)
> at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
> at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
> at py4j.GatewayConnection.run(GatewayConnection.java:207)
> at java.lang.Thread.run(Thread.java:745)
> Traceback (most recent call last):
> File "C:/Users/ashish dutt/PycharmProjects/KafkaToHDFS/local2Remote.py", line 26, in
<module>
> sc = SparkContext(conf=conf)
> File "C:\spark-1.4.0\python\pyspark\context.py", line 113, in __init__
> conf, jsc, profiler_cls)
> File "C:\spark-1.4.0\python\pyspark\context.py", line 165, in _do_init
> self._jsc = jsc or self._initialize_context(self._conf._jconf)
> File "C:\spark-1.4.0\python\pyspark\context.py", line 219, in _initialize_context
> return self._jvm.JavaSparkContext(jconf)
> File "C:\spark-1.4.0\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py", line 701,
in __call__
> File "C:\spark-1.4.0\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py", line 300, in
get_return_value
> py4j.protocol.Py4JJavaError: An error occurred while calling None.org.apache.spark.api.java.JavaSparkContext.
> : java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext
> at org.apache.spark.SparkContext.org <http://org.apache.spark.sparkcontext.org/>$apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:103)
> at org.apache.spark.SparkContext.getSchedulingMode(SparkContext.scala:1501)
> at org.apache.spark.SparkContext.postEnvironmentUpdate(SparkContext.scala:2005)
> at org.apache.spark.SparkContext.<init>(SparkContext.scala:543)
> at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:61)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
> at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
> at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
> at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
> at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
> at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
> at py4j.Gateway.invoke(Gateway.java:214)
> at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
> at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
> at py4j.GatewayConnection.run(GatewayConnection.java:207)
> at java.lang.Thread.run(Thread.java:745)
>
> Process finished with exit code 1
>
> The spark-defaults.conf file is configured as follows
>
>     #spark.eventLog.dir=hdfs://ABCD01:8020/user/spark/applicationHistory
> spark.eventLog.dir      hdfs://10.210.250.400:8020/user/spark/eventlog
> spark.eventLog.enabled      true
> spark.serializer        org.apache.spark.serializer.KryoSerializer
> spark.shuffle.service.enabled   true
> spark.shuffle.service.port  7337
> spark.yarn.historyServer.address    http://ABCD04:18088 <http://abcd04:18088/>
> spark.master        spark://10.210.250.400:7077
> spark.yarn.jar      local:/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/spark/assembly/lib/spark-assembly-1.3.0-cdh5.4.2-hadoop2.6.0-cdh5.4.2.jar
> spark.driver.extraLibraryPath   /opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/hadoop/lib/native
> spark.executor.extraLibraryPath /opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/hadoop/lib/native
> spark.yarn.am.extraLibraryPath  /opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/hadoop/lib/native
> spark.logConf   true
>
> The spark-env.sh file is configured as follows
>
> #!/usr/bin/env bash
> ##
> # Generated by Cloudera Manager and should not be modified directly
> ##
>
> SELF="$(cd $(dirname $BASH_SOURCE) && pwd)"
> if [ -z "$SPARK_CONF_DIR" ]; then
> export SPARK_CONF_DIR="$SELF"
> fi
>
> export SPARK_HOME=/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/spark
> export DEFAULT_HADOOP_HOME=/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/hadoop
> #export STANDALONE_SPARK_MASTER_HOST=`ABCD01`
> export SPARK_MASTER_IP=spark://10.210.250.400
> export SPARK_MASTER_PORT=7077
> export SPARK_WEBUI_PORT=18080
>
>
> ### Path of Spark assembly jar in HDFS
> export SPARK_JAR_HDFS_PATH=${SPARK_JAR_HDFS_PATH:-''}
>
> export HADOOP_HOME=${HADOOP_HOME:-$DEFAULT_HADOOP_HOME}
>
> if [ -n "$HADOOP_HOME" ]; then
> LD_LIBRARY_PATH=$LD_LIBRARY_PATH:${HADOOP_HOME}/lib/native
> fi
>
> SPARK_EXTRA_LIB_PATH=""
> if [ -n "$SPARK_EXTRA_LIB_PATH" ]; then
> LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$SPARK_EXTRA_LIB_PATH
> fi
>
> export LD_LIBRARY_PATH
> export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-$SPARK_CONF_DIR/yarn-conf}
>
> # This is needed to support old CDH versions that use a forked version
> # of compute-classpath.sh.
> export SCALA_LIBRARY_PATH=${SPARK_HOME}/lib
>
> # Set distribution classpath. This is only used in CDH 5.3 and later.
> export SPARK_DIST_CLASSPATH=$(paste -sd: "$SELF/classpath.txt")
>
> And the slaves.sh file is configured as
>
> 10.210.250.401
> 10.210.250.402
> 10.210.250.403
>
> Please tell me how can I connect to the remote server using pycharm or any
> other IDE?
>
> Thank you,
>
> Ashish
>
>

Mime
View raw message