spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qiaou <qiaou8...@gmail.com>
Subject 回复: How did the RDD.union work
Date Wed, 12 Nov 2014 06:53:32 GMT
ok here is the code

def hbaseQuery:(String)=>RDD[Result] = {
      val generateRdd = (area:String)=>{
        val startRowKey = s"$area${RowKeyUtils.convertToHex(startId, 10)}AAAAAAAAAAAAAAAA"
        val stopRowKey = s"$area${RowKeyUtils.convertToHex(endId, 10)}GGGGGGGGGGGGGGGG"
        println(s"startRowKey:${startRowKey}")
        println(s"stopRowKey :${stopRowKey}")

        val scan = new Scan()
        scan.setStartRow(Bytes.toBytes(startRowKey))
        scan.setStopRow(Bytes.toBytes(stopRowKey))
        val filterList: FilterList = new FilterList()
        if (appKey != null && !appKey.equals("_")) {
          val appKeyFilter: SingleColumnValueFilter =
            new SingleColumnValueFilter(Bytes.toBytes("clientInfo"), Bytes.toBytes("optKey"),
CompareOp.EQUAL, Bytes.toBytes(appKey))
          filterList.addFilter(appKeyFilter)
        }
        if (imei != null && !imei.equals("_")) {
          val imeiFilter: SingleColumnValueFilter =
            new SingleColumnValueFilter(Bytes.toBytes("clientInfo"), Bytes.toBytes("optImei"),
CompareOp.EQUAL, Bytes.toBytes(imei))
          filterList.addFilter(imeiFilter)
        }
        if (filterList.getFilters != null && filterList.getFilters.size() > 0)
{
          scan.setFilter(filterList)
        }
        scan.setCaching(10000)

        val hbaseConf = HBaseConfigUtil.getHBaseConfiguration
        hbaseConf.set(TableInputFormat.INPUT_TABLE, "asrLogFeedBack")
        hbaseConf.set(TableInputFormat.SCAN, Base64.encodeBytes(ProtobufUtil.toScan(scan).toByteArray))

        SparkUtil.getSingleSparkContext()
          .newAPIHadoopRDD(hbaseConf, classOf[TableInputFormat],
            classOf[ImmutableBytesWritable], classOf[Result]).map {
          case (_: ImmutableBytesWritable, result: Result) => {
            result
          }
        }
      }
      return generateRdd
    }


--  
qiaou
已使用 Sparrow (http://www.sparrowmailapp.com/?sig)


在 2014年11月12日 星期三,下午2:50,Shixiong Zhu 写道:

> Could you provide the code of hbaseQuery? It maybe doesn't support to execute in parallel.
>  
> Best Regards,
> Shixiong Zhu
>  
>  
>  
>  
> 2014-11-12 14:32 GMT+08:00 qiaou <qiaou8727@gmail.com (mailto:qiaou8727@gmail.com)>:
> > Hi:  
> >     I got a problem with using the union method of RDD
> >     things like this
> > I get a function like
> > def hbaseQuery(area:string):RDD[Result]= ???
> >         when i use hbaseQuery('aa').union(hbaseQuery(‘bb’)).count() it returns
0
> > however when use like this  sc.parallize(hbaseQuery('aa’).collect.toList ::: hbaseQuery(’bb’).collect.toList).count()
it return the right value  
> > obviously i have got an action after my transformation action ,but why it did not
work
> > fyi
> >  
> > --  
> > qiaou
> > 已使用 Sparrow (http://www.sparrowmailapp.com/?sig)
> >  
>  


Mime
View raw message