Aditya created SPARK-28336:
------------------------------
Summary: Tried running same code in local machine in IDE pycharm it running fine
but issue arises when i setup all on EC2 my RDD has Json Value and convert it to data frame
and show dataframe by Show method it fails to show my data frame.
Key: SPARK-28336
URL: https://issues.apache.org/jira/browse/SPARK-28336
Project: Spark
Issue Type: Bug
Components: Deploy, DStreams, EC2, PySpark, Spark Submit
Affects Versions: 2.4.3
Environment: Using EC2 Ubuntu 18.04.2 LTS
Spark version : Spark 2.4.3 built for Hadoop 2.7.3
Kafka version : kafka_2.12-2.2.1
Reporter: Aditya
I am a beginner to pyspark and I am creating a pilot project in spark i used pycharm IDE for
developing my project and it runs fine on my IDE Let me explain my project I am producing
JSON in Kafka topic and consuming topic in spark and converting RDD VALUE(which is i JSON)
converting to data frame using this method (productInfo = sqlContext.read.json(rdd)) and working
perfectly on my local machine after converting RDD to DataFrame I am displaying that DataFrame
to my console using .Show() method and working fine.
But my problem arises when I setup all this(Kafka,apache-spark) in EC2(Ubuntu 18.04.2 LTS)
and tried to execute using spark-submit console stop when it reached my show() method and
display nothing again starts and stops at show() method I can't figure out what is error not
showing any error in console and also check if my data is coming in RDD or not it is in RDD
{color:#FF0000}My Code:{color}
{color:#FF0000}# -*- coding: utf-8 -*-{color}
{color:#FF0000}from pyspark import SparkContext{color}
{color:#FF0000}from pyspark import SparkConf{color}
{color:#FF0000}from pyspark.streaming import StreamingContext{color}
{color:#FF0000}from pyspark.streaming.kafka import KafkaUtils{color}
{color:#FF0000}from pyspark.sql import Row, DataFrame, SQLContext{color}
{color:#FF0000}import pandas as pd{color}
{color:#FF0000}def getSqlContextInstance(sparkContext):{color}
{color:#FF0000} if ('sqlContextSingletonInstance' not in globals()):{color}
{color:#FF0000} globals()['sqlContextSingletonInstance'] = SQLContext(sparkContext){color}
{color:#FF0000} return globals()['sqlContextSingletonInstance']{color}
{color:#FF0000}def process(time, rdd):{color}
{color:#FF0000} print("========= %s =========" % str(time)){color}
{color:#FF0000} try:{color}
#print("--------------Also cross check my data is present in rdd I checked by printing ----------------")
#results = rdd.collect()
#for result in results:
#print(result)
{color:#FF0000} # Get the singleton instance of SparkSession{color}
{color:#FF0000} sqlContext = getSqlContextInstance(rdd.context){color}
{color:#FF0000} productInfo = sqlContext.read.json(rdd){color}
//problem comes here when i try to show it
{color:#FF0000} productInfo.show(){color}
{color:#FF0000} except:{color}
{color:#FF0000} pass{color}
{color:#FF0000}if __name__ == '__main__':{color}
{color:#FF0000} conf = SparkConf().set("spark.cassandra.connection.host", "127.0.0.1"){color}
{color:#FF0000} sc = SparkContext(conf = conf){color}
{color:#FF0000} sc.setLogLevel("WARN"){color}
{color:#FF0000} sqlContext = SQLContext(sc){color}
{color:#FF0000} ssc = StreamingContext(sc,10){color}
{color:#FF0000} kafkaStream = KafkaUtils.createStream(ssc, 'localhost:2181', 'spark-streaming',
\{'new_topic':1}){color}
{color:#FF0000} lines = kafkaStream.map(lambda x: x[1]){color}
{color:#FF0000} lines.foreachRDD(process){color}
{color:#FF0000} #lines.pprint(){color}
{color:#FF0000} ssc.start(){color}
{color:#FF0000} ssc.awaitTermination(){color}
{color:#FF0000}My console:{color}
./spark-submit ReadingJsonFromKafkaAndWritingToScylla_CSV_Example.py
19/07/10 11:13:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform...
using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/07/10 11:13:15 INFO SparkContext: Running Spark version 2.4.3
19/07/10 11:13:15 INFO SparkContext: Submitted application: ReadingJsonFromKafkaAndWritingToScylla_CSV_Example.py
19/07/10 11:13:15 INFO SecurityManager: Changing view acls to: kafka
19/07/10 11:13:15 INFO SecurityManager: Changing modify acls to: kafka
19/07/10 11:13:15 INFO SecurityManager: Changing view acls groups to:
19/07/10 11:13:15 INFO SecurityManager: Changing modify acls groups to:
19/07/10 11:13:15 INFO SecurityManager: SecurityManager: authentication disabled; ui acls
disabled; users with view permissions: Set(kafka); groups with view permissions: Set(); users
with modify permissions: Set(kafka); groups with modify permissions: Set()
19/07/10 11:13:16 INFO Utils: Successfully started service 'sparkDriver' on port 41655.
19/07/10 11:13:16 INFO SparkEnv: Registering MapOutputTracker
19/07/10 11:13:16 INFO SparkEnv: Registering BlockManagerMaster
19/07/10 11:13:16 INFO BlockManagerMasterEndpoint: Using org.apache.spark.storage.DefaultTopologyMapper
for getting topology information
19/07/10 11:13:16 INFO BlockManagerMasterEndpoint: BlockManagerMasterEndpoint up
19/07/10 11:13:16 INFO DiskBlockManager: Created local directory at /tmp/blockmgr-33f848fe-88d7-4c8f-8440-8384e094c59c
19/07/10 11:13:16 INFO MemoryStore: MemoryStore started with capacity 366.3 MB
19/07/10 11:13:16 INFO SparkEnv: Registering OutputCommitCoordinator
19/07/10 11:13:16 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port
4041.
19/07/10 11:13:16 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port
4042.
19/07/10 11:13:16 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port
4043.
19/07/10 11:13:16 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port
4044.
19/07/10 11:13:16 WARN Utils: Service 'SparkUI' could not bind on port 4044. Attempting port
4045.
19/07/10 11:13:16 WARN Utils: Service 'SparkUI' could not bind on port 4045. Attempting port
4046.
19/07/10 11:13:16 INFO Utils: Successfully started service 'SparkUI' on port 4046.
19/07/10 11:13:16 INFO SparkUI: Bound SparkUI to 0.0.0.0, and started at http://ip-172-31-92-134.ec2.internal:4046
19/07/10 11:13:16 INFO Executor: Starting executor ID driver on host localhost
19/07/10 11:13:16 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService'
on port 34719.
19/07/10 11:13:16 INFO NettyBlockTransferService: Server created on ip-172-31-92-134.ec2.internal:34719
19/07/10 11:13:16 INFO BlockManager: Using org.apache.spark.storage.RandomBlockReplicationPolicy
for block replication policy
19/07/10 11:13:16 INFO BlockManagerMaster: Registering BlockManager BlockManagerId(driver,
ip-172-31-92-134.ec2.internal, 34719, None)
19/07/10 11:13:16 INFO BlockManagerMasterEndpoint: Registering block manager ip-172-31-92-134.ec2.internal:34719
with 366.3 MB RAM, BlockManagerId(driver, ip-172-31-92-134.ec2.internal, 34719, None)
19/07/10 11:13:16 INFO BlockManagerMaster: Registered BlockManager BlockManagerId(driver,
ip-172-31-92-134.ec2.internal, 34719, None)
19/07/10 11:13:16 INFO BlockManager: Initialized BlockManager: BlockManagerId(driver, ip-172-31-92-134.ec2.internal,
34719, None)
19/07/10 11:13:17 WARN AppInfo$: Can't read Kafka version from MANIFEST.MF. Possible cause:
java.lang.NullPointerException
19/07/10 11:13:18 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
19/07/10 11:13:18 WARN BlockManager: Block input-0-1562757198000 replicated to only 0 peer(s)
instead of 1 peers
{color:#FF0000}///////////////////This is when I am not producing in data in my kafka topic//////////////////////{color}
========= 2019-07-10 11:13:20 =========
---------------------in function procces----------------------
-----------------------before printing----------------------
========= 2019-07-10 11:13:30 =========
---------------------in function procces----------------------
-----------------------before printing----------------------
++
||
++
++
------------------------after printing-----------------------
========= 2019-07-10 11:13:40 =========
---------------------in function procces----------------------
-----------------------before printing----------------------
++
||
++
++
------------------------after printing-----------------------
========= 2019-07-10 11:15:40 =========
---------------------in function procces----------------------
-----------------------before printing----------------------
++
||
++
++
------------------------after printing-----------------------
19/07/10 11:15:47 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
19/07/10 11:15:47 WARN BlockManager: Block input-0-1562757347200 replicated to only 0 peer(s)
instead of 1 peers
{color:#FF0000}///////////////////This is when I start producing my data in kafka topic//////////////////////{color}
========= 2019-07-10 11:15:50 =========
---------------------in function procces----------------------
-----------------------before printing----------------------
19/07/10 11:15:52 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
19/07/10 11:15:52 WARN BlockManager: Block input-0-1562757352200 replicated to only 0 peer(s)
instead of 1 peers
19/07/10 11:15:57 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
19/07/10 11:15:57 WARN BlockManager: Block input-0-1562757357200 replicated to only 0 peer(s)
instead of 1 peers
========= 2019-07-10 11:16:00 =========
---------------------in function procces----------------------
-----------------------before printing----------------------
19/07/10 11:16:02 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
19/07/10 11:16:02 WARN BlockManager: Block input-0-1562757362200 replicated to only 0 peer(s)
instead of 1 peers
19/07/10 11:16:07 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
19/07/10 11:16:07 WARN BlockManager: Block input-0-1562757367400 replicated to only 0 peer(s)
instead of 1 peers
========= 2019-07-10 11:16:10 =========
---------------------in function procces----------------------
-----------------------before printing----------------------
19/07/10 11:16:12 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
19/07/10 11:16:12 WARN BlockManager: Block input-0-1562757372400 replicated to only 0 peer(s)
instead of 1 peers
19/07/10 11:16:17 WARN RandomBlockReplicationPolicy: Expecting 1 replicas with only 0 peer/s.
19/07/10 11:16:17 WARN BlockManager: Block input-0-1562757377400 replicated to only 0 peer(s)
instead of 1 peers
I don't how to figure out can anyone help me really appreciated.
Thank you
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org
|