ARTEMIS-1840 Refactor XML Data Serialiser
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/812776fc
Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/812776fc
Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/812776fc
Branch: refs/heads/master
Commit: 812776fca7ab7e029bdc5b46da87325d597fd723
Parents: 13fac86
Author: Martyn Taylor <mtaylor@redhat.com>
Authored: Wed May 2 11:32:39 2018 +0100
Committer: Clebert Suconic <clebertsuconic@apache.org>
Committed: Wed May 2 12:09:50 2018 -0400
----------------------------------------------------------------------
.../commands/tools/xml/XMLMessageExporter.java | 148 +++++++++
.../commands/tools/xml/XMLMessageImporter.java | 331 +++++++++++++++++++
.../commands/tools/xml/XmlDataConstants.java | 4 +-
.../cli/commands/tools/xml/XmlDataExporter.java | 117 +------
.../commands/tools/xml/XmlDataExporterUtil.java | 2 +-
.../cli/commands/tools/xml/XmlDataImporter.java | 288 +---------------
6 files changed, 507 insertions(+), 383 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/812776fc/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageExporter.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageExporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageExporter.java
new file mode 100644
index 0000000..a2fbadd
--- /dev/null
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageExporter.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.cli.commands.tools.xml;
+
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamWriter;
+import java.util.List;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
+import org.apache.activemq.artemis.api.core.ActiveMQException;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
+import org.apache.activemq.artemis.core.server.LargeServerMessage;
+import org.apache.activemq.artemis.reader.TextMessageUtil;
+
+
+/** This is an Utility class that will import the outputs in XML format. */
+public class XMLMessageExporter {
+
+ private static final Long LARGE_MESSAGE_CHUNK_SIZE = 1000L;
+
+ private XMLStreamWriter xmlWriter;
+
+ public XMLMessageExporter(XMLStreamWriter xmlWriter) {
+ this.xmlWriter = xmlWriter;
+ }
+
+ public XMLStreamWriter getRawXMLWriter() {
+ return xmlWriter;
+ }
+
+ public void printSingleMessageAsXML(ICoreMessage message, List<String> queues, boolean encodeTextUTF8) throws Exception {
+ xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_CHILD);
+ printMessageAttributes(message);
+ printMessageProperties(message);
+ printMessageQueues(queues);
+ printMessageBody(message.toCore(), encodeTextUTF8);
+ xmlWriter.writeEndElement(); // end MESSAGES_CHILD
+ }
+
+ public void printMessageBody(Message message, boolean encodeTextMessageUTF8) throws Exception {
+ xmlWriter.writeStartElement(XmlDataConstants.MESSAGE_BODY);
+
+ if (message.isLargeMessage()) {
+ printLargeMessageBody((LargeServerMessage) message);
+ } else {
+ if (encodeTextMessageUTF8 && message.toCore().getType() == Message.TEXT_TYPE) {
+ xmlWriter.writeCData(TextMessageUtil.readBodyText(message.toCore().getReadOnlyBodyBuffer()).toString());
+ } else {
+ xmlWriter.writeCData(XmlDataExporterUtil.encodeMessageBodyBase64(message));
+ }
+ }
+ xmlWriter.writeEndElement(); // end MESSAGE_BODY
+ }
+
+ public void printLargeMessageBody(LargeServerMessage message) throws XMLStreamException {
+ xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_IS_LARGE, Boolean.TRUE.toString());
+ LargeBodyEncoder encoder = null;
+
+ try {
+ encoder = message.toCore().getBodyEncoder();
+ encoder.open();
+ long totalBytesWritten = 0;
+ Long bufferSize;
+ long bodySize = encoder.getLargeBodySize();
+ for (long i = 0; i < bodySize; i += LARGE_MESSAGE_CHUNK_SIZE) {
+ Long remainder = bodySize - totalBytesWritten;
+ if (remainder >= LARGE_MESSAGE_CHUNK_SIZE) {
+ bufferSize = LARGE_MESSAGE_CHUNK_SIZE;
+ } else {
+ bufferSize = remainder;
+ }
+ ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(bufferSize.intValue());
+ encoder.encode(buffer, bufferSize.intValue());
+ xmlWriter.writeCData(XmlDataExporterUtil.encode(buffer.toByteBuffer().array()));
+ totalBytesWritten += bufferSize;
+ }
+ encoder.close();
+ } catch (ActiveMQException e) {
+ e.printStackTrace();
+ } finally {
+ if (encoder != null) {
+ try {
+ encoder.close();
+ } catch (ActiveMQException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ public void printMessageQueues(List<String> queues) throws XMLStreamException {
+ if (queues != null) {
+ xmlWriter.writeStartElement(XmlDataConstants.QUEUES_PARENT);
+ for (String queueName : queues) {
+ xmlWriter.writeEmptyElement(XmlDataConstants.QUEUES_CHILD);
+ xmlWriter.writeAttribute(XmlDataConstants.QUEUE_NAME, queueName);
+ }
+ xmlWriter.writeEndElement(); // end QUEUES_PARENT
+ }
+ }
+
+ public void printMessageProperties(Message message) throws XMLStreamException {
+ xmlWriter.writeStartElement(XmlDataConstants.PROPERTIES_PARENT);
+ for (SimpleString key : message.getPropertyNames()) {
+ Object value = message.getObjectProperty(key);
+ xmlWriter.writeEmptyElement(XmlDataConstants.PROPERTIES_CHILD);
+ xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_NAME, key.toString());
+ xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_VALUE, XmlDataExporterUtil.convertProperty(value));
+
+ // Write the property type as an attribute
+ String propertyType = XmlDataExporterUtil.getPropertyType(value);
+ if (propertyType != null) {
+ xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, propertyType);
+ }
+ }
+ xmlWriter.writeEndElement(); // end PROPERTIES_PARENT
+ }
+
+ public void printMessageAttributes(ICoreMessage message) throws XMLStreamException {
+ xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_ID, Long.toString(message.getMessageID()));
+ xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_PRIORITY, Byte.toString(message.getPriority()));
+ xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_EXPIRATION, Long.toString(message.getExpiration()));
+ xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_TIMESTAMP, Long.toString(message.getTimestamp()));
+ String prettyType = XmlDataExporterUtil.getMessagePrettyType(message.getType());
+ xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_TYPE, prettyType);
+ if (message.getUserID() != null) {
+ xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_USER_ID, message.getUserID().toString());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/812776fc/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageImporter.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageImporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageImporter.java
new file mode 100644
index 0000000..09e78d5
--- /dev/null
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XMLMessageImporter.java
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.cli.commands.tools.xml;
+
+import javax.xml.stream.XMLStreamConstants;
+import javax.xml.stream.XMLStreamException;
+import javax.xml.stream.XMLStreamReader;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.ByteBufAllocator;
+import org.apache.activemq.artemis.api.core.ICoreMessage;
+import org.apache.activemq.artemis.api.core.Message;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientMessage;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.utils.Base64;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
+import org.jboss.logging.Logger;
+
+/** This is an Utility class that will import the outputs in XML format. */
+public class XMLMessageImporter {
+
+ private static final Logger logger = Logger.getLogger(XMLMessageImporter.class);
+
+ private XMLStreamReader reader;
+
+ private ClientSession session;
+
+ Map<String, String> oldPrefixTranslation = new HashMap<>();
+
+ public XMLMessageImporter(XMLStreamReader xmlStreamReader, ClientSession session) {
+ this.reader = xmlStreamReader;
+ this.session = session;
+ }
+
+ public void setOldPrefixTranslation(Map<String, String> oldPrefixTranslation) {
+ this.oldPrefixTranslation = oldPrefixTranslation;
+ }
+
+ public XMLStreamReader getRawXMLReader() {
+ return reader;
+ }
+
+ public MessageInfo readMessage(boolean decodeUTF8) throws Exception {
+ if (!reader.hasNext()) return null;
+
+ Byte type = 0;
+ Byte priority = 0;
+ Long expiration = 0L;
+ Long timestamp = 0L;
+ Long id = 0L;
+ org.apache.activemq.artemis.utils.UUID userId = null;
+ ArrayList<String> queues = new ArrayList<>();
+
+ // get message's attributes
+ for (int i = 0; i < reader.getAttributeCount(); i++) {
+ String attributeName = reader.getAttributeLocalName(i);
+ switch (attributeName) {
+ case XmlDataConstants.MESSAGE_TYPE:
+ type = getMessageType(reader.getAttributeValue(i));
+ break;
+ case XmlDataConstants.MESSAGE_PRIORITY:
+ priority = Byte.parseByte(reader.getAttributeValue(i));
+ break;
+ case XmlDataConstants.MESSAGE_EXPIRATION:
+ expiration = Long.parseLong(reader.getAttributeValue(i));
+ break;
+ case XmlDataConstants.MESSAGE_TIMESTAMP:
+ timestamp = Long.parseLong(reader.getAttributeValue(i));
+ break;
+ case XmlDataConstants.MESSAGE_USER_ID:
+ userId = UUIDGenerator.getInstance().generateUUID();
+ break;
+ case XmlDataConstants.MESSAGE_ID:
+ id = Long.parseLong(reader.getAttributeValue(i));
+ break;
+ }
+ }
+
+ Message message = session.createMessage(type, true, expiration, timestamp, priority);
+
+ message.setUserID(userId);
+
+ boolean endLoop = false;
+
+ File largeMessageTemporaryFile = null;
+ // loop through the XML and gather up all the message's data (i.e. body, properties, queues, etc.)
+ while (reader.hasNext()) {
+ int eventType = reader.getEventType();
+ switch (eventType) {
+ case XMLStreamConstants.START_ELEMENT:
+ if (XmlDataConstants.MESSAGE_BODY.equals(reader.getLocalName())) {
+ largeMessageTemporaryFile = processMessageBody(message.toCore(), decodeUTF8);
+ } else if (XmlDataConstants.PROPERTIES_CHILD.equals(reader.getLocalName())) {
+ processMessageProperties(message);
+ } else if (XmlDataConstants.QUEUES_CHILD.equals(reader.getLocalName())) {
+ processMessageQueues(queues);
+ }
+ break;
+ case XMLStreamConstants.END_ELEMENT:
+ if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName())) {
+ endLoop = true;
+ }
+ break;
+ }
+ if (endLoop) {
+ break;
+ }
+ reader.next();
+ }
+ return new MessageInfo(id, queues, message, largeMessageTemporaryFile);
+ }
+
+ private Byte getMessageType(String value) {
+ Byte type = Message.DEFAULT_TYPE;
+ switch (value) {
+ case XmlDataConstants.DEFAULT_TYPE_PRETTY:
+ type = Message.DEFAULT_TYPE;
+ break;
+ case XmlDataConstants.BYTES_TYPE_PRETTY:
+ type = Message.BYTES_TYPE;
+ break;
+ case XmlDataConstants.MAP_TYPE_PRETTY:
+ type = Message.MAP_TYPE;
+ break;
+ case XmlDataConstants.OBJECT_TYPE_PRETTY:
+ type = Message.OBJECT_TYPE;
+ break;
+ case XmlDataConstants.STREAM_TYPE_PRETTY:
+ type = Message.STREAM_TYPE;
+ break;
+ case XmlDataConstants.TEXT_TYPE_PRETTY:
+ type = Message.TEXT_TYPE;
+ break;
+ }
+ return type;
+ }
+
+ private void processMessageQueues(ArrayList<String> queues) {
+ for (int i = 0; i < reader.getAttributeCount(); i++) {
+ if (XmlDataConstants.QUEUE_NAME.equals(reader.getAttributeLocalName(i))) {
+ String queueName = reader.getAttributeValue(i);
+ String translation = checkPrefix(queueName);
+ queues.add(translation);
+ }
+ }
+ }
+
+ private String checkPrefix(String queueName) {
+ String newQueueName = oldPrefixTranslation.get(queueName);
+ if (newQueueName == null) {
+ newQueueName = queueName;
+ }
+ return newQueueName;
+ }
+
+ private void processMessageProperties(Message message) {
+ String key = "";
+ String value = "";
+ String propertyType = "";
+
+ for (int i = 0; i < reader.getAttributeCount(); i++) {
+ String attributeName = reader.getAttributeLocalName(i);
+ switch (attributeName) {
+ case XmlDataConstants.PROPERTY_NAME:
+ key = reader.getAttributeValue(i);
+ break;
+ case XmlDataConstants.PROPERTY_VALUE:
+ value = reader.getAttributeValue(i);
+ break;
+ case XmlDataConstants.PROPERTY_TYPE:
+ propertyType = reader.getAttributeValue(i);
+ break;
+ }
+ }
+
+ if (value.equals(XmlDataConstants.NULL)) {
+ value = null;
+ }
+
+ switch (propertyType) {
+ case XmlDataConstants.PROPERTY_TYPE_SHORT:
+ message.putShortProperty(key, Short.parseShort(value));
+ break;
+ case XmlDataConstants.PROPERTY_TYPE_BOOLEAN:
+ message.putBooleanProperty(key, Boolean.parseBoolean(value));
+ break;
+ case XmlDataConstants.PROPERTY_TYPE_BYTE:
+ message.putByteProperty(key, Byte.parseByte(value));
+ break;
+ case XmlDataConstants.PROPERTY_TYPE_BYTES:
+ message.putBytesProperty(key, value == null ? null : decode(value));
+ break;
+ case XmlDataConstants.PROPERTY_TYPE_DOUBLE:
+ message.putDoubleProperty(key, Double.parseDouble(value));
+ break;
+ case XmlDataConstants.PROPERTY_TYPE_FLOAT:
+ message.putFloatProperty(key, Float.parseFloat(value));
+ break;
+ case XmlDataConstants.PROPERTY_TYPE_INTEGER:
+ message.putIntProperty(key, Integer.parseInt(value));
+ break;
+ case XmlDataConstants.PROPERTY_TYPE_LONG:
+ message.putLongProperty(key, Long.parseLong(value));
+ break;
+ case XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING:
+ message.putStringProperty(new SimpleString(key), value == null ? null : SimpleString.toSimpleString(value));
+ break;
+ case XmlDataConstants.PROPERTY_TYPE_STRING:
+ message.putStringProperty(key, value);
+ break;
+ }
+ }
+
+ private File processMessageBody(final ICoreMessage message, boolean decodeTextMessage) throws XMLStreamException, IOException {
+ File tempFileName = null;
+ boolean isLarge = false;
+
+ for (int i = 0; i < reader.getAttributeCount(); i++) {
+ String attributeName = reader.getAttributeLocalName(i);
+ if (XmlDataConstants.MESSAGE_IS_LARGE.equals(attributeName)) {
+ isLarge = Boolean.parseBoolean(reader.getAttributeValue(i));
+ }
+ }
+ reader.next();
+ if (logger.isDebugEnabled()) {
+ logger.debug("XMLStreamReader impl: " + reader);
+ }
+ if (isLarge) {
+ tempFileName = File.createTempFile("largeMessage", ".tmp");
+ if (logger.isDebugEnabled()) {
+ logger.debug("Creating temp file " + tempFileName + " for large message.");
+ }
+ try (OutputStream out = new BufferedOutputStream(new FileOutputStream(tempFileName))) {
+ getMessageBodyBytes(bytes -> out.write(bytes), (message.toCore().getType() == Message.TEXT_TYPE) && decodeTextMessage);
+ }
+ FileInputStream fileInputStream = new FileInputStream(tempFileName);
+ BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
+ ((ClientMessage) message).setBodyInputStream(bufferedInput);
+ } else {
+ getMessageBodyBytes(bytes -> message.getBodyBuffer().writeBytes(bytes), (message.toCore().getType() == Message.TEXT_TYPE) && decodeTextMessage);
+ }
+
+ return tempFileName;
+ }
+
+ /**
+ * Message bodies are written to XML as one or more Base64 encoded CDATA elements. Some parser implementations won't
+ * read an entire CDATA element at once (e.g. Woodstox) so it's possible that multiple CDATA/CHARACTERS events need
+ * to be combined to reconstruct the Base64 encoded string. You can't decode bits and pieces of each CDATA. Each
+ * CDATA has to be decoded in its entirety.
+ *
+ * @param processor used to deal with the decoded CDATA elements
+ * @param textMessage If this a text message we decode UTF8 and encode as a simple string
+ */
+ private void getMessageBodyBytes(MessageBodyBytesProcessor processor, boolean decodeTextMessage) throws IOException, XMLStreamException {
+ int currentEventType;
+ StringBuilder cdata = new StringBuilder();
+ while (reader.hasNext()) {
+ currentEventType = reader.getEventType();
+ if (currentEventType == XMLStreamConstants.END_ELEMENT) {
+ break;
+ } else if (currentEventType == XMLStreamConstants.CHARACTERS && reader.isWhiteSpace() && cdata.length() > 0) {
+ /* when we hit a whitespace CHARACTERS event we know that the entire CDATA is complete so decode, pass back to
+ * the processor, and reset the cdata for the next event(s)
+ */
+ if (decodeTextMessage) {
+ SimpleString text = new SimpleString(cdata.toString());
+ ByteBuf byteBuf = ByteBufAllocator.DEFAULT.buffer(SimpleString.sizeofNullableString(text));
+ SimpleString.writeNullableSimpleString(byteBuf, text);
+ byte[] bytes = new byte[SimpleString.sizeofNullableString(text)];
+ byteBuf.readBytes(bytes);
+ processor.processBodyBytes(bytes);
+ } else {
+ processor.processBodyBytes(decode(cdata.toString()));
+ cdata.setLength(0);
+ }
+ } else {
+ cdata.append(new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength()).trim());
+ }
+ reader.next();
+ }
+ }
+
+ private static byte[] decode(String data) {
+ return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
+ }
+
+ private interface MessageBodyBytesProcessor {
+ void processBodyBytes(byte[] bytes) throws IOException;
+ }
+
+ public class MessageInfo {
+ public long id;
+ public List<String> queues;
+ public Message message;
+ public File tempFile;
+
+ MessageInfo(long id, List<String> queues, Message message, File tempFile) {
+ this.message = message;
+ this.queues = queues;
+ this.id = id;
+ this.tempFile = tempFile;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/812776fc/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataConstants.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataConstants.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataConstants.java
index 24e56b2..a5a7c97 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataConstants.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataConstants.java
@@ -26,7 +26,7 @@ public final class XmlDataConstants {
// Utility
}
- static final String XML_VERSION = "1.0";
+ public static final String XML_VERSION = "1.0";
static final String DOCUMENT_PARENT = "activemq-journal";
static final String BINDINGS_PARENT = "bindings";
@@ -50,7 +50,7 @@ public final class XmlDataConstants {
static final String ADDRESS_BINDING_ID = "id";
static final String ADDRESS_BINDING_ROUTING_TYPE = "routing-types";
- static final String MESSAGES_PARENT = "messages";
+ public static final String MESSAGES_PARENT = "messages";
static final String MESSAGES_CHILD = "message";
static final String MESSAGE_ID = "id";
static final String MESSAGE_PRIORITY = "priority";
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/812776fc/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java
index 85eac4a..0125cb7 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporter.java
@@ -36,7 +36,6 @@ import java.util.TreeMap;
import io.airlift.airline.Command;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQBuffers;
-import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
@@ -48,7 +47,6 @@ import org.apache.activemq.artemis.core.journal.Journal;
import org.apache.activemq.artemis.core.journal.PreparedTransactionInfo;
import org.apache.activemq.artemis.core.journal.RecordInfo;
import org.apache.activemq.artemis.core.journal.TransactionFailureCallback;
-import org.apache.activemq.artemis.core.message.LargeBodyEncoder;
import org.apache.activemq.artemis.core.paging.PagedMessage;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
@@ -66,12 +64,10 @@ import org.apache.activemq.artemis.core.persistence.impl.journal.codec.Persisten
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.PersistentQueueBindingEncoding;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.artemis.core.server.JournalType;
-import org.apache.activemq.artemis.core.server.LargeServerMessage;
@Command(name = "exp", description = "Export all message-data using an XML that could be interpreted by any system.")
public final class XmlDataExporter extends DBOption {
- private static final Long LARGE_MESSAGE_CHUNK_SIZE = 1000L;
private XMLStreamWriter xmlWriter;
// an inner map of message refs hashed by the queue ID to which they belong and then hashed by their record ID
@@ -92,6 +88,8 @@ public final class XmlDataExporter extends DBOption {
long bindingsPrinted = 0L;
+ XMLMessageExporter exporter;
+
@Override
public Object execute(ActionContext context) throws Exception {
super.execute(context);
@@ -141,7 +139,7 @@ public final class XmlDataExporter extends DBOption {
XMLStreamWriter rawXmlWriter = factory.createXMLStreamWriter(out, "UTF-8");
PrettyPrintHandler handler = new PrettyPrintHandler(rawXmlWriter);
xmlWriter = (XMLStreamWriter) Proxy.newProxyInstance(XMLStreamWriter.class.getClassLoader(), new Class[]{XMLStreamWriter.class}, handler);
-
+ exporter = new XMLMessageExporter(xmlWriter);
writeXMLData();
}
@@ -317,7 +315,7 @@ public final class XmlDataExporter extends DBOption {
private void printDataAsXML() {
try {
- xmlWriter.writeStartDocument(XmlDataConstants.XML_VERSION);
+
xmlWriter.writeStartElement(XmlDataConstants.DOCUMENT_PARENT);
printBindingsAsXML();
printAllMessagesAsXML();
@@ -375,6 +373,10 @@ public final class XmlDataExporter extends DBOption {
xmlWriter.writeEndElement(); // end "messages"
}
+ private void printSingleMessageAsXML(ICoreMessage message, List<String> queues) throws Exception {
+ exporter.printSingleMessageAsXML(message, queues, false);
+ messagesPrinted++;
+ }
/**
* Reads from the page files and prints messages as it finds them (making sure to check acks and transactions
* from the journal).
@@ -444,104 +446,9 @@ public final class XmlDataExporter extends DBOption {
}
}
- private void printSingleMessageAsXML(ICoreMessage message, List<String> queues) throws Exception {
- xmlWriter.writeStartElement(XmlDataConstants.MESSAGES_CHILD);
- printMessageAttributes(message);
- printMessageProperties(message);
- printMessageQueues(queues);
- printMessageBody(message.toCore());
- xmlWriter.writeEndElement(); // end MESSAGES_CHILD
- messagesPrinted++;
- }
-
- private void printMessageBody(Message message) throws Exception {
- xmlWriter.writeStartElement(XmlDataConstants.MESSAGE_BODY);
-
- if (message.toCore().isLargeMessage()) {
- printLargeMessageBody((LargeServerMessage) message);
- } else {
- xmlWriter.writeCData(XmlDataExporterUtil.encodeMessageBody(message));
- }
- xmlWriter.writeEndElement(); // end MESSAGE_BODY
- }
-
- private void printLargeMessageBody(LargeServerMessage message) throws XMLStreamException {
- xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_IS_LARGE, Boolean.TRUE.toString());
- LargeBodyEncoder encoder = null;
-
- try {
- encoder = message.toCore().getBodyEncoder();
- encoder.open();
- long totalBytesWritten = 0;
- Long bufferSize;
- long bodySize = encoder.getLargeBodySize();
- for (long i = 0; i < bodySize; i += LARGE_MESSAGE_CHUNK_SIZE) {
- Long remainder = bodySize - totalBytesWritten;
- if (remainder >= LARGE_MESSAGE_CHUNK_SIZE) {
- bufferSize = LARGE_MESSAGE_CHUNK_SIZE;
- } else {
- bufferSize = remainder;
- }
- ActiveMQBuffer buffer = ActiveMQBuffers.fixedBuffer(bufferSize.intValue());
- encoder.encode(buffer, bufferSize.intValue());
- xmlWriter.writeCData(XmlDataExporterUtil.encode(buffer.toByteBuffer().array()));
- totalBytesWritten += bufferSize;
- }
- encoder.close();
- } catch (ActiveMQException e) {
- e.printStackTrace();
- } finally {
- if (encoder != null) {
- try {
- encoder.close();
- } catch (ActiveMQException e) {
- e.printStackTrace();
- }
- }
- }
- }
-
- private void printMessageQueues(List<String> queues) throws XMLStreamException {
- xmlWriter.writeStartElement(XmlDataConstants.QUEUES_PARENT);
- for (String queueName : queues) {
- xmlWriter.writeEmptyElement(XmlDataConstants.QUEUES_CHILD);
- xmlWriter.writeAttribute(XmlDataConstants.QUEUE_NAME, queueName);
- }
- xmlWriter.writeEndElement(); // end QUEUES_PARENT
- }
-
- private void printMessageProperties(Message message) throws XMLStreamException {
- xmlWriter.writeStartElement(XmlDataConstants.PROPERTIES_PARENT);
- for (SimpleString key : message.getPropertyNames()) {
- Object value = message.getObjectProperty(key);
- xmlWriter.writeEmptyElement(XmlDataConstants.PROPERTIES_CHILD);
- xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_NAME, key.toString());
- xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_VALUE, XmlDataExporterUtil.convertProperty(value));
-
- // Write the property type as an attribute
- String propertyType = XmlDataExporterUtil.getPropertyType(value);
- if (propertyType != null) {
- xmlWriter.writeAttribute(XmlDataConstants.PROPERTY_TYPE, propertyType);
- }
- }
- xmlWriter.writeEndElement(); // end PROPERTIES_PARENT
- }
-
- private void printMessageAttributes(ICoreMessage message) throws XMLStreamException {
- xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_ID, Long.toString(message.getMessageID()));
- xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_PRIORITY, Byte.toString(message.getPriority()));
- xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_EXPIRATION, Long.toString(message.getExpiration()));
- xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_TIMESTAMP, Long.toString(message.getTimestamp()));
- String prettyType = XmlDataExporterUtil.getMessagePrettyType(message.getType());
- xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_TYPE, prettyType);
- if (message.getUserID() != null) {
- xmlWriter.writeAttribute(XmlDataConstants.MESSAGE_USER_ID, message.getUserID().toString());
- }
- }
-
- private List<String> extractQueueNames(HashMap<Long, ReferenceDescribe> refMap) {
+ private List<String> extractQueueNames(HashMap<Long, DescribeJournal.ReferenceDescribe> refMap) {
List<String> queues = new ArrayList<>();
- for (ReferenceDescribe ref : refMap.values()) {
+ for (DescribeJournal.ReferenceDescribe ref : refMap.values()) {
queues.add(queueBindings.get(ref.refEncoding.queueID).getQueueName().toString());
}
return queues;
@@ -552,7 +459,7 @@ public final class XmlDataExporter extends DBOption {
/**
* Proxy to handle indenting the XML since <code>javax.xml.stream.XMLStreamWriter</code> doesn't support that.
*/
- static class PrettyPrintHandler implements InvocationHandler {
+ public static class PrettyPrintHandler implements InvocationHandler {
private final XMLStreamWriter target;
@@ -564,7 +471,7 @@ public final class XmlDataExporter extends DBOption {
boolean wrap = true;
- PrettyPrintHandler(XMLStreamWriter target) {
+ public PrettyPrintHandler(XMLStreamWriter target) {
this.target = target;
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/812776fc/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporterUtil.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporterUtil.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporterUtil.java
index df48dcf..7e6545c 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporterUtil.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataExporterUtil.java
@@ -88,7 +88,7 @@ public class XmlDataExporterUtil {
/**
* Base64 encode a ServerMessage body into the proper XML format
*/
- static String encodeMessageBody(final Message message) throws Exception {
+ static String encodeMessageBodyBase64(final Message message) throws Exception {
Preconditions.checkNotNull(message, "ServerMessage can not be null");
ActiveMQBuffer byteBuffer = message.toCore().getReadOnlyBodyBuffer();
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/812776fc/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java
----------------------------------------------------------------------
diff --git a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java
index 595ed55..bec2fbd 100644
--- a/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java
+++ b/artemis-cli/src/main/java/org/apache/activemq/artemis/cli/commands/tools/xml/XmlDataImporter.java
@@ -19,25 +19,18 @@ package org.apache.activemq.artemis.cli.commands.tools.xml;
import javax.xml.XMLConstants;
import javax.xml.stream.XMLInputFactory;
import javax.xml.stream.XMLStreamConstants;
-import javax.xml.stream.XMLStreamException;
import javax.xml.stream.XMLStreamReader;
import javax.xml.transform.stax.StAXSource;
import javax.xml.validation.Schema;
import javax.xml.validation.SchemaFactory;
import javax.xml.validation.Validator;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
import java.io.File;
import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
import java.net.URL;
import java.nio.ByteBuffer;
import java.security.AccessController;
import java.security.PrivilegedAction;
-import java.util.ArrayList;
import java.util.Comparator;
import java.util.EnumSet;
import java.util.HashMap;
@@ -47,7 +40,6 @@ import java.util.TreeSet;
import io.airlift.airline.Command;
import io.airlift.airline.Option;
-import org.apache.activemq.artemis.api.core.ICoreMessage;
import org.apache.activemq.artemis.api.core.Message;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
@@ -68,10 +60,8 @@ import org.apache.activemq.artemis.core.protocol.core.impl.PacketImpl;
import org.apache.activemq.artemis.core.remoting.impl.netty.NettyConnectorFactory;
import org.apache.activemq.artemis.core.remoting.impl.netty.TransportConstants;
import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.utils.Base64;
import org.apache.activemq.artemis.utils.ClassloadingUtil;
import org.apache.activemq.artemis.utils.ListUtil;
-import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.jboss.logging.Logger;
/**
@@ -86,6 +76,8 @@ public final class XmlDataImporter extends ActionAbstract {
private XMLStreamReader reader;
+ private XMLMessageImporter messageReader;
+
// this session is really only needed if the "session" variable does not auto-commit sends
ClientSession managementSession;
@@ -123,7 +115,7 @@ public final class XmlDataImporter extends ActionAbstract {
@Option(name = "--legacy-prefixes", description = "Do not remove prefixes from legacy imports")
public boolean legacyPrefixes = false;
- TreeSet<MessageTemp> messages;
+ TreeSet<XMLMessageImporter.MessageInfo> messages;
public String getPassword() {
return password;
@@ -179,6 +171,9 @@ public final class XmlDataImporter extends ActionAbstract {
ClientSession session,
ClientSession managementSession) throws Exception {
reader = XMLInputFactory.newInstance().createXMLStreamReader(inputStream);
+ messageReader = new XMLMessageImporter(reader, session);
+ messageReader.setOldPrefixTranslation(oldPrefixTranslation);
+
this.session = session;
if (managementSession != null) {
this.managementSession = managementSession;
@@ -237,9 +232,9 @@ public final class XmlDataImporter extends ActionAbstract {
private void processXml() throws Exception {
if (sort) {
- messages = new TreeSet<MessageTemp>(new Comparator<MessageTemp>() {
+ messages = new TreeSet<XMLMessageImporter.MessageInfo>(new Comparator<XMLMessageImporter.MessageInfo>() {
@Override
- public int compare(MessageTemp o1, MessageTemp o2) {
+ public int compare(XMLMessageImporter.MessageInfo o1, XMLMessageImporter.MessageInfo o2) {
if (o1.id == o2.id) {
return 0;
} else if (o1.id > o2.id) {
@@ -270,8 +265,8 @@ public final class XmlDataImporter extends ActionAbstract {
}
if (sort) {
- for (MessageTemp msgtmp : messages) {
- sendMessage(msgtmp.queues, msgtmp.message, msgtmp.tempFileName);
+ for (XMLMessageImporter.MessageInfo msgtmp : messages) {
+ sendMessage(msgtmp.queues, msgtmp.message, msgtmp.tempFile);
}
}
@@ -288,118 +283,14 @@ public final class XmlDataImporter extends ActionAbstract {
}
private void processMessage() throws Exception {
- Byte type = 0;
- Byte priority = 0;
- Long expiration = 0L;
- Long timestamp = 0L;
- Long id = 0L;
- org.apache.activemq.artemis.utils.UUID userId = null;
- ArrayList<String> queues = new ArrayList<>();
-
- // get message's attributes
- for (int i = 0; i < reader.getAttributeCount(); i++) {
- String attributeName = reader.getAttributeLocalName(i);
- switch (attributeName) {
- case XmlDataConstants.MESSAGE_TYPE:
- type = getMessageType(reader.getAttributeValue(i));
- break;
- case XmlDataConstants.MESSAGE_PRIORITY:
- priority = Byte.parseByte(reader.getAttributeValue(i));
- break;
- case XmlDataConstants.MESSAGE_EXPIRATION:
- expiration = Long.parseLong(reader.getAttributeValue(i));
- break;
- case XmlDataConstants.MESSAGE_TIMESTAMP:
- timestamp = Long.parseLong(reader.getAttributeValue(i));
- break;
- case XmlDataConstants.MESSAGE_USER_ID:
- userId = UUIDGenerator.getInstance().generateUUID();
- break;
- case XmlDataConstants.MESSAGE_ID:
- id = Long.parseLong(reader.getAttributeValue(i));
- break;
- }
- }
-
- Message message = session.createMessage(type, true, expiration, timestamp, priority);
- message.setUserID(userId);
-
- boolean endLoop = false;
-
- File largeMessageTemporaryFile = null;
- // loop through the XML and gather up all the message's data (i.e. body, properties, queues, etc.)
- while (reader.hasNext()) {
- int eventType = reader.getEventType();
- switch (eventType) {
- case XMLStreamConstants.START_ELEMENT:
- if (XmlDataConstants.MESSAGE_BODY.equals(reader.getLocalName())) {
- largeMessageTemporaryFile = processMessageBody(message.toCore());
- } else if (XmlDataConstants.PROPERTIES_CHILD.equals(reader.getLocalName())) {
- processMessageProperties(message);
- } else if (XmlDataConstants.QUEUES_CHILD.equals(reader.getLocalName())) {
- processMessageQueues(queues);
- }
- break;
- case XMLStreamConstants.END_ELEMENT:
- if (XmlDataConstants.MESSAGES_CHILD.equals(reader.getLocalName())) {
- endLoop = true;
- }
- break;
- }
- if (endLoop) {
- break;
- }
- reader.next();
- }
-
+ XMLMessageImporter.MessageInfo info = messageReader.readMessage(false);
if (sort) {
- messages.add(new MessageTemp(id, queues, message, largeMessageTemporaryFile));
+ messages.add(info);
} else {
- sendMessage(queues, message, largeMessageTemporaryFile);
- }
- }
-
-
- class MessageTemp {
- long id;
- List<String> queues;
- Message message;
- File tempFileName;
-
- MessageTemp(long id, List<String> queues, Message message, File tempFileName) {
- this.message = message;
- this.queues = queues;
- this.message = message;
- this.id = id;
- this.tempFileName = tempFileName;
+ sendMessage(info.queues, info.message, info.tempFile);
}
}
- private Byte getMessageType(String value) {
- Byte type = Message.DEFAULT_TYPE;
- switch (value) {
- case XmlDataConstants.DEFAULT_TYPE_PRETTY:
- type = Message.DEFAULT_TYPE;
- break;
- case XmlDataConstants.BYTES_TYPE_PRETTY:
- type = Message.BYTES_TYPE;
- break;
- case XmlDataConstants.MAP_TYPE_PRETTY:
- type = Message.MAP_TYPE;
- break;
- case XmlDataConstants.OBJECT_TYPE_PRETTY:
- type = Message.OBJECT_TYPE;
- break;
- case XmlDataConstants.STREAM_TYPE_PRETTY:
- type = Message.STREAM_TYPE;
- break;
- case XmlDataConstants.TEXT_TYPE_PRETTY:
- type = Message.TEXT_TYPE;
- break;
- }
- return type;
- }
-
private void sendMessage(List<String> queues, Message message, File tempFileName) throws Exception {
StringBuilder logMessage = new StringBuilder();
String destination = addressMap.get(queues.get(0));
@@ -460,153 +351,6 @@ public final class XmlDataImporter extends ActionAbstract {
}
}
- private void processMessageQueues(ArrayList<String> queues) {
- for (int i = 0; i < reader.getAttributeCount(); i++) {
- if (XmlDataConstants.QUEUE_NAME.equals(reader.getAttributeLocalName(i))) {
- String queueName = reader.getAttributeValue(i);
- String translation = checkPrefix(queueName);
- queues.add(translation);
- }
- }
- }
-
- private String checkPrefix(String queueName) {
- String newQueueName = oldPrefixTranslation.get(queueName);
- if (newQueueName == null) {
- newQueueName = queueName;
- }
- return newQueueName;
- }
-
- private void processMessageProperties(Message message) {
- String key = "";
- String value = "";
- String propertyType = "";
-
- for (int i = 0; i < reader.getAttributeCount(); i++) {
- String attributeName = reader.getAttributeLocalName(i);
- switch (attributeName) {
- case XmlDataConstants.PROPERTY_NAME:
- key = reader.getAttributeValue(i);
- break;
- case XmlDataConstants.PROPERTY_VALUE:
- value = reader.getAttributeValue(i);
- break;
- case XmlDataConstants.PROPERTY_TYPE:
- propertyType = reader.getAttributeValue(i);
- break;
- }
- }
-
- if (value.equals(XmlDataConstants.NULL)) {
- value = null;
- }
-
- switch (propertyType) {
- case XmlDataConstants.PROPERTY_TYPE_SHORT:
- message.putShortProperty(key, Short.parseShort(value));
- break;
- case XmlDataConstants.PROPERTY_TYPE_BOOLEAN:
- message.putBooleanProperty(key, Boolean.parseBoolean(value));
- break;
- case XmlDataConstants.PROPERTY_TYPE_BYTE:
- message.putByteProperty(key, Byte.parseByte(value));
- break;
- case XmlDataConstants.PROPERTY_TYPE_BYTES:
- message.putBytesProperty(key, value == null ? null : decode(value));
- break;
- case XmlDataConstants.PROPERTY_TYPE_DOUBLE:
- message.putDoubleProperty(key, Double.parseDouble(value));
- break;
- case XmlDataConstants.PROPERTY_TYPE_FLOAT:
- message.putFloatProperty(key, Float.parseFloat(value));
- break;
- case XmlDataConstants.PROPERTY_TYPE_INTEGER:
- message.putIntProperty(key, Integer.parseInt(value));
- break;
- case XmlDataConstants.PROPERTY_TYPE_LONG:
- message.putLongProperty(key, Long.parseLong(value));
- break;
- case XmlDataConstants.PROPERTY_TYPE_SIMPLE_STRING:
- message.putStringProperty(new SimpleString(key), value == null ? null : SimpleString.toSimpleString(value));
- break;
- case XmlDataConstants.PROPERTY_TYPE_STRING:
- message.putStringProperty(key, value);
- break;
- }
- }
-
- private File processMessageBody(final ICoreMessage message) throws XMLStreamException, IOException {
- File tempFileName = null;
- boolean isLarge = false;
-
- for (int i = 0; i < reader.getAttributeCount(); i++) {
- String attributeName = reader.getAttributeLocalName(i);
- if (XmlDataConstants.MESSAGE_IS_LARGE.equals(attributeName)) {
- isLarge = Boolean.parseBoolean(reader.getAttributeValue(i));
- }
- }
- reader.next();
- if (logger.isDebugEnabled()) {
- logger.debug("XMLStreamReader impl: " + reader);
- }
- if (isLarge) {
- tempFileName = File.createTempFile("largeMessage", ".tmp");
- if (logger.isDebugEnabled()) {
- logger.debug("Creating temp file " + tempFileName + " for large message.");
- }
- try (OutputStream out = new BufferedOutputStream(new FileOutputStream(tempFileName))) {
- getMessageBodyBytes(new MessageBodyBytesProcessor() {
- @Override
- public void processBodyBytes(byte[] bytes) throws IOException {
- out.write(bytes);
- }
- });
- }
- FileInputStream fileInputStream = new FileInputStream(tempFileName);
- BufferedInputStream bufferedInput = new BufferedInputStream(fileInputStream);
- ((ClientMessage) message).setBodyInputStream(bufferedInput);
- } else {
- getMessageBodyBytes(new MessageBodyBytesProcessor() {
- @Override
- public void processBodyBytes(byte[] bytes) throws IOException {
- message.getBodyBuffer().writeBytes(bytes);
- }
- });
- }
-
- return tempFileName;
- }
-
- /**
- * Message bodies are written to XML as one or more Base64 encoded CDATA elements. Some parser implementations won't
- * read an entire CDATA element at once (e.g. Woodstox) so it's possible that multiple CDATA/CHARACTERS events need
- * to be combined to reconstruct the Base64 encoded string. You can't decode bits and pieces of each CDATA. Each
- * CDATA has to be decoded in its entirety.
- *
- * @param processor used to deal with the decoded CDATA elements
- */
- private void getMessageBodyBytes(MessageBodyBytesProcessor processor) throws IOException, XMLStreamException {
- int currentEventType;
- StringBuilder cdata = new StringBuilder();
- while (reader.hasNext()) {
- currentEventType = reader.getEventType();
- if (currentEventType == XMLStreamConstants.END_ELEMENT) {
- break;
- } else if (currentEventType == XMLStreamConstants.CHARACTERS && reader.isWhiteSpace() && cdata.length() > 0) {
- /* when we hit a whitespace CHARACTERS event we know that the entire CDATA is complete so decode, pass back to
- * the processor, and reset the cdata for the next event(s)
- */
- processor.processBodyBytes(decode(cdata.toString()));
- cdata.setLength(0);
- } else {
- cdata.append(new String(reader.getTextCharacters(), reader.getTextStart(), reader.getTextLength()).trim());
- }
- reader.next();
- }
- }
-
-
private void oldBinding() throws Exception {
String queueName = "";
String address = "";
@@ -762,11 +506,5 @@ public final class XmlDataImporter extends ActionAbstract {
}
}
- private static byte[] decode(String data) {
- return Base64.decode(data, Base64.DONT_BREAK_LINES | Base64.URL_SAFE);
- }
- private interface MessageBodyBytesProcessor {
- void processBodyBytes(byte[] bytes) throws IOException;
- }
}
|