hbase-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ch huang <justlo...@gmail.com>
Subject problem in testing coprocessor endpoint
Date Fri, 12 Jul 2013 03:26:35 GMT
i am testing coprocessor endpoint function, here is my testing process ,and
error i get ,hope any expert on coprocessor can help me out


# vi ColumnAggregationProtocol.java

import java.io.IOException;
import org.apache.hadoop.hbase.ipc.CoprocessorProtocol;
// A sample protocol for performing aggregation at regions.
public interface ColumnAggregationProtocol
extends CoprocessorProtocol {
// Perform aggregation for a given column at the region. The aggregation
// will include all the rows inside the region. It can be extended to
// allow passing start and end rows for a fine-grained aggregation.
   public long sum(byte[] family, byte[] qualifier) throws IOException;
}


# vi ColumnAggregationEndpoint.java


import java.io.FileWriter;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.CoprocessorEnvironment;
import org.apache.hadoop.hbase.KeyValue;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.coprocessor.BaseEndpointCoprocessor;
import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
import org.apache.hadoop.hbase.ipc.ProtocolSignature;
import org.apache.hadoop.hbase.regionserver.HRegion;
import org.apache.hadoop.hbase.regionserver.InternalScanner;
import org.apache.hadoop.hbase.util.Bytes;

//Aggregation implementation at a region.

public class ColumnAggregationEndpoint extends BaseEndpointCoprocessor
  implements ColumnAggregationProtocol {
     @Override
     public long sum(byte[] family, byte[] qualifier)
     throws IOException {
       // aggregate at each region
         Scan scan = new Scan();
         scan.addColumn(family, qualifier);
         long sumResult = 0;

         CoprocessorEnvironment ce = getEnvironment();
         HRegion hr = ((RegionCoprocessorEnvironment)ce).getRegion();
         InternalScanner scanner = hr.getScanner(scan);

         try {
           List<KeyValue> curVals = new ArrayList<KeyValue>();
           boolean hasMore = false;
           do {
         curVals.clear();
         hasMore = scanner.next(curVals);
         KeyValue kv = curVals.get(0);
         sumResult += Long.parseLong(Bytes.toString(kv.getValue()));

           } while (hasMore);
         } finally {
             scanner.close();
         }
         return sumResult;
      }

      @Override
      public long getProtocolVersion(String protocol, long clientVersion)
             throws IOException {
         // TODO Auto-generated method stub
         return 0;
      }

      @Override

      public ProtocolSignature getProtocolSignature(String protocol,
             long clientVersion, int clientMethodsHash) throws IOException
{
          // TODO Auto-generated method stub
          return null;
      }
}

i compile and pack the two into test.jar,and put it into my HDFS filesystem

and load it into my test table

hbase(main):006:0> alter 'mytest', METHOD =>
'table_att','coprocessor'=>'hdfs:///
192.168.10.22:9000/alex/test.jar|ColumnAggregationEndpoint|1001'

here is my testing java code

package com.testme.demo;
import java.io.IOException;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.client.*;
import org.apache.hadoop.hbase.coprocessor.ColumnAggregationProtocol;
import org.apache.hadoop.hbase.util.*;
import org.apache.hadoop.hbase.client.coprocessor.Batch.Call;;

public class TestCop {
   private static Configuration conf =null;
   private static String TEST_TABLE = "mytest";
   private static String TEST_FAMILY = "myfl";
   private static String TEST_QUALIFIER = "myqf";
 /**
  * @param args
  */
   static {
          conf = HBaseConfiguration.create();
          conf.addResource( "hbase-site.xml");
   }

 public static void main(String[] args) throws IOException,Throwable{
  // TODO Auto-generated method stub
  conf = HBaseConfiguration.create();

   HTable table = new HTable(conf,TEST_TABLE);
 //  HTableDescriptor htd = table.getTableDescriptor();

   Scan scan = new Scan();
   Map<byte[], Long> results;

   results = table.coprocessorExec(ColumnAggregationProtocol.class,
"1".getBytes(),"5".getBytes(), new Call<ColumnAggregationProtocol,Long>(){
     public Long call(ColumnAggregationProtocol instance)throws IOException{
       return (Long) instance.sum(TEST_FAMILY.getBytes(),
TEST_QUALIFIER.getBytes());
    }});

   long sumResult = 0;
   long expectedResult = 0;
   for (Map.Entry<byte[], Long> e:results.entrySet()){
    sumResult += e.getValue();
   }
   System.out.println(sumResult);
 }
}
when i run it i get error
Exception in thread "main"
org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolException:
org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolException: No matching
handler for protocol
org.apache.hadoop.hbase.coprocessor.ColumnAggregationProtocol in region
mytest,,1373597714844.e11ad2263faf89b5865ae98f524e3fb9.
 at org.apache.hadoop.hbase.regionserver.HRegion.exec(HRegion.java:5463)
 at
