spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "宋源栋" <yuandong.s...@greatopensource.com>
Subject 回复:Spark is only using one worker machine when more are available
Date Thu, 12 Apr 2018 02:39:26 GMT
Hi 1. Spark version : 2.3.0 2. jdk: oracle jdk 1.8 3. os version: centos 6.8 4. spark-env.sh:
null 5. spark session config:    SparkSession.builder().appName("DBScale")
                .config("spark.sql.crossJoin.enabled", "true")
                .config("spark.sql.adaptive.enabled", "true")
                .config("spark.scheduler.mode", "FAIR")
                .config("spark.executor.memory", "1g")
                .config("spark.executor.cores", 1)
                .config("spark.driver.memory", "20")
                .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
                .config("spark.executor.extraJavaOptions",
                        "-XX:+UseG1GC -XX:+PrintFlagsFinal -XX:+PrintReferenceGC " +
                                "-verbose:gc -XX:+PrintGCDetails " +
                                "-XX:+PrintGCTimeStamps -XX:+PrintAdaptiveSizePolicy")
                .master(this.spark_master)
                .getOrCreate();  6. core code:             for (SparksqlTableInfo
tableInfo: this.dbtable){ // this loop reads data from mysql
            String dt = "(" + tableInfo.sql + ")" + tableInfo.tmp_table_name;
            String[] pred = new String[tableInfo.partition_num];
            if (tableInfo.partition_num > 0) {
                for (int j = 0; j < tableInfo.partition_num; j++) {
                    String str = "some where clause to split mysql table into many partitions";
                    pred[j] = str;
                }
                Dataset<Row> jdbcDF = ss.read().jdbc(this.url, dt, pred, connProp);
//this.url is mysql-jdbc-url (mysql://XX.XX.XX.XX:XXXX)
                jdbcDF.createOrReplaceTempView(tableInfo.tmp_table_name);
            } else {
                logger.warn("[\033[32m" + "partition_num == 0" + "\033[0m]");
                Dataset<Row> jdbcDF = ss.read().jdbc(this.url, dt, connProp);
                jdbcDF.createOrReplaceTempView(tableInfo.tmp_table_name);
            }
        }
        // Then run a query and write the result set to mysql

        Dataset<Row> result = ss.sql(this.sql);
        result.explain(true);
        connProp.put("rewriteBatchedStatements", "true");
        connProp.put("sessionVariables", "sql_log_bin=off");
        result.write().jdbc(this.dst_url, this.dst_table, connProp);

------------------------------------------------------------------发件人:Jhon Anderson
Cardenas Diaz <jhonderson2007@gmail.com>发送时间:2018年4月11日(星期三) 22:42收件人:宋源栋
<yuandong.song@greatopensource.com>抄 送:user <user@spark.apache.org>主 题:Re:
Spark is only using one worker machine when more are available
Hi, could you please share the environment variables values that you are sending when you
run the jobs, spark version, etc.. more details.
Btw, you should take a look on SPARK_WORKER_INSTANCES and SPARK_WORKER_CORES if you are using
spark 2.0.0.

Regards.

2018-04-11 4:10 GMT-05:00 宋源栋 <yuandong.song@greatopensource.com>:


Hi all,
I hava a standalone mode spark cluster without HDFS with 10 machines that each one has 40
cpu cores and 128G RAM.
My application is a sparksql application that reads data from database "tpch_100g" in mysql
and run tpch queries. When loading tables from myql to spark, I spilts the biggest table "lineitem"
into 600 partitions. 

When my application runs, there are only 40 executor(spark.executor.memory = 1g, spark.executor.cores
= 1) in executor page of spark application web and all executors are on the same mathine.
It is too slowly that all tasks are parallelly running in only one mathine.





Mime
View raw message