hbase-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Andrey S <oct...@gmail.com>
Subject Re: Unit Test: HBase Map/Reduce
Date Mon, 19 Apr 2010 18:39:19 GMT
2010/4/19 Renaud Delbru <renaud.delbru@deri.org>

> Hi,
>
> I am trying to create a unit test using the HBaseClusterTestCase and the
> RowCounter example.
> I am able to spin up a hbase table, load data inside, access the data
> (lookup and scan), but whenever I am trying to launch a map/reduce job
> (TableMapper), the map/reduce functions are never executed because the
> number of splits returned byt the TableInputFormat is empty. After some
> debugging, I noticed that the line (in TableInputFormatBase)
> final byte [][] startKeys = table.getStartKeys();
> is returning an empty array.
>
> In fact, even if I am able to access table data using HTable#get, the
> HTable#getStartKeys is returning nothing. Any ideas on this issue ? Also, do
> someone have some advices/examples on how to write and run unit tests
> involving hbase ?
>
> Thanks,
> Regards
> --
> Renaud Delbru
>

I use the following "recompilation" of the standard test helper classes, and
they now works fine. Here was many magic :) with parameters and finally I
able to use them with pig (0.7.0) to run tests in mapreduce mode under
maven.
importTable() method can import tables produced by Export.class from hbase
distribution.

Hopes this helps.

import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.AfterClass;
import org.junit.BeforeClass;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URISyntaxException;

/**
 * @author octo
 */
public abstract class BaseHadoopTestCase {

    private static final Log LOG =
LogFactory.getLog(BaseHadoopTestCase.class);

    protected static FileSystem localFs;
    protected static Configuration localConf;

    @BeforeClass
    public static void beforeClass() throws IOException, URISyntaxException
{
        //System.setProperty("java.io.tmpdir", new
File("target/tmp").getAbsolutePath());
        System.setProperty("hadoop.tmp.dir", new
File("target/hadoop-test").getAbsolutePath());
        System.setProperty("test.build.data", new
File("target/hadoop-test").getAbsolutePath());

        localConf = new Configuration();
        localConf.set("fs.default.name", "file://" + new
File("target/hadoop-test/dfs").getAbsolutePath());
        localFs = FileSystem.getLocal(localConf);
        LOG.info(String.format("Filesystem at %s",
localFs.getWorkingDirectory()));
        localConf.set("hadoop.log.dir", new
Path("target/hadoop-test/logs").makeQualified(localFs).toString());
        localConf.set("mapred.system.dir", new
Path("target/hadoop-test/mapred/sys").makeQualified(localFs).toString());
        localConf.set("mapred.local.dir", new
File("target/hadoop-test/mapred/local").getAbsolutePath());
        localConf.set("mapred.temp.dir", new
File("target/hadoop-test/mapred/tmp").getAbsolutePath());
        System.setProperty("hadoop.log.dir",
localConf.get("hadoop.log.dir"));
    }

    @AfterClass
    public static void afterClass() throws IOException {
        if (localFs != null)
            localFs.close();
    }

    protected Path localResourceToPath(String path, String target) throws
IOException {
        try {
            final InputStream resource =
this.getClass().getResourceAsStream(path);
            if (resource == null)
                throw new IllegalArgumentException(path + " not found");
            final Path targetPath =
                    new Path(localFs.getWorkingDirectory(),
"target/hadoop-test/imported/" + target)
                            .makeQualified(localFs);
            LOG.info(String.format("local resource %s->%s", path,
targetPath));
            localFs.delete(targetPath, true);
            OutputStream out = null;
            try {
                out = localFs.create(targetPath, true);
                IOUtils.copyBytes(resource, out, localConf, true);
            } catch (IOException e) {
                IOUtils.closeStream(out);
                IOUtils.closeStream(resource);
                throw e;
            }
            return targetPath;
        } catch (Exception e) {
            throw new IOException(e);
        }
    }


    protected Path localFileToPath(String path) {
        return localFs.makeQualified(new Path(path));
    }
}


