From mapreduce-commits-return-7147-apmail-hadoop-mapreduce-commits-archive=hadoop.apache.org@hadoop.apache.org Thu Jul 17 17:46:02 2014 Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id D376A117A8 for ; Thu, 17 Jul 2014 17:46:02 +0000 (UTC) Received: (qmail 44774 invoked by uid 500); 17 Jul 2014 17:46:02 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 44523 invoked by uid 500); 17 Jul 2014 17:46:02 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 44503 invoked by uid 99); 17 Jul 2014 17:46:02 -0000 Received: from athena.apache.org (HELO athena.apache.org) (140.211.11.136) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Jul 2014 17:46:02 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Jul 2014 17:45:56 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id E07102388C80; Thu, 17 Jul 2014 17:45:11 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1611413 [11/18] - in /hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client: ./ hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client-nativetask/ hadoop-mapreduce-client-nati... Date: Thu, 17 Jul 2014 17:45:01 -0000 To: mapreduce-commits@hadoop.apache.org From: todd@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140717174511.E07102388C80@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Merge.cc URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Merge.cc?rev=1611413&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Merge.cc (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Merge.cc Thu Jul 17 17:44:55 2014 @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "commons.h" +#include "util/Timer.h" +#include "util/StringUtil.h" +#include "Merge.h" +#include "lib/FileSystem.h" + +namespace NativeTask { + +IFileMergeEntry * IFileMergeEntry::create(SingleSpillInfo * spill) { + InputStream * fileOut = FileSystem::getLocal().open(spill->path); + IFileReader * reader = new IFileReader(fileOut, spill, true); + return new IFileMergeEntry(reader); +} + +Merger::Merger(IFileWriter * writer, Config * config, ComparatorPtr comparator, + ICombineRunner * combineRunner) + : _writer(writer), _config(config), _combineRunner(combineRunner), _first(true), + _comparator(comparator) { + +} + +Merger::~Merger() { + _heap.clear(); + for (size_t i = 0; i < _entries.size(); i++) { + delete _entries[i]; + } + _entries.clear(); +} + +void Merger::addMergeEntry(MergeEntryPtr pme) { + _entries.push_back(pme); +} + +/** + * 0 if success, have next partition + * 1 if failed, no more + */ +bool Merger::startPartition() { + bool firstPartitionState = false; + for (size_t i = 0; i < _entries.size(); i++) { + bool partitionState = _entries[i]->nextPartition(); + if (i == 0) { + firstPartitionState = partitionState; + } + if (firstPartitionState != partitionState) { + THROW_EXCEPTION(IOException, "MergeEntry partition number not equal"); + } + } + if (firstPartitionState) { // do have new partition + _writer->startPartition(); + } + return firstPartitionState; +} + +/** + * finish one partition + */ +void Merger::endPartition() { + _writer->endPartition(); +} + +void Merger::initHeap() { + _heap.clear(); + for (size_t i = 0; i < _entries.size(); i++) { + MergeEntryPtr pme = _entries[i]; + if (pme->next()) { + _heap.push_back(pme); + } + } + makeHeap(&(_heap[0]), &(_heap[0]) + _heap.size(), _comparator); +} + +bool Merger::next() { + size_t cur_heap_size = _heap.size(); + if (cur_heap_size > 0) { + if (!_first) { + if (_heap[0]->next()) { // have more, adjust heap + if (cur_heap_size == 1) { + return true; + } else if (cur_heap_size == 2) { + MergeEntryPtr * base = &(_heap[0]); + + if (_comparator(base[1], base[0])) { + std::swap(base[0], base[1]); + } + } else { + MergeEntryPtr * base = &(_heap[0]); + heapify(base, 1, cur_heap_size, _comparator); + } + } else { // no more, pop heap + MergeEntryPtr * base = &(_heap[0]); + popHeap(base, base + cur_heap_size, _comparator); + _heap.pop_back(); + } + } else { + _first = false; + } + return _heap.size() > 0; + } + return false; +} + +bool Merger::next(Buffer & key, Buffer & value) { + bool result = next(); + if (result) { + MergeEntryPtr * base = &(_heap[0]); + key.reset(base[0]->getKey(), base[0]->getKeyLength()); + value.reset(base[0]->getValue(), base[0]->getValueLength()); + return true; + } else { + return false; + } +} + +void Merger::merge() { + Timer timer; + uint64_t total_record = 0; + _heap.reserve(_entries.size()); + MergeEntryPtr * base = &(_heap[0]); + while (startPartition()) { + initHeap(); + if (_heap.size() == 0) { + endPartition(); + continue; + } + _first = true; + if (_combineRunner == NULL) { + while (next()) { + _writer->write(base[0]->getKey(), base[0]->getKeyLength(), base[0]->getValue(), + base[0]->getValueLength()); + total_record++; + } + } else { + _combineRunner->combine(CombineContext(UNKNOWN), this, _writer); + } + endPartition(); + } + + uint64_t interval = (timer.now() - timer.last()); + uint64_t M = 1000000; //1 million + + uint64_t output_size; + uint64_t real_output_size; + _writer->getStatistics(output_size, real_output_size); + + if (total_record != 0) { + LOG("[Merge] Merged segment#: %lu, record#: %llu, avg record size: %llu, uncompressed total bytes: %llu, compressed total bytes: %llu, time: %llu ms", + _entries.size(), + total_record, + output_size / (total_record), + output_size, + real_output_size, + interval / M); + } else { + LOG("[Merge] Merged segments#, %lu, uncompressed total bytes: %llu, compressed total bytes: %llu, time: %llu ms", + _entries.size(), + output_size, + real_output_size, + interval / M); + } +} + +} // namespace NativeTask Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Merge.h URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Merge.h?rev=1611413&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Merge.h (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Merge.h Thu Jul 17 17:44:55 2014 @@ -0,0 +1,254 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MERGE_H_ +#define MERGE_H_ + +#include "NativeTask.h" +#include "Buffers.h" +#include "MapOutputCollector.h" +#include "IFile.h" +#include "MinHeap.h" + +namespace NativeTask { + +/** + * merger + */ +class MergeEntry { + +protected: + // these 3 fields should be filled after next() is called + const char * _key; + const char * _value; + uint32_t _keyLength; + uint32_t _valueLength; + +public: + MergeEntry() + : _key(NULL), _value(NULL), _keyLength(0), _valueLength(0) { + } + + const char * getKey() const { + return _key; + } + + const char * getValue() const { + return _value; + } + + uint32_t getKeyLength() const { + return _keyLength; + } + + uint32_t getValueLength() const { + return _valueLength; + } + + virtual ~MergeEntry() { + } + + /** + * move to next partition + * 0 on success + * 1 on no more + */ + virtual bool nextPartition() = 0; + + /** + * move to next key/value + * 0 on success + * 1 on no more + */ + virtual bool next() = 0; +}; + +/** + * Merger + */ +typedef MergeEntry * MergeEntryPtr; + +class MergeEntryComparator { +private: + ComparatorPtr _keyComparator; + +public: + MergeEntryComparator(ComparatorPtr comparator) + : _keyComparator(comparator) { + } + +public: + bool operator()(const MergeEntryPtr lhs, const MergeEntryPtr rhs) { + return (*_keyComparator)(lhs->getKey(), lhs->getKeyLength(), rhs->getKey(), rhs->getKeyLength()) + < 0; + } +}; + +/** + * Merge entry for in-memory partition bucket + */ +class MemoryMergeEntry : public MergeEntry { +protected: + + PartitionBucket ** _partitions; + uint32_t _number; + int64_t _index; + + KVIterator * _iterator; + Buffer keyBuffer; + Buffer valueBuffer; + +public: + MemoryMergeEntry(PartitionBucket ** partitions, uint32_t numberOfPartitions) + : _partitions(partitions), _number(numberOfPartitions), _index(-1), _iterator(NULL) { + } + + virtual ~MemoryMergeEntry() { + if (NULL != _iterator) { + delete _iterator; + _iterator = NULL; + } + } + + virtual bool nextPartition() { + ++_index; + if (_index < _number) { + PartitionBucket * current = _partitions[_index]; + if (NULL != _iterator) { + delete _iterator; + _iterator = NULL; + } + if (NULL != current) { + _iterator = current->getIterator(); + } + return true; + } + return false; + } + + /** + * move to next key/value + * 0 on success + * 1 on no more + */ + virtual bool next() { + if (NULL == _iterator) { + return false; + } + bool hasNext = _iterator->next(keyBuffer, valueBuffer); + + if (hasNext) { + _keyLength = keyBuffer.length(); + _key = keyBuffer.data(); + _valueLength = valueBuffer.length(); + _value = valueBuffer.data(); + assert(_value != NULL); + return true; + } + // detect error early + _keyLength = 0xffffffff; + _valueLength = 0xffffffff; + _key = NULL; + _value = NULL; + return false; + } +}; + +/** + * Merge entry for intermediate file + */ +class IFileMergeEntry : public MergeEntry { +protected: + IFileReader * _reader; + bool new_partition; +public: + /** + * @param reader: managed by InterFileMergeEntry + */ + + static IFileMergeEntry * create(SingleSpillInfo * spill); + + IFileMergeEntry(IFileReader * reader) + : _reader(reader) { + new_partition = false; + } + + virtual ~IFileMergeEntry() { + delete _reader; + _reader = NULL; + } + + /** + * move to next partition + * 0 on success + * 1 on no more + */ + virtual bool nextPartition() { + return _reader->nextPartition(); + } + + /** + * move to next key/value + * 0 on success + * 1 on no more + */ + virtual bool next() { + _key = _reader->nextKey(_keyLength); + if (unlikely(NULL == _key)) { + // detect error early + _keyLength = 0xffffffffU; + _valueLength = 0xffffffffU; + return false; + } + _value = _reader->value(_valueLength); + return true; + } +}; + +class Merger : public KVIterator { + +private: + vector _entries; + vector _heap; + IFileWriter * _writer; + Config * _config; + ICombineRunner * _combineRunner; + bool _first; + MergeEntryComparator _comparator; + +public: + Merger(IFileWriter * writer, Config * config, ComparatorPtr comparator, + ICombineRunner * combineRunner = NULL); + + ~Merger(); + + void addMergeEntry(MergeEntryPtr pme); + + void merge(); + + virtual bool next(Buffer & key, Buffer & value); +protected: + bool startPartition(); + void endPartition(); + void initHeap(); + bool next(); +}; + +} // namespace NativeTask + +#endif /* MERGE_H_ */ Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MinHeap.h URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MinHeap.h?rev=1611413&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MinHeap.h (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/MinHeap.h Thu Jul 17 17:44:55 2014 @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MIN_HEAP_H_ +#define MIN_HEAP_H_ + +#include "NativeTask.h" +#include "Buffers.h" + +template +void heapify(T* first, int rt, int heap_len, Compare & Comp) { + while (rt * 2 <= heap_len) // not leaf + { + int left = (rt << 1); // left child + int right = (rt << 1) + 1; // right child + int smallest = rt; + if (Comp(*(first + left - 1), *(first + smallest - 1))) { + smallest = left; + } + if (right <= heap_len && Comp(*(first + right - 1), *(first + smallest - 1))) { + smallest = right; + } + if (smallest != rt) { + std::swap(*(first + smallest - 1), *(first + rt - 1)); + rt = smallest; + } else { + break; + } + } +} + +template +void makeHeap(T* begin, T* end, Compare & Comp) { + int heap_len = end - begin; + if (heap_len >= 0) { + for (uint32_t i = heap_len / 2; i >= 1; i--) { + heapify(begin, i, heap_len, Comp); + } + } +} + +template +void popHeap(T* begin, T* end, Compare & Comp) { + *begin = *(end - 1); + // adjust [begin, end - 1) to heap + heapify(begin, 1, end - begin - 1, Comp); +} + +#endif /* HEAP_H_ */ Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeLibrary.cc URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeLibrary.cc?rev=1611413&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeLibrary.cc (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeLibrary.cc Thu Jul 17 17:44:55 2014 @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include "commons.h" +#include "NativeObjectFactory.h" +#include "NativeLibrary.h" + +namespace NativeTask { + +////////////////////////////////////////////////////////////////// +// NativeLibrary methods +////////////////////////////////////////////////////////////////// + +NativeLibrary::NativeLibrary(const string & path, const string & name) + : _path(path), _name(name), _getObjectCreatorFunc(NULL), _functionGetter(NULL) { + +} + +bool NativeLibrary::init() { + void *library = dlopen(_path.c_str(), RTLD_LAZY | RTLD_GLOBAL); + if (NULL == library) { + LOG("[NativeLibrary] Load object library %s failed.", _path.c_str()); + return false; + } + // clean error status + dlerror(); + + string create_object_func_name = _name + "GetObjectCreator"; + _getObjectCreatorFunc = (GetObjectCreatorFunc)dlsym(library, create_object_func_name.c_str()); + if (NULL == _getObjectCreatorFunc) { + LOG("[NativeLibrary] ObjectCreator function [%s] not found", create_object_func_name.c_str()); + } + + string functionGetter = _name + "GetFunctionGetter"; + _functionGetter = (FunctionGetter)dlsym(library, functionGetter.c_str()); + if (NULL == _functionGetter) { + LOG("[NativeLibrary] function getter [%s] not found", functionGetter.c_str()); + } + + string init_library_func_name = _name + "Init"; + InitLibraryFunc init_library_func = (InitLibraryFunc)dlsym(library, + init_library_func_name.c_str()); + if (NULL == init_library_func) { + LOG("[NativeLibrary] Library init function [%s] not found", init_library_func_name.c_str()); + } else { + init_library_func(); + } + return true; +} + +NativeObject * NativeLibrary::createObject(const string & clz) { + if (NULL == _getObjectCreatorFunc) { + return NULL; + } + return (NativeObject*)((_getObjectCreatorFunc(clz))()); +} + +void * NativeLibrary::getFunction(const string & functionName) { + if (NULL == _functionGetter) { + return NULL; + } + return (*_functionGetter)(functionName); +} + +ObjectCreatorFunc NativeLibrary::getObjectCreator(const string & clz) { + if (NULL == _getObjectCreatorFunc) { + return NULL; + } + return _getObjectCreatorFunc(clz); +} + +} // namespace NativeTask + Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeLibrary.h URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeLibrary.h?rev=1611413&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeLibrary.h (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeLibrary.h Thu Jul 17 17:44:55 2014 @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef NATIVELIBRARY_H_ +#define NATIVELIBRARY_H_ + +#include + +namespace NativeTask { + +using std::string; +class NativeObject; +class NativeObjectFactory; + +/** + * User level object library abstraction + */ +class NativeLibrary { + friend class NativeObjectFactory; +private: + string _path; + string _name; + GetObjectCreatorFunc _getObjectCreatorFunc; + FunctionGetter _functionGetter; +public: + NativeLibrary(const string & path, const string & name); + + bool init(); + + NativeObject * createObject(const string & clz); + + void * getFunction(const string & functionName); + + ObjectCreatorFunc getObjectCreator(const string & clz); + + ~NativeLibrary() { + } +}; + +} // namespace NativeTask + +#endif /* NATIVELIBRARY_H_ */ Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeObjectFactory.cc URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeObjectFactory.cc?rev=1611413&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeObjectFactory.cc (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeObjectFactory.cc Thu Jul 17 17:44:55 2014 @@ -0,0 +1,453 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#ifndef __CYGWIN__ +#include +#endif +#include "commons.h" +#include "NativeTask.h" +#include "NativeObjectFactory.h" +#include "NativeLibrary.h" +#include "BufferStream.h" +#include "util/StringUtil.h" +#include "util/SyncUtils.h" +#include "util/WritableUtils.h" +#include "handler/BatchHandler.h" +#include "handler/MCollectorOutputHandler.h" +#include "handler/CombineHandler.h" + +using namespace NativeTask; + +// TODO: just for debug, should be removed +extern "C" void handler(int sig) { + void *array[10]; + size_t size; + + // print out all the frames to stderr + fprintf(stderr, "Error: signal %d:\n", sig); + +#ifndef __CYGWIN__ + // get void*'s for all entries on the stack + size = backtrace(array, 10); + + backtrace_symbols_fd(array, size, 2); +#endif + + exit(1); +} + +DEFINE_NATIVE_LIBRARY(NativeTask) { + //signal(SIGSEGV, handler); + REGISTER_CLASS(BatchHandler, NativeTask); + REGISTER_CLASS(CombineHandler, NativeTask); + REGISTER_CLASS(MCollectorOutputHandler, NativeTask); + REGISTER_CLASS(Mapper, NativeTask); + REGISTER_CLASS(Reducer, NativeTask); + REGISTER_CLASS(Partitioner, NativeTask); + REGISTER_CLASS(Folder, NativeTask); + NativeObjectFactory::SetDefaultClass(BatchHandlerType, "NativeTask.BatchHandler"); + NativeObjectFactory::SetDefaultClass(MapperType, "NativeTask.Mapper"); + NativeObjectFactory::SetDefaultClass(ReducerType, "NativeTask.Reducer"); + NativeObjectFactory::SetDefaultClass(PartitionerType, "NativeTask.Partitioner"); + NativeObjectFactory::SetDefaultClass(FolderType, "NativeTask.Folder"); +} + +namespace NativeTask { + +static Config G_CONFIG; + +vector NativeObjectFactory::Libraries; +map NativeObjectFactory::DefaultClasses; +Config * NativeObjectFactory::GlobalConfig = &G_CONFIG; +float NativeObjectFactory::LastProgress = 0; +Progress * NativeObjectFactory::TaskProgress = NULL; +string NativeObjectFactory::LastStatus; +set NativeObjectFactory::CounterSet; +vector NativeObjectFactory::Counters; +vector NativeObjectFactory::CounterLastUpdateValues; +bool NativeObjectFactory::Inited = false; + +static Lock FactoryLock; + +bool NativeObjectFactory::Init() { + ScopeLock autolocak(FactoryLock); + if (Inited == false) { + // setup log device + string device = GetConfig().get(NATIVE_LOG_DEVICE, "stderr"); + if (device == "stdout") { + LOG_DEVICE = stdout; + } else if (device == "stderr") { + LOG_DEVICE = stderr; + } else { + LOG_DEVICE = fopen(device.c_str(), "w"); + } + NativeTaskInit(); + NativeLibrary * library = new NativeLibrary("libnativetask.so", "NativeTask"); + library->_getObjectCreatorFunc = NativeTaskGetObjectCreator; + Libraries.push_back(library); + Inited = true; + // load extra user provided libraries + string libraryConf = GetConfig().get(NATIVE_CLASS_LIBRARY_BUILDIN, ""); + if (libraryConf.length() > 0) { + vector libraries; + vector pair; + StringUtil::Split(libraryConf, ",", libraries, true); + for (size_t i = 0; i < libraries.size(); i++) { + pair.clear(); + StringUtil::Split(libraries[i], "=", pair, true); + if (pair.size() == 2) { + string & name = pair[0]; + string & path = pair[1]; + LOG("[NativeObjectLibrary] Try to load library [%s] with file [%s]", name.c_str(), + path.c_str()); + if (false == RegisterLibrary(path, name)) { + LOG("[NativeObjectLibrary] RegisterLibrary failed: name=%s path=%s", name.c_str(), + path.c_str()); + return false; + } else { + LOG("[NativeObjectLibrary] RegisterLibrary success: name=%s path=%s", name.c_str(), + path.c_str()); + } + } else { + LOG("[NativeObjectLibrary] Illegal native.class.libray: [%s] in [%s]", + libraries[i].c_str(), libraryConf.c_str()); + } + } + } + const char * version = GetConfig().get(NATIVE_HADOOP_VERSION); + LOG("[NativeObjectLibrary] NativeTask library initialized with hadoop %s", + version==NULL?"unkown":version); + } + return true; +} + +void NativeObjectFactory::Release() { + ScopeLock autolocak(FactoryLock); + for (ssize_t i = Libraries.size() - 1; i >= 0; i--) { + delete Libraries[i]; + Libraries[i] = NULL; + } + Libraries.clear(); + for (size_t i = 0; i < Counters.size(); i++) { + delete Counters[i]; + } + Counters.clear(); + if (LOG_DEVICE != stdout && LOG_DEVICE != stderr) { + fclose(LOG_DEVICE); + LOG_DEVICE = stderr; + } + Inited = false; +} + +void NativeObjectFactory::CheckInit() { + if (Inited == false) { + if (!Init()) { + throw new IOException("Init NativeTask library failed."); + } + } +} + +Config & NativeObjectFactory::GetConfig() { + return *GlobalConfig; +} + +Config * NativeObjectFactory::GetConfigPtr() { + return GlobalConfig; +} + +void NativeObjectFactory::SetTaskProgressSource(Progress * progress) { + TaskProgress = progress; +} + +float NativeObjectFactory::GetTaskProgress() { + if (TaskProgress != NULL) { + LastProgress = TaskProgress->getProgress(); + } + return LastProgress; +} + +void NativeObjectFactory::SetTaskStatus(const string & status) { + LastStatus = status; +} + +static Lock CountersLock; + +void NativeObjectFactory::GetTaskStatusUpdate(string & statusData) { + // Encoding: + // progress:float + // status:Text + // Counter number + // Counters[group:Text, name:Text, incrCount:Long] + OutputStringStream os(statusData); + float progress = GetTaskProgress(); + WritableUtils::WriteFloat(&os, progress); + WritableUtils::WriteText(&os, LastStatus); + LastStatus.clear(); + { + ScopeLock AutoLock(CountersLock); + uint32_t numCounter = (uint32_t)Counters.size(); + WritableUtils::WriteInt(&os, numCounter); + for (size_t i = 0; i < numCounter; i++) { + Counter * counter = Counters[i]; + uint64_t newCount = counter->get(); + uint64_t incr = newCount - CounterLastUpdateValues[i]; + CounterLastUpdateValues[i] = newCount; + WritableUtils::WriteText(&os, counter->group()); + WritableUtils::WriteText(&os, counter->name()); + WritableUtils::WriteLong(&os, incr); + } + } +} + +Counter * NativeObjectFactory::GetCounter(const string & group, const string & name) { + ScopeLock AutoLock(CountersLock); + Counter tmpCounter(group, name); + set::iterator itr = CounterSet.find(&tmpCounter); + if (itr != CounterSet.end()) { + return *itr; + } + Counter * ret = new Counter(group, name); + Counters.push_back(ret); + CounterLastUpdateValues.push_back(0); + CounterSet.insert(ret); + return ret; +} + +void NativeObjectFactory::RegisterClass(const string & clz, ObjectCreatorFunc func) { + NativeTaskClassMap__[clz] = func; +} + +NativeObject * NativeObjectFactory::CreateObject(const string & clz) { + ObjectCreatorFunc creator = GetObjectCreator(clz); + return creator ? creator() : NULL; +} + +void * NativeObjectFactory::GetFunction(const string & funcName) { + CheckInit(); + { + for (vector::reverse_iterator ritr = Libraries.rbegin(); + ritr != Libraries.rend(); ritr++) { + void * ret = (*ritr)->getFunction(funcName); + if (NULL != ret) { + return ret; + } + } + return NULL; + } +} + +ObjectCreatorFunc NativeObjectFactory::GetObjectCreator(const string & clz) { + CheckInit(); + { + for (vector::reverse_iterator ritr = Libraries.rbegin(); + ritr != Libraries.rend(); ritr++) { + ObjectCreatorFunc ret = (*ritr)->getObjectCreator(clz); + if (NULL != ret) { + return ret; + } + } + return NULL; + } +} + +void NativeObjectFactory::ReleaseObject(NativeObject * obj) { + delete obj; +} + +bool NativeObjectFactory::RegisterLibrary(const string & path, const string & name) { + CheckInit(); + { + NativeLibrary * library = new NativeLibrary(path, name); + bool ret = library->init(); + if (!ret) { + delete library; + return false; + } + Libraries.push_back(library); + return true; + } +} + +static Lock DefaultClassesLock; + +void NativeObjectFactory::SetDefaultClass(NativeObjectType type, const string & clz) { + ScopeLock autolocak(DefaultClassesLock); + DefaultClasses[type] = clz; +} + +NativeObject * NativeObjectFactory::CreateDefaultObject(NativeObjectType type) { + CheckInit(); + { + if (DefaultClasses.find(type) != DefaultClasses.end()) { + string clz = DefaultClasses[type]; + return CreateObject(clz); + } + LOG("[NativeObjectLibrary] Default class for NativeObjectType %s not found", + NativeObjectTypeToString(type).c_str()); + return NULL; + } +} + +int NativeObjectFactory::BytesComparator(const char * src, uint32_t srcLength, const char * dest, + uint32_t destLength) { + + uint32_t minlen = std::min(srcLength, destLength); + int64_t ret = fmemcmp(src, dest, minlen); + if (ret > 0) { + return 1; + } else if (ret < 0) { + return -1; + } + return srcLength - destLength; +} + +int NativeObjectFactory::ByteComparator(const char * src, uint32_t srcLength, const char * dest, + uint32_t destLength) { + return (*src) - (*dest); +} + +int NativeObjectFactory::IntComparator(const char * src, uint32_t srcLength, const char * dest, + uint32_t destLength) { + int result = (*src) - (*dest); + if (result == 0) { + uint32_t from = bswap(*(uint32_t*)src); + uint32_t to = bswap(*(uint32_t*)dest); + if (from > to) { + return 1; + } else if (from == to) { + return 0; + } else { + return -1; + } + } + return result; +} + +int NativeObjectFactory::LongComparator(const char * src, uint32_t srcLength, const char * dest, + uint32_t destLength) { + int result = (int)(*src) - (int)(*dest); + if (result == 0) { + + uint64_t from = bswap64(*(uint64_t*)src); + uint64_t to = bswap64(*(uint64_t*)dest); + if (from > to) { + return 1; + } else if (from == to) { + return 0; + } else { + return -1; + } + } + return result; +} + +int NativeObjectFactory::VIntComparator(const char * src, uint32_t srcLength, const char * dest, + uint32_t destLength) { + int32_t from = WritableUtils::ReadVInt(src, srcLength); + int32_t to = WritableUtils::ReadVInt(dest, destLength); + if (from > to) { + return 1; + } else if (from == to) { + return 0; + } else { + return -1; + } +} + +int NativeObjectFactory::VLongComparator(const char * src, uint32_t srcLength, const char * dest, + uint32_t destLength) { + int64_t from = WritableUtils::ReadVLong(src, srcLength); + int64_t to = WritableUtils::ReadVLong(dest, destLength); + if (from > to) { + return 1; + } else if (from == to) { + return 0; + } else { + return -1; + } +} + +int NativeObjectFactory::FloatComparator(const char * src, uint32_t srcLength, const char * dest, + uint32_t destLength) { + if (srcLength != 4 || destLength != 4) { + THROW_EXCEPTION_EX(IOException, "float comparator, while src/dest lengt is not 4"); + } + + uint32_t from = bswap(*(uint32_t*)src); + uint32_t to = bswap(*(uint32_t*)dest); + + float * srcValue = (float *)(&from); + float * destValue = (float *)(&to); + + if ((*srcValue) < (*destValue)) { + return -1; + } else if ((*srcValue) == (*destValue)) { + return 0; + } else { + return 1; + } +} + +int NativeObjectFactory::DoubleComparator(const char * src, uint32_t srcLength, const char * dest, + uint32_t destLength) { + if (srcLength != 8 || destLength != 8) { + THROW_EXCEPTION_EX(IOException, "double comparator, while src/dest lengt is not 4"); + } + + uint64_t from = bswap64(*(uint64_t*)src); + uint64_t to = bswap64(*(uint64_t*)dest); + + double * srcValue = (double *)(&from); + double * destValue = (double *)(&to); + if ((*srcValue) < (*destValue)) { + return -1; + } else if ((*srcValue) == (*destValue)) { + return 0; + } else { + return 1; + } +} + +ComparatorPtr get_comparator(const KeyValueType keyType, const char * comparatorName) { + if (NULL == comparatorName) { + if (keyType == BytesType || keyType == TextType) { + return &NativeObjectFactory::BytesComparator; + } else if (keyType == ByteType || keyType == BoolType) { + return &NativeObjectFactory::ByteComparator; + } else if (keyType == IntType) { + return &NativeObjectFactory::IntComparator; + } else if (keyType == LongType) { + return &NativeObjectFactory::LongComparator; + } else if (keyType == FloatType) { + return &NativeObjectFactory::FloatComparator; + } else if (keyType == DoubleType) { + return &NativeObjectFactory::DoubleComparator; + } else if (keyType == VIntType) { + return &NativeObjectFactory::VIntComparator; + } else if (keyType == VLongType) { + return &NativeObjectFactory::VLongComparator; + } + } else { + void * func = NativeObjectFactory::GetFunction(string(comparatorName)); + return (ComparatorPtr)func; + } + return NULL; +} +} // namespace NativeTask + Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeObjectFactory.h URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeObjectFactory.h?rev=1611413&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeObjectFactory.h (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeObjectFactory.h Thu Jul 17 17:44:55 2014 @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef NATIVEOBJECTFACTORY_H_ +#define NATIVEOBJECTFACTORY_H_ + +#include +#include +#include +#include + +#include "NativeTask.h" + +namespace NativeTask { + +using std::string; +using std::vector; +using std::map; +using std::set; +using std::pair; + +class NativeLibrary; + +/** + * Native object factory + */ +class NativeObjectFactory { +private: + static vector Libraries; + static map DefaultClasses; + static Config * GlobalConfig; + static float LastProgress; + static Progress * TaskProgress; + static string LastStatus; + static set CounterSet; + static vector Counters; + static vector CounterLastUpdateValues; + static bool Inited; +public: + static bool Init(); + static void Release(); + static void CheckInit(); + static Config & GetConfig(); + static Config * GetConfigPtr(); + static void SetTaskProgressSource(Progress * progress); + static float GetTaskProgress(); + static void SetTaskStatus(const string & status); + static void GetTaskStatusUpdate(string & statusData); + static Counter * GetCounter(const string & group, const string & name); + static void RegisterClass(const string & clz, ObjectCreatorFunc func); + static NativeObject * CreateObject(const string & clz); + static void * GetFunction(const string & clz); + static ObjectCreatorFunc GetObjectCreator(const string & clz); + static void ReleaseObject(NativeObject * obj); + static bool RegisterLibrary(const string & path, const string & name); + static void SetDefaultClass(NativeObjectType type, const string & clz); + static NativeObject * CreateDefaultObject(NativeObjectType type); + static int BytesComparator(const char * src, uint32_t srcLength, const char * dest, + uint32_t destLength); + static int ByteComparator(const char * src, uint32_t srcLength, const char * dest, + uint32_t destLength); + static int IntComparator(const char * src, uint32_t srcLength, const char * dest, + uint32_t destLength); + static int LongComparator(const char * src, uint32_t srcLength, const char * dest, + uint32_t destLength); + static int VIntComparator(const char * src, uint32_t srcLength, const char * dest, + uint32_t destLength); + static int VLongComparator(const char * src, uint32_t srcLength, const char * dest, + uint32_t destLength); + static int FloatComparator(const char * src, uint32_t srcLength, const char * dest, + uint32_t destLength); + static int DoubleComparator(const char * src, uint32_t srcLength, const char * dest, + uint32_t destLength); +}; + +} // namespace NativeTask + +#endif /* NATIVEOBJECTFACTORY_H_ */ Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeRuntimeJniImpl.cc URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeRuntimeJniImpl.cc?rev=1611413&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeRuntimeJniImpl.cc (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeRuntimeJniImpl.cc Thu Jul 17 17:44:55 2014 @@ -0,0 +1,226 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef QUICK_BUILD +#include "org_apache_hadoop_mapred_nativetask_NativeRuntime.h" +#endif +#include "commons.h" +#include "jniutils.h" +#include "NativeObjectFactory.h" + +using namespace NativeTask; + +/////////////////////////////////////////////////////////////// +// NativeRuntime JNI methods +/////////////////////////////////////////////////////////////// + +/* + * Class: org_apache_hadoop_mapred_nativetask_NativeRuntime + * Method: JNIRelease + * Signature: ()V + */JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIRelease( + JNIEnv * jenv, jclass nativeRuntimeClass) { + try { + NativeTask::NativeObjectFactory::Release(); + } catch (NativeTask::UnsupportException & e) { + JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what()); + } catch (NativeTask::OutOfMemoryException & e) { + JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what()); + } catch (NativeTask::IOException & e) { + JNU_ThrowByName(jenv, "java/io/IOException", e.what()); + } catch (NativeTask::JavaException & e) { + LOG("[NativeRuntimeJniImpl] JavaException: %s", e.what()); + // Do nothing, let java side handle + } catch (std::exception & e) { + JNU_ThrowByName(jenv, "java/io/IOException", e.what()); + } catch (...) { + JNU_ThrowByName(jenv, "java/io/IOException", "[NativeRuntimeJniImpl] Unkown std::exception"); + } +} + +/* + * Class: org_apache_hadoop_mapred_nativetask_NativeRuntime + * Method: JNIConfigure + * Signature: ([[B)V + */JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIConfigure( + JNIEnv * jenv, jclass nativeRuntimeClass, jobjectArray configs) { + try { + NativeTask::Config & config = NativeTask::NativeObjectFactory::GetConfig(); + jsize len = jenv->GetArrayLength(configs); + for (jsize i = 0; i + 1 < len; i += 2) { + jbyteArray key_obj = (jbyteArray)jenv->GetObjectArrayElement(configs, i); + jbyteArray val_obj = (jbyteArray)jenv->GetObjectArrayElement(configs, i + 1); + config.set(JNU_ByteArrayToString(jenv, key_obj), JNU_ByteArrayToString(jenv, val_obj)); + } + } catch (NativeTask::UnsupportException & e) { + JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what()); + } catch (NativeTask::OutOfMemoryException & e) { + JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what()); + } catch (NativeTask::IOException & e) { + JNU_ThrowByName(jenv, "java/io/IOException", e.what()); + } catch (NativeTask::JavaException & e) { + LOG("JavaException: %s", e.what()); + // Do nothing, let java side handle + } catch (std::exception & e) { + JNU_ThrowByName(jenv, "java/io/IOException", e.what()); + } catch (...) { + JNU_ThrowByName(jenv, "java/io/IOException", "Unkown std::exception"); + } +} + +/* + * Class: org_apache_hadoop_mapred_nativetask_NativeRuntime + * Method: JNICreateNativeObject + * Signature: ([B[B)J + */ +jlong JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNICreateNativeObject( + JNIEnv * jenv, jclass nativeRuntimeClass, jbyteArray clazz) { + try { + std::string typeString = JNU_ByteArrayToString(jenv, clazz); + return (jlong)(NativeTask::NativeObjectFactory::CreateObject(typeString)); + } catch (NativeTask::UnsupportException & e) { + JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what()); + } catch (NativeTask::OutOfMemoryException & e) { + JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what()); + } catch (NativeTask::IOException & e) { + JNU_ThrowByName(jenv, "java/io/IOException", e.what()); + } catch (NativeTask::JavaException & e) { + LOG("JavaException: %s", e.what()); + // Do nothing, let java side handle + } catch (std::exception & e) { + JNU_ThrowByName(jenv, "java/io/IOException", e.what()); + } catch (...) { + JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception"); + } + return 0; +} + +/* + * Class: org_apache_hadoop_mapred_nativetask_NativeRuntime + * Method: JNICreateDefaultNativeObject + * Signature: ([B)J + */JNIEXPORT jlong JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNICreateDefaultNativeObject( + JNIEnv * jenv, jclass nativeRuntimeClass, jbyteArray type) { + try { + std::string typeString = JNU_ByteArrayToString(jenv, type); + NativeTask::NativeObjectType type = NativeTask::NativeObjectTypeFromString(typeString.c_str()); + return (jlong)(NativeTask::NativeObjectFactory::CreateDefaultObject(type)); + } catch (NativeTask::UnsupportException & e) { + JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what()); + } catch (NativeTask::OutOfMemoryException & e) { + JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what()); + } catch (NativeTask::IOException & e) { + JNU_ThrowByName(jenv, "java/io/IOException", e.what()); + } catch (NativeTask::JavaException & e) { + LOG("[NativeRuntimeJniImpl] JavaException: %s", e.what()); + // Do nothing, let java side handle + } catch (std::exception & e) { + JNU_ThrowByName(jenv, "java/io/IOException", e.what()); + } catch (...) { + JNU_ThrowByName(jenv, "java/io/IOException", "[NativeRuntimeJniImpl] Unknown exception"); + } + return 0; +} + +/* + * Class: org_apache_hadoop_mapred_nativetask_NativeRuntime + * Method: JNIReleaseNativeObject + * Signature: (J)V + */JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIReleaseNativeObject( + JNIEnv * jenv, jclass nativeRuntimeClass, jlong objectAddr) { + try { + NativeTask::NativeObject * nobj = ((NativeTask::NativeObject *)objectAddr); + if (NULL == nobj) { + JNU_ThrowByName(jenv, "java/lang/IllegalArgumentException", + "Object addr not instance of NativeObject"); + return; + } + NativeTask::NativeObjectFactory::ReleaseObject(nobj); + } catch (NativeTask::UnsupportException & e) { + JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what()); + } catch (NativeTask::OutOfMemoryException & e) { + JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what()); + } catch (NativeTask::IOException & e) { + JNU_ThrowByName(jenv, "java/io/IOException", e.what()); + } catch (NativeTask::JavaException & e) { + LOG("JavaException: %s", e.what()); + // Do nothing, let java side handle + } catch (std::exception & e) { + JNU_ThrowByName(jenv, "java/io/IOException", e.what()); + } catch (...) { + JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception"); + } +} + +/* + * Class: org_apache_hadoop_mapred_nativetask_NativeRuntime + * Method: JNIRegisterModule + * Signature: ([B[B)I + */JNIEXPORT jint JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIRegisterModule( + JNIEnv * jenv, jclass nativeRuntimeClass, jbyteArray modulePath, jbyteArray moduleName) { + try { + std::string pathString = JNU_ByteArrayToString(jenv, modulePath); + std::string nameString = JNU_ByteArrayToString(jenv, moduleName); + if (NativeTask::NativeObjectFactory::RegisterLibrary(pathString, nameString)) { + return 0; + } + } catch (NativeTask::UnsupportException & e) { + JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what()); + } catch (NativeTask::OutOfMemoryException & e) { + JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what()); + } catch (NativeTask::IOException & e) { + JNU_ThrowByName(jenv, "java/io/IOException", e.what()); + } catch (NativeTask::JavaException & e) { + LOG("JavaException: %s", e.what()); + // Do nothing, let java side handle + } catch (std::exception & e) { + JNU_ThrowByName(jenv, "java/io/IOException", e.what()); + } catch (...) { + JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception"); + } + return 1; +} + +/* + * Class: org_apache_hadoop_mapred_nativetask_NativeRuntime + * Method: JNIUpdateStatus + * Signature: ()[B + */JNIEXPORT jbyteArray JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeRuntime_JNIUpdateStatus( + JNIEnv * jenv, jclass nativeRuntimeClass) { + try { + std::string statusData; + NativeTask::NativeObjectFactory::GetTaskStatusUpdate(statusData); + jbyteArray ret = jenv->NewByteArray(statusData.length()); + jenv->SetByteArrayRegion(ret, 0, statusData.length(), (jbyte*)statusData.c_str()); + return ret; + } catch (NativeTask::UnsupportException & e) { + JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what()); + } catch (NativeTask::OutOfMemoryException & e) { + JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what()); + } catch (NativeTask::IOException & e) { + JNU_ThrowByName(jenv, "java/io/IOException", e.what()); + } catch (NativeTask::JavaException & e) { + LOG("JavaException: %s", e.what()); + // Do nothing, let java side handle + } catch (std::exception & e) { + JNU_ThrowByName(jenv, "java/io/IOException", e.what()); + } catch (...) { + JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception"); + } + return NULL; +} Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeTask.cc URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeTask.cc?rev=1611413&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeTask.cc (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/NativeTask.cc Thu Jul 17 17:44:55 2014 @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef __CYGWIN__ +#include +#endif +#include "commons.h" +#include "util/Hash.h" +#include "util/StringUtil.h" +#include "NativeTask.h" +#include "NativeObjectFactory.h" + +namespace NativeTask { + +////////////////////////////////////////////////////////////////// +// NativeObjectType methods +////////////////////////////////////////////////////////////////// + +const string NativeObjectTypeToString(NativeObjectType type) { + switch (type) { + case BatchHandlerType: + return string("BatchHandlerType"); + case MapperType: + return string("MapperType"); + case ReducerType: + return string("ReducerType"); + case PartitionerType: + return string("PartitionerType"); + case CombinerType: + return string("CombinerType"); + case FolderType: + return string("FolderType"); + case RecordReaderType: + return string("RecordReaderType"); + case RecordWriterType: + return string("RecordWriterType"); + default: + return string("UnknownObjectType"); + } +} + +NativeObjectType NativeObjectTypeFromString(const string type) { + if (type == "BatchHandlerType") { + return BatchHandlerType; + } else if (type == "MapperType") { + return MapperType; + } else if (type == "ReducerType") { + return ReducerType; + } else if (type == "PartitionerType") { + return PartitionerType; + } else if (type == "CombinerType") { + return CombinerType; + } else if (type == "FolderType") { + return CombinerType; + } else if (type == "RecordReaderType") { + return RecordReaderType; + } else if (type == "RecordWriterType") { + return RecordWriterType; + } + return UnknownObjectType; +} + +HadoopException::HadoopException(const string & what) { + // remove long path prefix + size_t n = 0; + if (what[0] == '/') { + size_t p = what.find(':'); + if (p != what.npos) { + while (true) { + size_t np = what.find('/', n + 1); + if (np == what.npos || np >= p) { + break; + } + n = np; + } + } + } + _reason.append(what.c_str() + n, what.length() - n); + void *array[64]; + size_t size; + +#ifndef __CYGWIN__ + size = backtrace(array, 64); + char ** traces = backtrace_symbols(array, size); + for (size_t i = 0; i< size;i++) { + _reason.append("\n\t"); + _reason.append(traces[i]); + } +#endif +} + +/////////////////////////////////////////////////////////// + +void Config::load(const string & path) { + FILE * fin = fopen(path.c_str(), "r"); + if (NULL == fin) { + THROW_EXCEPTION(IOException, "file not found or can not open for read"); + } + char buff[256]; + while (fgets(buff, 256, fin) != NULL) { + if (buff[0] == '#') { + continue; + } + std::string key = buff; + if (key[key.length() - 1] == '\n') { + size_t br = key.find('='); + if (br != key.npos) { + set(key.substr(0, br), StringUtil::Trim(key.substr(br + 1))); + } + } + } + fclose(fin); +} + +void Config::set(const string & key, const string & value) { + _configs[key] = value; +} + +void Config::setInt(const string & name, int64_t value) { + _configs[name] = StringUtil::ToString(value); +} + +void Config::setBool(const string & name, bool value) { + _configs[name] = StringUtil::ToString(value); +} + +void Config::parse(int32_t argc, const char ** argv) { + for (int32_t i = 0; i < argc; i++) { + const char * equ = strchr(argv[i], '='); + if (NULL == equ) { + LOG("[NativeTask] config argument not recognized: %s", argv[i]); + continue; + } + if (argv[i][0] == '-') { + LOG("[NativeTask] config argument with '-' prefix ignored: %s", argv[i]); + continue; + } + string key(argv[i], equ - argv[i]); + string value(equ + 1, strlen(equ + 1)); + map::iterator itr = _configs.find(key); + if (itr == _configs.end()) { + _configs[key] = value; + } else { + itr->second.append(","); + itr->second.append(value); + } + } +} + +const char * Config::get(const string & name) { + map::iterator itr = _configs.find(name); + if (itr == _configs.end()) { + return NULL; + } else { + return itr->second.c_str(); + } +} + +string Config::get(const string & name, const string & defaultValue) { + map::iterator itr = _configs.find(name); + if (itr == _configs.end()) { + return defaultValue; + } else { + return itr->second; + } +} + +int64_t Config::getInt(const string & name, int64_t defaultValue) { + map::iterator itr = _configs.find(name); + if (itr == _configs.end()) { + return defaultValue; + } else { + return StringUtil::toInt(itr->second); + } +} + +bool Config::getBool(const string & name, bool defaultValue) { + map::iterator itr = _configs.find(name); + if (itr == _configs.end()) { + return defaultValue; + } else { + return StringUtil::toBool(itr->second); + } +} + +float Config::getFloat(const string & name, float defaultValue) { + map::iterator itr = _configs.find(name); + if (itr == _configs.end()) { + return defaultValue; + } else { + return StringUtil::toFloat(itr->second); + } +} + +void Config::getStrings(const string & name, vector & dest) { + map::iterator itr = _configs.find(name); + if (itr != _configs.end()) { + StringUtil::Split(itr->second, ",", dest, true); + } +} + +void Config::getInts(const string & name, vector & dest) { + vector sdest; + getStrings(name, sdest); + for (size_t i = 0; i < sdest.size(); i++) { + dest.push_back(StringUtil::toInt(sdest[i])); + } +} + +void Config::getFloats(const string & name, vector & dest) { + vector sdest; + getStrings(name, sdest); + for (size_t i = 0; i < sdest.size(); i++) { + dest.push_back(StringUtil::toFloat(sdest[i])); + } +} + +/////////////////////////////////////////////////////////// + +Counter * ProcessorBase::getCounter(const string & group, const string & name) { + return NULL; +} + +uint32_t Partitioner::getPartition(const char * key, uint32_t & keyLen, uint32_t numPartition) { + if (numPartition == 1) { + return 0; + } + return (Hash::BytesHash(key, keyLen) & 0x7fffffff) % numPartition; +} + +/////////////////////////////////////////////////////////// + +} Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucket.cc URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucket.cc?rev=1611413&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucket.cc (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucket.cc Thu Jul 17 17:44:55 2014 @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "commons.h" +#include "util/Timer.h" +#include "util/StringUtil.h" +#include "NativeObjectFactory.h" +#include "PartitionBucket.h" +#include "Merge.h" +#include "NativeTask.h" +#include "WritableUtils.h" +#include "util/DualPivotQuickSort.h" +#include "Combiner.h" +#include "TaskCounters.h" +#include "MinHeap.h" +#include "PartitionBucketIterator.h" + +namespace NativeTask { + +KVIterator * PartitionBucket::getIterator() { + if (_memBlocks.size() == 0) { + return NULL; + } + return new PartitionBucketIterator(this, _keyComparator); +} + +void PartitionBucket::spill(IFileWriter * writer) throw (IOException, UnsupportException) { + KVIterator * iterator = getIterator(); + if (NULL == iterator || NULL == writer) { + return; + } + + if (_combineRunner == NULL) { + Buffer key; + Buffer value; + + while (iterator->next(key, value)) { + writer->write(key.data(), key.length(), value.data(), value.length()); + } + } else { + _combineRunner->combine(CombineContext(UNKNOWN), iterator, writer); + } + delete iterator; +} + +void PartitionBucket::sort(SortAlgorithm type) { + if (_memBlocks.size() == 0) { + return; + } + if ((!_sorted)) { + for (uint32_t i = 0; i < _memBlocks.size(); i++) { + MemoryBlock * block = _memBlocks[i]; + block->sort(type, _keyComparator); + } + } + _sorted = true; +} + +} +; +// namespace NativeTask Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucket.h URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucket.h?rev=1611413&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucket.h (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucket.h Thu Jul 17 17:44:55 2014 @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef PARTITION_BUCKET_H_ +#define PARTITION_BUCKET_H_ + +#include "NativeTask.h" +#include "MemoryPool.h" +#include "MemoryBlock.h" +#include "Timer.h" +#include "Buffers.h" +#include "MapOutputSpec.h" +#include "IFile.h" +#include "SpillInfo.h" +#include "Combiner.h" + +namespace NativeTask { + +/** + * Buffer for a single partition + */ +class PartitionBucket { + friend class PartitionBucketIterator; + friend class TestPartitionBucket; + +private: + std::vector _memBlocks; + MemoryPool * _pool; + uint32_t _partition; + uint32_t _blockSize; + ComparatorPtr _keyComparator; + ICombineRunner * _combineRunner; + bool _sorted; + +public: + PartitionBucket(MemoryPool * pool, uint32_t partition, ComparatorPtr comparator, + ICombineRunner * combineRunner, uint32_t blockSize) + : _pool(pool), _partition(partition), _keyComparator(comparator), + _combineRunner(combineRunner), _blockSize(blockSize), _sorted(false) { + if (NULL == _pool || NULL == comparator) { + THROW_EXCEPTION_EX(IOException, "pool is NULL, or comparator is not set"); + } + + if (NULL != combineRunner) { + LOG("[PartitionBucket] combine runner has been set"); + } + } + + ~PartitionBucket() { + reset(); + } + + uint32_t getPartitionId() { + return _partition; + } + + void reset() { + for (uint32_t i = 0; i < _memBlocks.size(); i++) { + if (NULL != _memBlocks[i]) { + delete _memBlocks[i]; + _memBlocks[i] = NULL; + } + } + _memBlocks.clear(); + } + + KVIterator * getIterator(); + + uint32_t getKVCount() const { + uint32_t size = 0; + for (uint32_t i = 0; i < _memBlocks.size(); i++) { + MemoryBlock * block = _memBlocks[i]; + if (NULL != block) { + size += block->getKVCount(); + } + } + return size; + } + + /** + * @throws OutOfMemoryException if total_length > io.sort.mb + */ + KVBuffer * allocateKVBuffer(uint32_t kvLength) { + if (kvLength == 0) { + LOG("KV Length is empty, no need to allocate buffer for it"); + return NULL; + } + _sorted = false; + MemoryBlock * memBlock = NULL; + uint32_t memBockSize = _memBlocks.size(); + if (memBockSize > 0) { + memBlock = _memBlocks[memBockSize - 1]; + } + if (NULL != memBockSize && memBlock->remainSpace() >= kvLength) { + return memBlock->allocateKVBuffer(kvLength); + } else { + uint32_t min = kvLength; + uint32_t expect = std::max(_blockSize, min); + uint32_t allocated = 0; + char * buff = _pool->allocate(min, expect, allocated); + if (NULL != buff) { + memBlock = new MemoryBlock(buff, allocated); + _memBlocks.push_back(memBlock); + return memBlock->allocateKVBuffer(kvLength); + } else { + LOG("MemoryPool is full, fail to allocate new MemBlock, block size: %d, kv length: %d", expect, kvLength); + } + } + return NULL; + } + + void sort(SortAlgorithm type); + + void spill(IFileWriter * writer) throw (IOException, UnsupportException); + + uint32_t getMemoryBlockCount() const { + return _memBlocks.size(); + } + + MemoryBlock * getMemoryBlock(uint32_t index) const { + return _memBlocks[index]; + } +}; + +} +; +//namespace NativeTask + +#endif /* PARTITION_BUCKET_H_ */ Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucketIterator.cc URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucketIterator.cc?rev=1611413&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucketIterator.cc (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucketIterator.cc Thu Jul 17 17:44:55 2014 @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "commons.h" +#include "util/Timer.h" +#include "util/StringUtil.h" +#include "NativeObjectFactory.h" +#include "PartitionBucketIterator.h" +#include "Merge.h" +#include "NativeTask.h" +#include "WritableUtils.h" +#include "util/DualPivotQuickSort.h" +#include "Combiner.h" +#include "TaskCounters.h" +#include "MinHeap.h" + +namespace NativeTask { + +///////////////////////////////////////////////////////////////// +// PartitionBucket +///////////////////////////////////////////////////////////////// + +PartitionBucketIterator::PartitionBucketIterator(PartitionBucket * pb, ComparatorPtr comparator) + : _pb(pb), _comparator(comparator), _first(true) { + uint32_t blockCount = _pb->getMemoryBlockCount(); + for (uint32_t i = 0; i < blockCount; i++) { + MemoryBlock * block = _pb->getMemoryBlock(i); + MemBlockIteratorPtr blockIterator = new MemBlockIterator(block); + if (blockIterator->next()) { + _heap.push_back(blockIterator); + } + } + if (_heap.size() > 0) { + makeHeap(&(_heap[0]), &(_heap[0]) + _heap.size(), _comparator); + } +} + +PartitionBucketIterator::~PartitionBucketIterator() { + for (uint32_t i = 0; i < _heap.size(); i++) { + MemBlockIteratorPtr ptr = _heap[i]; + if (NULL != ptr) { + delete ptr; + _heap[i] = NULL; + } + } +} + +bool PartitionBucketIterator::next() { + size_t cur_heap_size = _heap.size(); + if (cur_heap_size > 0) { + if (!_first) { + if (_heap[0]->next()) { // have more, adjust heap + if (cur_heap_size == 1) { + return true; + } else if (cur_heap_size == 2) { + MemBlockIteratorPtr * base = &(_heap[0]); + + if (_comparator(base[1], base[0])) { + std::swap(base[0], base[1]); + } + } else { + MemBlockIteratorPtr * base = &(_heap[0]); + heapify(base, 1, cur_heap_size, _comparator); + } + } else { // no more, pop heap + MemBlockIteratorPtr * base = &(_heap[0]); + popHeap(base, base + cur_heap_size, _comparator); + _heap.pop_back(); + } + } else { + _first = false; + } + return _heap.size() > 0; + } + return false; +} + +bool PartitionBucketIterator::next(Buffer & key, Buffer & value) { + bool result = next(); + if (result) { + MemBlockIteratorPtr * base = &(_heap[0]); + KVBuffer * kvBuffer = base[0]->getKVBuffer(); + + key.reset(kvBuffer->getKey(), kvBuffer->keyLength); + value.reset(kvBuffer->getValue(), kvBuffer->valueLength); + + return true; + } + return false; +} +} +; +// namespace NativeTask + Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucketIterator.h URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucketIterator.h?rev=1611413&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucketIterator.h (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/PartitionBucketIterator.h Thu Jul 17 17:44:55 2014 @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef PARTITION_BUCKET_ITERATOR_H_ +#define PARTITION_BUCKET_ITERATOR_H_ + +#include "NativeTask.h" +#include "MemoryPool.h" +#include "Timer.h" +#include "Buffers.h" +#include "MapOutputSpec.h" +#include "IFile.h" +#include "SpillInfo.h" +#include "Combiner.h" +#include "PartitionBucket.h" + +namespace NativeTask { + +class PartitionBucketIterator : public KVIterator { +protected: + PartitionBucket * _pb; + std::vector _heap; + MemBlockComparator _comparator; + bool _first; + +public: + PartitionBucketIterator(PartitionBucket * pb, ComparatorPtr comparator); + virtual ~PartitionBucketIterator(); + virtual bool next(Buffer & key, Buffer & value); + +private: + bool next(); +}; + +} +; +//namespace NativeTask + +#endif /* PARTITION_BUCKET_ITERATOR_H_ */ Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Path.cc URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Path.cc?rev=1611413&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Path.cc (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Path.cc Thu Jul 17 17:44:55 2014 @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "Path.h" + +namespace NativeTask { + +bool Path::IsAbsolute(const string & path) { + if (path.length() > 0 && path[0] == '/') { + return true; + } + return false; +} + +string Path::GetParent(const string & path) { + size_t lastSlash = path.rfind('/'); + if (lastSlash == path.npos) { + return "."; + } + if (lastSlash == 0 && path.length() == 1) { + return ""; + } + if (lastSlash == 0) { + return path; + } + return path.substr(0, lastSlash); +} + +string Path::GetName(const string & path) { + size_t lastSlash = path.rfind('/'); + if (lastSlash == path.npos) { + return path; + } + return path.substr(lastSlash + 1); +} + +} // namespace NativeTask + Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Path.h URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Path.h?rev=1611413&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Path.h (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Path.h Thu Jul 17 17:44:55 2014 @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef PATH_H_ +#define PATH_H_ + +#include +#include + +namespace NativeTask { + +using std::string; + +class Path { +public: + static bool IsAbsolute(const string & path); + static string GetParent(const string & path); + static string GetName(const string & path); +}; + +} // namespace NativeTask + +#endif /* PATH_H_ */ Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/SpillInfo.cc URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/SpillInfo.cc?rev=1611413&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/SpillInfo.cc (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/SpillInfo.cc Thu Jul 17 17:44:55 2014 @@ -0,0 +1,73 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "commons.h" +#include "Streams.h" +#include "FileSystem.h" +#include "Buffers.h" +#include "SpillInfo.h" + +namespace NativeTask { + +void SingleSpillInfo::deleteSpillFile() { + if (path.length() > 0) { + struct stat st; + if (0 == stat(path.c_str(), &st)) { + remove(path.c_str()); + } + } +} + +void SingleSpillInfo::writeSpillInfo(const std::string & filepath) { + OutputStream * fout = FileSystem::getLocal().create(filepath, true); + { + ChecksumOutputStream dest = ChecksumOutputStream(fout, CHECKSUM_CRC32); + AppendBuffer appendBuffer; + appendBuffer.init(32 * 1024, &dest, ""); + uint64_t base = 0; + + for (size_t j = 0; j < this->length; j++) { + IFileSegment * segment = &(this->segments[j]); + const bool firstSegment = (j == 0); + if (firstSegment) { + appendBuffer.write_uint64_be(base); + appendBuffer.write_uint64_be(segment->uncompressedEndOffset); + appendBuffer.write_uint64_be(segment->realEndOffset); + } else { + appendBuffer.write_uint64_be(base + this->segments[j - 1].realEndOffset); + appendBuffer.write_uint64_be( + segment->uncompressedEndOffset - this->segments[j - 1].uncompressedEndOffset); + appendBuffer.write_uint64_be(segment->realEndOffset - this->segments[j - 1].realEndOffset); + } + } + appendBuffer.flush(); + uint32_t chsum = dest.getChecksum(); +#ifdef SPILLRECORD_CHECKSUM_UINT + chsum = bswap(chsum); + fout->write(&chsum, sizeof(uint32_t)); +#else + uint64_t wtchsum = bswap64((uint64_t)chsum); + fout->write(&wtchsum, sizeof(uint64_t)); +#endif + } + fout->close(); + delete fout; +} + +} + Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/SpillInfo.h URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/SpillInfo.h?rev=1611413&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/SpillInfo.h (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/SpillInfo.h Thu Jul 17 17:44:55 2014 @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef PARTITIONINDEX_H_ +#define PARTITIONINDEX_H_ + +#include +#include + +namespace NativeTask { + +using std::string; + +/** + * Store spill file segment information + */ +struct IFileSegment { + // uncompressed stream end position + uint64_t uncompressedEndOffset; + // compressed stream end position + uint64_t realEndOffset; +}; + +class SingleSpillInfo { +public: + uint32_t length; + std::string path; + IFileSegment * segments; + ChecksumType checkSumType; + KeyValueType keyType; + KeyValueType valueType; + std::string codec; + + SingleSpillInfo(IFileSegment * segments, uint32_t len, const string & path, ChecksumType checksum, + KeyValueType ktype, KeyValueType vtype, const string & inputCodec) + : length(len), path(path), segments(segments), checkSumType(checksum), keyType(ktype), + valueType(vtype), codec(inputCodec) { + } + + ~SingleSpillInfo() { + delete[] segments; + } + + void deleteSpillFile(); + + uint64_t getEndPosition() { + return segments ? segments[length - 1].uncompressedEndOffset : 0; + } + + uint64_t getRealEndPosition() { + return segments ? segments[length - 1].realEndOffset : 0; + } + + void writeSpillInfo(const std::string & filepath); +}; + +class SpillInfos { +public: + std::vector spills; + SpillInfos() { + } + + ~SpillInfos() { + for (size_t i = 0; i < spills.size(); i++) { + delete spills[i]; + } + spills.clear(); + } + + void deleteAllSpillFiles() { + for (size_t i = 0; i < spills.size(); i++) { + spills[i]->deleteSpillFile(); + } + } + + void add(SingleSpillInfo * sri) { + spills.push_back(sri); + } + + uint32_t getSpillCount() const { + return spills.size(); + } + + SingleSpillInfo* getSingleSpillInfo(int index) { + return spills.at(index); + } +}; + +} // namespace NativeTask + +#endif /* PARTITIONINDEX_H_ */ Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/SpillOutputService.h URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/SpillOutputService.h?rev=1611413&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/SpillOutputService.h (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/SpillOutputService.h Thu Jul 17 17:44:55 2014 @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef SPILL_OUTPUT_SERVICE_H_ +#define SPILL_OUTPUT_SERVICE_H_ + +#include +#include + +namespace NativeTask { + +class CombineHandler; + +using std::string; + +class SpillOutputService { +public: + virtual ~SpillOutputService() {} + + virtual string * getSpillPath() = 0; + virtual string * getOutputPath() = 0; + virtual string * getOutputIndexPath() = 0; + + virtual CombineHandler * getJavaCombineHandler() = 0; +}; + +} // namespace NativeTask + +#endif /* SPILL_OUTPUT_SERVICE_H_ */