Hi,

I think the combination of Mongodb and Spark is a little bit unlucky.

Why don't you simply use mongodb?

If you want to process a lot of data you should use hdfs or cassandra as storage. Mongodb is not suitable for heterogeneous processing of  large scale data.

Best regards

Best regards,


Le mar. 4 août 2015 à 11:19, Deepesh Maheshwari <deepesh.maheshwari17@gmail.com> a écrit :
Hi,
I am new to Apache Spark and exploring spark+kafka intergration to process data using spark which i did earlier in MongoDB Aggregation.

I am not able to figure out to handle my use case.

Mongo Document :
{
    "_id" : ObjectId("55bfb3285e90ecbfe37b25c3"),
    "url" : "http://www.zzzzz.com/new_car_search.php?bycity=Mumbai&sortfield=price&sortdirection=desc",
    "ip" : "27.5.107.65",
    "pgDownloadTime" : NumberLong(2526),
    "agentType" : "Mozilla/5.0 (Linux; Android 5.1.1; Nexus 5 Build/LMY48B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/43.0.2357.93 Mobile Safari/537.36",
    "referer" : "zzzzz.com",
    "domain" : "zzzzz.com",
    "channel" : "zzzzz",
    "host" : "zzzzz.com",
    "pgAccessTime" : NumberLong("1438626600021"),
    "pgAccessMin" : NumberLong(1438626600),
    "pgAccessHour" : NumberLong(1438626600),
    "p5Min" : NumberLong(1438626600),
    "contentType" : 1,
    "articleId" : "4b1ad5357a6987bbc611ff92dcf9cb50",
    "location" : 1,
    "action" : 1,
    "cat" : "Home",
    "subcat" : [
        ""
    ],
    "tags" : [
        ""
    ],
    "catIds" : [
        "Home"
    ],
    "catIdHash" : NumberLong("7115745069349994427"),
    "isIndia" : 1,
    "geoLocation" : "Mumbai",
    "publishTime" : NumberLong(0),
    "author" : "",
    "pagePosition" : "",
    "group" : 0,
    "ssoId" : null,
    "isAPP" : 0,
    "sessionId" : "17a95722-5a48-459f-afd8-78f7edb84897"
}

I am putting data in kafka in above json format.
Now, when i am reading it in  spark and i need group above document using two keys and get the total count of that key and put it in mongo along with the whole document details.

Mongo Aggregation Job :

{
    "$match": {
        "pgAccessMin": {
            "$gte": 1438679100,
            "$lt": 1438679400
        }
    }
},
{
    "$project": {
        "articleId": 1,
        "host": 1,
        "isAPP": 1,
        "cat": 1,
        "subcat": 1,
        "publishTime": 1,
        "channel": 1,
        "author": 1,
        "tags": 1,
        "url": 1,
        "catIds": 1,
        "catIdHash": 1,
        "count": 1,
        "contentType": 1,
        "_id": 0
    }
},
{
    "$group": {
        "_id": {
            "host": "$host",
            "isAPP": "$isAPP",
            "articleId": "$articleId"

        },
        "count": {
            "$sum": 1
        },
        "url": {
            "$first": "$url"
        },
        "subcat": {
            "$first": "$subcat"
        },
        "cat": {
            "$first": "$cat"
        },
        "publishTime": {
            "$first": "$publishTime"
        },
        "channel": {
            "$first": "$channel"
        },
        "author": {
            "$first": "$author"
        },
        "tags": {
            "$first": "$tags"
        },
        "catIdHash": {
            "$first": "$catIdHash"
        },
        "catIds": {
            "$first": "$catIds"
        },
        "contentType": {
            "$first": "$contentType"
        }
    }
}

Please suggest how to write this equivalent job in  spark so that i can get the view count along with other fields and save it in mongo.

Regards,
Deepesh