I've been trying to achieve the same objective, coming up with approaches similar to your method 1 and 2. Method 2 is the slowest for me due to massive amount of data being shuffled around at each matrix operation stage. Method 3 is new to me, so I can't comment much.

I ended up using an approach that is similar to your method 1, which gives reasonable performance in my use case.

#4 Normalizer then UDF (PySpark code)
normaliser = Normalizer(inputCol="vec", outputCol="norm_vec")
df_word_norm = normaliser.transform(df_word)

dot_udf = F.udf(lambda x,y: float(x.dot(y)), DoubleType())
df_score = df_word_norm.withColumn("score", dot_udf(df_word_norm.norm_vec1, df_word_norm.norm_vec2))
# norm_vec1 and norm_vec2 come from a Cartesian join. Steps to produce them are not shown for brevity.

Would be curious to learn how other people solve this problem.

Best wishes,
Chee Yee

On Tue, 24 Sep 2019 at 04:20, Stevens, Clay <Clay.Stevens@wolterskluwer.com> wrote:

There are several ways I can compute the cosine similarities between a Spark ML vector to each ML vector in a Spark DataFrame column then sorting for the highest results.  However, I can't come up with a method that is faster than replacing the `/data/` in a Spark ML Word2Vec model, then using `.findSynonyms()`.  The problem is the Word2Vec model is held entirely in the driver which can cause memory issues if the data set I want to compare to gets too big.

1. Is there a more efficient method than the ones I have shown below?
2. Could the data for the Word2Vec model be distributed across the cluster?
3. Could the the `.findSynonyms()` [Scala code](
https://github.com/apache/spark/blob/master/mllib/src/main/scala/org/apache/spark/mllib/feature/Word2Vec.scala#L571toL619) be modified to make a spark sql function that can operate efficiently over a whole Spark DataFrame?

Methods I have tried:

#1 rdd function:

    # vecIn = vector of same dimensions as 'vectors' column
    def cosSim(row, vecIn):
        return (
            tuple(( Vectors.dense( Vectors.dense(row.vectors.dot(vecIn)) /
                        (Vectors.dense(np.sqrt(row.vectors.dot(row.vectors))) *
    df.rdd.map(lambda row: cosSim(row, vecIn)).toDF(['CosSim']).show(truncate=False)


#2  `.toIndexedRowMatrix().columnSimilarities()` then filter the results (not shown):

        IndexedRowMatrix(df.rdd.map(lambda row: (row.vectors.toArray())))


#3 replace Word2Vec model `/data/` with my own, then load 'revised' model and use `.findSynonyms()`:

    ## StructType(List(StructField(word,StringType,true),StructField(vector,ArrayType(FloatType,true),true)))
    df_words_vectors.write.parquet("exiting_Word2Vec_model/data/", mode='overwrite')
    new_Word2Vec_model = Word2VecModel.load("exiting_Word2Vec_model")
    ## vecIn = vector of same dimensions as 'vector' column in DataFrame saved over Word2Vec model /data/
    new_Word2Vec_model.findSynonyms(vecIn, 20).show()




Clay Stevens