rocketmq-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From yu...@apache.org
Subject [25/43] incubator-rocketmq git commit: Finish code dump. Reviewed by: @yukon @vongosling @stevenschew @vintagewang @lollipop @zander
Date Mon, 19 Dec 2016 09:40:42 GMT
http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfigSingleton.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfigSingleton.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfigSingleton.java
new file mode 100644
index 0000000..f1bc453
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/BrokerConfigSingleton.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+
+public class BrokerConfigSingleton {
+    private static AtomicBoolean isInit = new AtomicBoolean();
+    private static BrokerConfig brokerConfig;
+
+    public static BrokerConfig getBrokerConfig() {
+        if (brokerConfig == null) {
+            throw new IllegalArgumentException("brokerConfig Cannot be null !");
+        }
+        return brokerConfig;
+    }
+
+    public static void setBrokerConfig(BrokerConfig brokerConfig) {
+        if (!isInit.compareAndSet(false, true)) {
+            throw new IllegalArgumentException("broker config have inited !");
+        }
+        BrokerConfigSingleton.brokerConfig = brokerConfig;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/ConfigManager.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/ConfigManager.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/ConfigManager.java
new file mode 100644
index 0000000..3191509
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/ConfigManager.java
@@ -0,0 +1,89 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.common;
+
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+
+/**
+ * @author shijia.wxr
+ */
+public abstract class ConfigManager {
+    private static final Logger PLOG = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
+
+
+    public abstract String encode();
+
+    public boolean load() {
+        String fileName = null;
+        try {
+            fileName = this.configFilePath();
+            String jsonString = MixAll.file2String(fileName);
+
+            if (null == jsonString || jsonString.length() == 0) {
+                return this.loadBak();
+            } else {
+                this.decode(jsonString);
+                PLOG.info("load {} OK", fileName);
+                return true;
+            }
+        } catch (Exception e) {
+            PLOG.error("load " + fileName + " Failed, and try to load backup file", e);
+            return this.loadBak();
+        }
+    }
+
+    public abstract String configFilePath();
+
+    private boolean loadBak() {
+        String fileName = null;
+        try {
+            fileName = this.configFilePath();
+            String jsonString = MixAll.file2String(fileName + ".bak");
+            if (jsonString != null && jsonString.length() > 0) {
+                this.decode(jsonString);
+                PLOG.info("load " + fileName + " OK");
+                return true;
+            }
+        } catch (Exception e) {
+            PLOG.error("load " + fileName + " Failed", e);
+            return false;
+        }
+
+        return true;
+    }
+
+    public abstract void decode(final String jsonString);
+
+    public synchronized void persist() {
+        String jsonString = this.encode(true);
+        if (jsonString != null) {
+            String fileName = this.configFilePath();
+            try {
+                MixAll.string2File(jsonString, fileName);
+            } catch (IOException e) {
+                PLOG.error("persist file Exception, " + fileName, e);
+            }
+        }
+    }
+
+    public abstract String encode(final boolean prettyFormat);
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/Configuration.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/Configuration.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/Configuration.java
new file mode 100644
index 0000000..8b69c1f
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/Configuration.java
@@ -0,0 +1,310 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common;
+
+import org.slf4j.Logger;
+
+import java.io.IOException;
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * @author xigu.lx
+ */
+public class Configuration {
+
+    private final Logger log;
+
+    private List<Object> configObjectList = new ArrayList<Object>(4);
+    private String storePath;
+    private boolean storePathFromConfig = false;
+    private Object storePathObject;
+    private Field storePathField;
+    private DataVersion dataVersion = new DataVersion();
+    private ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
+    /**
+     * All properties include configs in object and extend properties.
+     */
+    private Properties allConfigs = new Properties();
+
+    public Configuration(Logger log) {
+        this.log = log;
+    }
+
+    public Configuration(Logger log, Object... configObjects) {
+        this.log = log;
+        if (configObjects == null || configObjects.length == 0) {
+            return;
+        }
+        for (Object configObject : configObjects) {
+            registerConfig(configObject);
+        }
+    }
+
+    public Configuration(Logger log, String storePath, Object... configObjects) {
+        this(log, configObjects);
+        this.storePath = storePath;
+    }
+
+    /**
+     * register config object
+     *
+     * @param configObject
+     * @return the current Configuration object
+     */
+    public Configuration registerConfig(Object configObject) {
+        try {
+            readWriteLock.writeLock().lockInterruptibly();
+
+            try {
+
+                Properties registerProps = MixAll.object2Properties(configObject);
+
+                merge(registerProps, this.allConfigs);
+
+                configObjectList.add(configObject);
+            } finally {
+                readWriteLock.writeLock().unlock();
+            }
+        } catch (InterruptedException e) {
+            log.error("registerConfig lock error");
+        }
+        return this;
+    }
+
+    /**
+     * register config properties
+     *
+     * @param extProperties
+     * @return the current Configuration object
+     */
+    public Configuration registerConfig(Properties extProperties) {
+        if (extProperties == null) {
+            return this;
+        }
+
+        try {
+            readWriteLock.writeLock().lockInterruptibly();
+
+            try {
+                merge(extProperties, this.allConfigs);
+            } finally {
+                readWriteLock.writeLock().unlock();
+            }
+        } catch (InterruptedException e) {
+            log.error("register lock error. {}" + extProperties);
+        }
+
+        return this;
+    }
+
+    /**
+     * The store path will be gotten from the field of object.
+     *
+     * @param object
+     * @param fieldName
+     *
+     * @throws java.lang.RuntimeException if the field of object is not exist.
+     */
+    public void setStorePathFromConfig(Object object, String fieldName) {
+        assert object != null;
+
+        try {
+            readWriteLock.writeLock().lockInterruptibly();
+
+            try {
+                this.storePathFromConfig = true;
+                this.storePathObject = object;
+                // check
+                this.storePathField = object.getClass().getDeclaredField(fieldName);
+                assert this.storePathField != null
+                        && !Modifier.isStatic(this.storePathField.getModifiers());
+                this.storePathField.setAccessible(true);
+            } catch (NoSuchFieldException e) {
+                throw new RuntimeException(e);
+            } finally {
+                readWriteLock.writeLock().unlock();
+            }
+        } catch (InterruptedException e) {
+            log.error("setStorePathFromConfig lock error");
+        }
+    }
+
+    private String getStorePath() {
+        String realStorePath = null;
+        try {
+            readWriteLock.readLock().lockInterruptibly();
+
+            try {
+                realStorePath = this.storePath;
+
+                if (this.storePathFromConfig) {
+                    try {
+                        realStorePath = (String) storePathField.get(this.storePathObject);
+                    } catch (IllegalAccessException e) {
+                        log.error("getStorePath error, ", e);
+                    }
+                }
+            } finally {
+                readWriteLock.readLock().unlock();
+            }
+        } catch (InterruptedException e) {
+            log.error("getStorePath lock error");
+        }
+
+        return realStorePath;
+    }
+
+    public void update(Properties properties) {
+        try {
+            readWriteLock.writeLock().lockInterruptibly();
+
+            try {
+                // the property must be exist when update
+                mergeIfExist(properties, this.allConfigs);
+
+                for (Object configObject : configObjectList) {
+                    // not allConfigs to update...
+                    MixAll.properties2Object(properties, configObject);
+                }
+
+                this.dataVersion.nextVersion();
+
+            } finally {
+                readWriteLock.writeLock().unlock();
+            }
+        } catch (InterruptedException e) {
+            log.error("update lock error, {}", properties);
+            return;
+        }
+
+        persist();
+    }
+
+    public void persist() {
+        try {
+            readWriteLock.readLock().lockInterruptibly();
+
+            try {
+                String allConfigs = getAllConfigsInternal();
+
+                MixAll.string2File(allConfigs, getStorePath());
+            } catch (IOException e) {
+                log.error("persist string2File error, ", e);
+            } finally {
+                readWriteLock.readLock().unlock();
+            }
+        } catch (InterruptedException e) {
+            log.error("persist lock error");
+        }
+    }
+
+    public String getAllConfigsFormatString() {
+        try {
+            readWriteLock.readLock().lockInterruptibly();
+
+            try {
+
+                return getAllConfigsInternal();
+
+            } finally {
+                readWriteLock.readLock().unlock();
+            }
+        } catch (InterruptedException e) {
+            log.error("getAllConfigsFormatString lock error");
+        }
+
+        return null;
+    }
+
+    public String getDataVersionJson() {
+        return this.dataVersion.toJson();
+    }
+
+    public Properties getAllConfigs() {
+        try {
+            readWriteLock.readLock().lockInterruptibly();
+
+            try {
+
+                return this.allConfigs;
+
+            } finally {
+                readWriteLock.readLock().unlock();
+            }
+        } catch (InterruptedException e) {
+            log.error("getAllConfigs lock error");
+        }
+
+        return null;
+    }
+
+    private String getAllConfigsInternal() {
+        StringBuilder stringBuilder = new StringBuilder();
+
+        // reload from config object ?
+        for (Object configObject : this.configObjectList) {
+            Properties properties = MixAll.object2Properties(configObject);
+            if (properties != null) {
+                merge(properties, this.allConfigs);
+            } else {
+                log.warn("getAllConfigsInternal object2Properties is null, {}", configObject.getClass());
+            }
+        }
+
+        {
+            stringBuilder.append(MixAll.properties2String(this.allConfigs));
+        }
+
+        return stringBuilder.toString();
+    }
+
+    public void setStorePath(final String storePath) {
+        this.storePath = storePath;
+    }
+
+    private void merge(Properties from, Properties to) {
+        for (Object key : from.keySet()) {
+            Object fromObj = from.get(key), toObj = to.get(key);
+            if (toObj != null && !toObj.equals(fromObj)) {
+                log.info("Replace, key: {}, value: {} -> {}", key, toObj, fromObj);
+            }
+            to.put(key, fromObj);
+        }
+    }
+
+    private void mergeIfExist(Properties from, Properties to) {
+        for (Object key : from.keySet()) {
+            if (!to.containsKey(key)) {
+                continue;
+            }
+
+            Object fromObj = from.get(key), toObj = to.get(key);
+            if (toObj != null && !toObj.equals(fromObj)) {
+                log.info("Replace, key: {}, value: {} -> {}", key, toObj, fromObj);
+            }
+            to.put(key, fromObj);
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/CountDownLatch.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/CountDownLatch.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/CountDownLatch.java
new file mode 100644
index 0000000..a5cc9a1
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/CountDownLatch.java
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.AbstractQueuedSynchronizer;
+
+/**
+ * Add reset feature for @see java.util.concurrent.CountDownLatch
+ *
+ * @author xinyuzhou.zxy
+ */
+public class CountDownLatch {
+    /**
+     * Synchronization control For CountDownLatch.
+     * Uses AQS state to represent count.
+     */
+    private static final class Sync extends AbstractQueuedSynchronizer {
+        private static final long serialVersionUID = 4982264981922014374L;
+
+        private final int startCount;
+
+        Sync(int count) {
+            this.startCount = count;
+            setState(count);
+        }
+
+        int getCount() {
+            return getState();
+        }
+
+        protected int tryAcquireShared(int acquires) {
+            return (getState() == 0) ? 1 : -1;
+        }
+
+        protected boolean tryReleaseShared(int releases) {
+            // Decrement count; signal when transition to zero
+            for (;;) {
+                int c = getState();
+                if (c == 0)
+                    return false;
+                int nextc = c - 1;
+                if (compareAndSetState(c, nextc))
+                    return nextc == 0;
+            }
+        }
+
+        protected void reset() {
+            setState(startCount);
+        }
+    }
+
+    private final Sync sync;
+
+    /**
+     * Constructs a {@code CountDownLatch} initialized with the given count.
+     *
+     * @param count
+     *         the number of times {@link #countDown} must be invoked
+     *         before threads can pass through {@link #await}
+     *
+     * @throws IllegalArgumentException
+     *         if {@code count} is negative
+     */
+    public CountDownLatch(int count) {
+        if (count < 0) throw new IllegalArgumentException("count < 0");
+        this.sync = new Sync(count);
+    }
+
+    /**
+     * Causes the current thread to wait until the latch has counted down to
+     * zero, unless the thread is {@linkplain Thread#interrupt interrupted}.
+     *
+     * <p>If the current count is zero then this method returns immediately.
+     *
+     * <p>If the current count is greater than zero then the current
+     * thread becomes disabled for thread scheduling purposes and lies
+     * dormant until one of two things happen:
+     * <ul>
+     * <li>The count reaches zero due to invocations of the
+     * {@link #countDown} method; or
+     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
+     * the current thread.
+     * </ul>
+     *
+     * <p>If the current thread:
+     * <ul>
+     * <li>has its interrupted status set on entry to this method; or
+     * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
+     * </ul>
+     * then {@link InterruptedException} is thrown and the current thread's
+     * interrupted status is cleared.
+     *
+     * @throws InterruptedException
+     *         if the current thread is interrupted
+     *         while waiting
+     */
+    public void await() throws InterruptedException {
+        sync.acquireSharedInterruptibly(1);
+    }
+
+    /**
+     * Causes the current thread to wait until the latch has counted down to
+     * zero, unless the thread is {@linkplain Thread#interrupt interrupted},
+     * or the specified waiting time elapses.
+     *
+     * <p>If the current count is zero then this method returns immediately
+     * with the value {@code true}.
+     *
+     * <p>If the current count is greater than zero then the current
+     * thread becomes disabled for thread scheduling purposes and lies
+     * dormant until one of three things happen:
+     * <ul>
+     * <li>The count reaches zero due to invocations of the
+     * {@link #countDown} method; or
+     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
+     * the current thread; or
+     * <li>The specified waiting time elapses.
+     * </ul>
+     *
+     * <p>If the count reaches zero then the method returns with the
+     * value {@code true}.
+     *
+     * <p>If the current thread:
+     * <ul>
+     * <li>has its interrupted status set on entry to this method; or
+     * <li>is {@linkplain Thread#interrupt interrupted} while waiting,
+     * </ul>
+     * then {@link InterruptedException} is thrown and the current thread's
+     * interrupted status is cleared.
+     *
+     * <p>If the specified waiting time elapses then the value {@code false}
+     * is returned.  If the time is less than or equal to zero, the method
+     * will not wait at all.
+     *
+     * @param timeout
+     *         the maximum time to wait
+     * @param unit
+     *         the time unit of the {@code timeout} argument
+     *
+     * @return {@code true} if the count reached zero and {@code false}
+     * if the waiting time elapsed before the count reached zero
+     *
+     * @throws InterruptedException
+     *         if the current thread is interrupted
+     *         while waiting
+     */
+    public boolean await(long timeout, TimeUnit unit)
+            throws InterruptedException {
+        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
+    }
+
+    /**
+     * Decrements the count of the latch, releasing all waiting threads if
+     * the count reaches zero.
+     *
+     * <p>If the current count is greater than zero then it is decremented.
+     * If the new count is zero then all waiting threads are re-enabled for
+     * thread scheduling purposes.
+     *
+     * <p>If the current count equals zero then nothing happens.
+     */
+    public void countDown() {
+        sync.releaseShared(1);
+    }
+
+    /**
+     * Returns the current count.
+     *
+     * <p>This method is typically used for debugging and testing purposes.
+     *
+     * @return the current count
+     */
+    public long getCount() {
+        return sync.getCount();
+    }
+
+    public void reset() {
+        sync.reset();
+    }
+
+    /**
+     * Returns a string identifying this latch, as well as its state.
+     * The state, in brackets, includes the String {@code "Count ="}
+     * followed by the current count.
+     *
+     * @return a string identifying this latch, as well as its state
+     */
+    public String toString() {
+        return super.toString() + "[Count = " + sync.getCount() + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/DataVersion.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/DataVersion.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/DataVersion.java
new file mode 100644
index 0000000..eb78ba1
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/DataVersion.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.common;
+
+import com.alibaba.rocketmq.remoting.protocol.RemotingSerializable;
+
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class DataVersion extends RemotingSerializable {
+    private long timestatmp = System.currentTimeMillis();
+    private AtomicLong counter = new AtomicLong(0);
+
+
+    public void assignNewOne(final DataVersion dataVersion) {
+        this.timestatmp = dataVersion.timestatmp;
+        this.counter.set(dataVersion.counter.get());
+    }
+
+
+    public void nextVersion() {
+        this.timestatmp = System.currentTimeMillis();
+        this.counter.incrementAndGet();
+    }
+
+
+    public long getTimestatmp() {
+        return timestatmp;
+    }
+
+
+    public void setTimestatmp(long timestatmp) {
+        this.timestatmp = timestatmp;
+    }
+
+
+    public AtomicLong getCounter() {
+        return counter;
+    }
+
+
+    public void setCounter(AtomicLong counter) {
+        this.counter = counter;
+    }
+
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        final DataVersion that = (DataVersion) o;
+
+        if (timestatmp != that.timestatmp) return false;
+        return counter != null ? counter.equals(that.counter) : that.counter == null;
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = (int) (timestatmp ^ (timestatmp >>> 32));
+        result = 31 * result + (counter != null ? counter.hashCode() : 0);
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/MQVersion.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/MQVersion.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/MQVersion.java
new file mode 100644
index 0000000..19afb09
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/MQVersion.java
@@ -0,0 +1,362 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.common;
+
+/**
+ * @author shijia.wxr
+ */
+public class MQVersion {
+
+    public static final int CURRENT_VERSION = Version.V4_0_0_SNAPSHOT.ordinal();
+
+
+    public static String getVersionDesc(int value) {
+        try {
+            Version v = Version.values()[value];
+            return v.name();
+        } catch (Exception e) {
+        }
+
+        return "HigherVersion";
+    }
+
+
+    public static Version value2Version(int value) {
+        return Version.values()[value];
+    }
+
+    public enum Version {
+        V3_0_0_SNAPSHOT,
+        V3_0_0_ALPHA1,
+        V3_0_0_BETA1,
+        V3_0_0_BETA2,
+        V3_0_0_BETA3,
+        V3_0_0_BETA4,
+        V3_0_0_BETA5,
+        V3_0_0_BETA6_SNAPSHOT,
+        V3_0_0_BETA6,
+        V3_0_0_BETA7_SNAPSHOT,
+        V3_0_0_BETA7,
+        V3_0_0_BETA8_SNAPSHOT,
+        V3_0_0_BETA8,
+        V3_0_0_BETA9_SNAPSHOT,
+        V3_0_0_BETA9,
+        V3_0_0_FINAL,
+        V3_0_1_SNAPSHOT,
+        V3_0_1,
+        V3_0_2_SNAPSHOT,
+        V3_0_2,
+        V3_0_3_SNAPSHOT,
+        V3_0_3,
+        V3_0_4_SNAPSHOT,
+        V3_0_4,
+        V3_0_5_SNAPSHOT,
+        V3_0_5,
+        V3_0_6_SNAPSHOT,
+        V3_0_6,
+        V3_0_7_SNAPSHOT,
+        V3_0_7,
+        V3_0_8_SNAPSHOT,
+        V3_0_8,
+        V3_0_9_SNAPSHOT,
+        V3_0_9,
+
+        V3_0_10_SNAPSHOT,
+        V3_0_10,
+
+        V3_0_11_SNAPSHOT,
+        V3_0_11,
+
+        V3_0_12_SNAPSHOT,
+        V3_0_12,
+
+        V3_0_13_SNAPSHOT,
+        V3_0_13,
+
+        V3_0_14_SNAPSHOT,
+        V3_0_14,
+
+        V3_0_15_SNAPSHOT,
+        V3_0_15,
+
+        V3_1_0_SNAPSHOT,
+        V3_1_0,
+
+        V3_1_1_SNAPSHOT,
+        V3_1_1,
+
+        V3_1_2_SNAPSHOT,
+        V3_1_2,
+
+        V3_1_3_SNAPSHOT,
+        V3_1_3,
+
+        V3_1_4_SNAPSHOT,
+        V3_1_4,
+
+        V3_1_5_SNAPSHOT,
+        V3_1_5,
+
+        V3_1_6_SNAPSHOT,
+        V3_1_6,
+
+        V3_1_7_SNAPSHOT,
+        V3_1_7,
+
+        V3_1_8_SNAPSHOT,
+        V3_1_8,
+
+        V3_1_9_SNAPSHOT,
+        V3_1_9,
+
+        V3_2_0_SNAPSHOT,
+        V3_2_0,
+
+        V3_2_1_SNAPSHOT,
+        V3_2_1,
+
+        V3_2_2_SNAPSHOT,
+        V3_2_2,
+
+        V3_2_3_SNAPSHOT,
+        V3_2_3,
+
+        V3_2_4_SNAPSHOT,
+        V3_2_4,
+
+        V3_2_5_SNAPSHOT,
+        V3_2_5,
+
+        V3_2_6_SNAPSHOT,
+        V3_2_6,
+
+        V3_2_7_SNAPSHOT,
+        V3_2_7,
+
+        V3_2_8_SNAPSHOT,
+        V3_2_8,
+
+        V3_2_9_SNAPSHOT,
+        V3_2_9,
+
+        V3_3_1_SNAPSHOT,
+        V3_3_1,
+
+        V3_3_2_SNAPSHOT,
+        V3_3_2,
+
+        V3_3_3_SNAPSHOT,
+        V3_3_3,
+
+        V3_3_4_SNAPSHOT,
+        V3_3_4,
+
+        V3_3_5_SNAPSHOT,
+        V3_3_5,
+
+        V3_3_6_SNAPSHOT,
+        V3_3_6,
+
+        V3_3_7_SNAPSHOT,
+        V3_3_7,
+
+        V3_3_8_SNAPSHOT,
+        V3_3_8,
+
+        V3_3_9_SNAPSHOT,
+        V3_3_9,
+
+        V3_4_1_SNAPSHOT,
+        V3_4_1,
+
+        V3_4_2_SNAPSHOT,
+        V3_4_2,
+
+        V3_4_3_SNAPSHOT,
+        V3_4_3,
+
+        V3_4_4_SNAPSHOT,
+        V3_4_4,
+
+        V3_4_5_SNAPSHOT,
+        V3_4_5,
+
+        V3_4_6_SNAPSHOT,
+        V3_4_6,
+
+        V3_4_7_SNAPSHOT,
+        V3_4_7,
+
+        V3_4_8_SNAPSHOT,
+        V3_4_8,
+
+        V3_4_9_SNAPSHOT,
+        V3_4_9,
+        V3_5_1_SNAPSHOT,
+        V3_5_1,
+
+        V3_5_2_SNAPSHOT,
+        V3_5_2,
+
+        V3_5_3_SNAPSHOT,
+        V3_5_3,
+
+        V3_5_4_SNAPSHOT,
+        V3_5_4,
+
+        V3_5_5_SNAPSHOT,
+        V3_5_5,
+
+        V3_5_6_SNAPSHOT,
+        V3_5_6,
+
+        V3_5_7_SNAPSHOT,
+        V3_5_7,
+
+        V3_5_8_SNAPSHOT,
+        V3_5_8,
+
+        V3_5_9_SNAPSHOT,
+        V3_5_9,
+
+        V3_6_1_SNAPSHOT,
+        V3_6_1,
+
+        V3_6_2_SNAPSHOT,
+        V3_6_2,
+
+        V3_6_3_SNAPSHOT,
+        V3_6_3,
+
+        V3_6_4_SNAPSHOT,
+        V3_6_4,
+
+        V3_6_5_SNAPSHOT,
+        V3_6_5,
+
+        V3_6_6_SNAPSHOT,
+        V3_6_6,
+
+        V3_6_7_SNAPSHOT,
+        V3_6_7,
+
+        V3_6_8_SNAPSHOT,
+        V3_6_8,
+
+        V3_6_9_SNAPSHOT,
+        V3_6_9,
+
+        V3_7_1_SNAPSHOT,
+        V3_7_1,
+
+        V3_7_2_SNAPSHOT,
+        V3_7_2,
+
+        V3_7_3_SNAPSHOT,
+        V3_7_3,
+
+        V3_7_4_SNAPSHOT,
+        V3_7_4,
+
+        V3_7_5_SNAPSHOT,
+        V3_7_5,
+
+        V3_7_6_SNAPSHOT,
+        V3_7_6,
+
+        V3_7_7_SNAPSHOT,
+        V3_7_7,
+
+        V3_7_8_SNAPSHOT,
+        V3_7_8,
+
+        V3_7_9_SNAPSHOT,
+        V3_7_9,
+
+        V3_8_1_SNAPSHOT,
+        V3_8_1,
+
+        V3_8_2_SNAPSHOT,
+        V3_8_2,
+
+        V3_8_3_SNAPSHOT,
+        V3_8_3,
+
+        V3_8_4_SNAPSHOT,
+        V3_8_4,
+
+        V3_8_5_SNAPSHOT,
+        V3_8_5,
+
+        V3_8_6_SNAPSHOT,
+        V3_8_6,
+
+        V3_8_7_SNAPSHOT,
+        V3_8_7,
+
+        V3_8_8_SNAPSHOT,
+        V3_8_8,
+
+        V3_8_9_SNAPSHOT,
+        V3_8_9,
+
+        V3_9_1_SNAPSHOT,
+        V3_9_1,
+
+        V3_9_2_SNAPSHOT,
+        V3_9_2,
+
+        V3_9_3_SNAPSHOT,
+        V3_9_3,
+
+        V3_9_4_SNAPSHOT,
+        V3_9_4,
+
+        V3_9_5_SNAPSHOT,
+        V3_9_5,
+
+        V3_9_6_SNAPSHOT,
+        V3_9_6,
+
+        V3_9_7_SNAPSHOT,
+        V3_9_7,
+
+        V3_9_8_SNAPSHOT,
+        V3_9_8,
+
+        V3_9_9_SNAPSHOT,
+        V3_9_9,
+
+        V4_0_0_SNAPSHOT,
+        V4_0_0,
+
+        V4_1_0_SNAPSHOT,
+        V4_1_0,
+
+        V4_2_0_SNAPSHOT,
+        V4_2_0,
+
+        V4_3_0_SNAPSHOT,
+        V4_3_0,
+
+        V4_4_0_SNAPSHOT,
+        V4_4_0,
+
+        V4_5_0_SNAPSHOT,
+        V4_5_0,
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/MixAll.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/MixAll.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/MixAll.java
new file mode 100644
index 0000000..508111c
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/MixAll.java
@@ -0,0 +1,486 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.alibaba.rocketmq.common;
+
+import com.alibaba.rocketmq.common.annotation.ImportantField;
+import com.alibaba.rocketmq.common.help.FAQUrl;
+import org.slf4j.Logger;
+
+import java.io.ByteArrayInputStream;
+import java.io.File;
+import java.io.FileReader;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UnsupportedEncodingException;
+import java.lang.annotation.Annotation;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.lang.reflect.Modifier;
+import java.net.InetAddress;
+import java.net.InetSocketAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Enumeration;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class MixAll {
+    public static final String ROCKETMQ_HOME_ENV = "ROCKETMQ_HOME";
+    public static final String ROCKETMQ_HOME_PROPERTY = "rocketmq.home.dir";
+    public static final String NAMESRV_ADDR_ENV = "NAMESRV_ADDR";
+    public static final String NAMESRV_ADDR_PROPERTY = "rocketmq.namesrv.addr";
+    public static final String MESSAGE_COMPRESS_LEVEL = "rocketmq.message.compressLevel";
+    public static final String WS_DOMAIN_NAME = System.getProperty("rocketmq.namesrv.domain", "jmenv.tbsite.net");
+    public static final String WS_DOMAIN_SUBGROUP = System.getProperty("rocketmq.namesrv.domain.subgroup", "nsaddr");
+    // http://jmenv.tbsite.net:8080/rocketmq/nsaddr
+    public static final String WS_ADDR = "http://" + WS_DOMAIN_NAME + ":8080/rocketmq/" + WS_DOMAIN_SUBGROUP;
+    public static final String DEFAULT_TOPIC = "TBW102";
+    public static final String BENCHMARK_TOPIC = "BenchmarkTest";
+    public static final String DEFAULT_PRODUCER_GROUP = "DEFAULT_PRODUCER";
+    public static final String DEFAULT_CONSUMER_GROUP = "DEFAULT_CONSUMER";
+    public static final String TOOLS_CONSUMER_GROUP = "TOOLS_CONSUMER";
+    public static final String FILTERSRV_CONSUMER_GROUP = "FILTERSRV_CONSUMER";
+    public static final String MONITOR_CONSUMER_GROUP = "__MONITOR_CONSUMER";
+    public static final String CLIENT_INNER_PRODUCER_GROUP = "CLIENT_INNER_PRODUCER";
+    public static final String SELF_TEST_PRODUCER_GROUP = "SELF_TEST_P_GROUP";
+    public static final String SELF_TEST_CONSUMER_GROUP = "SELF_TEST_C_GROUP";
+    public static final String SELF_TEST_TOPIC = "SELF_TEST_TOPIC";
+    public static final String OFFSET_MOVED_EVENT = "OFFSET_MOVED_EVENT";
+    public static final String ONS_HTTP_PROXY_GROUP = "CID_ONS-HTTP-PROXY";
+    public static final String CID_ONSAPI_PERMISSION_GROUP = "CID_ONSAPI_PERMISSION";
+    public static final String CID_ONSAPI_OWNER_GROUP = "CID_ONSAPI_OWNER";
+    public static final String CID_ONSAPI_PULL_GROUP = "CID_ONSAPI_PULL";
+    public static final String CID_RMQ_SYS_PREFIX = "CID_RMQ_SYS_";
+
+    public static final List<String> LOCAL_INET_ADDRESS = getLocalInetAddress();
+    public static final String LOCALHOST = localhost();
+    public static final String DEFAULT_CHARSET = "UTF-8";
+    public static final long MASTER_ID = 0L;
+    public static final long CURRENT_JVM_PID = getPID();
+
+    public static final String RETRY_GROUP_TOPIC_PREFIX = "%RETRY%";
+
+    public static final String DLQ_GROUP_TOPIC_PREFIX = "%DLQ%";
+    public static final String SYSTEM_TOPIC_PREFIX = "rmq_sys_";
+    public static final String UNIQUE_MSG_QUERY_FLAG = "_UNIQUE_KEY_QUERY";
+    public static final String DEFAULT_TRACE_REGION_ID = "DefaultRegion";
+    public static final String CONSUME_CONTEXT_TYPE = "ConsumeContextType";
+
+    public static String getRetryTopic(final String consumerGroup) {
+        return RETRY_GROUP_TOPIC_PREFIX + consumerGroup;
+    }
+
+
+    public static boolean isSysConsumerGroup(final String consumerGroup) {
+        return consumerGroup.startsWith(CID_RMQ_SYS_PREFIX);
+    }
+
+    public static boolean isSystemTopic(final String topic) {
+        return topic.startsWith(SYSTEM_TOPIC_PREFIX);
+    }
+
+    public static String getDLQTopic(final String consumerGroup) {
+        return DLQ_GROUP_TOPIC_PREFIX + consumerGroup;
+    }
+
+
+    public static String brokerVIPChannel(final boolean isChange, final String brokerAddr) {
+        if (isChange) {
+            String[] ipAndPort = brokerAddr.split(":");
+            String brokerAddrNew = ipAndPort[0] + ":" + (Integer.parseInt(ipAndPort[1]) - 2);
+            return brokerAddrNew;
+        } else {
+            return brokerAddr;
+        }
+    }
+
+
+    public static long getPID() {
+        String processName = java.lang.management.ManagementFactory.getRuntimeMXBean().getName();
+        if (processName != null && processName.length() > 0) {
+            try {
+                return Long.parseLong(processName.split("@")[0]);
+            } catch (Exception e) {
+                return 0;
+            }
+        }
+
+        return 0;
+    }
+
+
+    public static long createBrokerId(final String ip, final int port) {
+        InetSocketAddress isa = new InetSocketAddress(ip, port);
+        byte[] ipArray = isa.getAddress().getAddress();
+        ByteBuffer bb = ByteBuffer.allocate(8);
+        bb.put(ipArray);
+        bb.putInt(port);
+        long value = bb.getLong(0);
+        return Math.abs(value);
+    }
+
+    public static final void string2File(final String str, final String fileName) throws IOException {
+
+        String tmpFile = fileName + ".tmp";
+        string2FileNotSafe(str, tmpFile);
+
+
+        String bakFile = fileName + ".bak";
+        String prevContent = file2String(fileName);
+        if (prevContent != null) {
+            string2FileNotSafe(prevContent, bakFile);
+        }
+
+
+        File file = new File(fileName);
+        file.delete();
+
+
+        file = new File(tmpFile);
+        file.renameTo(new File(fileName));
+    }
+
+
+    public static final void string2FileNotSafe(final String str, final String fileName) throws IOException {
+        File file = new File(fileName);
+        File fileParent = file.getParentFile();
+        if (fileParent != null) {
+            fileParent.mkdirs();
+        }
+        FileWriter fileWriter = null;
+
+        try {
+            fileWriter = new FileWriter(file);
+            fileWriter.write(str);
+        } catch (IOException e) {
+            throw e;
+        } finally {
+            if (fileWriter != null) {
+                try {
+                    fileWriter.close();
+                } catch (IOException e) {
+                    throw e;
+                }
+            }
+        }
+    }
+
+
+    public static final String file2String(final String fileName) {
+        File file = new File(fileName);
+        return file2String(file);
+    }
+
+    public static final String file2String(final File file) {
+        if (file.exists()) {
+            char[] data = new char[(int) file.length()];
+            boolean result = false;
+
+            FileReader fileReader = null;
+            try {
+                fileReader = new FileReader(file);
+                int len = fileReader.read(data);
+                result = len == data.length;
+            } catch (IOException e) {
+                // e.printStackTrace();
+            } finally {
+                if (fileReader != null) {
+                    try {
+                        fileReader.close();
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+
+            if (result) {
+                return new String(data);
+            }
+        }
+        return null;
+    }
+
+    public static final String file2String(final URL url) {
+        InputStream in = null;
+        try {
+            URLConnection urlConnection = url.openConnection();
+            urlConnection.setUseCaches(false);
+            in = urlConnection.getInputStream();
+            int len = in.available();
+            byte[] data = new byte[len];
+            in.read(data, 0, len);
+            return new String(data, "UTF-8");
+        } catch (Exception e) {
+        } finally {
+            if (null != in) {
+                try {
+                    in.close();
+                } catch (IOException e) {
+                }
+            }
+        }
+
+        return null;
+    }
+
+    public static String findClassPath(Class<?> c) {
+        URL url = c.getProtectionDomain().getCodeSource().getLocation();
+        return url.getPath();
+    }
+
+
+    public static void printObjectProperties(final Logger log, final Object object) {
+        printObjectProperties(log, object, false);
+    }
+
+
+    public static void printObjectProperties(final Logger log, final Object object, final boolean onlyImportantField) {
+        Field[] fields = object.getClass().getDeclaredFields();
+        for (Field field : fields) {
+            if (!Modifier.isStatic(field.getModifiers())) {
+                String name = field.getName();
+                if (!name.startsWith("this")) {
+                    Object value = null;
+                    try {
+                        field.setAccessible(true);
+                        value = field.get(object);
+                        if (null == value) {
+                            value = "";
+                        }
+                    } catch (IllegalArgumentException e) {
+                        e.printStackTrace();
+                    } catch (IllegalAccessException e) {
+                        e.printStackTrace();
+                    }
+
+                    if (onlyImportantField) {
+                        Annotation annotation = field.getAnnotation(ImportantField.class);
+                        if (null == annotation) {
+                            continue;
+                        }
+                    }
+
+                    if (log != null) {
+                        log.info(name + "=" + value);
+                    } else {
+                    }
+                }
+            }
+        }
+    }
+
+
+    public static String properties2String(final Properties properties) {
+        StringBuilder sb = new StringBuilder();
+        for (Map.Entry<Object, Object> entry : properties.entrySet()) {
+            if (entry.getValue() != null) {
+                sb.append(entry.getKey().toString() + "=" + entry.getValue().toString() + "\n");
+            }
+        }
+        return sb.toString();
+    }
+
+    public static Properties string2Properties(final String str) {
+        Properties properties = new Properties();
+        try {
+            InputStream in = new ByteArrayInputStream(str.getBytes(DEFAULT_CHARSET));
+            properties.load(in);
+        } catch (UnsupportedEncodingException e) {
+            e.printStackTrace();
+            return null;
+        } catch (IOException e) {
+            e.printStackTrace();
+            return null;
+        }
+
+        return properties;
+    }
+
+    public static Properties object2Properties(final Object object) {
+        Properties properties = new Properties();
+
+        Field[] fields = object.getClass().getDeclaredFields();
+        for (Field field : fields) {
+            if (!Modifier.isStatic(field.getModifiers())) {
+                String name = field.getName();
+                if (!name.startsWith("this")) {
+                    Object value = null;
+                    try {
+                        field.setAccessible(true);
+                        value = field.get(object);
+                    } catch (IllegalArgumentException e) {
+                        e.printStackTrace();
+                    } catch (IllegalAccessException e) {
+                        e.printStackTrace();
+                    }
+
+                    if (value != null) {
+                        properties.setProperty(name, value.toString());
+                    }
+                }
+            }
+        }
+
+        return properties;
+    }
+
+    public static void properties2Object(final Properties p, final Object object) {
+        Method[] methods = object.getClass().getMethods();
+        for (Method method : methods) {
+            String mn = method.getName();
+            if (mn.startsWith("set")) {
+                try {
+                    String tmp = mn.substring(4);
+                    String first = mn.substring(3, 4);
+
+                    String key = first.toLowerCase() + tmp;
+                    String property = p.getProperty(key);
+                    if (property != null) {
+                        Class<?>[] pt = method.getParameterTypes();
+                        if (pt != null && pt.length > 0) {
+                            String cn = pt[0].getSimpleName();
+                            Object arg = null;
+                            if (cn.equals("int") || cn.equals("Integer")) {
+                                arg = Integer.parseInt(property);
+                            } else if (cn.equals("long") || cn.equals("Long")) {
+                                arg = Long.parseLong(property);
+                            } else if (cn.equals("double") || cn.equals("Double")) {
+                                arg = Double.parseDouble(property);
+                            } else if (cn.equals("boolean") || cn.equals("Boolean")) {
+                                arg = Boolean.parseBoolean(property);
+                            } else if (cn.equals("float") || cn.equals("Float")) {
+                                arg = Float.parseFloat(property);
+                            } else if (cn.equals("String")) {
+                                arg = property;
+                            } else {
+                                continue;
+                            }
+                            method.invoke(object, new Object[]{arg});
+                        }
+                    }
+                } catch (Throwable e) {
+                }
+            }
+        }
+    }
+
+
+    public static boolean isPropertiesEqual(final Properties p1, final Properties p2) {
+        return p1.equals(p2);
+    }
+
+
+    public static List<String> getLocalInetAddress() {
+        List<String> inetAddressList = new ArrayList<String>();
+        try {
+            Enumeration<NetworkInterface> enumeration = NetworkInterface.getNetworkInterfaces();
+            while (enumeration.hasMoreElements()) {
+                NetworkInterface networkInterface = enumeration.nextElement();
+                Enumeration<InetAddress> addrs = networkInterface.getInetAddresses();
+                while (addrs.hasMoreElements()) {
+                    inetAddressList.add(addrs.nextElement().getHostAddress());
+                }
+            }
+        } catch (SocketException e) {
+            throw new RuntimeException("get local inet address fail", e);
+        }
+
+        return inetAddressList;
+    }
+
+
+    public static boolean isLocalAddr(String address) {
+        for (String addr : LOCAL_INET_ADDRESS) {
+            if (address.contains(addr))
+                return true;
+        }
+        return false;
+    }
+
+
+    private static String localhost() {
+        try {
+            InetAddress addr = InetAddress.getLocalHost();
+            return addr.getHostAddress();
+        } catch (Throwable e) {
+            throw new RuntimeException("InetAddress java.net.InetAddress.getLocalHost() throws UnknownHostException"
+                    + FAQUrl.suggestTodo(FAQUrl.UNKNOWN_HOST_EXCEPTION),
+                    e);
+        }
+    }
+
+
+    public static boolean compareAndIncreaseOnly(final AtomicLong target, final long value) {
+        long prev = target.get();
+        while (value > prev) {
+            boolean updated = target.compareAndSet(prev, value);
+            if (updated)
+                return true;
+
+            prev = target.get();
+        }
+
+        return false;
+    }
+
+    public static String localhostName() {
+        try {
+            return InetAddress.getLocalHost().getHostName();
+        } catch (Throwable e) {
+            throw new RuntimeException("InetAddress java.net.InetAddress.getLocalHost() throws UnknownHostException"
+                    + FAQUrl.suggestTodo(FAQUrl.UNKNOWN_HOST_EXCEPTION),
+                    e);
+        }
+    }
+
+    public Set<String> list2Set(List<String> values) {
+        Set<String> result = new HashSet<String>();
+        for (String v : values) {
+            result.add(v);
+        }
+        return result;
+    }
+
+    public List<String> set2List(Set<String> values) {
+        List<String> result = new ArrayList<String>();
+        for (String v : values) {
+            result.add(v);
+        }
+        return result;
+    }
+
+    public static String humanReadableByteCount(long bytes, boolean si) {
+        int unit = si ? 1000 : 1024;
+        if (bytes < unit) return bytes + " B";
+        int exp = (int) (Math.log(bytes) / Math.log(unit));
+        String pre = (si ? "kMGTPE" : "KMGTPE").charAt(exp - 1) + (si ? "" : "i");
+        return String.format("%.1f %sB", bytes / Math.pow(unit, exp), pre);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/Pair.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/Pair.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/Pair.java
new file mode 100644
index 0000000..ada6144
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/Pair.java
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.common;
+
+/**
+ * @author shijia.wxr
+ */
+public class Pair<T1, T2> {
+    private T1 object1;
+    private T2 object2;
+
+
+    public Pair(T1 object1, T2 object2) {
+        this.object1 = object1;
+        this.object2 = object2;
+    }
+
+
+    public T1 getObject1() {
+        return object1;
+    }
+
+
+    public void setObject1(T1 object1) {
+        this.object1 = object1;
+    }
+
+
+    public T2 getObject2() {
+        return object2;
+    }
+
+
+    public void setObject2(T2 object2) {
+        this.object2 = object2;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/ServiceState.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/ServiceState.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/ServiceState.java
new file mode 100644
index 0000000..a580cf4
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/ServiceState.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.common;
+
+/**
+ * @author shijia.wxr
+ */
+public enum ServiceState {
+    /**
+     * Service just created,not start
+     */
+    CREATE_JUST,
+    /**
+     * Service Running
+     */
+    RUNNING,
+    /**
+     * Service shutdown
+     */
+    SHUTDOWN_ALREADY,
+    /**
+     * Service Start failure
+     */
+    START_FAILED;
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/ServiceThread.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/ServiceThread.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/ServiceThread.java
new file mode 100644
index 0000000..d6da0e3
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/ServiceThread.java
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.common;
+
+import com.alibaba.rocketmq.common.constant.LoggerName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * @author shijia.wxr
+ * @author xinyuzhou.zxy
+ */
+public abstract class ServiceThread implements Runnable {
+    private static final Logger STLOG = LoggerFactory.getLogger(LoggerName.COMMON_LOGGER_NAME);
+    private static final long JOIN_TIME = 90 * 1000;
+
+    protected final Thread thread;
+
+    protected volatile AtomicBoolean hasNotified = new AtomicBoolean(false);
+
+    protected volatile boolean stopped = false;
+
+    protected final CountDownLatch waitPoint = new CountDownLatch(1);
+
+
+    public ServiceThread() {
+        this.thread = new Thread(this, this.getServiceName());
+    }
+
+
+    public abstract String getServiceName();
+
+
+    public void start() {
+        this.thread.start();
+    }
+
+
+    public void shutdown() {
+        this.shutdown(false);
+    }
+
+    public void shutdown(final boolean interrupt) {
+        this.stopped = true;
+        STLOG.info("shutdown thread " + this.getServiceName() + " interrupt " + interrupt);
+
+        if (hasNotified.compareAndSet(false, true)) {
+            waitPoint.countDown(); // notify
+        }
+
+        try {
+            if (interrupt) {
+                this.thread.interrupt();
+            }
+
+            long beginTime = System.currentTimeMillis();
+            if (!this.thread.isDaemon()) {
+                this.thread.join(this.getJointime());
+            }
+            long eclipseTime = System.currentTimeMillis() - beginTime;
+            STLOG.info("join thread " + this.getServiceName() + " eclipse time(ms) " + eclipseTime + " "
+                    + this.getJointime());
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        }
+    }
+
+    public long getJointime() {
+        return JOIN_TIME;
+    }
+
+    public void stop() {
+        this.stop(false);
+    }
+
+    public void stop(final boolean interrupt) {
+        this.stopped = true;
+        STLOG.info("stop thread " + this.getServiceName() + " interrupt " + interrupt);
+
+        if (hasNotified.compareAndSet(false, true)) {
+            waitPoint.countDown(); // notify
+        }
+
+        if (interrupt) {
+            this.thread.interrupt();
+        }
+    }
+
+    public void makeStop() {
+        this.stopped = true;
+        STLOG.info("makestop thread " + this.getServiceName());
+    }
+
+    public void wakeup() {
+        if (hasNotified.compareAndSet(false, true)) {
+            waitPoint.countDown(); // notify
+        }
+    }
+
+    protected void waitForRunning(long interval) {
+        if (hasNotified.compareAndSet(true, false)) {
+            this.onWaitEnd();
+            return;
+        }
+
+        //entry to wait
+        waitPoint.reset();
+
+        try {
+            waitPoint.await(interval, TimeUnit.MILLISECONDS);
+        } catch (InterruptedException e) {
+            e.printStackTrace();
+        } finally {
+            hasNotified.set(false);
+            this.onWaitEnd();
+        }
+    }
+
+    protected void onWaitEnd() {
+    }
+
+    public boolean isStopped() {
+        return stopped;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/SystemClock.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/SystemClock.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/SystemClock.java
new file mode 100644
index 0000000..36c0448
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/SystemClock.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.common;
+
+/**
+ * @author vintage.wang
+ */
+public class SystemClock {
+    public long now() {
+        return System.currentTimeMillis();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/ThreadFactoryImpl.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/ThreadFactoryImpl.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/ThreadFactoryImpl.java
new file mode 100644
index 0000000..b4d85cd
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/ThreadFactoryImpl.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+
+package com.alibaba.rocketmq.common;
+
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicLong;
+
+
+public class ThreadFactoryImpl implements ThreadFactory {
+    private final AtomicLong threadIndex = new AtomicLong(0);
+    private final String threadNamePrefix;
+
+
+    public ThreadFactoryImpl(final String threadNamePrefix) {
+        this.threadNamePrefix = threadNamePrefix;
+    }
+
+
+    @Override
+    public Thread newThread(Runnable r) {
+        return new Thread(r, threadNamePrefix + this.threadIndex.incrementAndGet());
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/TopicConfig.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/TopicConfig.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/TopicConfig.java
new file mode 100644
index 0000000..16019df
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/TopicConfig.java
@@ -0,0 +1,206 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.common;
+
+import com.alibaba.rocketmq.common.constant.PermName;
+
+
+/**
+ * @author shijia.wxr
+ */
+public class TopicConfig {
+    private static final String SEPARATOR = " ";
+    public static int defaultReadQueueNums = 16;
+    public static int defaultWriteQueueNums = 16;
+    private String topicName;
+    private int readQueueNums = defaultReadQueueNums;
+    private int writeQueueNums = defaultWriteQueueNums;
+    private int perm = PermName.PERM_READ | PermName.PERM_WRITE;
+    private TopicFilterType topicFilterType = TopicFilterType.SINGLE_TAG;
+    private int topicSysFlag = 0;
+    private boolean order = false;
+
+
+    public TopicConfig() {
+    }
+
+
+    public TopicConfig(String topicName) {
+        this.topicName = topicName;
+    }
+
+
+    public TopicConfig(String topicName, int readQueueNums, int writeQueueNums, int perm) {
+        this.topicName = topicName;
+        this.readQueueNums = readQueueNums;
+        this.writeQueueNums = writeQueueNums;
+        this.perm = perm;
+    }
+
+
+    public String encode() {
+        StringBuilder sb = new StringBuilder();
+
+        // 1
+        sb.append(this.topicName);
+        sb.append(SEPARATOR);
+
+        // 2
+        sb.append(this.readQueueNums);
+        sb.append(SEPARATOR);
+
+        // 3
+        sb.append(this.writeQueueNums);
+        sb.append(SEPARATOR);
+
+        // 4
+        sb.append(this.perm);
+        sb.append(SEPARATOR);
+
+        // 5
+        sb.append(this.topicFilterType);
+
+        return sb.toString();
+    }
+
+
+    public boolean decode(final String in) {
+        String[] strs = in.split(SEPARATOR);
+        if (strs != null && strs.length == 5) {
+            this.topicName = strs[0];
+
+            this.readQueueNums = Integer.parseInt(strs[1]);
+
+            this.writeQueueNums = Integer.parseInt(strs[2]);
+
+            this.perm = Integer.parseInt(strs[3]);
+
+            this.topicFilterType = TopicFilterType.valueOf(strs[4]);
+
+            return true;
+        }
+
+        return false;
+    }
+
+
+    public String getTopicName() {
+        return topicName;
+    }
+
+
+    public void setTopicName(String topicName) {
+        this.topicName = topicName;
+    }
+
+
+    public int getReadQueueNums() {
+        return readQueueNums;
+    }
+
+
+    public void setReadQueueNums(int readQueueNums) {
+        this.readQueueNums = readQueueNums;
+    }
+
+
+    public int getWriteQueueNums() {
+        return writeQueueNums;
+    }
+
+
+    public void setWriteQueueNums(int writeQueueNums) {
+        this.writeQueueNums = writeQueueNums;
+    }
+
+
+    public int getPerm() {
+        return perm;
+    }
+
+
+    public void setPerm(int perm) {
+        this.perm = perm;
+    }
+
+
+    public TopicFilterType getTopicFilterType() {
+        return topicFilterType;
+    }
+
+
+    public void setTopicFilterType(TopicFilterType topicFilterType) {
+        this.topicFilterType = topicFilterType;
+    }
+
+
+    public int getTopicSysFlag() {
+        return topicSysFlag;
+    }
+
+
+    public void setTopicSysFlag(int topicSysFlag) {
+        this.topicSysFlag = topicSysFlag;
+    }
+
+
+    public boolean isOrder() {
+        return order;
+    }
+
+
+    public void setOrder(boolean isOrder) {
+        this.order = isOrder;
+    }
+
+    @Override
+    public boolean equals(final Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        final TopicConfig that = (TopicConfig) o;
+
+        if (readQueueNums != that.readQueueNums) return false;
+        if (writeQueueNums != that.writeQueueNums) return false;
+        if (perm != that.perm) return false;
+        if (topicSysFlag != that.topicSysFlag) return false;
+        if (order != that.order) return false;
+        if (topicName != null ? !topicName.equals(that.topicName) : that.topicName != null) return false;
+        return topicFilterType == that.topicFilterType;
+
+    }
+
+    @Override
+    public int hashCode() {
+        int result = topicName != null ? topicName.hashCode() : 0;
+        result = 31 * result + readQueueNums;
+        result = 31 * result + writeQueueNums;
+        result = 31 * result + perm;
+        result = 31 * result + (topicFilterType != null ? topicFilterType.hashCode() : 0);
+        result = 31 * result + topicSysFlag;
+        result = 31 * result + (order ? 1 : 0);
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "TopicConfig [topicName=" + topicName + ", readQueueNums=" + readQueueNums
+                + ", writeQueueNums=" + writeQueueNums + ", perm=" + PermName.perm2String(perm)
+                + ", topicFilterType=" + topicFilterType + ", topicSysFlag=" + topicSysFlag + ", order="
+                + order + "]";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-rocketmq/blob/057d0e9b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/TopicFilterType.java
----------------------------------------------------------------------
diff --git a/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/TopicFilterType.java b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/TopicFilterType.java
new file mode 100644
index 0000000..7a20dc9
--- /dev/null
+++ b/rocketmq-common/src/main/java/com/alibaba/rocketmq/common/TopicFilterType.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ */
+package com.alibaba.rocketmq.common;
+
+/**
+ * @author shijia.wxr
+ */
+public enum TopicFilterType {
+    SINGLE_TAG,
+    MULTI_TAG
+}


Mime
View raw message