spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ashish Dutt <ashish.du...@gmail.com>
Subject How to connect to remote HDFS programmatically to retrieve data, analyse it and then write the data back to HDFS?
Date Thu, 06 Aug 2015 01:04:37 GMT
*Use Case:* To automate the process of data extraction (HDFS), data
analysis (pySpark/sparkR) and saving the data back to HDFS
programmatically.

*Prospective solutions:*

1. Create a remote server connectivity program in an IDE like pyCharm or
RStudio and use it to retrieve the data from HDFS or else
2. Create the data retrieval code in python or R and then point the IDE to
the remote server using TCP.

*Problem:* How to achieve either of the prospective solution 1 or 2 defined
above? Do you have any better solution then these, if yes please share?

*What have I tried so far?*

The server and 3 namenodes already installed with pyspark and I have
checked pyspark works in standalone mode on all four servers. Pyspark works
in standalone mode on my laptop too.

I use the following code but I am not able to connect to the remote server.

    import os
    import sys
try:
    from pyspark import SparkContext
    from pyspark import SparkConf
    print ("Pyspark sucess")
except ImportError as e:
    print ("Error importing Spark Modules", e)

conf = SparkConf()
conf.setMaster("spark://10.210.250.400:7077")
conf.setAppName("First_Remote_Spark_Program")
sc = SparkContext(conf=conf)
print ("connection succeeded with Master",conf)
data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)
print(distData)

The stack trace of error is

        Pyspark sucess
15/08/01 14:08:24 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where
applicable
15/08/01 14:08:24 ERROR Shell: Failed to locate the winutils binary in
the hadoop binary path
java.io.IOException: Could not locate executable null\bin\winutils.exe
in the Hadoop binaries.
at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:318)
at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:333)
at org.apache.hadoop.util.Shell.<clinit>(Shell.java:326)
at org.apache.hadoop.util.StringUtils.<clinit>(StringUtils.java:76)
at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:93)
at org.apache.hadoop.security.Groups.<init>(Groups.java:77)
at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:240)
at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:255)
at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:232)
at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:718)
at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:703)
at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:605)
at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2162)
at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2162)
at scala.Option.getOrElse(Option.scala:120)
at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2162)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:301)
at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:61)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:214)
at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
15/08/01 14:08:25 WARN Utils: Service 'SparkUI' could not bind on port
4040. Attempting port 4041.
15/08/01 14:08:26 WARN AppClient$ClientActor: Could not connect to
akka.tcp://sparkMaster@10.210.250.400:7077:
akka.remote.InvalidAssociation: Invalid address:
akka.tcp://sparkMaster@10.210.250.400:7077
15/08/01 14:08:26 WARN Remoting: Tried to associate with unreachable
remote address [akka.tcp://sparkMaster@10.210.250.400:7077]. Address
is now gated for 5000 ms, all messages to this address will be
delivered to dead letters. Reason: Connection refused: no further
information: /10.210.250.400:7077
15/08/01 14:08:46 WARN AppClient$ClientActor: Could not connect to
akka.tcp://sparkMaster@10.210.250.400:7077:
akka.remote.InvalidAssociation: Invalid address:
akka.tcp://sparkMaster@10.210.250.400:7077
15/08/01 14:08:46 WARN Remoting: Tried to associate with unreachable
remote address [akka.tcp://sparkMaster@10.210.250.400:7077]. Address
is now gated for 5000 ms, all messages to this address will be
delivered to dead letters. Reason: Connection refused: no further
information: /10.210.250.400:7077
15/08/01 14:09:06 WARN AppClient$ClientActor: Could not connect to
akka.tcp://sparkMaster@10.210.250.400:7077:
akka.remote.InvalidAssociation: Invalid address:
akka.tcp://sparkMaster@10.210.250.400:7077
15/08/01 14:09:06 WARN Remoting: Tried to associate with unreachable
remote address [akka.tcp://sparkMaster@10.210.250.400:7077]. Address
is now gated for 5000 ms, all messages to this address will be
delivered to dead letters. Reason: Connection refused: no further
information: /10.210.250.400:7077
15/08/01 14:09:25 ERROR SparkDeploySchedulerBackend: Application has
been killed. Reason: All masters are unresponsive! Giving up.
15/08/01 14:09:25 WARN SparkDeploySchedulerBackend: Application ID is
not initialized yet.
15/08/01 14:09:25 ERROR OneForOneStrategy:
java.lang.NullPointerException
at org.apache.spark.deploy.client.AppClient$ClientActor$$anonfun$receiveWithLogging$1.applyOrElse(AppClient.scala:160)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33)
at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25)
at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:59)
at org.apache.spark.util.ActorLogReceive$$anon$1.apply(ActorLogReceive.scala:42)
at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118)
at org.apache.spark.util.ActorLogReceive$$anon$1.applyOrElse(ActorLogReceive.scala:42)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at org.apache.spark.deploy.client.AppClient$ClientActor.aroundReceive(AppClient.scala:61)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:393)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
15/08/01 14:09:25 ERROR SparkContext: Error initializing SparkContext.
java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext
at org.apache.spark.SparkContext.org
<http://org.apache.spark.sparkcontext.org/>$apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:103)
at org.apache.spark.SparkContext.getSchedulingMode(SparkContext.scala:1501)
at org.apache.spark.SparkContext.postEnvironmentUpdate(SparkContext.scala:2005)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:543)
at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:61)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:214)
at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)
Traceback (most recent call last):
File "C:/Users/ashish
dutt/PycharmProjects/KafkaToHDFS/local2Remote.py", line 26, in
<module>
sc = SparkContext(conf=conf)
File "C:\spark-1.4.0\python\pyspark\context.py", line 113, in __init__
conf, jsc, profiler_cls)
File "C:\spark-1.4.0\python\pyspark\context.py", line 165, in _do_init
self._jsc = jsc or self._initialize_context(self._conf._jconf)
File "C:\spark-1.4.0\python\pyspark\context.py", line 219, in
_initialize_context
return self._jvm.JavaSparkContext(jconf)
File "C:\spark-1.4.0\python\lib\py4j-0.8.2.1-src.zip\py4j\java_gateway.py",
line 701, in __call__
File "C:\spark-1.4.0\python\lib\py4j-0.8.2.1-src.zip\py4j\protocol.py",
line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling
None.org.apache.spark.api.java.JavaSparkContext.
: java.lang.IllegalStateException: Cannot call methods on a stopped SparkContext
at org.apache.spark.SparkContext.org
<http://org.apache.spark.sparkcontext.org/>$apache$spark$SparkContext$$assertNotStopped(SparkContext.scala:103)
at org.apache.spark.SparkContext.getSchedulingMode(SparkContext.scala:1501)
at org.apache.spark.SparkContext.postEnvironmentUpdate(SparkContext.scala:2005)
at org.apache.spark.SparkContext.<init>(SparkContext.scala:543)
at org.apache.spark.api.java.JavaSparkContext.<init>(JavaSparkContext.scala:61)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:57)
at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:526)
at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:234)
at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
at py4j.Gateway.invoke(Gateway.java:214)
at py4j.commands.ConstructorCommand.invokeConstructor(ConstructorCommand.java:79)
at py4j.commands.ConstructorCommand.execute(ConstructorCommand.java:68)
at py4j.GatewayConnection.run(GatewayConnection.java:207)
at java.lang.Thread.run(Thread.java:745)

