Are you sharing this.rankTable between threads? HTable is not thread safe.
-- Lars
________________________________
From: Bing Li <lblabs@gmail.com>
To: "hbase-user@hadoop.apache.org" <hbase-user@hadoop.apache.org>; user <user@hbase.apache.org>
Sent: Tuesday, February 5, 2013 8:54 AM
Subject: Re: Is "synchronized" required?
Dear all,
After "synchronized" is removed from the method of writing, I get the
following exceptions when reading. Before the removal, no such
exceptions.
Could you help me how to solve it?
Thanks so much!
Best wishes,
Bing
[java] Feb 6, 2013 12:21:31 AM
org.apache.hadoop.hbase.ipc.HBaseClient$Connection run
[java] WARNING: Unexpected exception receiving call responses
[java] java.lang.NullPointerException
[java] at
org.apache.hadoop.hbase.io.HbaseObjectWritable.readObject(HbaseObjectWritable.java:521)
[java] at
org.apache.hadoop.hbase.io.HbaseObjectWritable.readFields(HbaseObjectWritable.java:297)
[java] at
org.apache.hadoop.hbase.ipc.HBaseClient$Connection.receiveResponse(HBaseClient.java:593)
[java] at
org.apache.hadoop.hbase.ipc.HBaseClient$Connection.run(HBaseClient.java:505)
[java] Feb 6, 2013 12:21:31 AM
org.apache.hadoop.hbase.client.ScannerCallable close
[java] WARNING: Ignore, probably already closed
[java] java.io.IOException: Call to greatfreeweb/127.0.1.1:60020
failed on local exception: java.io.IOException: Unexpected exception
receiving call responses
[java] at
org.apache.hadoop.hbase.ipc.HBaseClient.wrapException(HBaseClient.java:934)
[java] at
org.apache.hadoop.hbase.ipc.HBaseClient.call(HBaseClient.java:903)
[java] at
org.apache.hadoop.hbase.ipc.WritableRpcEngine$Invoker.invoke(WritableRpcEngine.java:150)
[java] at $Proxy6.close(Unknown Source)
[java] at
org.apache.hadoop.hbase.client.ScannerCallable.close(ScannerCallable.java:112)
[java] at
org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:74)
[java] at
org.apache.hadoop.hbase.client.ScannerCallable.call(ScannerCallable.java:39)
[java] at
org.apache.hadoop.hbase.client.HConnectionManager$HConnectionImplementation.getRegionServerWithRetries(HConnectionManager.java:1325)
[java] at
org.apache.hadoop.hbase.client.HTable$ClientScanner.nextScanner(HTable.java:1167)
[java] at
org.apache.hadoop.hbase.client.HTable$ClientScanner.next(HTable.java:1296)
[java] at
org.apache.hadoop.hbase.client.HTable$ClientScanner$1.hasNext(HTable.java:1356)
[java] at
com.greatfree.hbase.rank.NodeRankRetriever.LoadNodeGroupNodeRankRowKeys(NodeRankRetriever.java:348)
[java] at
com.greatfree.ranking.PersistNodeGroupNodeRanksThread.run(PersistNodeGroupNodeRanksThread.java:29)
[java] at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
[java] at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
[java] at java.lang.Thread.run(Thread.java:662)
[java] Caused by: java.io.IOException: Unexpected exception
receiving call responses
[java] at
org.apache.hadoop.hbase.ipc.HBaseClient$Connection.run(HBaseClient.java:509)
[java] Caused by: java.lang.NullPointerException
[java] at
org.apache.hadoop.hbase.io.HbaseObjectWritable.readObject(HbaseObjectWritable.java:521)
[java] at
org.apache.hadoop.hbase.io.HbaseObjectWritable.readFields(HbaseObjectWritable.java:297)
[java] at
org.apache.hadoop.hbase.ipc.HBaseClient$Connection.receiveResponse(HBaseClient.java:593)
[java] at
org.apache.hadoop.hbase.ipc.HBaseClient$Connection.run(HBaseClient.java:505)
The code that causes the exceptions is as follows.
public Set<String> LoadNodeGroupNodeRankRowKeys(String
hostNodeKey, String groupKey, int timingScale)
{
List<Filter> nodeGroupFilterList = new ArrayList<Filter>();
SingleColumnValueFilter hostNodeKeyFilter = new
SingleColumnValueFilter(RankStructure.NODE_GROUP_NODE_RANK_FAMILY,
RankStructure.NODE_GROUP_NODE_RANK_HOST_NODE_KEY_COLUMN,
CompareFilter.CompareOp.EQUAL, new SubstringComparator(hostNodeKey));
hostNodeKeyFilter.setFilterIfMissing(true);
nodeGroupFilterList.add(hostNodeKeyFilter);
SingleColumnValueFilter groupKeyFilter = new
SingleColumnValueFilter(RankStructure.NODE_GROUP_NODE_RANK_FAMILY,
RankStructure.NODE_GROUP_NODE_RANK_GROUP_KEY_COLUMN,
CompareFilter.CompareOp.EQUAL, new SubstringComparator(groupKey));
groupKeyFilter.setFilterIfMissing(true);
nodeGroupFilterList.add(groupKeyFilter);
SingleColumnValueFilter timingScaleFilter = new
SingleColumnValueFilter(RankStructure.NODE_GROUP_NODE_RANK_FAMILY,
RankStructure.NODE_GROUP_NODE_RANK_TIMING_SCALE_COLUMN,
CompareFilter.CompareOp.EQUAL, new
BinaryComparator(Bytes.toBytes(timingScale)));
timingScaleFilter.setFilterIfMissing(true);
nodeGroupFilterList.add(timingScaleFilter);
FilterList nodeGroupFilter = new
FilterList(nodeGroupFilterList);
Scan scan = new Scan();
scan.setFilter(nodeGroupFilter);
scan.setCaching(Parameters.CACHING_SIZE);
scan.setBatch(Parameters.BATCHING_SIZE);
Set<String> rowKeySet = Sets.newHashSet();
try
{
ResultScanner scanner = this.rankTable.getScanner(scan);
for (Result result : scanner) //
<---- EXCEPTIONS are raised at this line.
{
for (KeyValue kv : result.raw())
{
rowKeySet.add(Bytes.toString(kv.getRow()));
break;
}
}
scanner.close();
}
catch (IOException e)
{
e.printStackTrace();
}
return rowKeySet;
}
On Tue, Feb 5, 2013 at 4:20 AM, Bing Li <lblabs@gmail.com> wrote:
> Dear all,
>
> When writing data into HBase, sometimes I got exceptions. I guess they
> might be caused by concurrent writings. But I am not sure.
>
> My question is whether it is necessary to put "synchronized" before
> the writing methods? The following lines are the sample code.
>
> I think the directive, synchronized, must lower the performance of
> writing. Sometimes concurrent writing is needed in my system.
>
> Thanks so much!
>
> Best wishes,
> Bing
>
> public synchronized void AddDomainNodeRanks(String domainKey, int
> timingScale, Map<String, Double> nodeRankMap)
> {
> List<Put> puts = new ArrayList<Put>();
> Put domainKeyPut;
> Put timingScalePut;
> Put nodeKeyPut;
> Put rankPut;
>
> byte[] domainNodeRankRowKey;
>
> for (Map.Entry<String, Double> nodeRankEntry : nodeRankMap.entrySet())
> {
> domainNodeRankRowKey =
> Bytes.toBytes(RankStructure.DOMAIN_NODE_RANK_ROW +
> Tools.GetAHash(domainKey + timingScale + nodeRankEntry.getKey()));
>
> domainKeyPut = new Put(domainNodeRankRowKey);
> domainKeyPut.add(RankStructure.DOMAIN_NODE_RANK_FAMILY,
> RankStructure.DOMAIN_NODE_RANK_DOMAIN_KEY_COLUMN,
> Bytes.toBytes(domainKey));
> puts.add(domainKeyPut);
>
> timingScalePut = new Put(domainNodeRankRowKey);
> timingScalePut.add(RankStructure.DOMAIN_NODE_RANK_FAMILY,
> RankStructure.DOMAIN_NODE_RANK_TIMING_SCALE_COLUMN,
> Bytes.toBytes(timingScale));
> puts.add(timingScalePut);
>
> nodeKeyPut = new Put(domainNodeRankRowKey);
> nodeKeyPut.add(RankStructure.DOMAIN_NODE_RANK_FAMILY,
> RankStructure.DOMAIN_NODE_RANK_NODE_KEY_COLUMN,
> Bytes.toBytes(nodeRankEntry.getKey()));
> puts.add(nodeKeyPut);
>
> rankPut = new Put(domainNodeRankRowKey);
> rankPut.add(RankStructure.DOMAIN_NODE_RANK_FAMILY,
> RankStructure.DOMAIN_NODE_RANK_RANKS_COLUMN,
> Bytes.toBytes(nodeRankEntry.getValue()));
> puts.add(rankPut);
> }
>
> try
> {
> this.rankTable.put(puts);
> }
> catch (IOException e)
> {
> e.printStackTrace();
> }
> }
|