spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robin East <robin.e...@xense.co.uk>
Subject Re: Performance issue with Spak's foreachpartition method
Date Wed, 22 Jul 2015 19:20:26 GMT
The first question I would ask is have you determined whether you have a performance issue
writing to Oracle? In particular how many commits are you making? If you are issuing a lot
of commits that would be a performance problem.

Robin

> On 22 Jul 2015, at 19:11, diplomatic Guru <diplomaticguru@gmail.com> wrote:
> 
> Hello all,
> 
> We are having a major performance issue with the Spark, which is holding us from going
live.
> 
> We have a job that carries out computation on log files and write the results into Oracle
DB.
> 
> The reducer 'reduceByKey'  have been set to parallelize by 4 as we don't want to establish
too many DB connections.
> 
> We are then calling the foreachPartition on the RDD pairs that were reduced by the key.
Within this foreachPartition method we establish DB connection, then iterate the results,
prepare the Oracle statement for batch insertion then we commit the batch and close the connection.
All these are working fine.
> 
> However, when we execute the job to process 12GB of data, it takes forever to complete,
especially at the foreachPartition stage.
> 
> We submitted the job with 6 executors, 2 cores, and 6GB memory of which 0.3 is assigned
to spark.storage.memoryFraction.
> 
> The job is taking about 50 minutes to complete, which is not ideal. I'm not sure how
we could enhance the performance. I've provided the main body of the codes, please take a
look and advice:
> 
> From Driver:
> 
> reduceResultsRDD.foreachPartition(new DB.InsertFunction( dbuser,dbpass,batchsize));
> 
> 
> DB class:
> 
> public class DB {
> 	private static final Logger logger = LoggerFactory
> 			.getLogger(DB.class);
> 	
> public static class InsertFunction implements
> 			VoidFunction<Iterator<Tuple2<String, String>>> {
> 
> 		private static final long serialVersionUID = 999955766876878L;
> 		private String dbuser = "";
> 		private String dbpass = "";
> 		private int batchsize;
> 
> 		public InsertFunction(String dbuser, String dbpass, int batchsize) {
> 			super();
> 			this.dbuser = dbuser;
> 			this.dbuser = dbuser;
> 			this.batchsize=batchsize;
> 		}
> 
> @Override
> 		public void call(Iterator<Tuple2<String, String>> results) {
> 			Connection connect = null;
> 			PreparedStatement pstmt = null;
> 			try {
> 				connect = getDBConnection(dbuser,
> 						dbpass);
> 
> 				int count = 0;
> 
> 				if (batchsize <= 0) {
> 					batchsize = 10000;
> 				}
> 
> 				pstmt1 = connect
> 						.prepareStatement("MERGE INTO SOME TABLE IF RECORD FOUND, IF NOT INSERT");
> 
> 				while (results.hasNext()) {
> 
> 					Tuple2<String, String> kv = results.next();
> 	
> 						String [] data = kv._1.concat("," +kv._2).split(",");
> 
> 				
> 					pstmt.setString(1, data[0].toString());
> 					pstmt.setString(2, data[1].toString());
> 		 .....
> 
> 					pstmt.addBatch();
> 
> 					count++;
> 
> 					if (count == batchsize) {
> 						logger.info <http://logger.info/>("BulkCount : " + count);
> 						pstmt.executeBatch();
> 						connect.commit();
> 						count = 0;
> 					}
> 
> 					pstmt.executeBatch();
> 					connect.commit();
> 
> 				}
> 
> 				pstmt.executeBatch();
> 				connect.commit();
> 
> 			} catch (Exception e) {
> 				logger.error("InsertFunction error: " + e.getMessage());
> 			} finally {
> 
> 				if (pstmt != null) {
> 					pstmt.close();
> 				}
> 
> 				try {
> 	
> 					connect.close();
> 				} catch (SQLException e) {
> 					logger.error("InsertFunction Connection Close error: "
> 							+ e.getMessage());
> 				}
> 			}
> 		}
> 
> 	}
> }


Mime
View raw message