Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestIFile.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestIFile.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestIFile.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestIFile.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <algorithm>
+#include "commons.h"
+#include "BufferStream.h"
+#include "FileSystem.h"
+#include "IFile.h"
+#include "test_commons.h"
+
+SingleSpillInfo * writeIFile(int partition, vector<pair<string, string> > & kvs,
+ const string & path, KeyValueType type, const string & codec) {
+ FileOutputStream * fout = (FileOutputStream*)FileSystem::getLocal().create(path);
+ IFileWriter * iw = new IFileWriter(fout, CHECKSUM_CRC32, type, type, codec, NULL);
+ for (int i = 0; i < partition; i++) {
+ iw->startPartition();
+ for (size_t i = 0; i < kvs.size(); i++) {
+ pair<string, string> & p = kvs[i];
+ iw->write(p.first.c_str(), p.first.length(), p.second.c_str(), p.second.length());
+ }
+ iw->endPartition();
+ }
+ SingleSpillInfo * info = iw->getSpillInfo();
+ delete iw;
+ delete fout;
+ return info;
+}
+
+void readIFile(vector<pair<string, string> > & kvs, const string & path, KeyValueType type,
+ SingleSpillInfo * info, const string & codec) {
+ FileInputStream * fin = (FileInputStream*)FileSystem::getLocal().open(path);
+ IFileReader * ir = new IFileReader(fin, info);
+ while (ir->nextPartition()) {
+ const char * key, *value;
+ uint32_t keyLen, valueLen;
+ while (NULL != (key = ir->nextKey(keyLen))) {
+ value = ir->value(valueLen);
+ string keyS(key, keyLen);
+ string valueS(value, valueLen);
+ kvs.push_back(std::make_pair(keyS, valueS));
+ }
+ }
+ delete ir;
+ delete fin;
+}
+
+void TestIFileReadWrite(KeyValueType kvtype, int partition, int size,
+ vector<pair<string, string> > & kvs, const string & codec = "") {
+ string outputpath = "ifilewriter";
+ SingleSpillInfo * info = writeIFile(partition, kvs, outputpath, kvtype, codec);
+ LOG("write finished");
+ vector<pair<string, string> > readkvs;
+ readIFile(readkvs, outputpath, kvtype, info, codec);
+ LOG("read finished");
+ ASSERT_EQ(kvs.size() * partition, readkvs.size());
+ for (int i = 0; i < partition; i++) {
+ vector<pair<string, string> > cur_part(readkvs.begin() + i * kvs.size(),
+ readkvs.begin() + (i + 1) * kvs.size());
+ ASSERT_EQ(kvs.size(), cur_part.size());
+// for (size_t j=0;j<kvs.size();j++) {
+// SCOPED_TRACE(j);
+// ASSERT_EQ(kvs[j], cur_part[j]);
+// }
+ ASSERT_EQ(kvs, cur_part);
+ }
+ FileSystem::getLocal().remove(outputpath);
+}
+
+TEST(IFile, WriteRead) {
+ int partition = TestConfig.getInt("ifile.partition", 7);
+ int size = TestConfig.getInt("partition.size", 20000);
+ vector<pair<string, string> > kvs;
+ Generate(kvs, size, "bytes");
+ TestIFileReadWrite(TextType, partition, size, kvs);
+ TestIFileReadWrite(BytesType, partition, size, kvs);
+ TestIFileReadWrite(UnknownType, partition, size, kvs);
+ TestIFileReadWrite(TextType, partition, size, kvs, "org.apache.hadoop.io.compress.SnappyCodec");
+}
+
+void TestIFileWriteRead2(vector<pair<string, string> > & kvs, char * buff, size_t buffsize,
+ const string & codec, ChecksumType checksumType, KeyValueType type) {
+ int partition = TestConfig.getInt("ifile.partition", 50);
+ Timer timer;
+ OutputBuffer outputBuffer = OutputBuffer(buff, buffsize);
+ IFileWriter * iw = new IFileWriter(&outputBuffer, checksumType, type, type, codec, NULL);
+ timer.reset();
+ for (int i = 0; i < partition; i++) {
+ iw->startPartition();
+ for (size_t j = 0; j < kvs.size(); j++) {
+ iw->write(kvs[j].first.c_str(), kvs[j].first.length(), kvs[j].second.c_str(),
+ kvs[j].second.length());
+ }
+ iw->endPartition();
+ }
+ SingleSpillInfo * info = iw->getSpillInfo();
+ LOG("%s",
+ timer.getSpeedM2("Write data", info->getEndPosition(), info->getRealEndPosition()).c_str());
+ delete iw;
+
+ InputBuffer inputBuffer = InputBuffer(buff, outputBuffer.tell());
+ IFileReader * ir = new IFileReader(&inputBuffer, info);
+ timer.reset();
+ while (ir->nextPartition()) {
+ const char * key, *value;
+ uint32_t keyLen, valueLen;
+ while (NULL != (key = ir->nextKey(keyLen))) {
+ value = ir->value(valueLen);
+ }
+ }
+ LOG("%s",
+ timer.getSpeedM2(" Read data", info->getEndPosition(), info->getRealEndPosition()).c_str());
+ delete ir;
+ delete info;
+}
+
+
+
+TEST(Perf, IFile) {
+ int size = TestConfig.getInt("partition.size", 20000);
+ string codec = TestConfig.get("ifile.codec", "");
+ string type = TestConfig.get("ifile.type", "bytes");
+
+ vector<pair<string, string> > kvs;
+ Generate(kvs, size, type);
+ std::sort(kvs.begin(), kvs.end());
+
+ size_t buffsize = 200 * 1024 * 1024;
+ char * buff = new char[buffsize];
+ memset(buff, 0, buffsize);
+
+ LOG("Test TextType CRC32");
+ TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32, TextType);
+ LOG("Test BytesType CRC32");
+ TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32, BytesType);
+ LOG("Test UnknownType CRC32");
+ TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32, UnknownType);
+ LOG("Test TextType CRC32C");
+ TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32C, TextType);
+ LOG("Test BytesType CRC32C");
+ TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32C, BytesType);
+ LOG("Test UnknownType CRC32C");
+ TestIFileWriteRead2(kvs, buff, buffsize, codec, CHECKSUM_CRC32C, UnknownType);
+ delete[] buff;
+}
+
+// The Glibc has a bug in the file tell api, it will overwrite the file data
+// unexpected.
+// Please check https://rhn.redhat.com/errata/RHBA-2013-0279.html
+// This case is to check wether the bug exists.
+// If it exists, it means you need to upgrade the glibc.
+TEST(IFile, TestGlibCBug) {
+ std::string path("./testData/testGlibCBugSpill.out");
+
+ uint32_t expect[5] = {-1538241715, -1288088794, -192294464, 563552421, 1661521654};
+
+ LOG("TestGlibCBug %s", path.c_str());
+ IFileSegment * segments = new IFileSegment [1];
+ segments[0].realEndOffset = 10000000;
+ SingleSpillInfo * info = new SingleSpillInfo(segments, 1, path, CHECKSUM_NONE,
+ IntType, TextType, "");
+
+ InputStream * fileOut = FileSystem::getLocal().open(path);
+ IFileReader * reader = new IFileReader(fileOut, info, true);
+
+ const char * key = NULL;
+ uint32_t length = 0;
+ reader->nextPartition();
+ uint32_t index = 0;
+ while(NULL != (key = reader->nextKey(length))) {
+ int realKey = bswap(*(uint32_t *)(key));
+ ASSERT_EQ(expect[index], realKey);
+ index++;
+ }
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestMain.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestMain.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestMain.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestMain.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <signal.h>
+
+#ifndef __CYGWIN__
+#include <execinfo.h>
+#endif
+
+#include <stdexcept>
+#include "commons.h"
+#include "Buffers.h"
+#include "FileSystem.h"
+#include "test_commons.h"
+
+extern "C" {
+
+static void handler(int sig);
+
+// TODO: just for debug, should be removed
+void handler(int sig) {
+ void *array[10];
+ size_t size;
+
+ // print out all the frames to stderr
+ fprintf(stderr, "Error: signal %d:\n", sig);
+
+#ifndef __CYGWIN__
+ // get void*'s for all entries on the stack
+ size = backtrace(array, 10);
+
+ backtrace_symbols_fd(array, size, 2);
+#endif
+
+ exit(1);
+}
+}
+
+using namespace NativeTask;
+
+typedef char * CString;
+
+int main(int argc, char ** argv) {
+ signal(SIGSEGV, handler);
+ CString * newArgv = new CString[argc + 1];
+ memcpy(newArgv, argv, argc * sizeof(CString));
+
+ bool gen = false;
+ if (argc > 1) {
+ if (string("perf") == newArgv[1]) {
+ newArgv[1] = (char *)"--gtest_filter=Perf.*";
+ } else if (string("noperf") == newArgv[1]) {
+ newArgv[1] = (char *)"--gtest_filter=-Perf.*";
+ } else if (string("gen") == newArgv[1]) {
+ gen = true;
+ }
+ }
+ testing::InitGoogleTest(&argc, newArgv);
+ if (argc > 0) {
+ int skip = gen ? 2 : 1;
+ TestConfig.parse(argc - skip, (const char **)(newArgv + skip));
+ }
+ try {
+ if (gen == true) {
+ string type = TestConfig.get("generate.type", "word");
+ string codec = TestConfig.get("generate.codec", "");
+ int64_t len = TestConfig.getInt("generate.length", 1024);
+ string temp;
+ GenerateKVTextLength(temp, len, type);
+ if (codec.length() == 0) {
+ fprintf(stdout, "%s", temp.c_str());
+ } else {
+ OutputStream * fout = FileSystem::getLocal().create("/dev/stdout");
+ AppendBuffer app = AppendBuffer();
+ app.init(128 * 1024, fout, codec);
+ app.write(temp.data(), temp.length());
+ fout->close();
+ delete fout;
+ }
+ return 0;
+ } else {
+ return RUN_ALL_TESTS();
+ }
+ } catch (std::exception & e) {
+ fprintf(stderr, "Exception: %s", e.what());
+ return 1;
+ }
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestPrimitives.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestPrimitives.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestPrimitives.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestPrimitives.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,304 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "test_commons.h"
+
+TEST(Primitives, fmemcmp) {
+ std::vector<std::string> vs;
+ char buff[14];
+ vs.push_back("");
+ for (uint32_t i = 0; i < 5000; i += 7) {
+ snprintf(buff, 14, "%d", i * 31);
+ vs.push_back(buff);
+ snprintf(buff, 10, "%010d", i);
+ vs.push_back(buff);
+ }
+ for (size_t i = 0; i < vs.size(); i++) {
+ for (size_t j = 0; j < vs.size(); j++) {
+ std::string & ls = vs[i];
+ std::string & rs = vs[j];
+ size_t m = std::min(ls.length(), rs.length());
+ int c = memcmp(ls.c_str(), rs.c_str(), m);
+ int t = fmemcmp(ls.c_str(), rs.c_str(), m);
+ if (!((c == 0 && t == 0) || (c > 0 && t > 0) || (c < 0 && t < 0))) {
+ ASSERT_TRUE(false);
+ }
+ }
+ }
+}
+
+static int test_memcmp() {
+ uint8_t buff[2048];
+ for (uint32_t i = 0; i < 2048; i++) {
+ buff[i] = i & 0xff;
+ }
+ std::random_shuffle(buff, buff + 2048);
+ int r = 0;
+ for (uint32_t i = 0; i < 100000000; i++) {
+ int offset = i % 1000;
+ r += memcmp(buff, buff + 1024, 5);
+ r += memcmp(buff + offset, buff + 1124, 9);
+ r += memcmp(buff + offset, buff + 1224, 10);
+ r += memcmp(buff + offset, buff + 1324, 15);
+ r += memcmp(buff + offset, buff + 1424, 16);
+ r += memcmp(buff + offset, buff + 1524, 17);
+ r += memcmp(buff + offset, buff + 1624, 18);
+ r += memcmp(buff + offset, buff + 1724, 19);
+ }
+ return r;
+}
+
+static int test_fmemcmp() {
+ char buff[2048];
+ for (uint32_t i = 0; i < 2048; i++) {
+ buff[i] = i & 0xff;
+ }
+ std::random_shuffle(buff, buff + 2048);
+ int r = 0;
+ for (uint32_t i = 0; i < 100000000; i++) {
+ int offset = i % 1000;
+ r += fmemcmp(buff, buff + 1024, 5);
+ r += fmemcmp(buff + offset, buff + 1124, 9);
+ r += fmemcmp(buff + offset, buff + 1224, 10);
+ r += fmemcmp(buff + offset, buff + 1324, 15);
+ r += fmemcmp(buff + offset, buff + 1424, 16);
+ r += fmemcmp(buff + offset, buff + 1524, 17);
+ r += fmemcmp(buff + offset, buff + 1624, 18);
+ r += fmemcmp(buff + offset, buff + 1724, 19);
+ }
+ return r;
+}
+
+TEST(Perf, fmemcmp) {
+ Timer t;
+ int a = test_memcmp();
+ LOG("%s", t.getInterval(" memcmp ").c_str());
+ t.reset();
+ int b = test_fmemcmp();
+ LOG("%s", t.getInterval(" fmemcmp ").c_str());
+ // prevent compiler optimization
+ TestConfig.setInt("tempvalue", a + b);
+}
+
+static void test_memcpy_perf_len(char * src, char * dest, size_t len, size_t time) {
+ for (size_t i = 0; i < time; i++) {
+ memcpy(src, dest, len);
+ memcpy(dest, src, len);
+ }
+}
+
+static void test_simple_memcpy_perf_len(char * src, char * dest, size_t len, size_t time) {
+ for (size_t i = 0; i < time; i++) {
+ simple_memcpy(src, dest, len);
+ simple_memcpy(dest, src, len);
+ }
+}
+
+TEST(Perf, simple_memcpy_small) {
+ char * src = new char[10240];
+ char * dest = new char[10240];
+ char buff[32];
+ for (size_t len = 1; len < 256; len = len + 2) {
+ LOG("------------------------------");
+ snprintf(buff, 32, " memcpy %luB\t", len);
+ Timer t;
+ test_memcpy_perf_len(src, dest, len, 1000000);
+ LOG("%s", t.getInterval(buff).c_str());
+ snprintf(buff, 32, "simple_memcpy %luB\t", len);
+ t.reset();
+ test_simple_memcpy_perf_len(src, dest, len, 1000000);
+ LOG("%s", t.getInterval(buff).c_str());
+ }
+ delete[] src;
+ delete[] dest;
+}
+
+inline char * memchrbrf4(char * p, char ch, size_t len) {
+ ssize_t i = 0;
+ for (; i < ((ssize_t)len) - 3; i += 3) {
+ if (p[i] == ch) {
+ return p + i;
+ }
+ if (p[i + 1] == ch) {
+ return p + i + 1;
+ }
+ if (p[i + 2] == ch) {
+ return p + i + 2;
+ }
+ }
+ for (; i < len; i++) {
+ if (p[i] == ch) {
+ return p + i;
+ }
+ }
+ return NULL;
+}
+
+inline char * memchrbrf2(char * p, char ch, size_t len) {
+ for (size_t i = 0; i < len / 2; i += 2) {
+ if (p[i] == ch) {
+ return p + i;
+ }
+ if (p[i + 1] == ch) {
+ return p + i + 1;
+ }
+ }
+ if (len % 2 && p[len - 1] == ch) {
+ return p + len - 1;
+ }
+ return NULL;
+}
+
+// not safe in MACOSX, segment fault, should be safe on Linux with out mmap
+inline int memchr_sse(const char *s, int c, int len) {
+ //len : edx; c: esi; s:rdi
+ int index = 0;
+
+#ifdef __X64
+
+ __asm__ __volatile__(
+ //"and $0xff, %%esi;" //clear upper bytes
+ "movd %%esi, %%xmm1;"
+
+ "mov $1, %%eax;"
+ "add $16, %%edx;"
+ "mov %%rdi ,%%r8;"
+
+ "1:"
+ "movdqu (%%rdi), %%xmm2;"
+ "sub $16, %%edx;"
+ "addq $16, %%rdi;"
+ //"pcmpestri $0x0, %%xmm2,%%xmm1;"
+ ".byte 0x66 ,0x0f ,0x3a ,0x61 ,0xca ,0x00;"
+ //"lea 16(%%rdi), %%rdi;"
+ "ja 1b;"//Res2==0:no match and zflag==0: s is not end
+ "jc 3f;"//Res2==1: match and s is not end
+
+ "mov $0xffffffff, %%eax;"//no match
+ "jmp 0f;"
+
+ "3:"
+ "sub %%r8, %%rdi;"
+ "lea -16(%%edi,%%ecx),%%eax;"
+
+ "0:"
+ // "mov %%eax, %0;"
+ :"=a"(index),"=D"(s),"=S"(c),"=d"(len)
+ :"D"(s),"S"(c),"d"(len)
+ :"rcx","r8","memory"
+ );
+
+#endif
+
+ return index;
+}
+
+TEST(Perf, memchr) {
+ Random r;
+ int32_t size = 100 * 1024 * 1024;
+ int32_t lineLength = TestConfig.getInt("memchr.line.length", 100);
+ char * buff = new char[size + 16];
+ memset(buff, 'a', size);
+ for (int i = 0; i < size / lineLength; i++) {
+ buff[r.next_int32(size)] = '\n';
+ }
+ Timer timer;
+ char * pos = buff;
+ int count = 0;
+ while (true) {
+ if (pos == buff + size) {
+ break;
+ }
+ pos = (char*)memchr(pos, '\n', buff + size - pos);
+ if (pos == NULL) {
+ break;
+ }
+ pos++;
+ count++;
+ }
+ LOG("%s", timer.getSpeedM2("memchr bytes/lines", size, count).c_str());
+ timer.reset();
+ pos = buff;
+ count = 0;
+ while (true) {
+ if (pos == buff + size) {
+ break;
+ }
+ pos = (char*)memchrbrf2(pos, '\n', buff + size - pos);
+ if (pos == NULL) {
+ break;
+ }
+ pos++;
+ count++;
+ }
+ LOG("%s", timer.getSpeedM2("memchrbrf2 bytes/lines", size, count).c_str());
+ timer.reset();
+ pos = buff;
+ count = 0;
+ while (true) {
+ if (pos == buff + size) {
+ break;
+ }
+ pos = (char*)memchrbrf4(pos, '\n', buff + size - pos);
+ if (pos == NULL) {
+ break;
+ }
+ pos++;
+ count++;
+ }
+ LOG("%s", timer.getSpeedM2("memchrbrf4 bytes/lines", size, count).c_str());
+ timer.reset();
+ pos = buff;
+ count = 0;
+ while (true) {
+ if (pos == buff + size) {
+ break;
+ }
+ int ret = memchr_sse(pos, '\n', buff + size - pos);
+ if (ret == -1) {
+ break;
+ }
+ pos = pos + ret;
+ pos++;
+ count++;
+ }
+ LOG("%s", timer.getSpeedM2("memchr_sse bytes/lines", size, count).c_str());
+ delete[] buff;
+}
+
+TEST(Perf, memcpy_batch) {
+ int32_t size = TestConfig.getInt("input.size", 64 * 1024);
+ size_t mb = TestConfig.getInt("input.mb", 320) * 1024 * 1024UL;
+ char * src = new char[size];
+ char * dest = new char[size];
+ memset(src, 0, size);
+ memset(dest, 0, size);
+ Timer t;
+ for (size_t i = 0; i < mb; i += size) {
+ memcpy(dest, src, size);
+ }
+ LOG("%s", t.getSpeedM("memcpy", mb).c_str());
+ t.reset();
+ for (size_t i = 0; i < mb; i += size) {
+ simple_memcpy(dest, src, size);
+ }
+ LOG("%s", t.getSpeedM("simple_memcpy", mb).c_str());
+ delete[] src;
+ delete[] dest;
+}
+
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestSort.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestSort.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestSort.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/TestSort.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,323 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "Streams.h"
+#include "Buffers.h"
+#include "DualPivotQuickSort.h"
+#include "test_commons.h"
+
+string gBuffer;
+
+inline const char * get_position(uint32_t offset) {
+ return gBuffer.data() + offset;
+}
+
+/**
+ * fast memcmp
+ */
+inline int fmemcmporig(const char * src, const char * dest, uint32_t len) {
+ const uint64_t * src8 = (const uint64_t*)src;
+ const uint64_t * dest8 = (const uint64_t*)dest;
+ while (len >= 8) {
+ uint64_t l = *src8;
+ uint64_t r = *dest8;
+ if (l != r) {
+ l = bswap64(l);
+ r = bswap64(r);
+ return l > r ? 1 : -1;
+ }
+ ++src8;
+ ++dest8;
+ len -= 8;
+ }
+ if (len == 0)
+ return 0;
+ if (len == 1) {
+ int l = (int)(*(uint8_t*)src8);
+ int r = (int)(*(uint8_t*)dest8);
+ return l - r;
+ }
+ uint64_t mask = (1ULL << (len * 8)) - 1;
+ uint64_t l = (*src8) & mask;
+ uint64_t r = (*dest8) & mask;
+ if (l == r) {
+ return 0;
+ }
+ l = bswap64(l);
+ r = bswap64(r);
+ return l > r ? 1 : -1;
+}
+
+/**
+ * c qsort compare function
+ */
+static int compare_offset(const void * plh, const void * prh) {
+ KVBuffer * lhb = (KVBuffer*)get_position(*(uint32_t*)plh);
+ KVBuffer * rhb = (KVBuffer*)get_position(*(uint32_t*)prh);
+ uint32_t minlen = std::min(lhb->keyLength, rhb->keyLength);
+ int ret = memcmp(lhb->getKey(), rhb->getKey(), minlen);
+ if (ret) {
+ return ret;
+ }
+ return lhb->keyLength - rhb->keyLength;
+}
+
+/**
+ * dualpivot sort compare function
+ */
+class CompareOffset {
+public:
+ int64_t operator()(uint32_t lhs, uint32_t rhs) {
+
+ KVBuffer * lhb = (KVBuffer*)get_position(lhs);
+ KVBuffer * rhb = (KVBuffer*)get_position(rhs);
+
+ uint32_t minlen = std::min(lhb->keyLength, rhb->keyLength);
+ int64_t ret = memcmp(lhb->getKey(), rhb->getKey(), minlen);
+ if (ret) {
+ return ret;
+ }
+ return lhb->keyLength - rhb->keyLength;
+ }
+};
+
+/**
+ * quicksort compare function
+ */
+class OffsetLessThan {
+public:
+ bool operator()(uint32_t lhs, uint32_t rhs) {
+ KVBuffer * lhb = (KVBuffer*)get_position(lhs);
+ KVBuffer * rhb = (KVBuffer*)get_position(rhs);
+
+ uint32_t minlen = std::min(lhb->keyLength, rhb->keyLength);
+ int64_t ret = memcmp(lhb->content, rhb->content, minlen);
+ return ret < 0 || (ret == 0 && (lhb->keyLength < rhb->keyLength));
+ }
+};
+
+/**
+ * c qsort compare function
+ */
+static int compare_offset2(const void * plh, const void * prh) {
+
+ KVBuffer * lhb = (KVBuffer*)get_position(*(uint32_t*)plh);
+ KVBuffer * rhb = (KVBuffer*)get_position(*(uint32_t*)prh);
+
+ uint32_t minlen = std::min(lhb->keyLength, rhb->keyLength);
+ int64_t ret = fmemcmp(lhb->content, rhb->content, minlen);
+ if (ret) {
+ return ret;
+ }
+ return lhb->keyLength - rhb->keyLength;
+}
+
+/**
+ * dualpivot sort compare function
+ */
+class CompareOffset2 {
+public:
+ int64_t operator()(uint32_t lhs, uint32_t rhs) {
+
+ KVBuffer * lhb = (KVBuffer*)get_position(lhs);
+ KVBuffer * rhb = (KVBuffer*)get_position(rhs);
+
+ uint32_t minlen = std::min(lhb->keyLength, rhb->keyLength);
+ int64_t ret = fmemcmp(lhb->content, rhb->content, minlen);
+ if (ret) {
+ return ret;
+ }
+ return lhb->keyLength - rhb->keyLength;
+ }
+};
+
+/**
+ * quicksort compare function
+ */
+class OffsetLessThan2 {
+public:
+ bool operator()(uint32_t lhs, uint32_t rhs) {
+
+ KVBuffer * lhb = (KVBuffer*)get_position(lhs);
+ KVBuffer * rhb = (KVBuffer*)get_position(rhs);
+
+ uint32_t minlen = std::min(lhb->keyLength, rhb->keyLength);
+ int64_t ret = fmemcmp(lhb->content, rhb->content, minlen);
+ return ret < 0 || (ret == 0 && (lhb->keyLength < rhb->keyLength));
+ }
+};
+
+/*
+void makeInput(string & dest, vector<uint32_t> & offsets, uint64_t length) {
+ TeraGen tera = TeraGen(length / 100, 1, 0);
+ dest.reserve(length + 1024);
+ string k, v;
+ while (tera.next(k, v)) {
+ offsets.push_back(dest.length());
+ uint32_t tempLen = k.length();
+ dest.append((const char *)&tempLen, 4);
+ dest.append(k.data(), k.length());
+ tempLen = v.length();
+ dest.append((const char *)&tempLen, 4);
+ dest.append(v.data(), v.length());
+ }
+}
+*/
+
+void makeInputWord(string & dest, vector<uint32_t> & offsets, uint64_t length) {
+ Random r;
+ dest.reserve(length + 1024);
+ string k, v;
+ while (true) {
+ k = r.nextWord();
+ v = r.nextWord();
+ offsets.push_back(dest.length());
+ uint32_t tempLen = k.length();
+ dest.append((const char *)&tempLen, 4);
+ dest.append(k.data(), k.length());
+ tempLen = v.length();
+ dest.append((const char *)&tempLen, 4);
+ dest.append(v.data(), v.length());
+ if (dest.length() > length) {
+ return;
+ }
+ }
+}
+
+TEST(Perf, sort) {
+ vector<uint32_t> offsets;
+ makeInputWord(gBuffer, offsets, 80000000);
+ Timer timer;
+ vector<uint32_t> offsetstemp1_0 = offsets;
+ vector<uint32_t> offsetstemp1_1 = offsets;
+ vector<uint32_t> offsetstemp1_2 = offsets;
+ vector<uint32_t> offsetstemp1_3 = offsets;
+ timer.reset();
+ qsort(&offsetstemp1_0[0], offsetstemp1_0.size(), sizeof(uint32_t), compare_offset);
+ qsort(&offsetstemp1_1[0], offsetstemp1_1.size(), sizeof(uint32_t), compare_offset);
+ qsort(&offsetstemp1_2[0], offsetstemp1_2.size(), sizeof(uint32_t), compare_offset);
+ qsort(&offsetstemp1_3[0], offsetstemp1_3.size(), sizeof(uint32_t), compare_offset);
+ LOG("%s", timer.getInterval("qsort").c_str());
+ offsetstemp1_0 = offsets;
+ offsetstemp1_1 = offsets;
+ offsetstemp1_2 = offsets;
+ offsetstemp1_3 = offsets;
+ timer.reset();
+ qsort(&offsetstemp1_0[0], offsetstemp1_0.size(), sizeof(uint32_t), compare_offset2);
+ qsort(&offsetstemp1_1[0], offsetstemp1_1.size(), sizeof(uint32_t), compare_offset2);
+ qsort(&offsetstemp1_2[0], offsetstemp1_2.size(), sizeof(uint32_t), compare_offset2);
+ qsort(&offsetstemp1_3[0], offsetstemp1_3.size(), sizeof(uint32_t), compare_offset2);
+ LOG("%s", timer.getInterval("qsort 2").c_str());
+ offsetstemp1_0 = offsets;
+ offsetstemp1_1 = offsets;
+ offsetstemp1_2 = offsets;
+ offsetstemp1_3 = offsets;
+ timer.reset();
+ std::sort(offsetstemp1_0.begin(), offsetstemp1_0.end(), OffsetLessThan());
+ std::sort(offsetstemp1_1.begin(), offsetstemp1_1.end(), OffsetLessThan());
+ std::sort(offsetstemp1_2.begin(), offsetstemp1_2.end(), OffsetLessThan());
+ std::sort(offsetstemp1_3.begin(), offsetstemp1_3.end(), OffsetLessThan());
+ LOG("%s", timer.getInterval("std::sort").c_str());
+ offsetstemp1_0 = offsets;
+ offsetstemp1_1 = offsets;
+ offsetstemp1_2 = offsets;
+ offsetstemp1_3 = offsets;
+ timer.reset();
+ std::sort(offsetstemp1_0.begin(), offsetstemp1_0.end(), OffsetLessThan2());
+ std::sort(offsetstemp1_1.begin(), offsetstemp1_1.end(), OffsetLessThan2());
+ std::sort(offsetstemp1_2.begin(), offsetstemp1_2.end(), OffsetLessThan2());
+ std::sort(offsetstemp1_3.begin(), offsetstemp1_3.end(), OffsetLessThan2());
+ LOG("%s", timer.getInterval("std::sort 2").c_str());
+ offsetstemp1_0 = offsets;
+ offsetstemp1_1 = offsets;
+ offsetstemp1_2 = offsets;
+ offsetstemp1_3 = offsets;
+ timer.reset();
+ DualPivotQuicksort(offsetstemp1_0, CompareOffset());
+ DualPivotQuicksort(offsetstemp1_1, CompareOffset());
+ DualPivotQuicksort(offsetstemp1_2, CompareOffset());
+ DualPivotQuicksort(offsetstemp1_3, CompareOffset());
+ LOG("%s", timer.getInterval("DualPivotQuicksort").c_str());
+ offsetstemp1_0 = offsets;
+ offsetstemp1_1 = offsets;
+ offsetstemp1_2 = offsets;
+ offsetstemp1_3 = offsets;
+ timer.reset();
+ DualPivotQuicksort(offsetstemp1_0, CompareOffset2());
+ DualPivotQuicksort(offsetstemp1_1, CompareOffset2());
+ DualPivotQuicksort(offsetstemp1_2, CompareOffset2());
+ DualPivotQuicksort(offsetstemp1_3, CompareOffset2());
+ LOG("%s", timer.getInterval("DualPivotQuicksort 2").c_str());
+}
+
+TEST(Perf, sortCacheMiss) {
+
+ LOG("Testing partition based sort, sort 4MB every time");
+
+ vector<uint32_t> offsets;
+ makeInputWord(gBuffer, offsets, 80000000);
+ Timer timer;
+ vector<uint32_t> offsetstemp1_0 = offsets;
+ vector<uint32_t> offsetstemp1_1 = offsets;
+ vector<uint32_t> offsetstemp1_2 = offsets;
+ vector<uint32_t> offsetstemp1_3 = offsets;
+
+ timer.reset();
+ DualPivotQuicksort(offsetstemp1_0, CompareOffset2());
+ DualPivotQuicksort(offsetstemp1_1, CompareOffset2());
+ DualPivotQuicksort(offsetstemp1_2, CompareOffset2());
+ DualPivotQuicksort(offsetstemp1_3, CompareOffset2());
+ LOG("%s", timer.getInterval("DualPivotQuicksort 2 full sort").c_str());
+
+ uint32_t MOD = 128000;
+ uint32_t END = offsets.size();
+
+ for (MOD = 1024; MOD < END; MOD <<= 1) {
+ offsetstemp1_0 = offsets;
+ offsetstemp1_1 = offsets;
+ offsetstemp1_2 = offsets;
+ offsetstemp1_3 = offsets;
+ timer.reset();
+
+ for (uint32_t i = 0; i <= END / MOD; i++) {
+ int base = i * MOD;
+ int max = (base + MOD) > END ? END : (base + MOD);
+ DualPivotQuicksort(offsetstemp1_0, base, max - 1, 3, CompareOffset2());
+ }
+
+ for (uint32_t i = 0; i <= END / MOD; i++) {
+ int base = i * MOD;
+ int max = (base + MOD) > END ? END : (base + MOD);
+ DualPivotQuicksort(offsetstemp1_1, base, max - 1, 3, CompareOffset2());
+ }
+
+ for (uint32_t i = 0; i <= END / MOD; i++) {
+ int base = i * MOD;
+ int max = (base + MOD) > END ? END : (base + MOD);
+ DualPivotQuicksort(offsetstemp1_2, base, max - 1, 3, CompareOffset2());
+ }
+
+ for (uint32_t i = 0; i <= END / MOD; i++) {
+ int base = i * MOD;
+ int max = (base + MOD) > END ? END : (base + MOD);
+ DualPivotQuicksort(offsetstemp1_3, base, max - 1, 3, CompareOffset2());
+ }
+ LOG("%s, MOD: %d", timer.getInterval("DualPivotQuicksort 2 partition sort").c_str(), MOD);
+ }
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestByteArray.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestByteArray.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestByteArray.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestByteArray.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "lib/Combiner.h"
+#include "test_commons.h"
+#include <iostream>
+
+namespace NativeTask {
+
+TEST(ByteArray, read) {
+ ByteArray * buffer = new ByteArray();
+ buffer->resize(10);
+ ASSERT_EQ(10, buffer->size());
+ char * buff1 = buffer->buff();
+
+ buffer->resize(15);
+ ASSERT_EQ(15, buffer->size());
+ ASSERT_EQ(buffer->buff(), buff1);
+
+ buffer->resize(30);
+ ASSERT_EQ(30, buffer->size());
+ ASSERT_NE(buffer->buff(), buff1);
+
+ delete buffer;
+}
+
+} /* namespace NativeTask */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestByteBuffer.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestByteBuffer.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestByteBuffer.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestByteBuffer.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "lib/Combiner.h"
+#include "test_commons.h"
+#include <iostream>
+
+namespace NativeTask {
+
+TEST(ByteBuffer, read) {
+ char * buff = new char[100];
+ ByteBuffer byteBuffer;
+ byteBuffer.reset(buff, 100);
+
+ ASSERT_EQ(0, byteBuffer.position());
+ ASSERT_EQ(100, byteBuffer.capacity());
+ ASSERT_EQ(0, byteBuffer.limit());
+
+ ASSERT_EQ(buff, byteBuffer.current());
+ ASSERT_EQ(0, byteBuffer.remain());
+
+ int newPos = byteBuffer.advance(3);
+ ASSERT_EQ(3, byteBuffer.current() - byteBuffer.base());
+
+ byteBuffer.rewind(10, 20);
+ ASSERT_EQ(20, byteBuffer.limit());
+
+ ASSERT_EQ(10, byteBuffer.position());
+}
+} /* namespace NativeTask */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestComparatorForDualPivotQuickSort.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestComparatorForDualPivotQuickSort.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestComparatorForDualPivotQuickSort.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestComparatorForDualPivotQuickSort.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "lib/Combiner.h"
+#include "lib/MemoryBlock.h"
+#include "test_commons.h"
+#include <iostream>
+
+namespace NativeTask {
+
+static const char * expectedSrc = NULL;
+static int expectedSrcLength = 0;
+
+static const char * expectedDest = NULL;
+static int expectedDestLength = 0;
+
+static int compareResult = 0;
+
+void checkInputArguments(const char * src, uint32_t srcLength, const char * dest,
+ uint32_t destLength) {
+ ASSERT_EQ(expectedSrc, src);
+ ASSERT_EQ(expectedSrcLength, srcLength);
+
+ ASSERT_EQ(expectedDest, dest);
+ ASSERT_EQ(expectedDestLength, destLength);
+}
+
+int MockComparatorForDualPivot(const char * src, uint32_t srcLength, const char * dest,
+ uint32_t destLength) {
+ checkInputArguments(src, srcLength, dest, destLength);
+ return compareResult;
+}
+
+TEST(ComparatorForDualPivotQuickSort, compare) {
+ char * buff = new char[100];
+ KVBuffer * kv1 = (KVBuffer *)buff;
+
+ const char * KEY = "KEY";
+ const char * VALUE = "VALUE";
+
+ kv1->keyLength = strlen(KEY);
+ char * key = kv1->getKey();
+ ::memcpy(key, KEY, strlen(KEY));
+ kv1->valueLength = strlen(VALUE);
+ char * value = kv1->getValue();
+ ::memcpy(value, VALUE, strlen(VALUE));
+
+ const char * KEY2 = "KEY2";
+ const char * VALUE2 = "VALUE2";
+
+ KVBuffer * kv2 = kv1->next();
+ kv2->keyLength = strlen(KEY2);
+ char * key2 = kv2->getKey();
+ ::memcpy(key2, KEY2, strlen(KEY2));
+ kv2->valueLength = strlen(VALUE2);
+ char * value2 = kv2->getValue();
+ ::memcpy(value2, VALUE2, strlen(VALUE2));
+
+ ComparatorForDualPivotSort comparator(buff, &MockComparatorForDualPivot);
+
+ expectedSrc = kv1->getKey();
+ expectedSrcLength = strlen(KEY);
+
+ expectedDest = kv2->getKey();
+ expectedDestLength = strlen(KEY2);
+
+ compareResult = -1;
+
+ ASSERT_EQ(-1, comparator((char * )kv1 - buff, (char * )kv2 - buff));
+}
+
+} /* namespace NativeTask */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestComparatorForStdSort.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestComparatorForStdSort.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestComparatorForStdSort.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestComparatorForStdSort.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "lib/Combiner.h"
+#include "lib/MemoryBlock.h"
+#include "test_commons.h"
+#include <iostream>
+
+namespace NativeTask {
+
+static const char * expectedSrc = NULL;
+static int expectedSrcLength = 0;
+
+static const char * expectedDest = NULL;
+static int expectedDestLength = 0;
+
+static int compareResult = 0;
+
+void checkInputArgumentsForStdOut(const char * src, uint32_t srcLength, const char * dest,
+ uint32_t destLength) {
+ ASSERT_EQ(expectedSrc, src);
+ ASSERT_EQ(expectedSrcLength, srcLength);
+
+ ASSERT_EQ(expectedDest, dest);
+ ASSERT_EQ(expectedDestLength, destLength);
+}
+
+int MockComparatorForStdOut(const char * src, uint32_t srcLength, const char * dest,
+ uint32_t destLength) {
+ checkInputArgumentsForStdOut(src, srcLength, dest, destLength);
+ return compareResult;
+}
+
+TEST(ComparatorForStdSort, compare) {
+ char * buff = new char[100];
+ KVBuffer * kv1 = (KVBuffer *)buff;
+
+ const char * KEY = "KEY";
+ const char * VALUE = "VALUE";
+
+ kv1->keyLength = strlen(KEY);
+ char * key = kv1->getKey();
+ ::memcpy(key, KEY, strlen(KEY));
+ kv1->valueLength = strlen(VALUE);
+ char * value = kv1->getValue();
+ ::memcpy(value, VALUE, strlen(VALUE));
+
+ const char * KEY2 = "KEY2";
+ const char * VALUE2 = "VALUE2";
+
+ KVBuffer * kv2 = kv1->next();
+ kv2->keyLength = strlen(KEY2);
+ char * key2 = kv2->getKey();
+ ::memcpy(key2, KEY2, strlen(KEY2));
+ kv2->valueLength = strlen(VALUE2);
+ char * value2 = kv2->getValue();
+ ::memcpy(value2, VALUE2, strlen(VALUE2));
+
+ ComparatorForStdSort comparator(buff, &MockComparatorForStdOut);
+
+ expectedSrc = kv1->content;
+ expectedSrcLength = strlen(KEY);
+
+ expectedDest = kv2->content;
+ expectedDestLength = strlen(KEY2);
+
+ compareResult = -1;
+
+ ASSERT_EQ(true, comparator((char * )kv1 - buff, (char * )kv2 - buff));
+}
+
+} /* namespace NativeTask */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestFixSizeContainer.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestFixSizeContainer.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestFixSizeContainer.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestFixSizeContainer.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "lib/Combiner.h"
+#include "test_commons.h"
+#include <iostream>
+
+namespace NativeTask {
+
+TEST(FixSizeContainer, test) {
+ uint32_t length = 100;
+ FixSizeContainer * container = new FixSizeContainer();
+ char * bytes = new char[length];
+ container->wrap(bytes, length);
+
+ ASSERT_EQ(0, container->position());
+ int pos1 = 3;
+ container->position(pos1);
+ ASSERT_EQ(pos1, container->position());
+ ASSERT_EQ(length - pos1, container->remain());
+
+ container->rewind();
+ ASSERT_EQ(0, container->position());
+ ASSERT_EQ(length, container->size());
+
+ std::string toBeFilled = "Hello, FixContainer";
+
+ container->fill(toBeFilled.c_str(), toBeFilled.length());
+
+ for (int i = 0; i < container->position(); i++) {
+ char * c = container->base() + i;
+ ASSERT_EQ(toBeFilled[i], *c);
+ }
+}
+
+} /* namespace NativeTask */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestIterator.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestIterator.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestIterator.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestIterator.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "lib/Combiner.h"
+#include "test_commons.h"
+#include <iostream>
+
+namespace NativeTask {
+
+class MockIterator : public KVIterator {
+ std::vector<std::pair<int, int> > * kvs;
+ uint32_t index;
+ uint32_t expectedKeyGroupNum;
+ std::map<int, int> expectkeyCountMap;
+ char * buffer;
+
+public:
+ MockIterator()
+ : index(0), buffer(NULL) {
+ buffer = new char[8];
+ kvs = new std::vector<std::pair<int, int> >();
+ kvs->push_back(std::pair<int, int>(10, 100));
+
+ kvs->push_back(std::pair<int, int>(10, 100));
+ kvs->push_back(std::pair<int, int>(10, 101));
+ kvs->push_back(std::pair<int, int>(10, 102));
+
+ kvs->push_back(std::pair<int, int>(20, 200));
+ kvs->push_back(std::pair<int, int>(20, 201));
+ kvs->push_back(std::pair<int, int>(20, 202));
+ kvs->push_back(std::pair<int, int>(30, 302));
+ kvs->push_back(std::pair<int, int>(40, 302));
+ this->expectedKeyGroupNum = 4;
+
+ expectkeyCountMap[10] = 4;
+ expectkeyCountMap[20] = 3;
+ expectkeyCountMap[30] = 1;
+ expectkeyCountMap[40] = 1;
+ }
+
+ bool next(Buffer & key, Buffer & outValue) {
+ if (index < kvs->size()) {
+ std::pair<int, int> value = kvs->at(index);
+ *((int *)buffer) = value.first;
+ *(((int *)buffer) + 1) = value.second;
+ key.reset(buffer, 4);
+ outValue.reset(buffer + 4, 4);
+ index++;
+ return true;
+ }
+ return false;
+ }
+
+ uint32_t getExpectedKeyGroupCount() {
+ return expectedKeyGroupNum;
+ }
+
+ std::map<int, int>& getExpectedKeyCountMap() {
+ return expectkeyCountMap;
+ }
+};
+
+void TestKeyGroupIterator() {
+ MockIterator * iter = new MockIterator();
+ KeyGroupIteratorImpl * groupIterator = new KeyGroupIteratorImpl(iter);
+ const char * key = NULL;
+
+ uint32_t keyGroupCount = 0;
+ std::map<int, int> actualKeyCount;
+ while (groupIterator->nextKey()) {
+ keyGroupCount++;
+ uint32_t length = 0;
+ key = groupIterator->getKey(length);
+ int * keyPtr = (int *)key;
+ std::cout << "new key group(key group hold kvs of same key): " << *keyPtr << std::endl;
+ const char * value = NULL;
+ while (NULL != (value = groupIterator->nextValue(length))) {
+ int * valuePtr = (int *)value;
+ std::cout << "==== key: " << *keyPtr << "value: " << *valuePtr << std::endl;
+
+ if (actualKeyCount.find(*keyPtr) == actualKeyCount.end()) {
+ actualKeyCount[*keyPtr] = 0;
+ }
+ actualKeyCount[*keyPtr]++;
+ }
+ }
+ ASSERT_EQ(iter->getExpectedKeyGroupCount(), keyGroupCount);
+ std::map<int, int> & expectedKeyCountMap = iter->getExpectedKeyCountMap();
+ for (std::map<int, int>::iterator keyCountIter = actualKeyCount.begin();
+ keyCountIter != actualKeyCount.end(); ++keyCountIter) {
+ uint32_t key = keyCountIter->first;
+ uint32_t expectedCount = expectedKeyCountMap[key];
+ ASSERT_EQ(expectedCount, keyCountIter->second);
+ }
+
+ std::cout << "Done!!!!!!! " << std::endl;
+}
+
+TEST(Iterator, keyGroupIterator) {
+ TestKeyGroupIterator();
+}
+
+} /* namespace NativeTask */
+
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestKVBuffer.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestKVBuffer.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestKVBuffer.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestKVBuffer.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "lib/Combiner.h"
+#include "test_commons.h"
+#include <iostream>
+
+namespace NativeTask {
+
+TEST(KVBuffer, test) {
+
+ char * buff = new char[100];
+ KVBuffer * kv1 = (KVBuffer *)buff;
+
+ const char * KEY = "KEY";
+ const char * VALUE = "VALUE";
+
+ kv1->keyLength = strlen(KEY);
+ char * key = kv1->getKey();
+ ::memcpy(key, KEY, strlen(KEY));
+ kv1->valueLength = strlen(VALUE);
+ char * value = kv1->getValue();
+ ::memcpy(value, VALUE, strlen(VALUE));
+
+ ASSERT_EQ(strlen(KEY) + strlen(VALUE) + 8, kv1->length());
+
+ ASSERT_EQ(8, kv1->getKey() - buff);
+ ASSERT_EQ(strlen(KEY) + 8, kv1->getValue() - buff);
+
+ kv1->keyLength = bswap(kv1->keyLength);
+ kv1->valueLength = bswap(kv1->valueLength);
+
+ ASSERT_EQ(8, kv1->headerLength());
+ ASSERT_EQ(strlen(KEY) + strlen(VALUE) + 8, kv1->lengthConvertEndium());
+}
+
+} /* namespace NativeTask */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestMemBlockIterator.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestMemBlockIterator.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestMemBlockIterator.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestMemBlockIterator.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "test_commons.h"
+#include "MapOutputSpec.h"
+#include "lib/MemoryBlock.h"
+
+using namespace NativeTask;
+
+namespace NativeTask {
+
+TEST(MemoryBlockIterator, test) {
+ const uint32_t BUFFER_LENGTH = 100;
+ const uint32_t BLOCK_ID = 3;
+ char * bytes = new char[BUFFER_LENGTH];
+ MemoryBlock block(bytes, BUFFER_LENGTH);
+
+ const uint32_t KV_SIZE = 60;
+ KVBuffer * kv1 = block.allocateKVBuffer(KV_SIZE);
+ KVBuffer * kv2 = block.allocateKVBuffer(KV_SIZE);
+
+ MemBlockIterator iter(&block);
+
+ uint32_t keyCount = 0;
+ while (iter.next()) {
+ KVBuffer * kv = iter.getKVBuffer();
+ ASSERT_EQ(block.getKVBuffer(keyCount), kv);
+ keyCount++;
+ }
+}
+
+class MemoryBlockFactory {
+public:
+ static MemoryBlock * create(std::vector<int> & keys) {
+ const uint32_t BUFFER_LENGTH = 1000;
+ const uint32_t BLOCK_ID = 3;
+ char * bytes = new char[BUFFER_LENGTH];
+ MemoryBlock * block1 = new MemoryBlock(bytes, BUFFER_LENGTH);
+
+ const uint32_t KV_SIZE = 16;
+
+ for (uint32_t i = 0; i < keys.size(); i++) {
+ uint32_t index = keys[i];
+ KVBuffer * kv = block1->allocateKVBuffer(KV_SIZE);
+
+ kv->keyLength = 4;
+ kv->valueLength = 4;
+ uint32_t * key = (uint32_t *)kv->getKey();
+ *key = bswap(index);
+ }
+ return block1;
+ }
+
+};
+
+TEST(MemoryBlockIterator, compare) {
+ std::vector<int> vector1;
+
+ vector1.push_back(2);
+ vector1.push_back(4);
+ vector1.push_back(6);
+
+ std::vector<int> vector2;
+
+ vector2.push_back(1);
+ vector2.push_back(3);
+ vector2.push_back(5);
+
+ ComparatorPtr bytesComparator = NativeTask::get_comparator(BytesType, NULL);
+
+ MemoryBlock * block1 = MemoryBlockFactory::create(vector1);
+ MemoryBlock * block2 = MemoryBlockFactory::create(vector2);
+
+ block1->sort(CPPSORT, bytesComparator);
+ block2->sort(CPPSORT, bytesComparator);
+
+ MemBlockIterator * iter1 = new MemBlockIterator(block1);
+ MemBlockIterator * iter2 = new MemBlockIterator(block2);
+
+ MemBlockComparator comparator(bytesComparator);
+
+ ASSERT_EQ(false, comparator(iter1, iter2));
+
+ iter1->next();
+ ASSERT_EQ(true, comparator(iter1, iter2));
+
+ iter2->next();
+ ASSERT_EQ(false, comparator(iter1, iter2));
+}
+}
+
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestMemoryBlock.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestMemoryBlock.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestMemoryBlock.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestMemoryBlock.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,107 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "test_commons.h"
+#include "MapOutputSpec.h"
+#include "lib/MemoryBlock.h"
+
+using namespace NativeTask;
+namespace NativeTaskTest {
+
+TEST(MemoryBlock, test) {
+ const uint32_t BUFFER_LENGTH = 1000;
+ char * bytes = new char[BUFFER_LENGTH];
+ MemoryBlock block(bytes, BUFFER_LENGTH);
+
+ uint32_t NON_EXIST = 3;
+ ASSERT_EQ(NULL, block.getKVBuffer(NON_EXIST));
+ ASSERT_EQ(0, block.getKVCount());
+ ASSERT_EQ(BUFFER_LENGTH, block.remainSpace());
+
+ ComparatorPtr bytesComparator = NativeTask::get_comparator(BytesType, NULL);
+ block.sort(CPPSORT, bytesComparator);
+ ASSERT_EQ(true, block.sorted());
+
+ const uint32_t KV_SIZE = 16;
+ KVBuffer * kv1 = block.allocateKVBuffer(KV_SIZE);
+ KVBuffer * kv2 = block.allocateKVBuffer(KV_SIZE);
+
+ ASSERT_EQ(2, block.getKVCount());
+ ASSERT_EQ(kv1, block.getKVBuffer(0));
+
+ ASSERT_EQ(BUFFER_LENGTH - 2 * KV_SIZE, block.remainSpace());
+ ASSERT_EQ(false, block.sorted());
+}
+
+TEST(MemoryBlock, overflow) {
+ const uint32_t BUFFER_LENGTH = 100;
+ char * bytes = new char[BUFFER_LENGTH];
+ MemoryBlock block(bytes, BUFFER_LENGTH);
+
+ const uint32_t KV_SIZE = 60;
+ KVBuffer * kv1 = block.allocateKVBuffer(KV_SIZE);
+ KVBuffer * kv2 = block.allocateKVBuffer(KV_SIZE);
+
+ ASSERT_EQ(kv1, block.getKVBuffer(0));
+ ASSERT_EQ(kv2, block.getKVBuffer(1));
+
+ ASSERT_EQ(1, block.getKVCount());
+
+ ASSERT_EQ(BUFFER_LENGTH - KV_SIZE, block.remainSpace());
+}
+
+TEST(MemoryBlock, sort) {
+ const uint32_t BUFFER_LENGTH = 1000;
+ char * bytes = new char[BUFFER_LENGTH];
+ MemoryBlock block(bytes, BUFFER_LENGTH);
+
+ const uint32_t KV_SIZE = 16;
+ KVBuffer * big = block.allocateKVBuffer(KV_SIZE);
+ KVBuffer * small = block.allocateKVBuffer(KV_SIZE);
+ KVBuffer * medium = block.allocateKVBuffer(KV_SIZE);
+
+ const uint32_t SMALL = 100;
+ const uint32_t MEDIUM = 1000;
+ const uint32_t BIG = 10000;
+
+ medium->keyLength = 4;
+ medium->valueLength = 4;
+ uint32_t * mediumKey = (uint32_t *)medium->getKey();
+ *mediumKey = bswap(MEDIUM);
+
+ small->keyLength = 4;
+ small->valueLength = 4;
+ uint32_t * smallKey = (uint32_t *)small->getKey();
+ *smallKey = bswap(SMALL);
+
+ big->keyLength = 4;
+ big->valueLength = 4;
+ uint32_t * bigKey = (uint32_t *)big->getKey();
+ *bigKey = bswap(BIG);
+
+ ComparatorPtr bytesComparator = NativeTask::get_comparator(BytesType, NULL);
+ block.sort(CPPSORT, bytesComparator);
+
+ ASSERT_EQ(small, block.getKVBuffer(0));
+ ASSERT_EQ(medium, block.getKVBuffer(1));
+ ASSERT_EQ(big, block.getKVBuffer(2));
+}
+
+}
+
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestMemoryPool.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestMemoryPool.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestMemoryPool.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestMemoryPool.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "test_commons.h"
+#include "PartitionBucket.h"
+#include "PartitionBucketIterator.h"
+#include "MemoryBlock.h"
+#include "IFile.h"
+
+using namespace NativeTask;
+
+namespace NativeTask {
+
+TEST(MemoryPool, general) {
+ MemoryPool * pool = new MemoryPool();
+ const uint32_t POOL_SIZE = 1024;
+
+ pool->init(POOL_SIZE);
+
+ uint32_t min = 1024;
+ uint32_t expect = 2048;
+ uint32_t allocated = 0;
+ char * buff = pool->allocate(min, expect, allocated);
+ ASSERT_NE((void *)NULL, buff);
+ buff = pool->allocate(min, expect, allocated);
+ ASSERT_EQ(NULL, buff);
+
+ pool->reset();
+ buff = pool->allocate(min, expect, allocated);
+ ASSERT_NE((void *)NULL, buff);
+
+ delete pool;
+}
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestPartitionBucket.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestPartitionBucket.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestPartitionBucket.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestPartitionBucket.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,212 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "test_commons.h"
+#include "PartitionBucket.h"
+#include "PartitionBucketIterator.h"
+#include "MemoryBlock.h"
+#include "IFile.h"
+
+using namespace NativeTask;
+
+namespace NativeTask {
+
+class MockIFileWriter : public IFileWriter {
+private:
+ char * _buff;
+ uint32_t _position;
+ uint32_t _capacity;
+public:
+
+ MockIFileWriter(char * buff, uint32_t capacity)
+ : IFileWriter(NULL, CHECKSUM_NONE, TextType, TextType, "", NULL), _buff(buff), _position(0),
+ _capacity(capacity) {
+ }
+
+ virtual void write(const char * key, uint32_t keyLen, const char * value, uint32_t valueLen) {
+ KVBuffer * kv = (KVBuffer *)(_buff + _position);
+ kv->keyLength = keyLen;
+ kv->valueLength = valueLen;
+ *((uint32_t *)kv->getKey()) = *((uint32_t *)key);
+ *((uint32_t *)kv->getValue()) = *((uint32_t *)value);
+ _position += kv->length();
+ }
+
+ char * buff() {
+ return _buff;
+ }
+};
+
+TEST(PartitionBucket, general) {
+ MemoryPool * pool = new MemoryPool();
+ const uint32_t POOL_SIZE = 1024 * 1024; //1MB
+ const uint32_t BLOCK_SIZE = 1024; //1KB
+ const uint32_t PARTITION_ID = 3;
+ pool->init(POOL_SIZE);
+ ComparatorPtr comparator = NativeTask::get_comparator(BytesType, NULL);
+ PartitionBucket * bucket = new PartitionBucket(pool, PARTITION_ID, comparator, NULL, BLOCK_SIZE);
+ ASSERT_EQ(0, bucket->getKVCount());
+ KVIterator * NULLPOINTER = 0;
+ ASSERT_EQ(NULLPOINTER, bucket->getIterator());
+ ASSERT_EQ(PARTITION_ID, bucket->getPartitionId());
+ bucket->sort(DUALPIVOTSORT);
+ bucket->spill(NULL);
+
+ delete bucket;
+ delete pool;
+}
+
+TEST(PartitionBucket, multipleMemoryBlock) {
+ MemoryPool * pool = new MemoryPool();
+ const uint32_t POOL_SIZE = 1024 * 1024; //1MB
+ const uint32_t BLOCK_SIZE = 1024; //1KB
+ const uint32_t PARTITION_ID = 3;
+ pool->init(POOL_SIZE);
+ ComparatorPtr comparator = NativeTask::get_comparator(BytesType, NULL);
+ PartitionBucket * bucket = new PartitionBucket(pool, PARTITION_ID, comparator, NULL, BLOCK_SIZE);
+
+ const uint32_t KV_SIZE = 700;
+ const uint32_t SMALL_KV_SIZE = 100;
+ KVBuffer * kv1 = bucket->allocateKVBuffer(KV_SIZE);
+ KVBuffer * kv2 = bucket->allocateKVBuffer(SMALL_KV_SIZE);
+ KVBuffer * kv3 = bucket->allocateKVBuffer(KV_SIZE);
+
+ ASSERT_EQ(3, bucket->getKVCount());
+ KVIterator * NULLPOINTER = 0;
+ ASSERT_NE(NULLPOINTER, bucket->getIterator());
+ ASSERT_EQ(2, bucket->getMemoryBlockCount());
+
+ bucket->reset();
+ ASSERT_EQ(NULLPOINTER, bucket->getIterator());
+ ASSERT_EQ(0, bucket->getMemoryBlockCount());
+
+ delete bucket;
+ delete pool;
+}
+
+TEST(PartitionBucket, sort) {
+ MemoryPool * pool = new MemoryPool();
+ const uint32_t POOL_SIZE = 1024 * 1024; //1MB
+ const uint32_t BLOCK_SIZE = 1024; //1KB
+ const uint32_t PARTITION_ID = 3;
+ pool->init(POOL_SIZE);
+ ComparatorPtr comparator = NativeTask::get_comparator(BytesType, NULL);
+ PartitionBucket * bucket = new PartitionBucket(pool, PARTITION_ID, comparator, NULL, BLOCK_SIZE);
+
+ const uint32_t KV_SIZE = 700;
+ const uint32_t SMALL_KV_SIZE = 100;
+ KVBuffer * kv1 = bucket->allocateKVBuffer(KV_SIZE);
+ KVBuffer * kv2 = bucket->allocateKVBuffer(SMALL_KV_SIZE);
+ KVBuffer * kv3 = bucket->allocateKVBuffer(KV_SIZE);
+
+ const uint32_t SMALL = 10;
+ const uint32_t MEDIUM = 100;
+ const uint32_t BIG = 1000;
+
+ kv1->keyLength = 4;
+ *((uint32_t *)kv1->getKey()) = bswap(BIG);
+ kv1->valueLength = KV_SIZE - kv1->headerLength() - kv1->keyLength;
+
+ kv2->keyLength = 4;
+ *((uint32_t *)kv2->getKey()) = bswap(SMALL);
+ kv2->valueLength = KV_SIZE - kv2->headerLength() - kv2->keyLength;
+
+ kv3->keyLength = 4;
+ *((uint32_t *)kv3->getKey()) = bswap(MEDIUM);
+ kv3->valueLength = KV_SIZE - kv3->headerLength() - kv3->keyLength;
+
+ bucket->sort(DUALPIVOTSORT);
+
+ KVIterator * iter = bucket->getIterator();
+
+ Buffer key;
+ Buffer value;
+ iter->next(key, value);
+
+ ASSERT_EQ(SMALL, bswap(*(uint32_t * )key.data()));
+
+ iter->next(key, value);
+ ASSERT_EQ(MEDIUM, bswap(*(uint32_t * )key.data()));
+
+ iter->next(key, value);
+ ASSERT_EQ(BIG, bswap(*(uint32_t * )key.data()));
+
+ delete iter;
+ delete bucket;
+ delete pool;
+}
+
+TEST(PartitionBucket, spill) {
+ MemoryPool * pool = new MemoryPool();
+ const uint32_t POOL_SIZE = 1024 * 1024; //1MB
+ const uint32_t BLOCK_SIZE = 1024; //1KB
+ const uint32_t PARTITION_ID = 3;
+ pool->init(POOL_SIZE);
+ ComparatorPtr comparator = NativeTask::get_comparator(BytesType, NULL);
+ PartitionBucket * bucket = new PartitionBucket(pool, PARTITION_ID, comparator, NULL, BLOCK_SIZE);
+
+ const uint32_t KV_SIZE = 700;
+ const uint32_t SMALL_KV_SIZE = 100;
+ KVBuffer * kv1 = bucket->allocateKVBuffer(KV_SIZE);
+ KVBuffer * kv2 = bucket->allocateKVBuffer(SMALL_KV_SIZE);
+ KVBuffer * kv3 = bucket->allocateKVBuffer(KV_SIZE);
+
+ const uint32_t SMALL = 10;
+ const uint32_t MEDIUM = 100;
+ const uint32_t BIG = 1000;
+
+ kv1->keyLength = 4;
+ *((uint32_t *)kv1->getKey()) = bswap(BIG);
+ kv1->valueLength = KV_SIZE - KVBuffer::headerLength() - kv1->keyLength;
+
+ kv2->keyLength = 4;
+ *((uint32_t *)kv2->getKey()) = bswap(SMALL);
+ kv2->valueLength = KV_SIZE - KVBuffer::headerLength() - kv2->keyLength;
+
+ kv3->keyLength = 4;
+ *((uint32_t *)kv3->getKey()) = bswap(MEDIUM);
+ kv3->valueLength = KV_SIZE - KVBuffer::headerLength() - kv3->keyLength;
+
+ bucket->sort(DUALPIVOTSORT);
+
+ uint32_t BUFF_SIZE = 1024 * 1024;
+ char * buff = new char[BUFF_SIZE];
+ MockIFileWriter writer(buff, BUFF_SIZE);
+ bucket->spill(&writer);
+
+ //check the result
+ KVBuffer * first = (KVBuffer *)writer.buff();
+ ASSERT_EQ(4, first->keyLength);
+ ASSERT_EQ(KV_SIZE - KVBuffer::headerLength() - 4, first->valueLength);
+ ASSERT_EQ(bswap(SMALL), (*(uint32_t * )(first->getKey())));
+
+ KVBuffer * second = first->next();
+ ASSERT_EQ(4, second->keyLength);
+ ASSERT_EQ(KV_SIZE - KVBuffer::headerLength() - 4, second->valueLength);
+ ASSERT_EQ(bswap(MEDIUM), (*(uint32_t * )(second->getKey())));
+
+ KVBuffer * third = second->next();
+ ASSERT_EQ(4, third->keyLength);
+ ASSERT_EQ(KV_SIZE - KVBuffer::headerLength() - 4, third->valueLength);
+ ASSERT_EQ(bswap(BIG), (*(uint32_t * )(third->getKey())));
+
+ delete bucket;
+ delete pool;
+}
+}
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestReadBuffer.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestReadBuffer.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestReadBuffer.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestReadBuffer.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "BufferStream.h"
+#include "Buffers.h"
+#include "test_commons.h"
+
+TEST(Buffers, AppendRead) {
+ string codec = "";
+ vector<string> data;
+ Generate(data, 1000000, "word");
+ string dest;
+ dest.reserve(64 * 1024 * 1024);
+ OutputStringStream outputStream = OutputStringStream(dest);
+ AppendBuffer appendBuffer;
+ appendBuffer.init(64 * 1024, &outputStream, codec);
+ for (size_t i = 0; i < data.size(); i++) {
+ appendBuffer.write(data[i].c_str(), data[i].length());
+ }
+ appendBuffer.flush();
+ InputBuffer inputBuffer = InputBuffer(dest.c_str(), dest.length());
+ ReadBuffer readBuffer = ReadBuffer();
+ readBuffer.init(64 * 1024, &inputBuffer, codec);
+ for (size_t i = 0; i < data.size(); i++) {
+ const char * rd = readBuffer.get(data[i].length());
+ ASSERT_EQ(data[i], string(rd, data[i].length()));
+ }
+}
+
+TEST(Buffers, AppendReadSnappy) {
+ string codec = "org.apache.hadoop.io.compress.SnappyCodec";
+ vector<string> data;
+ Generate(data, 1000000, "word");
+ string dest;
+ dest.reserve(64 * 1024 * 1024);
+ OutputStringStream outputStream = OutputStringStream(dest);
+ AppendBuffer appendBuffer;
+ appendBuffer.init(64 * 1024, &outputStream, codec);
+ for (size_t i = 0; i < data.size(); i++) {
+ appendBuffer.write(data[i].c_str(), data[i].length());
+ }
+ appendBuffer.flush();
+ InputBuffer inputBuffer = InputBuffer(dest.c_str(), dest.length());
+ ReadBuffer readBuffer = ReadBuffer();
+ readBuffer.init(64 * 1024, &inputBuffer, codec);
+ for (size_t i = 0; i < data.size(); i++) {
+ const char * rd = readBuffer.get(data[i].length());
+ ASSERT_EQ(data[i], string(rd, data[i].length()));
+ }
+}
+
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestReadWriteBuffer.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestReadWriteBuffer.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestReadWriteBuffer.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestReadWriteBuffer.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "lib/Combiner.h"
+#include "test_commons.h"
+#include <iostream>
+
+namespace NativeTask {
+
+TEST(ReadWriteBuffer, readAndWrite) {
+ ReadWriteBuffer buff(16);
+
+ int INT = 100;
+ int LONG = 200;
+ std::string STR = "hello, readWriteBuffer";
+ void * POINTER = this;
+
+ int REPEAT = 10;
+
+ for (int i = 0; i < REPEAT; i++) {
+ buff.writeInt(INT);
+ buff.writeLong(LONG);
+ buff.writeString(&STR);
+ buff.writePointer(POINTER);
+ buff.writeString(STR.c_str(), STR.length());
+ }
+
+ uint32_t writePoint = buff.getWritePoint();
+ LOG("Current Write Point: %d", writePoint);
+
+ for (int i = 0; i < REPEAT; i++) {
+ ASSERT_EQ(INT, buff.readInt());
+ ASSERT_EQ(LONG, buff.readLong());
+ string * read = buff.readString();
+ LOG("READ STRING: %s", read->c_str());
+ ASSERT_EQ(0, STR.compare(read->c_str()));
+ delete read;
+
+ ASSERT_EQ(POINTER, buff.readPointer());
+
+ read = buff.readString();
+ LOG("READ STRING: %s", read->c_str());
+ ASSERT_EQ(0, STR.compare(read->c_str()));
+ }
+
+ uint32_t readPoint = buff.getReadPoint();
+ ASSERT_EQ(writePoint, readPoint);
+
+ buff.setWritePoint(0);
+ buff.setReadPoint(0);
+
+ ASSERT_EQ(0, buff.getReadPoint());
+ ASSERT_EQ(0, buff.getWritePoint());
+}
+
+} /* namespace NativeTask */
Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestTrackingCollector.cc
URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestTrackingCollector.cc?rev=1611413&view=auto
==============================================================================
--- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestTrackingCollector.cc (added)
+++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/test/lib/TestTrackingCollector.cc Thu Jul 17 17:44:55 2014
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "commons.h"
+#include "lib/Combiner.h"
+#include "test_commons.h"
+#include <iostream>
+
+namespace NativeTask {
+
+TEST(TrackingCollector, read) {
+ const std::string GROUP("group");
+ const std::string KEY("key");
+ Counter * counter = new Counter(GROUP, KEY);
+ Collector * collector = new Collector();
+ TrackingCollector tracking(collector, counter);
+ tracking.collect(NULL, 0, NULL, 0);
+ ASSERT_EQ(1, counter->get());
+ delete counter;
+ delete collector;
+}
+} /* namespace NativeTask */
|