import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.MiniHBaseCluster;
import org.apache.hadoop.hbase.MiniZooKeeperCluster;
import org.apache.hadoop.hbase.client.HConnectionManager;
import org.apache.hadoop.hbase.client.HTable;
import org.apache.hadoop.hbase.mapreduce.Import;
import org.apache.hadoop.hbase.util.FSUtils;
import org.apache.hadoop.mapreduce.Job;
import org.junit.AfterClass;
import org.junit.BeforeClass;

import java.io.File;
import java.io.IOException;

/**
 * @author astepachev
 */
public abstract class BaseHBaseTestCase extends BaseHadoopTestCase {

    private static final Log LOG =
LogFactory.getLog(BaseHBaseTestCase.class);
    public static MiniHBaseCluster cluster;
    protected static MiniZooKeeperCluster zooKeeperCluster;
    protected static int regionServers = 1;
    protected static volatile HBaseConfiguration hconf;
    private static boolean openMetaTable = true;

    protected BaseHBaseTestCase() {
    }

    @BeforeClass
    public static void startUp() throws Exception {
        hconf = new HBaseConfiguration(localConf);
        hconf.set("hbase.master.info.port", "-1");
        hconf.set("hbase.regionserver.info.port", "-1");
        hconf.set("hbase.regionserver.info.port.auto", "true");
        hconf.set("hbase.regionserver.safemode", "false");

        Path parentdir = getHBaseRootDir(localFs);
        LOG.info(String.format("Initialising HBase at %s",
parentdir.toUri().toString()));
        localFs.delete(parentdir, true);
        hconf.set(HConstants.HBASE_DIR, parentdir.toString());
        localFs.mkdirs(parentdir);
        FSUtils.setVersion(localFs, parentdir);

        try {
            zooKeeperCluster = new MiniZooKeeperCluster();
            int clientPort = zooKeeperCluster.startup(new File(new
Path("target/hadoop-test/", "zookeepr").makeQualified(localFs).toUri()));
            hconf.set("hbase.zookeeper.property.clientPort",
Integer.toString(clientPort));

            // start the mini cluster
            cluster = new MiniHBaseCluster(hconf, regionServers);

            if (openMetaTable) {
                // opening the META table ensures that cluster is running
                new HTable(hconf, HConstants.META_TABLE_NAME);
            }
        } catch (Exception e) {
            LOG.error("Exception in setup!", e);
            if (cluster != null) {
                cluster.shutdown();
            }
            if (zooKeeperCluster != null) {
                zooKeeperCluster.shutdown();
            }
            throw e;
        }
    }

    private static Path getHBaseRootDir(FileSystem filesystem) {
        return new
Path("target/hadoop-test/hbase-test").makeQualified(filesystem);
    }

    @AfterClass
    public static void tearDown() throws IOException {
        if (!openMetaTable) {
            // open the META table now to ensure cluster is running
beforeClass shutdown.
            new HTable(hconf, HConstants.META_TABLE_NAME);
        }
        try {
            HConnectionManager.deleteConnectionInfo(hconf, true);
            if (cluster != null) {
                try {
                    cluster.shutdown();
                } catch (Exception e) {
                    LOG.warn("Closing mini dfs", e);
                }
                try {
                    zooKeeperCluster.shutdown();
                } catch (IOException e) {
                    LOG.warn("Shutting down ZooKeeper cluster", e);
                }
            }
        } catch (Exception e) {
            LOG.error(e);
        }
    }

    protected void importTable(String name, String path) throws IOException
{

        final Path imported = localResourceToPath(path, "import/" + path);
        final Job job1 = Import.createSubmittableJob(hconf, new
String[]{name, imported.toString()});
        try {
            job1.submit();
            job1.waitForCompletion(true);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        } catch (ClassNotFoundException e) {
            throw new RuntimeException(e);
        }

    }


}

Mime
  • Unnamed multipart/alternative (inline, None, 0 bytes)
View raw message