org.apache.hadoop.hbase.regionserver.HRegionServer.execCoprocessor(HRegionServer.java:3720)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
 at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 at java.lang.reflect.Method.invoke(Method.java:597)
 at
org.apache.hadoop.hbase.ipc.WritableRpcEngine$Server.call(WritableRpcEngine.java:320)
 at
org.apache.hadoop.hbase.ipc.HBaseServer$Handler.run(HBaseServer.java:1426)
 at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
 at sun.reflect.NativeConstructorAccessorImpl.newInstance(Unknown Source)
 at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(Unknown
Source)
 at java.lang.reflect.Constructor.newInstance(Unknown Source)
 at
org.apache.hadoop.ipc.RemoteException.instantiateException(RemoteException.java:90)
 at
org.apache.hadoop.ipc.RemoteException.unwrapRemoteException(RemoteException.java:79)
 at
org.apache.hadoop.hbase.client.ServerCallable.translateException(ServerCallable.java:228)
 at
org.apache.hadoop.hbase.client.ServerCallable.withRetries(ServerCallable.java:166)
 at
org.apache.hadoop.hbase.ipc.ExecRPCInvoker.invoke(ExecRPCInvoker.java:79)
 at com.sun.proxy.$Proxy8.sum(Unknown Source)
 at com.testme.demo.TestCop$1.call(TestCop.java:41)
 at com.testme.demo.TestCop$1.call(TestCop.java:1)
 at
org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation$4.call(HConnectionManager.java:1466)
 at java.util.concurrent.FutureTask$Sync.innerRun(Unknown Source)
 at java.util.concurrent.FutureTask.run(Unknown Source)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
 at java.lang.Thread.run(Unknown Source)
Caused by:
org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolException):
org.apache.hadoop.hbase.ipc.HBaseRPC$UnknownProtocolException: No matching
handler for protocol
org.apache.hadoop.hbase.coprocessor.ColumnAggregationProtocol in region
mytest,,1373597714844.e11ad2263faf89b5865ae98f524e3fb9.
 at org.apache.hadoop.hbase.regionserver.HRegion.exec(HRegion.java:5463)
 at
org.apache.hadoop.hbase.regionserver.HRegionServer.execCoprocessor(HRegionServer.java:3720)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
 at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
 at java.lang.reflect.Method.invoke(Method.java:597)
 at
org.apache.hadoop.hbase.ipc.WritableRpcEngine$Server.call(WritableRpcEngine.java:320)
 at
org.apache.hadoop.hbase.ipc.HBaseServer$Handler.run(HBaseServer.java:1426)
 at org.apache.hadoop.hbase.ipc.HBaseClient.call(HBaseClient.java:995)
 at
org.apache.hadoop.hbase.ipc.WritableRpcEngine$Invoker.invoke(WritableRpcEngine.java:86)
 at com.sun.proxy.$Proxy7.execCoprocessor(Unknown Source)
 at
org.apache.hadoop.hbase.ipc.ExecRPCInvoker$1.call(ExecRPCInvoker.java:75)
 at
org.apache.hadoop.hbase.ipc.ExecRPCInvoker$1.call(ExecRPCInvoker.java:73)
 at
org.apache.hadoop.hbase.client.ServerCallable.withRetries(ServerCallable.java:163)
 ... 10 more

hbase(main):020:0> describe
'mytest'
DESCRIPTION                                          ENABLED
 {NAME => 'mytest', coprocessor$1 => 'hdfs:///192.16 true
 8.10.22:9000/alex/test.jar|ColumnAggregationEndpoin
 t|1001', FAMILIES => [{NAME => 'myfl'
 , DATA_BLOCK_ENCODING => 'NONE', BLOOMFILTER => 'NO
 NE', REPLICATION_SCOPE => '0', VERSIONS => '3', COM
 PRESSION => 'NONE', MIN_VERSIONS => '0', TTL => '21
 47483647', KEEP_DELETED_CELLS => 'false', BLOCKSIZE
  => '65536', IN_MEMORY => 'false', ENCODE_ON_DISK =
 > 'true', BLOCKCACHE => 'true'}]}
1 row(s) in 0.0920 seconds

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message