spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Ganelin, Ilya" <>
Subject Re: ALS failure with size > Integer.MAX_VALUE
Date Sun, 30 Nov 2014 04:36:38 GMT
Hi Bharath – I’m unsure if this is your problem but the MatrixFactorizationModel in MLLIB
which is the underlying component for ALS expects your User/Product fields to be integers.
Specifically, the input to ALS is an RDD[Rating] and Rating is an (Int, Int, Double). I am
wondering if perhaps one of your identifiers exceeds MAX_INT, could you write a quick check
for that?

I have been running a very similar use case to yours (with more constrained hardware resources)
and I haven’t seen this exact problem but I’m sure we’ve seen similar issues. Please
let me know if you have other questions.

From: Bharath Ravi Kumar <<>>
Date: Thursday, November 27, 2014 at 1:30 PM
To: "<>" <<>>
Subject: ALS failure with size > Integer.MAX_VALUE

We're training a recommender with ALS in mllib 1.1 against a dataset of 150M users and 4.5K
items, with the total number of training records being 1.2 Billion (~30GB data). The input
data is spread across 1200 partitions on HDFS. For the training, rank=10, and we've configured
{number of user data blocks = number of item data blocks}. The number of user/item blocks
was varied  between 50 to 1200. Irrespective of the block size (e.g. at 1200 blocks each),
there are atleast a couple of tasks that end up shuffle reading > 9.7G each in the aggregate
stage (ALS.scala:337) and failing with the following exception:

java.lang.IllegalArgumentException: Size exceeds Integer.MAX_VALUE

As for the data, on the user side, the degree of a node in the connectivity graph is relatively
small. However, on the item side, 3.8K out of the 4.5K items are connected to 10^5 users each
on an average, with 100 items being connected to nearly 10^8 users. The rest of the items
are connected to less than 10^5 users. With such a skew in the connectivity graph, I'm unsure
if additional memory or variation in the block sizes would help (considering my limited understanding
of the implementation in mllib). Any suggestion to address the problem?

The test is being run on a standalone cluster of 3 hosts, each with 100G RAM & 24 cores
dedicated to the application. The additional configs I made specific to the shuffle and task
failure reduction are as follows:


The job execution summary is as follows:

Active Stages:

Stage id 2,  aggregate at ALS.scala:337, duration 55 min, Tasks 1197/1200 (3 failed), Shuffle
Read :  141.6 GB

Completed Stages (5)
Stage Id    Description                                        Duration    Tasks: Succeeded/Total
   Input    Shuffle Read    Shuffle Write
6            org.apache.spark.rdd.RDD.flatMap(RDD.scala:277) 12 min         1200/1200    
           29.9 GB    1668.4 MB        186.8 GB

5    mapPartitionsWithIndex at ALS.scala:250 +details

3    map at ALS.scala:231

0    aggregate at ALS.scala:337 +details

1    map at ALS.scala:228 +details


The information contained in this e-mail is confidential and/or proprietary to Capital One
and/or its affiliates. The information transmitted herewith is intended only for use by the
individual or entity to which it is addressed.  If the reader of this message is not the intended
recipient, you are hereby notified that any review, retransmission, dissemination, distribution,
copying or other use of, or taking of any action in reliance upon this information is strictly
prohibited. If you have received this communication in error, please contact the sender and
delete the material from your computer.

View raw message