Process finished with exit code 1

The spark-defaults.conf file is configured as follows

    #spark.eventLog.dir=hdfs://ABCD01:8020/user/spark/applicationHistory
spark.eventLog.dir      hdfs://10.210.250.400:8020/user/spark/eventlog
spark.eventLog.enabled      true
spark.serializer        org.apache.spark.serializer.KryoSerializer
spark.shuffle.service.enabled   true
spark.shuffle.service.port  7337
spark.yarn.historyServer.address    http://ABCD04:18088 <http://abcd04:18088/>
spark.master        spark://10.210.250.400:7077
spark.yarn.jar
local:/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/spark/assembly/lib/spark-assembly-1.3.0-cdh5.4.2-hadoop2.6.0-cdh5.4.2.jar
spark.driver.extraLibraryPath
/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/hadoop/lib/native
spark.executor.extraLibraryPath
/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/hadoop/lib/native
spark.yarn.am.extraLibraryPath
/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/hadoop/lib/native
spark.logConf   true

The spark-env.sh file is configured as follows

#!/usr/bin/env bash
##
# Generated by Cloudera Manager and should not be modified directly
##

SELF="$(cd $(dirname $BASH_SOURCE) && pwd)"
if [ -z "$SPARK_CONF_DIR" ]; then
export SPARK_CONF_DIR="$SELF"
fi

export SPARK_HOME=/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/spark
export DEFAULT_HADOOP_HOME=/opt/cloudera/parcels/CDH-5.4.2-1.cdh5.4.2.p0.2/lib/hadoop
#export STANDALONE_SPARK_MASTER_HOST=`ABCD01`
export SPARK_MASTER_IP=spark://10.210.250.400
export SPARK_MASTER_PORT=7077
export SPARK_WEBUI_PORT=18080


### Path of Spark assembly jar in HDFS
export SPARK_JAR_HDFS_PATH=${SPARK_JAR_HDFS_PATH:-''}

export HADOOP_HOME=${HADOOP_HOME:-$DEFAULT_HADOOP_HOME}

if [ -n "$HADOOP_HOME" ]; then
LD_LIBRARY_PATH=$LD_LIBRARY_PATH:${HADOOP_HOME}/lib/native
fi

SPARK_EXTRA_LIB_PATH=""
if [ -n "$SPARK_EXTRA_LIB_PATH" ]; then
LD_LIBRARY_PATH=$LD_LIBRARY_PATH:$SPARK_EXTRA_LIB_PATH
fi

export LD_LIBRARY_PATH
export HADOOP_CONF_DIR=${HADOOP_CONF_DIR:-$SPARK_CONF_DIR/yarn-conf}

# This is needed to support old CDH versions that use a forked version
# of compute-classpath.sh.
export SCALA_LIBRARY_PATH=${SPARK_HOME}/lib

# Set distribution classpath. This is only used in CDH 5.3 and later.
export SPARK_DIST_CLASSPATH=$(paste -sd: "$SELF/classpath.txt")

And the slaves.sh file is configured as

10.210.250.401
10.210.250.402
10.210.250.403

Please tell me how can I connect to the remote server using pycharm or any
other IDE?

Thank you,

Ashish

Mime
View raw message