From mapreduce-commits-return-7154-apmail-hadoop-mapreduce-commits-archive=hadoop.apache.org@hadoop.apache.org Thu Jul 17 17:46:16 2014 Return-Path: X-Original-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Delivered-To: apmail-hadoop-mapreduce-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 36049117AF for ; Thu, 17 Jul 2014 17:46:16 +0000 (UTC) Received: (qmail 47529 invoked by uid 500); 17 Jul 2014 17:46:15 -0000 Delivered-To: apmail-hadoop-mapreduce-commits-archive@hadoop.apache.org Received: (qmail 47459 invoked by uid 500); 17 Jul 2014 17:46:15 -0000 Mailing-List: contact mapreduce-commits-help@hadoop.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: mapreduce-dev@hadoop.apache.org Delivered-To: mailing list mapreduce-commits@hadoop.apache.org Received: (qmail 47448 invoked by uid 99); 17 Jul 2014 17:46:15 -0000 Received: from nike.apache.org (HELO nike.apache.org) (192.87.106.230) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Jul 2014 17:46:15 +0000 X-ASF-Spam-Status: No, hits=-2000.0 required=5.0 tests=ALL_TRUSTED X-Spam-Check-By: apache.org Received: from [140.211.11.4] (HELO eris.apache.org) (140.211.11.4) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 17 Jul 2014 17:46:03 +0000 Received: from eris.apache.org (localhost [127.0.0.1]) by eris.apache.org (Postfix) with ESMTP id A85632388C3E; Thu, 17 Jul 2014 17:45:11 +0000 (UTC) Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit Subject: svn commit: r1611413 [9/18] - in /hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client: ./ hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/ hadoop-mapreduce-client-nativetask/ hadoop-mapreduce-client-nativ... Date: Thu, 17 Jul 2014 17:45:01 -0000 To: mapreduce-commits@hadoop.apache.org From: todd@apache.org X-Mailer: svnmailer-1.0.9 Message-Id: <20140717174511.A85632388C3E@eris.apache.org> X-Virus-Checked: Checked by ClamAV on apache.org Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/snappy.h URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/snappy.h?rev=1611413&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/snappy.h (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/codec/snappy.h Thu Jul 17 17:44:55 2014 @@ -0,0 +1,163 @@ +// Copyright 2005 and onwards Google Inc. +// +// Redistribution and use in source and binary forms, with or without +// modification, are permitted provided that the following conditions are +// met: +// +// * Redistributions of source code must retain the above copyright +// notice, this list of conditions and the following disclaimer. +// * Redistributions in binary form must reproduce the above +// copyright notice, this list of conditions and the following disclaimer +// in the documentation and/or other materials provided with the +// distribution. +// * Neither the name of Google Inc. nor the names of its +// contributors may be used to endorse or promote products derived from +// this software without specific prior written permission. +// +// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. +// +// A light-weight compression algorithm. It is designed for speed of +// compression and decompression, rather than for the utmost in space +// savings. +// +// For getting better compression ratios when you are compressing data +// with long repeated sequences or compressing data that is similar to +// other data, while still compressing fast, you might look at first +// using BMDiff and then compressing the output of BMDiff with +// Snappy. + +#ifndef UTIL_SNAPPY_SNAPPY_H__ +#define UTIL_SNAPPY_SNAPPY_H__ + +#include +#include + +#include "snappy-stubs-public.h" + +namespace snappy { + class Source; + class Sink; + + // ------------------------------------------------------------------------ + // Generic compression/decompression routines. + // ------------------------------------------------------------------------ + + // Compress the bytes read from "*source" and append to "*sink". Return the + // number of bytes written. + size_t Compress(Source* source, Sink* sink); + + // Find the uncompressed length of the given stream, as given by the header. + // Note that the true length could deviate from this; the stream could e.g. + // be truncated. + // + // Also note that this leaves "*source" in a state that is unsuitable for + // further operations, such as RawUncompress(). You will need to rewind + // or recreate the source yourself before attempting any further calls. + bool GetUncompressedLength(Source* source, uint32* result); + + // ------------------------------------------------------------------------ + // Higher-level string based routines (should be sufficient for most users) + // ------------------------------------------------------------------------ + + // Sets "*output" to the compressed version of "input[0,input_length-1]". + // Original contents of *output are lost. + // + // REQUIRES: "input[]" is not an alias of "*output". + size_t Compress(const char* input, size_t input_length, string* output); + + // Decompresses "compressed[0,compressed_length-1]" to "*uncompressed". + // Original contents of "*uncompressed" are lost. + // + // REQUIRES: "compressed[]" is not an alias of "*uncompressed". + // + // returns false if the message is corrupted and could not be decompressed + bool Uncompress(const char* compressed, size_t compressed_length, + string* uncompressed); + + + // ------------------------------------------------------------------------ + // Lower-level character array based routines. May be useful for + // efficiency reasons in certain circumstances. + // ------------------------------------------------------------------------ + + // REQUIRES: "compressed" must point to an area of memory that is at + // least "MaxCompressedLength(input_length)" bytes in length. + // + // Takes the data stored in "input[0..input_length]" and stores + // it in the array pointed to by "compressed". + // + // "*compressed_length" is set to the length of the compressed output. + // + // Example: + // char* output = new char[snappy::MaxCompressedLength(input_length)]; + // size_t output_length; + // RawCompress(input, input_length, output, &output_length); + // ... Process(output, output_length) ... + // delete [] output; + void RawCompress(const char* input, + size_t input_length, + char* compressed, + size_t* compressed_length); + + // Given data in "compressed[0..compressed_length-1]" generated by + // calling the Snappy::Compress routine, this routine + // stores the uncompressed data to + // uncompressed[0..GetUncompressedLength(compressed)-1] + // returns false if the message is corrupted and could not be decrypted + bool RawUncompress(const char* compressed, size_t compressed_length, + char* uncompressed); + + // Given data from the byte source 'compressed' generated by calling + // the Snappy::Compress routine, this routine stores the uncompressed + // data to + // uncompressed[0..GetUncompressedLength(compressed,compressed_length)-1] + // returns false if the message is corrupted and could not be decrypted + bool RawUncompress(Source* compressed, char* uncompressed); + + // Returns the maximal size of the compressed representation of + // input data that is "source_bytes" bytes in length; + size_t MaxCompressedLength(size_t source_bytes); + + // REQUIRES: "compressed[]" was produced by RawCompress() or Compress() + // Returns true and stores the length of the uncompressed data in + // *result normally. Returns false on parsing error. + // This operation takes O(1) time. + bool GetUncompressedLength(const char* compressed, size_t compressed_length, + size_t* result); + + // Returns true iff the contents of "compressed[]" can be uncompressed + // successfully. Does not return the uncompressed data. Takes + // time proportional to compressed_length, but is usually at least + // a factor of four faster than actual decompression. + bool IsValidCompressedBuffer(const char* compressed, + size_t compressed_length); + + // The size of a compression block. Note that many parts of the compression + // code assumes that kBlockSize <= 65536; in particular, the hash table + // can only store 16-bit offsets, and EmitCopy() also assumes the offset + // is 65535 bytes or less. Note also that if you change this, it will + // affect the framing format (see framing_format.txt). + // + // Note that there might be older data around that is compressed with larger + // block sizes, so the decompression code should not rely on the + // non-existence of long backreferences. + static const int kBlockLog = 16; + static const size_t kBlockSize = 1 << kBlockLog; + + static const int kMaxHashTableBits = 14; + static const size_t kMaxHashTableSize = 1 << kMaxHashTableBits; + +} // end namespace snappy + + +#endif // UTIL_SNAPPY_SNAPPY_H__ Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/AbstractMapHandler.cc URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/AbstractMapHandler.cc?rev=1611413&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/AbstractMapHandler.cc (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/AbstractMapHandler.cc Thu Jul 17 17:44:55 2014 @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "commons.h" +#include "util/StringUtil.h" +#include "MCollectorOutputHandler.h" +#include "NativeObjectFactory.h" +#include "MapOutputCollector.h" +#include "CombineHandler.h" + +using std::string; +using std::vector; + +namespace NativeTask { + +const Command AbstractMapHandler::GET_OUTPUT_PATH(100, "GET_OUTPUT_PATH"); +const Command AbstractMapHandler::GET_OUTPUT_INDEX_PATH(101, "GET_OUTPUT_INDEX_PATH"); +const Command AbstractMapHandler::GET_SPILL_PATH(102, "GET_SPILL_PATH"); +const Command AbstractMapHandler::GET_COMBINE_HANDLER(103, "GET_COMBINE_HANDLER"); +} //namespace Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/AbstractMapHandler.h URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/AbstractMapHandler.h?rev=1611413&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/AbstractMapHandler.h (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/AbstractMapHandler.h Thu Jul 17 17:44:55 2014 @@ -0,0 +1,102 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef ABSTRACT_MAP_HANDLER_H +#define ABSTRACT_MAP_HANDLER_H + +#include "NativeTask.h" +#include "BatchHandler.h" +#include "lib/SpillOutputService.h" +#include "lib/Combiner.h" +#include "CombineHandler.h" + +namespace NativeTask { + +class AbstractMapHandler : public BatchHandler, public SpillOutputService { +public: + static const Command GET_OUTPUT_PATH; + static const Command GET_OUTPUT_INDEX_PATH; + static const Command GET_SPILL_PATH; + static const Command GET_COMBINE_HANDLER; + +public: + AbstractMapHandler() {} + + virtual ~AbstractMapHandler() {} + + virtual void configure(Config * config) { + _config = config; + } + + virtual string * getOutputPath() { + ResultBuffer * outputPathResult = call(GET_OUTPUT_PATH, NULL); + if (NULL == outputPathResult) { + return NULL; + } + string * outputPath = outputPathResult->readString(); + + delete outputPathResult; + return outputPath; + } + + virtual string * getOutputIndexPath() { + + ResultBuffer * outputIndexPath = call(GET_OUTPUT_INDEX_PATH, NULL); + if (NULL == outputIndexPath) { + return NULL; + } + string * indexpath = outputIndexPath->readString(); + delete outputIndexPath; + return indexpath; + } + + + virtual string * getSpillPath() { + ResultBuffer * spillPathBuffer = call(GET_SPILL_PATH, NULL); + if (NULL == spillPathBuffer) { + return NULL; + } + string * spillpath = spillPathBuffer->readString(); + delete spillPathBuffer; + return spillpath; + } + + virtual CombineHandler * getJavaCombineHandler() { + + LOG("[MapOutputCollector::configure] java combiner is configured"); + + ResultBuffer * getCombineHandlerResult = call(GET_COMBINE_HANDLER, NULL); + if (NULL != getCombineHandlerResult) { + + getCombineHandlerResult->setReadPoint(0); + + CombineHandler * javaCombiner = (CombineHandler *)((BatchHandler * )(getCombineHandlerResult->readPointer())); + delete getCombineHandlerResult; + return javaCombiner; + } + + + + return NULL; + } + +}; + +} // namespace NativeTask + +#endif /* MMAPPERHANDLER_H_ */ Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/BatchHandler.cc URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/BatchHandler.cc?rev=1611413&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/BatchHandler.cc (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/BatchHandler.cc Thu Jul 17 17:44:55 2014 @@ -0,0 +1,354 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef QUICK_BUILD +#include "org_apache_hadoop_mapred_nativetask_NativeBatchProcessor.h" +#endif +#include "commons.h" +#include "jni_md.h" +#include "jniutils.h" +#include "BatchHandler.h" +#include "NativeObjectFactory.h" + +/////////////////////////////////////////////////////////////// +// NativeBatchProcessor jni util methods +/////////////////////////////////////////////////////////////// + +static jfieldID InputBufferFieldID = NULL; +static jfieldID OutputBufferFieldID = NULL; +static jmethodID FlushOutputMethodID = NULL; +static jmethodID FinishOutputMethodID = NULL; +static jmethodID SendCommandToJavaMethodID = NULL; + +/////////////////////////////////////////////////////////////// +// BatchHandler methods +/////////////////////////////////////////////////////////////// + +namespace NativeTask { + +ReadWriteBuffer * JNU_ByteArraytoReadWriteBuffer(JNIEnv * jenv, jbyteArray src) { + if (NULL == src) { + return NULL; + } + jsize len = jenv->GetArrayLength(src); + + ReadWriteBuffer * ret = new ReadWriteBuffer(len); + jenv->GetByteArrayRegion(src, 0, len, (jbyte*)ret->getBuff()); + ret->setWritePoint(len); + return ret; +} + +jbyteArray JNU_ReadWriteBufferToByteArray(JNIEnv * jenv, ReadWriteBuffer * result) { + if (NULL == result || result->getWritePoint() == 0) { + return NULL; + } + + jbyteArray ret = jenv->NewByteArray(result->getWritePoint()); + jenv->SetByteArrayRegion(ret, 0, result->getWritePoint(), (jbyte*)result->getBuff()); + return ret; +} + +BatchHandler::BatchHandler() + : _processor(NULL), _config(NULL) { +} + +BatchHandler::~BatchHandler() { + releaseProcessor(); + if (NULL != _config) { + delete _config; + _config = NULL; + } +} + +void BatchHandler::releaseProcessor() { + if (_processor != NULL) { + JNIEnv * env = JNU_GetJNIEnv(); + env->DeleteGlobalRef((jobject)_processor); + _processor = NULL; + } +} + +void BatchHandler::onInputData(uint32_t length) { + _in.rewind(0, length); + handleInput(_in); +} + +void BatchHandler::flushOutput() { + + if (NULL == _out.base()) { + return; + } + + uint32_t length = _out.position(); + _out.position(0); + + if (length == 0) { + return; + } + + JNIEnv * env = JNU_GetJNIEnv(); + env->CallVoidMethod((jobject)_processor, FlushOutputMethodID, (jint)length); + if (env->ExceptionCheck()) { + THROW_EXCEPTION(JavaException, "FlushOutput throw exception"); + } +} + +void BatchHandler::finishOutput() { + if (NULL == _out.base()) { + return; + } + JNIEnv * env = JNU_GetJNIEnv(); + env->CallVoidMethod((jobject)_processor, FinishOutputMethodID); + if (env->ExceptionCheck()) { + THROW_EXCEPTION(JavaException, "FinishOutput throw exception"); + } +} + +void BatchHandler::onSetup(Config * config, char * inputBuffer, uint32_t inputBufferCapacity, + char * outputBuffer, uint32_t outputBufferCapacity) { + this->_config = config; + _in.reset(inputBuffer, inputBufferCapacity); + if (NULL != outputBuffer) { + if (outputBufferCapacity <= 1024) { + THROW_EXCEPTION(IOException, "Output buffer size too small for BatchHandler"); + } + _out.reset(outputBuffer, outputBufferCapacity); + _out.rewind(0, outputBufferCapacity); + + LOG("[BatchHandler::onSetup] input Capacity %d, output capacity %d", inputBufferCapacity, _out.limit()); + } + configure(_config); +} + +ResultBuffer * BatchHandler::call(const Command& cmd, ParameterBuffer * param) { + JNIEnv * env = JNU_GetJNIEnv(); + jbyteArray jcmdData = JNU_ReadWriteBufferToByteArray(env, param); + jbyteArray ret = (jbyteArray)env->CallObjectMethod((jobject)_processor, SendCommandToJavaMethodID, + cmd.id(), jcmdData); + + + if (env->ExceptionCheck()) { + THROW_EXCEPTION(JavaException, "SendCommandToJava throw exception"); + } + return JNU_ByteArraytoReadWriteBuffer(env, ret); +} + +} // namespace NativeTask + +/////////////////////////////////////////////////////////////// +// NativeBatchProcessor jni methods +/////////////////////////////////////////////////////////////// +using namespace NativeTask; + +/* + * Class: org_apache_hadoop_mapred_nativetask_NativeBatchProcessor + * Method: setupHandler + * Signature: (J)V + */ +void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_setupHandler( + JNIEnv * jenv, jobject processor, jlong handler, jobjectArray configs) { + try { + + NativeTask::Config * config = new NativeTask::Config(); + jsize len = jenv->GetArrayLength(configs); + for (jsize i = 0; i + 1 < len; i += 2) { + jbyteArray key_obj = (jbyteArray)jenv->GetObjectArrayElement(configs, i); + jbyteArray val_obj = (jbyteArray)jenv->GetObjectArrayElement(configs, i + 1); + config->set(JNU_ByteArrayToString(jenv, key_obj), JNU_ByteArrayToString(jenv, val_obj)); + } + + NativeTask::BatchHandler * batchHandler = (NativeTask::BatchHandler *)((void*)handler); + if (NULL == batchHandler) { + JNU_ThrowByName(jenv, "java/lang/IllegalArgumentException", "BatchHandler is null"); + return; + } + jobject jinputBuffer = jenv->GetObjectField(processor, InputBufferFieldID); + char * inputBufferAddr = NULL; + uint32_t inputBufferCapacity = 0; + if (NULL != jinputBuffer) { + inputBufferAddr = (char*)(jenv->GetDirectBufferAddress(jinputBuffer)); + inputBufferCapacity = jenv->GetDirectBufferCapacity(jinputBuffer); + } + jobject joutputBuffer = jenv->GetObjectField(processor, OutputBufferFieldID); + char * outputBufferAddr = NULL; + uint32_t outputBufferCapacity = 0; + if (NULL != joutputBuffer) { + outputBufferAddr = (char*)(jenv->GetDirectBufferAddress(joutputBuffer)); + outputBufferCapacity = jenv->GetDirectBufferCapacity(joutputBuffer); + } + batchHandler->setProcessor(jenv->NewGlobalRef(processor)); + batchHandler->onSetup(config, inputBufferAddr, inputBufferCapacity, outputBufferAddr, + outputBufferCapacity); + } catch (NativeTask::UnsupportException & e) { + JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what()); + } catch (NativeTask::OutOfMemoryException & e) { + JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what()); + } catch (NativeTask::IOException & e) { + JNU_ThrowByName(jenv, "java/io/IOException", e.what()); + } catch (NativeTask::JavaException & e) { + LOG("JavaException: %s", e.what()); + // Do nothing, let java side handle + } catch (std::exception & e) { + JNU_ThrowByName(jenv, "java/io/IOException", e.what()); + } catch (...) { + JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception"); + } +} + +/* + * Class: org_apache_hadoop_mapred_nativetask_NativeBatchProcessor + * Method: nativeProcessInput + * Signature: (JI)V + */ +void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_nativeProcessInput( + JNIEnv * jenv, jobject processor, jlong handler, jint length) { + + try { + NativeTask::BatchHandler * batchHandler = (NativeTask::BatchHandler *)((void*)handler); + if (NULL == batchHandler) { + JNU_ThrowByName(jenv, "java/lang/IllegalArgumentException", + "handler not instance of BatchHandler"); + return; + } + batchHandler->onInputData(length); + } catch (NativeTask::UnsupportException & e) { + JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what()); + } catch (NativeTask::OutOfMemoryException & e) { + JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what()); + } catch (NativeTask::IOException & e) { + JNU_ThrowByName(jenv, "java/io/IOException", e.what()); + } catch (NativeTask::JavaException & e) { + LOG("JavaException: %s", e.what()); + // Do nothing, let java side handle + } catch (std::exception & e) { + JNU_ThrowByName(jenv, "java/io/IOException", e.what()); + } catch (...) { + JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception"); + } +} + +/* + * Class: org_apache_hadoop_mapred_nativetask_NativeBatchProcessor + * Method: nativeFinish + * Signature: (J)V + */ +void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_nativeFinish( + JNIEnv * jenv, jobject processor, jlong handler) { + try { + NativeTask::BatchHandler * batchHandler = (NativeTask::BatchHandler *)((void*)handler); + if (NULL == batchHandler) { + JNU_ThrowByName(jenv, "java/lang/IllegalArgumentException", + "handler not instance of BatchHandler"); + return; + } + batchHandler->onFinish(); + } catch (NativeTask::UnsupportException & e) { + JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what()); + } catch (NativeTask::OutOfMemoryException & e) { + JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what()); + } catch (NativeTask::IOException & e) { + JNU_ThrowByName(jenv, "java/io/IOException", e.what()); + } catch (NativeTask::JavaException & e) { + LOG("JavaException: %s", e.what()); + // Do nothing, let java side handle + } catch (std::exception & e) { + JNU_ThrowByName(jenv, "java/io/IOException", e.what()); + } catch (...) { + JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception"); + } +} + +void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_nativeLoadData( + JNIEnv * jenv, jobject processor, jlong handler) { + try { + NativeTask::BatchHandler * batchHandler = (NativeTask::BatchHandler *)((void*)handler); + if (NULL == batchHandler) { + JNU_ThrowByName(jenv, "java/lang/IllegalArgumentException", + "handler not instance of BatchHandler"); + return; + } + batchHandler->onLoadData(); + } catch (NativeTask::UnsupportException & e) { + JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what()); + } catch (NativeTask::OutOfMemoryException & e) { + JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what()); + } catch (NativeTask::IOException & e) { + JNU_ThrowByName(jenv, "java/io/IOException", e.what()); + } catch (NativeTask::JavaException & e) { + LOG("JavaException: %s", e.what()); + // Do nothing, let java side handle + } catch (std::exception & e) { + JNU_ThrowByName(jenv, "java/io/IOException", e.what()); + } catch (...) { + JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception"); + } +} + +/* + * Class: org_apache_hadoop_mapred_nativetask_NativeBatchProcessor + * Method: nativeCommand + * Signature: (J[B)[B + */ +jbyteArray JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_nativeCommand( + JNIEnv * jenv, jobject processor, jlong handler, jint command, jbyteArray cmdData) { + try { + NativeTask::BatchHandler * batchHandler = (NativeTask::BatchHandler *)((void*)handler); + if (NULL == batchHandler) { + JNU_ThrowByName(jenv, "java/lang/IllegalArgumentException", + "handler not instance of BatchHandler"); + return NULL; + } + Command cmd(command); + ParameterBuffer * param = JNU_ByteArraytoReadWriteBuffer(jenv, cmdData); + ResultBuffer * result = batchHandler->onCall(cmd, param); + jbyteArray ret = JNU_ReadWriteBufferToByteArray(jenv, result); + + delete result; + delete param; + return ret; + } catch (NativeTask::UnsupportException & e) { + JNU_ThrowByName(jenv, "java/lang/UnsupportedOperationException", e.what()); + } catch (NativeTask::OutOfMemoryException & e) { + JNU_ThrowByName(jenv, "java/lang/OutOfMemoryError", e.what()); + } catch (NativeTask::IOException & e) { + JNU_ThrowByName(jenv, "java/io/IOException", e.what()); + } catch (const NativeTask::JavaException & e) { + LOG("JavaException: %s", e.what()); + // Do nothing, let java side handle + } catch (std::exception & e) { + JNU_ThrowByName(jenv, "java/io/IOException", e.what()); + } catch (...) { + JNU_ThrowByName(jenv, "java/io/IOException", "Unknown exception"); + } + return NULL; +} + +/* + * Class: org_apace_hadoop_mapred_nativetask_NativeBatchProcessor + * Method: InitIDs + * Signature: ()V + */ +void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_InitIDs(JNIEnv * jenv, + jclass processorClass) { + InputBufferFieldID = jenv->GetFieldID(processorClass, "rawOutputBuffer", "Ljava/nio/ByteBuffer;"); + OutputBufferFieldID = jenv->GetFieldID(processorClass, "rawInputBuffer", "Ljava/nio/ByteBuffer;"); + FlushOutputMethodID = jenv->GetMethodID(processorClass, "flushOutput", "(I)V"); + FinishOutputMethodID = jenv->GetMethodID(processorClass, "finishOutput", "()V"); + SendCommandToJavaMethodID = jenv->GetMethodID(processorClass, "sendCommandToJava", "(I[B)[B"); +} + Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/BatchHandler.h URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/BatchHandler.h?rev=1611413&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/BatchHandler.h (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/BatchHandler.h Thu Jul 17 17:44:55 2014 @@ -0,0 +1,157 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef BATCHHANDLER_H_ +#define BATCHHANDLER_H_ + +#include "NativeTask.h" +#include "lib/Buffers.h" + +namespace NativeTask { + +/** + * Native side counterpart of java side NativeBatchProcessor + */ +class BatchHandler : public Configurable { +protected: + ByteBuffer _in; + ByteBuffer _out; + void * _processor; + Config * _config; +public: + BatchHandler(); + virtual ~BatchHandler(); + + virtual NativeObjectType type() { + return BatchHandlerType; + } + + /** + * Called by native jni functions to set global jni reference + */ + void setProcessor(void * processor) { + _processor = processor; + } + + void releaseProcessor(); + + /** + * Called by java side to setup native side BatchHandler + * initialize buffers by default + */ + void onSetup(Config * config, char * inputBuffer, uint32_t inputBufferCapacity, + char * outputBuffer, uint32_t outputBufferCapacity); + + /** + * Called by java side to notice that input data available to handle + * @param length input buffer's available data length + */ + void onInputData(uint32_t length); + + virtual void onLoadData() { + } + + /** + * Called by java side to notice that input has finished + */ + void onFinish() { + finish(); + } + + /** + * Called by java side to send command to this handler + * BatchHandler ignore all command by default + * @param cmd command data + * @return command return value + */ + virtual ResultBuffer * onCall(const Command& command, ReadWriteBuffer * param) { + return NULL; + } + +protected: + virtual ResultBuffer * call(const Command& cmd, ParameterBuffer * param); + + /** + * Used by subclass, call java side flushOutput(int length) + * @param length output buffer's available data length + */ + virtual void flushOutput(); + + /** + * Used by subclass, call java side finishOutput() + */ + void finishOutput(); + + /** + * Write output buffer and use flushOutput manually, + * or use this helper method + */ + inline void output(const char * buff, uint32_t length) { + while (length > 0) { + if (length > _out.remain()) { + flushOutput(); + } + uint32_t remain = _out.remain(); + uint32_t cp = length < remain ? length : remain; + simple_memcpy(_out.current(), buff, cp); + buff += cp; + length -= cp; + _out.advance(cp); + } + } + + inline void outputInt(uint32_t v) { + if (4 > _out.remain()) { + flushOutput(); + } + *(uint32_t*)(_out.current()) = v; + _out.advance(4); + } + + ///////////////////////////////////////////////////////////// + // Subclass should implement these if needed + ///////////////////////////////////////////////////////////// + + /** + * Called by onSetup, do nothing by default + * Subclass should override this if needed + */ + virtual void configure(Config * config) { + } + + /** + * Called by onFinish, flush & close output by default + * Subclass should override this if needed + */ + virtual void finish() { + flushOutput(); + finishOutput(); + } + ; + + /** + * Called by onInputData, internal input data processor, + * Subclass should override this if needed + */ + virtual void handleInput(ByteBuffer & byteBuffer) { + } +}; + +} // namespace NativeTask + +#endif /* BATCHHANDLER_H_ */ Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/CombineHandler.cc URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/CombineHandler.cc?rev=1611413&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/CombineHandler.cc (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/CombineHandler.cc Thu Jul 17 17:44:55 2014 @@ -0,0 +1,256 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include "CombineHandler.h" + +namespace NativeTask { +const char * REFILL = "refill"; +const int LENGTH_OF_REFILL_STRING = 6; + +const Command CombineHandler::COMBINE(4, "Combine"); + +CombineHandler::CombineHandler() + : _combineContext(NULL), _kvIterator(NULL), _writer(NULL), _config(NULL), _kvCached(false), + _kType(UnknownType), _vType(UnknownType), _combineInputRecordCount(0), _combineInputBytes(0), + _combineOutputRecordCount(0),_combineOutputBytes(0){ +} + +CombineHandler::~CombineHandler() { +} + +void CombineHandler::configure(Config * config) { + + _config = config; + MapOutputSpec::getSpecFromConfig(_config, _mapOutputSpec); + _kType = _mapOutputSpec.keyType; + _vType = _mapOutputSpec.valueType; +} + +uint32_t CombineHandler::feedDataToJavaInWritableSerialization() { + + uint32_t written = 0; + bool firstKV = true; + _out.position(0); + + if (_kvCached) { + uint32_t kvLength = _key.outerLength + _value.outerLength + KVBuffer::headerLength(); + outputInt(bswap(_key.outerLength)); + outputInt(bswap(_value.outerLength)); + outputKeyOrValue(_key, _kType); + outputKeyOrValue(_value, _vType); + + written += kvLength; + _kvCached = false; + firstKV = false; + } + + uint32_t recordCount = 0; + while (nextKeyValue(_key, _value)) { + + //::sleep(5); + _kvCached = false; + recordCount++; + + uint32_t kvLength = _key.outerLength + _value.outerLength + KVBuffer::headerLength(); + + if (!firstKV && kvLength > _out.remain()) { + _kvCached = true; + break; + } else { + firstKV = false; + //write final key length and final value length + outputInt(bswap(_key.outerLength)); + outputInt(bswap(_value.outerLength)); + outputKeyOrValue(_key, _kType); + outputKeyOrValue(_value, _vType); + + written += kvLength; + } + } + + if (_out.position() > 0) { + flushOutput(); + } + + _combineInputRecordCount += recordCount; + _combineInputBytes += written; + return written; +} + +/** + * KV: key or value + */ +void CombineHandler::outputKeyOrValue(SerializeInfo & KV, KeyValueType type) { + uint32_t length = 0; + switch (type) { + case TextType: + output(KV.varBytes, KV.outerLength - KV.buffer.length()); + output(KV.buffer.data(), KV.buffer.length()); + break; + case BytesType: + outputInt(bswap(KV.buffer.length())); + output(KV.buffer.data(), KV.buffer.length()); + break; + default: + output(KV.buffer.data(), KV.buffer.length()); + break; + } +} + +bool CombineHandler::nextKeyValue(SerializeInfo & key, SerializeInfo & value) { + + if (!_kvIterator->next(key.buffer, value.buffer)) { + return false; + } + + uint32_t varLength = 0; + switch (_kType) { + case TextType: + WritableUtils::WriteVInt(key.buffer.length(), key.varBytes, varLength); + key.outerLength = key.buffer.length() + varLength; + break; + case BytesType: + key.outerLength = key.buffer.length() + 4; + break; + default: + key.outerLength = key.buffer.length(); + break; + } + + //prepare final value length + uint32_t varValueLength = 0; + switch (_vType) { + case TextType: + WritableUtils::WriteVInt(value.buffer.length(), value.varBytes, varValueLength); + value.outerLength = value.buffer.length() + varValueLength; + break; + case BytesType: + value.outerLength = value.buffer.length() + 4; + break; + default: + value.outerLength = value.buffer.length(); + break; + } + + return true; +} + +uint32_t CombineHandler::feedDataToJava(SerializationFramework serializationType) { + if (serializationType == WRITABLE_SERIALIZATION) { + return feedDataToJavaInWritableSerialization(); + } + THROW_EXCEPTION(IOException, "Native Serialization not supported"); +} + +void CombineHandler::handleInput(ByteBuffer & in) { + char * buff = in.current(); + uint32_t length = in.remain(); + const char * end = buff + length; + uint32_t remain = length; + char * pos = buff; + if (_asideBuffer.remain() > 0) { + uint32_t filledLength = _asideBuffer.fill(pos, length); + pos += filledLength; + remain -= filledLength; + } + + if (_asideBuffer.size() > 0 && _asideBuffer.remain() == 0) { + _asideBuffer.position(0); + write(_asideBuffer.current(), _asideBuffer.size()); + _asideBuffer.wrap(NULL, 0); + } + + if (remain == 0) { + return; + } + KVBuffer * kvBuffer = (KVBuffer *)pos; + + if (unlikely(remain < kvBuffer->headerLength())) { + THROW_EXCEPTION(IOException, "k/v meta information incomplete"); + } + + int kvLength = kvBuffer->lengthConvertEndium(); + + if (kvLength > remain) { + _asideBytes.resize(kvLength); + _asideBuffer.wrap(_asideBytes.buff(), _asideBytes.size()); + _asideBuffer.fill(pos, remain); + pos += remain; + remain = 0; + } else { + write(pos, remain); + } +} + +void CombineHandler::write(char * buf, uint32_t length) { + KVBuffer * kv = NULL; + char * pos = buf; + uint32_t remain = length; + + uint32_t outputRecordCount = 0; + while (remain > 0) { + kv = (KVBuffer *)pos; + kv->keyLength = bswap(kv->keyLength); + kv->valueLength = bswap(kv->valueLength); + _writer->write(kv->getKey(), kv->keyLength, kv->getValue(), kv->valueLength); + outputRecordCount++; + remain -= kv->length(); + pos += kv->length(); + ; + } + + _combineOutputRecordCount += outputRecordCount; + _combineOutputBytes += length; +} + +string toString(uint32_t length) { + string result; + result.reserve(4); + result.assign((char *)(&length), 4); + return result; +} + +void CombineHandler::onLoadData() { + feedDataToJava(WRITABLE_SERIALIZATION); +} + +ResultBuffer * CombineHandler::onCall(const Command& command, ParameterBuffer * param) { + THROW_EXCEPTION(UnsupportException, "Command not supported by RReducerHandler"); +} + +void CombineHandler::combine(CombineContext type, KVIterator * kvIterator, IFileWriter * writer) { + + _combineInputRecordCount = 0; + _combineOutputRecordCount = 0; + _combineInputBytes = 0; + _combineOutputBytes = 0; + + this->_combineContext = &type; + this->_kvIterator = kvIterator; + this->_writer = writer; + call(COMBINE, NULL); + + LOG("[CombineHandler] input Record Count: %d, input Bytes: %d, output Record Count: %d, output Bytes: %d", + _combineInputRecordCount, _combineInputBytes, + _combineOutputRecordCount, _combineOutputBytes); + return; +} + +void CombineHandler::finish() { +} + +} /* namespace NativeTask */ Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/CombineHandler.h URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/CombineHandler.h?rev=1611413&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/CombineHandler.h (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/CombineHandler.h Thu Jul 17 17:44:55 2014 @@ -0,0 +1,90 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#ifndef _COMBINEHANDLER_H_ +#define _COMBINEHANDLER_H_ + +#include "Combiner.h" +#include "BatchHandler.h" + +namespace NativeTask { + +enum SerializationFramework { + WRITABLE_SERIALIZATION = 0, + NATIVE_SERIALIZATION = 1 +}; + +struct SerializeInfo { + Buffer buffer; + uint32_t outerLength; + char varBytes[8]; +}; + +class CombineHandler : public NativeTask::ICombineRunner, public NativeTask::BatchHandler { +public: + static const Command COMBINE; + +private: + + CombineContext * _combineContext; + KVIterator * _kvIterator; + IFileWriter * _writer; + SerializeInfo _key; + SerializeInfo _value; + + KeyValueType _kType; + KeyValueType _vType; + MapOutputSpec _mapOutputSpec; + Config * _config; + bool _kvCached; + + uint32_t _combineInputRecordCount; + uint32_t _combineInputBytes; + + uint32_t _combineOutputRecordCount; + uint32_t _combineOutputBytes; + + FixSizeContainer _asideBuffer; + ByteArray _asideBytes; + +public: + CombineHandler(); + virtual ~CombineHandler(); + + virtual void handleInput(ByteBuffer & byteBuffer); + void finish(); + + ResultBuffer * onCall(const Command& command, ParameterBuffer * param); + + void configure(Config * config); + + void combine(CombineContext type, KVIterator * kvIterator, IFileWriter * writer); + + virtual void onLoadData(); + +private: + void flushDataToWriter(); + void outputKeyOrValue(SerializeInfo & info, KeyValueType type); + bool nextKeyValue(SerializeInfo & key, SerializeInfo & value); + uint32_t feedDataToJava(SerializationFramework serializationType); + uint32_t feedDataToJavaInWritableSerialization(); + void write(char * buf, uint32_t length); + +}; + +} /* namespace NativeTask */ +#endif /* _JAVACOMBINEHANDLER_H_ */ Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/MCollectorOutputHandler.cc URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/MCollectorOutputHandler.cc?rev=1611413&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/MCollectorOutputHandler.cc (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/MCollectorOutputHandler.cc Thu Jul 17 17:44:55 2014 @@ -0,0 +1,97 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "commons.h" +#include "util/StringUtil.h" +#include "MCollectorOutputHandler.h" +#include "NativeObjectFactory.h" +#include "MapOutputCollector.h" +#include "CombineHandler.h" + +using std::string; +using std::vector; + +namespace NativeTask { + +MCollectorOutputHandler::MCollectorOutputHandler() + : _collector(NULL), _dest(NULL), _endium(LARGE_ENDIUM) { +} + +MCollectorOutputHandler::~MCollectorOutputHandler() { + _dest = NULL; + delete _collector; + _collector = NULL; +} + +void MCollectorOutputHandler::configure(Config * config) { + if (NULL == config) { + return; + } + + uint32_t partition = config->getInt(MAPRED_NUM_REDUCES, 1); + + _collector = new MapOutputCollector(partition, this); + _collector->configure(config); +} + +void MCollectorOutputHandler::finish() { + _collector->close(); + BatchHandler::finish(); +} + +void MCollectorOutputHandler::handleInput(ByteBuffer & in) { + char * buff = in.current(); + uint32_t length = in.remain(); + + const char * end = buff + length; + char * pos = buff; + if (_kvContainer.remain() > 0) { + uint32_t filledLength = _kvContainer.fill(pos, length); + pos += filledLength; + } + + while (end - pos > 0) { + KVBufferWithParititionId * kvBuffer = (KVBufferWithParititionId *)pos; + + if (unlikely(end - pos < KVBuffer::headerLength())) { + THROW_EXCEPTION(IOException, "k/v meta information incomplete"); + } + + if (_endium == LARGE_ENDIUM) { + kvBuffer->partitionId = bswap(kvBuffer->partitionId); + kvBuffer->buffer.keyLength = bswap(kvBuffer->buffer.keyLength); + kvBuffer->buffer.valueLength = bswap(kvBuffer->buffer.valueLength); + } + + uint32_t kvLength = kvBuffer->buffer.length(); + + KVBuffer * dest = allocateKVBuffer(kvBuffer->partitionId, kvLength); + _kvContainer.wrap((char *)dest, kvLength); + + pos += 4; //skip the partition length + uint32_t filledLength = _kvContainer.fill(pos, end - pos); + pos += filledLength; + } +} + +KVBuffer * MCollectorOutputHandler::allocateKVBuffer(uint32_t partitionId, uint32_t kvlength) { + KVBuffer * dest = _collector->allocateKVBuffer(partitionId, kvlength); + return dest; +} + +} //namespace Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/MCollectorOutputHandler.h URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/MCollectorOutputHandler.h?rev=1611413&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/MCollectorOutputHandler.h (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/MCollectorOutputHandler.h Thu Jul 17 17:44:55 2014 @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef MCOLLECTOROUTPUTHANDLER_H_ +#define MCOLLECTOROUTPUTHANDLER_H_ + +#include "BatchHandler.h" +#include "lib/SpillOutputService.h" +#include "AbstractMapHandler.h" + +namespace NativeTask { +class MapOutputCollector; + +class MCollectorOutputHandler : public AbstractMapHandler { +private: + + FixSizeContainer _kvContainer; + + MapOutputCollector * _collector; + // state info for large KV pairs + char * _dest; + + Endium _endium; + +public: + MCollectorOutputHandler(); + virtual ~MCollectorOutputHandler(); + + virtual void configure(Config * config); + virtual void finish(); + virtual void handleInput(ByteBuffer & byteBuffer); +private: + KVBuffer * allocateKVBuffer(uint32_t partition, uint32_t kvlength); +}; + +} + +#endif /* MCOLLECTOROUTPUTHANDLER_H_ */ Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/org_apache_hadoop_mapred_nativetask_NativeBatchProcessor.h URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/org_apache_hadoop_mapred_nativetask_NativeBatchProcessor.h?rev=1611413&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/org_apache_hadoop_mapred_nativetask_NativeBatchProcessor.h (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/handler/org_apache_hadoop_mapred_nativetask_NativeBatchProcessor.h Thu Jul 17 17:44:55 2014 @@ -0,0 +1,54 @@ +/* DO NOT EDIT THIS FILE - it is machine generated */ +#include +/* Header for class org_apache_hadoop_mapred_nativetask_NativeBatchProcessor */ + +#ifndef _Included_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor +#define _Included_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor +#ifdef __cplusplus +extern "C" { +#endif +/* + * Class: org_apache_hadoop_mapred_nativetask_NativeBatchProcessor + * Method: setupHandler + * Signature: (J)V + */JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_setupHandler( + JNIEnv *, jobject, jlong, jobjectArray configs); + +/* + * Class: org_apache_hadoop_mapred_nativetask_NativeBatchProcessor + * Method: nativeProcessInput + * Signature: (JI)V + */JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_nativeProcessInput( + JNIEnv *, jobject, jlong, jint); + +/* + * Class: org_apache_hadoop_mapred_nativetask_NativeBatchProcessor + * Method: nativeFinish + * Signature: (J)V + */JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_nativeFinish( + JNIEnv *, jobject, jlong); +/* + * Class: org_apache_hadoop_mapred_nativetask_NativeBatchProcessor + * Method: nativeCommand + * Signature: (J[B)[B + */JNIEXPORT jbyteArray JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_nativeCommand( + JNIEnv *, jobject, jlong, jint, jbyteArray); + +/* + * Class: org_apache_hadoop_mapred_nativetask_NativeBatchProcessor + * Method: nativeLoadData + * Signature: (J)V + */JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_nativeLoadData( + JNIEnv *, jobject, jlong); + +/* + * Class: org_apache_hadoop_mapred_nativetask_NativeBatchProcessor + * Method: InitIDs + * Signature: ()V + */JNIEXPORT void JNICALL Java_org_apache_hadoop_mapred_nativetask_NativeBatchProcessor_InitIDs( + JNIEnv *, jclass); + +#ifdef __cplusplus +} +#endif +#endif Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/BufferStream.cc URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/BufferStream.cc?rev=1611413&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/BufferStream.cc (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/BufferStream.cc Thu Jul 17 17:44:55 2014 @@ -0,0 +1,155 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "commons.h" +#include "BufferStream.h" + +namespace NativeTask { + +BufferedInputStream::BufferedInputStream(InputStream * stream, uint32_t bufferSize) + : FilterInputStream(stream), _buff(NULL), _position(0), _limit(0), _capacity(0) { + _buff = (char*)malloc(bufferSize); + if (NULL != _buff) { + LOG("[BuferStream] malloc failed when create BufferedInputStream with buffersize %u", + bufferSize); + _capacity = bufferSize; + } +} + +BufferedInputStream::~BufferedInputStream() { + if (NULL != _buff) { + free(_buff); + _buff = NULL; + _position = 0; + _limit = 0; + _capacity = 0; + } +} + +void BufferedInputStream::seek(uint64_t position) { + if (_limit - _position > 0) { + THROW_EXCEPTION(IOException, "temporary buffered data exists when fseek()"); + } + _stream->seek(position); +} + +uint64_t BufferedInputStream::tell() { + return _stream->tell() - (_limit - _position); +} + +int32_t BufferedInputStream::read(void * buff, uint32_t length) { + uint32_t rest = _limit - _position; + if (rest > 0) { + // have some data in buffer, read from buffer + uint32_t cp = rest < length ? rest : length; + memcpy(buff, _buff + _position, cp); + _position += cp; + return cp; + } else if (length >= _capacity / 2) { + // dest buffer big enough, read to dest buffer directly + return _stream->read(buff, length); + } else { + // read to buffer first, then copy part of it to dest + _limit = 0; + do { + int32_t rd = _stream->read(_buff + _limit, _capacity - _limit); + if (rd <= 0) { + break; + } + } while (_limit < _capacity / 2); + if (_limit == 0) { + return -1; + } + uint32_t cp = _limit < length ? _limit : length; + memcpy(buff, _buff, cp); + _position = cp; + return cp; + } +} + +///////////////////////////////////////////////////////////////// + +BufferedOutputStream::BufferedOutputStream(InputStream * stream, uint32_t bufferSize) + : FilterOutputStream(_stream), _buff(NULL), _position(0), _capacity(0) { + _buff = (char*)malloc(bufferSize + sizeof(uint64_t)); + if (NULL != _buff) { + LOG("[BuferStream] malloc failed when create BufferedOutputStream with buffersize %u", + bufferSize); + _capacity = bufferSize; + } +} + +BufferedOutputStream::~BufferedOutputStream() { + if (NULL != _buff) { + free(_buff); + _buff = NULL; + _position = 0; + _capacity = 0; + } +} + +uint64_t BufferedOutputStream::tell() { + return _stream->tell() + _position; +} + +void BufferedOutputStream::write(const void * buff, uint32_t length) { + if (length < _capacity / 2) { + uint32_t rest = _capacity - _position; + if (length < rest) { + simple_memcpy(_buff + _position, buff, length); + _position += length; + } else { + flush(); + simple_memcpy(_buff, buff, length); + _position = length; + } + } else { + flush(); + _stream->write(buff, length); + } +} + +void BufferedOutputStream::flush() { + if (_position > 0) { + _stream->write(_buff, _position); + _position = 0; + } +} + +/////////////////////////////////////////////////////////// + +int32_t InputBuffer::read(void * buff, uint32_t length) { + uint32_t rd = _capacity - _position < length ? _capacity - _position : length; + if (rd > 0) { + memcpy(buff, _buff + _position, rd); + _position += rd; + return rd; + } + return length == 0 ? 0 : -1; +} + +void OutputBuffer::write(const void * buff, uint32_t length) { + if (_position + length <= _capacity) { + memcpy(_buff + _position, buff, length); + _position += length; + } else { + THROW_EXCEPTION(IOException, "OutputBuffer too small to write"); + } +} + +} // namespace NativeTask Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/BufferStream.h URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/BufferStream.h?rev=1611413&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/BufferStream.h (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/BufferStream.h Thu Jul 17 17:44:55 2014 @@ -0,0 +1,193 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef BUFFERSTREAM_H_ +#define BUFFERSTREAM_H_ + +#include +#include "Streams.h" + +namespace NativeTask { + +using std::string; + +class BufferedInputStream : public FilterInputStream { +protected: + char * _buff; + uint32_t _position; + uint32_t _limit; + uint32_t _capacity; +public: + BufferedInputStream(InputStream * stream, uint32_t bufferSize = 64 * 1024); + + virtual ~BufferedInputStream(); + + virtual void seek(uint64_t position); + + virtual uint64_t tell(); + + virtual int32_t read(void * buff, uint32_t length); +}; + +class BufferedOutputStream : public FilterOutputStream { +protected: + char * _buff; + uint32_t _position; + uint32_t _capacity; + +public: + BufferedOutputStream(InputStream * stream, uint32_t bufferSize = 64 * 1024); + + virtual ~BufferedOutputStream(); + + virtual uint64_t tell(); + + virtual void write(const void * buff, uint32_t length); + + virtual void flush(); + +}; + +class InputBuffer : public InputStream { +protected: + const char * _buff; + uint32_t _position; + uint32_t _capacity; +public: + InputBuffer() + : _buff(NULL), _position(0), _capacity(0) { + } + + InputBuffer(const char * buff, uint32_t capacity) + : _buff(buff), _position(0), _capacity(capacity) { + } + + InputBuffer(const string & src) + : _buff(src.data()), _position(0), _capacity(src.length()) { + } + + virtual ~InputBuffer() { + } + + virtual void seek(uint64_t position) { + if (position <= _capacity) { + _position = position; + } else { + _position = _capacity; + } + } + + virtual uint64_t tell() { + return _position; + } + + virtual int32_t read(void * buff, uint32_t length); + + void reset(const char * buff, uint32_t capacity) { + _buff = buff; + _position = 0; + _capacity = capacity; + } + + void reset(const string & src) { + _buff = src.data(); + _position = 0; + _capacity = src.length(); + } + + void rewind() { + _position = 0; + } +}; + +class OutputBuffer : public OutputStream { +protected: + char * _buff; + uint32_t _position; + uint32_t _capacity; +public: + OutputBuffer() + : _buff(NULL), _position(0), _capacity(0) { + } + + OutputBuffer(char * buff, uint32_t capacity) + : _buff(buff), _position(0), _capacity(capacity) { + } + + virtual ~OutputBuffer() { + } + + virtual uint64_t tell() { + return _position; + } + + virtual void write(const void * buff, uint32_t length); + + void clear() { + _position = 0; + } + + void reset(char * buff, uint32_t capacity) { + _buff = buff; + _position = 0; + _capacity = capacity; + } + + string getString() { + return string(_buff, _position); + } +}; + +class OutputStringStream : public OutputStream { +protected: + string * _dest; +public: + OutputStringStream() + : _dest(NULL) { + } + + OutputStringStream(string & dest) + : _dest(&dest) { + } + virtual ~OutputStringStream() { + } + + virtual uint64_t tell() { + return _dest->length(); + } + + virtual void write(const void * buff, uint32_t length) { + _dest->append((const char *)buff, length); + } + + void reset(string * dest) { + _dest = dest; + } + + void clear() { + _dest->clear(); + } + + string getString() { + return *_dest; + } +}; + +} // namespace NativeTask + +#endif /* BUFFERSTREAM_H_ */ Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Buffers.cc URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Buffers.cc?rev=1611413&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Buffers.cc (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Buffers.cc Thu Jul 17 17:44:55 2014 @@ -0,0 +1,304 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "commons.h" +#include "util/StringUtil.h" +#include "util/WritableUtils.h" +#include "Buffers.h" + +namespace NativeTask { + +DynamicBuffer::DynamicBuffer() + : _data(NULL), _capacity(0), _size(0), _used(0) { +} + +DynamicBuffer::DynamicBuffer(uint32_t capacity) + : _data(NULL), _capacity(0), _size(0), _used(0) { + reserve(capacity); +} + +DynamicBuffer::~DynamicBuffer() { + release(); +} + +void DynamicBuffer::release() { + if (_data != NULL) { + free(_data); + _data = NULL; + _capacity = 0; + _used = 0; + } +} + +void DynamicBuffer::reserve(uint32_t capacity) { + if (_data != NULL) { + if (capacity > _capacity) { + char * newdata = (char*)realloc(_data, capacity); + if (newdata == NULL) { + THROW_EXCEPTION_EX(OutOfMemoryException, "DynamicBuffer reserve realloc %u failed", + capacity); + } + _data = newdata; + _capacity = capacity; + } + return; + } + release(); + char * newdata = (char*)malloc(capacity); + if (newdata == NULL) { + THROW_EXCEPTION_EX(OutOfMemoryException, "DynamicBuffer reserve new %u failed", capacity); + } + _data = newdata; + _capacity = capacity; + _size = 0; + _used = 0; +} + +int32_t DynamicBuffer::refill(InputStream * stream) { + if (_data == NULL || freeSpace() == 0) { + THROW_EXCEPTION(IOException, "refill DynamicBuffer failed, no space left"); + } + int32_t rd = stream->read(_data + _size, freeSpace()); + if (rd > 0) { + _size += rd; + } + return rd; +} + +void DynamicBuffer::cleanUsed() { + if (_used > 0) { + uint32_t needToMove = _size - _used; + if (needToMove > 0) { + memmove(_data, _data + _used, needToMove); + _size = needToMove; + } else { + _size = 0; + } + _used = 0; + } +} + +/////////////////////////////////////////////////////////// + +ReadBuffer::ReadBuffer() + : _buff(NULL), _remain(0), _size(0), _capacity(0), _stream(NULL), _source(NULL) { +} + +void ReadBuffer::init(uint32_t size, InputStream * stream, const string & codec) { + if (size < 1024) { + THROW_EXCEPTION_EX(UnsupportException, "ReadBuffer size %u not support.", size); + } + _buff = (char *)malloc(size); + if (NULL == _buff) { + THROW_EXCEPTION(OutOfMemoryException, "create append buffer"); + } + _capacity = size; + _remain = 0; + _size = 0; + _stream = stream; + _source = _stream; + if (codec.length() > 0) { + if (!Compressions::support(codec)) { + THROW_EXCEPTION(UnsupportException, "compression codec not support"); + } + _source = Compressions::getDecompressionStream(codec, _stream, size); + } +} + +ReadBuffer::~ReadBuffer() { + if (_source != _stream) { + delete _source; + _source = NULL; + } + if (NULL != _buff) { + free(_buff); + _buff = NULL; + _capacity = 0; + _remain = 0; + _size = 0; + } +} + +char * ReadBuffer::fillGet(uint32_t count) { + + if (unlikely(count > _capacity)) { + uint32_t newcap = _capacity * 2 > count ? _capacity * 2 : count; + char * newbuff = (char*)malloc(newcap); + + + if (newbuff == NULL) { + THROW_EXCEPTION(OutOfMemoryException, + StringUtil::Format("buff realloc failed, size=%u", newcap)); + } + + if (_remain > 0) { + memcpy(newbuff, current(), _remain); + } + if (NULL != _buff) { + free(_buff); + } + + _buff = newbuff; + _capacity = newcap; + } else { + if (_remain > 0) { + memcpy(_buff, current(), _remain); + } + } + _size = _remain; + while (_remain < count) { + int32_t rd = _source->read(_buff + _size, _capacity - _size); + if (rd <= 0) { + THROW_EXCEPTION(IOException, "read reach EOF"); + } + _remain += rd; + _size += rd; + } + char * ret = current(); + _remain -= count; + return ret; +} + +int32_t ReadBuffer::fillRead(char * buff, uint32_t len) { + uint32_t cp = _remain; + if (cp > 0) { + memcpy(buff, current(), cp); + _remain = 0; + } + // TODO: read to buffer first + int32_t ret = _source->readFully(buff + cp, len - cp); + if (ret < 0 && cp == 0) { + return ret; + } else { + return ret < 0 ? cp : ret + cp; + } +} + +int64_t ReadBuffer::fillReadVLong() { + if (_remain == 0) { + int32_t rd = _source->read(_buff, _capacity); + if (rd <= 0) { + THROW_EXCEPTION(IOException, "fillReadVLong reach EOF"); + } + _remain = rd; + _size = rd; + } + int8_t * pos = (int8_t*)current(); + if (*pos >= -112) { + _remain--; + return (int64_t)*pos; + } + bool neg = *pos < -120; + uint32_t len = neg ? (-119 - *pos) : (-111 - *pos); + pos = (int8_t*)get(len); + const int8_t * end = pos + len; + uint64_t value = 0; + while (++pos < end) { + value = (value << 8) | *(uint8_t*)pos; + } + return neg ? (value ^ -1LL) : value; +} + +/////////////////////////////////////////////////////////// + +AppendBuffer::AppendBuffer() + : _buff(NULL), _remain(0), _capacity(0), _counter(0), _stream(NULL), _dest(NULL), + _compression(false) { +} + +void AppendBuffer::init(uint32_t size, OutputStream * stream, const string & codec) { + if (size < 1024) { + THROW_EXCEPTION_EX(UnsupportException, "AppendBuffer size %u not support.", size); + } + _buff = (char *)malloc(size + 8); + if (NULL == _buff) { + THROW_EXCEPTION(OutOfMemoryException, "create append buffer"); + } + _capacity = size; + _remain = _capacity; + _stream = stream; + _dest = _stream; + if (codec.length() > 0) { + if (!Compressions::support(codec)) { + THROW_EXCEPTION(UnsupportException, "compression codec not support"); + } + _dest = Compressions::getCompressionStream(codec, _stream, size); + _compression = true; + } +} + +CompressStream * AppendBuffer::getCompressionStream() { + if (_compression) { + return (CompressStream *)_dest; + } else { + return NULL; + } +} + +AppendBuffer::~AppendBuffer() { + if (_dest != _stream) { + delete _dest; + _dest = NULL; + } + if (NULL != _buff) { + free(_buff); + _buff = NULL; + _remain = 0; + _capacity = 0; + } +} + +void AppendBuffer::flushd() { + _dest->write(_buff, _capacity - _remain); + _counter += _capacity - _remain; + _remain = _capacity; +} + +void AppendBuffer::write_inner(const void * data, uint32_t len) { + flushd(); + if (len >= _capacity / 2) { + _dest->write(data, len); + _counter += len; + } else { + simple_memcpy(_buff, data, len); + _remain -= len; + } +} + +void AppendBuffer::write_vlong_inner(int64_t v) { + if (_remain < 9) { + flushd(); + } + uint32_t len; + WritableUtils::WriteVLong(v, current(), len); + _remain -= len; +} + +void AppendBuffer::write_vuint2_inner(uint32_t v1, uint32_t v2) { + if (_remain < 10) { + flushd(); + } + uint32_t len; + WritableUtils::WriteVLong(v1, current(), len); + _remain -= len; + WritableUtils::WriteVLong(v2, current(), len); + _remain -= len; +} + +} // namespace NativeTask + Added: hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Buffers.h URL: http://svn.apache.org/viewvc/hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Buffers.h?rev=1611413&view=auto ============================================================================== --- hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Buffers.h (added) +++ hadoop/common/branches/MR-2841/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-nativetask/src/main/native/src/lib/Buffers.h Thu Jul 17 17:44:55 2014 @@ -0,0 +1,694 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef BUFFERS_H_ +#define BUFFERS_H_ + +#include "Streams.h" +#include "Compressions.h" +#include "Constants.h" + +namespace NativeTask { + +class DynamicBuffer { +protected: + char * _data; + uint32_t _capacity; + uint32_t _size; + uint32_t _used; +public: + DynamicBuffer(); + + DynamicBuffer(uint32_t capacity); + + ~DynamicBuffer(); + + void reserve(uint32_t capacity); + + void release(); + + uint32_t capacity() { + return _capacity; + } + + char * data() { + return _data; + } + + uint32_t size() { + return _size; + } + + uint32_t used() { + return _used; + } + + char * current() { + return _data + _used; + } + + char * end() { + return _data + _size; + } + + uint32_t remain() { + return _size - _used; + } + + uint32_t freeSpace() { + return _capacity - _size; + } + + void use(uint32_t count) { + _used += count; + } + + void cleanUsed(); + + int32_t refill(InputStream * stream); +}; + +/** + * A lightweight read buffer, act as buffered input stream + */ +class ReadBuffer { +protected: + char * _buff; + uint32_t _remain; + uint32_t _size; + uint32_t _capacity; + + InputStream * _stream; + InputStream * _source; + +protected: + inline char * current() { + return _buff + _size - _remain; + } + + char * fillGet(uint32_t count); + int32_t fillRead(char * buff, uint32_t len); + int64_t fillReadVLong(); +public: + ReadBuffer(); + + void init(uint32_t size, InputStream * stream, const string & codec); + + ~ReadBuffer(); + + /** + * use get() to get inplace continuous memory of small object + */ + inline char * get(uint32_t count) { + if (likely(count <= _remain)) { + char * ret = current(); + _remain -= count; + return ret; + } + return fillGet(count); + } + + /** + * read to outside buffer + */ + inline int32_t read(char * buff, uint32_t len) { + if (likely(len <= _remain)) { + memcpy(buff, current(), len); + _remain -= len; + return len; + } + return fillRead(buff, len); + } + + /** + * read to outside buffer, use simple_memcpy + */ + inline void readUnsafe(char * buff, uint32_t len) { + if (likely(len <= _remain)) { + simple_memcpy(buff, current(), len); + _remain -= len; + return; + } + fillRead(buff, len); + } + + /** + * read VUInt + */ + inline int64_t readVLong() { + if (likely(_remain > 0)) { + char * mark = current(); + if (*(int8_t*)mark >= (int8_t)-112) { + _remain--; + return (int64_t)*mark; + } + } + return fillReadVLong(); + } + + /** + * read uint32_t little endian + */ + inline uint32_t read_uint32_le() { + return *(uint32_t*)get(4); + } + + /** + * read uint32_t big endian + */ + inline uint32_t read_uint32_be() { + return bswap(read_uint32_le()); + } +}; + +/** + * A light weighted append buffer, used as buffered output streams + */ +class AppendBuffer { +protected: + char * _buff; + uint32_t _remain; + uint32_t _capacity; + uint64_t _counter; + + OutputStream * _stream; + OutputStream * _dest; + bool _compression; + +protected: + void flushd(); + + inline char * current() { + return _buff + _capacity - _remain; + } + + void write_inner(const void * data, uint32_t len); + void write_vlong_inner(int64_t v); + void write_vuint2_inner(uint32_t v1, uint32_t v2); +public: + AppendBuffer(); + + ~AppendBuffer(); + + void init(uint32_t size, OutputStream * stream, const string & codec); + + CompressStream * getCompressionStream(); + + uint64_t getCounter() { + return _counter; + } + + inline char * borrowUnsafe(uint32_t len) { + if (likely(_remain >= len)) { + return current(); + } + if (likely(_capacity >= len)) { + flushd(); + return _buff; + } + return NULL; + } + + inline void useUnsafe(uint32_t count) { + _remain -= count; + } + + inline void write(char c) { + if (unlikely(_remain == 0)) { + flushd(); + } + *current() = c; + _remain--; + } + + inline void write(const void * data, uint32_t len) { + if (likely(len <= _remain)) { // append directly + simple_memcpy(current(), data, len); + _remain -= len; + return; + } + write_inner(data, len); + } + + inline void write_uint32_le(uint32_t v) { + if (unlikely(4 > _remain)) { + flushd(); + } + *(uint32_t*)current() = v; + _remain -= 4; + return; + } + + inline void write_uint32_be(uint32_t v) { + write_uint32_le(bswap(v)); + } + + inline void write_uint64_le(uint64_t v) { + if (unlikely(8 > _remain)) { + flushd(); + } + *(uint64_t*)current() = v; + _remain -= 8; + return; + } + + inline void write_uint64_be(uint64_t v) { + write_uint64_le(bswap64(v)); + } + + inline void write_vlong(int64_t v) { + if (likely(_remain > 0 && v <= 127 && v >= -112)) { + *(char*)current() = (char)v; + _remain--; + return; + } + write_vlong_inner(v); + } + + inline void write_vuint(uint32_t v) { + if (likely(_remain > 0 && v <= 127)) { + *(char*)current() = (char)v; + _remain--; + return; + } + write_vlong_inner(v); + } + + inline void write_vuint2(uint32_t v1, uint32_t v2) { + if (likely(_remain >= 2 && v1 <= 127 && v2 <= 127)) { + *(char*)current() = (char)v1; + *(char*)(current() + 1) = (char)v2; + _remain -= 2; + return; + } + write_vuint2_inner(v1, v2); + } + + /** + * flush current buffer, clear content + */ + inline void flush() { + if (_remain < _capacity) { + flushd(); + } + } +}; + +/** + * Memory Key-Value buffer pair with direct address content, so can be + * easily copied or dumped to file + */ +struct KVBuffer { + uint32_t keyLength; + uint32_t valueLength; + char content[1]; + + char * getKey() { + return content; + } + + char * getValue() { + return content + keyLength; + } + + KVBuffer * next() { + return ((KVBuffer*)(content + keyLength + valueLength)); + } + + std::string str() { + return std::string(content, keyLength) + "\t" + std::string(getValue(), valueLength); + } + + uint32_t length() { + return keyLength + valueLength + SIZE_OF_KEY_LENGTH + SIZE_OF_VALUE_LENGTH; + } + + uint32_t lengthConvertEndium() { + long value = bswap64(*((long *)this)); + return (value >> 32) + value + SIZE_OF_KEY_LENGTH + SIZE_OF_VALUE_LENGTH; + } + + void fill(const void * key, uint32_t keylen, const void * value, uint32_t vallen) { + keyLength = keylen; + valueLength = vallen; + + if (keylen > 0) { + simple_memcpy(getKey(), key, keylen); + } + if (vallen > 0) { + simple_memcpy(getValue(), value, vallen); + } + } + + static uint32_t headerLength() { + return SIZE_OF_KEY_LENGTH + SIZE_OF_VALUE_LENGTH; + } +}; + +struct KVBufferWithParititionId { + uint32_t partitionId; + KVBuffer buffer; + + inline static uint32_t minLength() { + return SIZE_OF_PARTITION_LENGTH + SIZE_OF_KV_LENGTH; + } + + int length() { + return 4 + buffer.length(); + } + + int lengthConvertEndium() { + return 4 + buffer.lengthConvertEndium(); + } +}; + +/** + * Native side abstraction of java ByteBuffer + */ +class ByteBuffer { +private: + char * _buff; + uint32_t _limit; + uint32_t _position; + uint32_t _capacity; + +public: + ByteBuffer() + : _buff(NULL), _limit(0), _position(0), _capacity(0) { + } + + ~ByteBuffer() { + } + + void reset(char * buff, uint32_t inputCapacity) { + this->_buff = buff; + this->_capacity = inputCapacity; + this->_position = 0; + this->_limit = 0; + } + + int capacity() { + return this->_capacity; + } + + int remain() { + return _limit - _position; + } + + int limit() { + return _limit; + } + + int advance(int positionOffset) { + _position += positionOffset; + return _position; + } + + int position() { + return this->_position; + } + + void position(int newPos) { + this->_position = newPos; + } + + void rewind(int newPos, int newLimit) { + this->_position = newPos; + if (newLimit > this->_capacity) { + THROW_EXCEPTION(IOException, "length larger than input buffer capacity"); + } + this->_limit = newLimit; + } + + char * current() { + return _buff + _position; + } + + char * base() { + return _buff; + } +}; + +class ByteArray { +private: + char * _buff; + uint32_t _length; + uint32_t _capacity; + +public: + ByteArray() + : _buff(NULL), _length(0), _capacity(0) { + } + + ~ByteArray() { + if (NULL != _buff) { + delete[] _buff; + _buff = NULL; + } + _length = 0; + _capacity = 0; + } + + void resize(uint32_t newSize) { + if (newSize <= _capacity) { + _length = newSize; + } else { + if (NULL != _buff) { + delete[] _buff; + _buff = NULL; + } + _capacity = 2 * newSize; + _buff = new char[_capacity]; + _length = newSize; + } + } + + char * buff() { + return _buff; + } + + uint32_t size() { + return _length; + } +}; + +class FixSizeContainer { +private: + char * _buff; + uint32_t _pos; + uint32_t _size; + +public: + FixSizeContainer() + : _buff(NULL), _pos(0), _size(0) { + } + + ~FixSizeContainer() { + } + + void wrap(char * buff, uint32_t size) { + _size = size; + _buff = buff; + _pos = 0; + } + + void rewind() { + _pos = 0; + } + + uint32_t remain() { + return _size - _pos; + } + + char * current() { + return _buff + _pos; + } + + char * base() { + return _buff; + } + + uint32_t size() { + return _size; + } + + /** + * return the length of actually filled data. + */ + uint32_t fill(const char * source, uint32_t maxSize) { + int remain = _size - _pos; + if (remain <= 0) { + return 0; + } + + uint32_t length = (maxSize < remain) ? maxSize : remain; + simple_memcpy(_buff + _pos, source, length); + _pos += length; + return length; + } + + uint32_t position() { + return _pos; + } + + void position(int pos) { + _pos = pos; + } +}; + +class ReadWriteBuffer { +private: + + static const uint32_t INITIAL_LENGTH = 16; + + uint32_t _readPoint; + uint32_t _writePoint; + char * _buff; + uint32_t _buffLength; + bool _newCreatedBuff; + +public: + + ReadWriteBuffer(uint32_t length) + : _readPoint(0), _writePoint(0), _buff(NULL), _buffLength(0), _newCreatedBuff(false) { + _buffLength = length; + if (_buffLength > 0) { + _buff = new char[_buffLength]; + _newCreatedBuff = true; + } + } + + ReadWriteBuffer() + : _readPoint(0), _writePoint(0), _buff(NULL), _buffLength(0), _newCreatedBuff(false) { + } + + ~ReadWriteBuffer() { + if (_newCreatedBuff) { + delete[] _buff; + _buff = NULL; + } + } + + void setReadPoint(uint32_t pos) { + _readPoint = pos; + } + + void setWritePoint(uint32_t pos) { + _writePoint = pos; + } + + char * getBuff() { + return _buff; + } + + uint32_t getWritePoint() { + return _writePoint; + } + + uint32_t getReadPoint() { + return _readPoint; + } + + void writeInt(uint32_t param) { + uint32_t written = param; + + checkWriteSpaceAndResizeIfNecessary(4); + *((uint32_t *)(_buff + _writePoint)) = written; + _writePoint += 4; + } + + void writeLong(uint64_t param) { + uint64_t written = param; + + checkWriteSpaceAndResizeIfNecessary(8); + *((uint64_t *)(_buff + _writePoint)) = written; + _writePoint += 8; + } + + void writeString(const char * param, uint32_t length) { + writeInt(length); + checkWriteSpaceAndResizeIfNecessary(length); + + memcpy(_buff + _writePoint, param, length); + _writePoint += length; + } + + void writeString(std::string * param) { + const char * str = param->c_str(); + int length = param->size(); + writeString(str, length); + } + + void writePointer(void * param) { + uint64_t written = (uint64_t)(param); + writeLong(written); + } + + uint32_t readInt() { + char * readPos = _buff + _readPoint; + uint32_t result = *((uint32_t *)(readPos)); + _readPoint += 4; + return result; + } + + uint64_t readLong() { + char * readPos = _buff + _readPoint; + uint64_t result = *((uint64_t *)(readPos)); + _readPoint += 8; + return result; + } + + std::string * readString() { + uint32_t len = readInt(); + char * strBegin = _buff + _readPoint; + _readPoint += len; + return new std::string(strBegin, len); + } + + void * readPointer() { + uint64_t result = readLong(); + return (void *)(result); + } + +private: + void checkWriteSpaceAndResizeIfNecessary(uint32_t toBeWritten) { + if (_buffLength == 0) { + _newCreatedBuff = true; + _buffLength = INITIAL_LENGTH > toBeWritten ? INITIAL_LENGTH : toBeWritten; + _buff = new char[_buffLength]; + } + + if (_buffLength - _writePoint >= toBeWritten) { + return; + } + + _buffLength = _buffLength + toBeWritten; + _newCreatedBuff = true; + char * newBuff = new char[_buffLength]; + memcpy(newBuff, _buff, _writePoint); + delete[] _buff; + _buff = newBuff; + } +}; + +typedef ReadWriteBuffer ParameterBuffer; +typedef ReadWriteBuffer ResultBuffer; + +} // namespace NativeTask + +#endif /* BUFFERS_H_ */