usergrid-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mru...@apache.org
Subject [05/20] usergrid git commit: Make the shard consistency tests a little smarter. Update shard compaction to be a safer background process by updating the edge writes to be atomic, and the deletes delayed (ensures data will always be available for seeking
Date Wed, 23 Mar 2016 17:34:33 GMT
Make the shard consistency tests a little smarter.  Update shard compaction to be a safer background
process by updating the edge writes to be atomic, and the deletes delayed (ensures data will
always be available for seeking and we filter any dupes).


Project: http://git-wip-us.apache.org/repos/asf/usergrid/repo
Commit: http://git-wip-us.apache.org/repos/asf/usergrid/commit/b112488d
Tree: http://git-wip-us.apache.org/repos/asf/usergrid/tree/b112488d
Diff: http://git-wip-us.apache.org/repos/asf/usergrid/diff/b112488d

Branch: refs/heads/release-2.1.1
Commit: b112488db1ef01ee8417e35e48e428c40d0aa206
Parents: 4bbebc5
Author: Michael Russo <mrusso@apigee.com>
Authored: Wed Mar 16 17:25:22 2016 -0700
Committer: Michael Russo <mrusso@apigee.com>
Committed: Wed Mar 16 17:25:22 2016 -0700

----------------------------------------------------------------------
 .../core/astyanax/MultiRowColumnIterator.java   |   8 +-
 .../shard/impl/ShardGroupCompactionImpl.java    |  79 +++++++++++---
 .../graph/GraphManagerShardConsistencyIT.java   | 108 ++++++++++++-------
 .../graph/src/test/resources/log4j.properties   |   9 +-
 4 files changed, 142 insertions(+), 62 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/usergrid/blob/b112488d/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
index 10786f7..6c91aca 100644
--- a/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
+++ b/stack/corepersistence/common/src/main/java/org/apache/usergrid/persistence/core/astyanax/MultiRowColumnIterator.java
@@ -169,9 +169,6 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T>
{
 
                 }
 
-                // reset the start column as we'll be seeking a new row, any duplicates will
be filtered out
-                startColumn = null;
-
                 advance();
 
         }
@@ -312,8 +309,9 @@ public class MultiRowColumnIterator<R, C, T> implements Iterator<T>
{
         }
 
 
-        // if a whole page is skipped, this is likely during a shard transition and we should
assume there is more to read
-        if( skipSize == selectSize || skipSize == selectSize - 1){
+        // if a whole page is skipped OR the result size equals the the difference of what's
skipped,
+        // it is likely during a shard transition and we should assume there is more to read
+        if( skipSize == selectSize || skipSize == selectSize - 1 || size == selectSize -
skipSize ){
             moreToReturn = true;
         }
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b112488d/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
index 80b63ec..f644380 100644
--- a/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
+++ b/stack/corepersistence/graph/src/main/java/org/apache/usergrid/persistence/graph/serialization/impl/shard/impl/ShardGroupCompactionImpl.java
@@ -23,20 +23,17 @@ package org.apache.usergrid.persistence.graph.serialization.impl.shard.impl;
 
 
 import java.nio.charset.Charset;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Random;
-import java.util.Set;
-import java.util.UUID;
+import java.util.*;
 import java.util.concurrent.Callable;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.RejectedExecutionException;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
 
 import javax.annotation.Nullable;
 
 import com.google.common.base.Optional;
+import com.netflix.astyanax.connectionpool.OperationResult;
 import org.apache.usergrid.persistence.graph.Edge;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -72,6 +69,8 @@ import com.google.inject.Singleton;
 import com.netflix.astyanax.Keyspace;
 import com.netflix.astyanax.MutationBatch;
 import com.netflix.astyanax.connectionpool.exceptions.ConnectionException;
+import rx.Observable;
+import rx.schedulers.Schedulers;
 
 
 /**
@@ -199,7 +198,6 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction
{
                     break;
                 }
 
-
                 newRowBatch.mergeShallow( edgeMeta
                         .writeEdge( shardedEdgeSerialization, edgeColumnFamilies, scope,
targetShard, edge,
                             timestamp ) );
@@ -216,8 +214,35 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction
{
                 if ( edgeCount % maxWorkSize == 0 ) {
 
                     try {
-                        newRowBatch.execute();
-                        deleteRowBatch.execute();
+
+                        // write the edges into the new shard atomically so we know they
all succeed
+                        newRowBatch.withAtomicBatch(true).execute();
+
+                        List<MutationBatch> deleteMutations = new ArrayList<MutationBatch>(1)
+                        {{
+                            add(deleteRowBatch);
+                        }};
+
+                        // fire the mutation in the background after 1 second delay
+                        if(logger.isTraceEnabled()){
+                            logger.trace("scheduling shard compaction delete");
+
+                        }
+
+                        // perform the deletes after some delay, but we need to block before
marking this shard as 'compacted'
+                        Observable.from(deleteMutations)
+                            .delay(1000, TimeUnit.MILLISECONDS)
+                            .map(deleteRowBatchSingle -> {
+                                try {
+                                    return deleteRowBatchSingle.execute();
+                                } catch (ConnectionException e) {
+                                    logger.error("Unable to remove edges from old shards");
+                                    throw new RuntimeException("Unable to remove edges from
old shards");
+                                }
+                            })
+                            .subscribeOn(Schedulers.io())
+                            .toBlocking().last();
+
                     }
                     catch ( Throwable t ) {
                         logger.error( "Unable to move edges from shard {} to shard {}", sourceShard,
targetShard );
@@ -236,9 +261,35 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction
{
 
 
         try {
-            newRowBatch.execute();
-            deleteRowBatch.execute();
-            updateShardMetaBatch.execute();
+
+            // write the edges into the new shard atomically so we know they all succeed
+            newRowBatch.withAtomicBatch(true).execute();
+
+            List<MutationBatch> deleteMutations = new ArrayList<MutationBatch>(1)
+            {{
+                add(deleteRowBatch);
+            }};
+
+
+            if(logger.isTraceEnabled()) {
+                logger.trace("scheduling shard compaction delete");
+            }
+
+            // perform the deletes after some delay, but we need to block before marking
this shard as 'compacted'
+            Observable.from(deleteMutations)
+                .delay(1000, TimeUnit.MILLISECONDS)
+                .map(deleteRowBatchSingle -> {
+                    try {
+                        return deleteRowBatchSingle.execute();
+                    } catch (ConnectionException e) {
+                        logger.error("Unable to remove edges from old shards");
+                        throw new RuntimeException("Unable to remove edges from old shards");
+                    }
+                })
+                .subscribeOn(Schedulers.io())
+                .toBlocking().last();
+
+            //updateShardMetaBatch.execute();
         }
         catch ( Throwable t ) {
             logger.error( "Unable to move edges to target shard {}", targetShard );
@@ -286,7 +337,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction
{
                 shardRemovalRollup.execute();
             }
             catch ( ConnectionException e ) {
-                throw new RuntimeException( "Unable to connect to casandra", e );
+                throw new RuntimeException( "Unable to connect to cassandra", e );
             }
 
 
@@ -300,7 +351,7 @@ public class ShardGroupCompactionImpl implements ShardGroupCompaction
{
                 updateMark.execute();
             }
             catch ( ConnectionException e ) {
-                throw new RuntimeException( "Unable to connect to casandra", e );
+                throw new RuntimeException( "Unable to connect to cassandra", e );
             }
 
             resultBuilder.withCompactedShard( compactedShard );

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b112488d/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
index 82b0879..439553c 100644
--- a/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
+++ b/stack/corepersistence/graph/src/test/java/org/apache/usergrid/persistence/graph/GraphManagerShardConsistencyIT.java
@@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicLong;
 import javax.annotation.Nullable;
 
 import org.apache.usergrid.StressTest;
+import org.apache.usergrid.persistence.model.util.UUIDGenerator;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Test;
@@ -78,7 +79,6 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.fail;
 
-
 public class GraphManagerShardConsistencyIT {
     private static final Logger logger = LoggerFactory.getLogger( GraphManagerShardConsistencyIT.class
);
 
@@ -98,7 +98,10 @@ public class GraphManagerShardConsistencyIT {
 
     protected Object originalShardDelta;
 
-    protected ListeningExecutorService executor;
+    protected ListeningExecutorService writeExecutor;
+
+    protected ListeningExecutorService deleteExecutor;
+
 
 
     @Before
@@ -112,7 +115,7 @@ public class GraphManagerShardConsistencyIT {
         originalShardDelta = ConfigurationManager.getConfigInstance().getProperty( GraphFig.SHARD_MIN_DELTA
);
 
 
-        ConfigurationManager.getConfigInstance().setProperty( GraphFig.SHARD_SIZE, 10000
);
+        ConfigurationManager.getConfigInstance().setProperty( GraphFig.SHARD_SIZE, 5000 );
 
 
         final long cacheTimeout = 2000;
@@ -145,28 +148,39 @@ public class GraphManagerShardConsistencyIT {
         reporter.stop();
         reporter.report();
 
-        executor.shutdownNow();
+        if(writeExecutor != null){
+            writeExecutor.shutdownNow();
+
+        }
+        if(deleteExecutor != null){
+            deleteExecutor.shutdownNow();
+
+        }
+
     }
 
 
-    private void createExecutor( final int size ) {
-        executor = MoreExecutors.listeningDecorator( Executors.newFixedThreadPool( size )
);
+    private void createWriteExecutor( final int size ) {
+        writeExecutor = MoreExecutors.listeningDecorator( Executors.newFixedThreadPool( size
) );
     }
 
+    private void createDeleteExecutor( final int size ) {
+        deleteExecutor = MoreExecutors.listeningDecorator( Executors.newFixedThreadPool(
size ) );
+    }
 
     @Test
     public void writeThousandsSingleSource()
         throws InterruptedException, ExecutionException, MigrationException, UnsupportedEncodingException
{
 
-        final Id sourceId = IdGenerator.createId( "source" );
-        final String edgeType = "test";
+        final Id sourceId = IdGenerator.createId( "sourceWrite" );
+        final String edgeType = "testWrite_"+ UUIDGenerator.newTimeUUID().toString();
 
         final EdgeGenerator generator = new EdgeGenerator() {
 
 
             @Override
             public Edge newEdge() {
-                Edge edge = createEdge( sourceId, edgeType, IdGenerator.createId( "target"
) );
+                Edge edge = createEdge( sourceId, edgeType, IdGenerator.createId( "targetWrite"
) );
 
 
                 return edge;
@@ -182,12 +196,12 @@ public class GraphManagerShardConsistencyIT {
         };
 
 
-        //        final int numInjectors = 2;
+        //final int numInjectors = 2;
         final int numInjectors = 1;
 
         /**
-         * create 3 injectors.  This way all the caches are independent of one another. 
This is the same as
-         * multiple nodes
+         * create injectors.  This way all the caches are independent of one another.  This
is the same as
+         * multiple nodes if there are multiple injectors
          */
         final List<Injector> injectors = createInjectors( numInjectors );
 
@@ -214,7 +228,7 @@ public class GraphManagerShardConsistencyIT {
         final long workerWriteLimit = numberOfEdges / numWorkersPerInjector / numInjectors;
 
 
-        createExecutor( numWorkersPerInjector );
+        createWriteExecutor( numWorkersPerInjector );
 
         final AtomicLong writeCounter = new AtomicLong();
 
@@ -236,7 +250,7 @@ public class GraphManagerShardConsistencyIT {
 
             for ( int i = 0; i < numWorkersPerInjector; i++ ) {
                 Future<Boolean> future =
-                    executor.submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime,
writeCounter ) );
+                    writeExecutor.submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime,
writeCounter ) );
 
                 futures.add( future );
             }
@@ -260,20 +274,20 @@ public class GraphManagerShardConsistencyIT {
 
         final long writeCount = writeCounter.get();
         final long expectedShardCount = writeCount / shardSize;
-        final Meter readMeter = registry.meter( "readThroughput" );
+        final Meter readMeter = registry.meter( "readThroughput-writeTest" );
 
 
         final List<Throwable> failures = new ArrayList<>();
-        //Thread.sleep(5000);
+        Thread.sleep(3000); // let's make sure everything is written
 
-        for(int i = 0; i < 2; i ++) {
+        for(int i = 0; i < 1; i ++) {
 
 
             /**
              * Start reading continuously while we migrate data to ensure our view is always
correct
              */
             final ListenableFuture<Long> future =
-                executor.submit( new ReadWorker( gmf, generator, writeCount, readMeter )
);
+                writeExecutor.submit( new ReadWorker( gmf, generator, writeCount, readMeter
) );
 
 
             //add the future
@@ -282,7 +296,7 @@ public class GraphManagerShardConsistencyIT {
                 @Override
                 public void onSuccess( @Nullable final Long result ) {
                     logger.info( "Successfully ran the read, re-running" );
-                    executor.submit( new ReadWorker( gmf, generator, writeCount, readMeter
) );
+                    writeExecutor.submit( new ReadWorker( gmf, generator, writeCount, readMeter
) );
                 }
 
 
@@ -360,7 +374,7 @@ public class GraphManagerShardConsistencyIT {
 
         Thread.sleep(30000);
 
-        executor.shutdownNow();
+        writeExecutor.shutdownNow();
     }
 
 
@@ -390,20 +404,20 @@ public class GraphManagerShardConsistencyIT {
     }
 
 
-    @Test(timeout=120000)
+    @Test(timeout=300000) // this test is SLOW as deletes are intensive and shard cleanup
is async
     @Category(StressTest.class)
     public void writeThousandsDelete()
         throws InterruptedException, ExecutionException, MigrationException, UnsupportedEncodingException
{
 
-        final Id sourceId = IdGenerator.createId( "source" );
-        final String edgeType = "test";
+        final Id sourceId = IdGenerator.createId( "sourceDelete" );
+        final String deleteEdgeType = "testDelete_"+ UUIDGenerator.newTimeUUID().toString();
 
         final EdgeGenerator generator = new EdgeGenerator() {
 
 
             @Override
             public Edge newEdge() {
-                Edge edge = createEdge( sourceId, edgeType, IdGenerator.createId( "target"
) );
+                Edge edge = createEdge( sourceId, deleteEdgeType, IdGenerator.createId( "targetDelete"
) );
 
 
                 return edge;
@@ -413,18 +427,17 @@ public class GraphManagerShardConsistencyIT {
             @Override
             public Observable<MarkedEdge> doSearch( final GraphManager manager ) {
                 return manager.loadEdgesFromSource(
-                    new SimpleSearchByEdgeType( sourceId, edgeType, Long.MAX_VALUE, SearchByEdgeType.Order.DESCENDING,
+                    new SimpleSearchByEdgeType( sourceId, deleteEdgeType, Long.MAX_VALUE,
SearchByEdgeType.Order.DESCENDING,
                         Optional.<Edge>absent(), false ) );
             }
         };
 
 
-        //        final int numInjectors = 2;
         final int numInjectors = 1;
 
         /**
-         * create 3 injectors.  This way all the caches are independent of one another. 
This is the same as
-         * multiple nodes
+         * create injectors.  This way all the caches are independent of one another.  This
is the same as
+         * multiple nodes if there are multiple injectors
          */
         final List<Injector> injectors = createInjectors( numInjectors );
 
@@ -449,7 +462,7 @@ public class GraphManagerShardConsistencyIT {
 
         final long workerWriteLimit = numberOfEdges / numWorkersPerInjector / numInjectors;
 
-        createExecutor( numWorkersPerInjector );
+        createDeleteExecutor( numWorkersPerInjector );
 
 
         final AtomicLong writeCounter = new AtomicLong();
@@ -472,7 +485,7 @@ public class GraphManagerShardConsistencyIT {
 
             for ( int i = 0; i < numWorkersPerInjector; i++ ) {
                 Future<Boolean> future =
-                    executor.submit( new Worker( gmf, generator, workerWriteLimit, minExecutionTime,
writeCounter ) );
+                    deleteExecutor.submit( new Worker( gmf, generator, workerWriteLimit,
minExecutionTime, writeCounter ) );
 
                 futures.add( future );
             }
@@ -488,14 +501,14 @@ public class GraphManagerShardConsistencyIT {
         //now get all our shards
         final NodeShardCache cache = getInstance( injectors, NodeShardCache.class );
 
-        final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( sourceId,
edgeType );
+        final DirectedEdgeMeta directedEdgeMeta = DirectedEdgeMeta.fromSourceNode( sourceId,
deleteEdgeType );
 
         //now submit the readers.
         final GraphManagerFactory gmf = getInstance( injectors, GraphManagerFactory.class
);
 
 
         final long writeCount = writeCounter.get();
-        final Meter readMeter = registry.meter( "readThroughput" );
+        final Meter readMeter = registry.meter( "readThroughput-deleteTest" );
 
 
         //check our shard state
@@ -526,11 +539,28 @@ public class GraphManagerShardConsistencyIT {
 
         long count = Long.MAX_VALUE;
 
+        Thread.sleep(3000); // let's make sure everything is written
+
+        long totalDeleted = 0;
+
         while(count != 0) {
-            //take 10000 then sleep
-            count = generator.doSearch( manager ).onBackpressureBlock().take( 1000 ).flatMap(
edge -> manager.markEdge( edge ) )
+
+            logger.info("total deleted: {}", totalDeleted);
+            if(count != Long.MAX_VALUE) { // count starts with Long.MAX
+                logger.info("deleted {} entities, continuing until count is 0", count);
+            }
+            //take 1000 then sleep
+            count = generator.doSearch( manager ).take( 1000 )
+                .filter(markedEdge -> {
+
+                    // if it's already been marked let's filter, move on as async deleteEdge()
+                    logger.trace("edge already marked, may indicated a problem with gm.deleteEdge():
{}", markedEdge);
+                    return !markedEdge.isDeleted();
+                })
+                .flatMap( edge -> manager.markEdge( edge ))
                      .flatMap( edge -> manager.deleteEdge( edge ) ).countLong().toBlocking().last();
 
+            totalDeleted += count;
             Thread.sleep( 500 );
         }
 
@@ -541,7 +571,7 @@ public class GraphManagerShardConsistencyIT {
         /**
          * Start reading continuously while we migrate data to ensure our view is always
correct
          */
-        final ListenableFuture<Long> future = executor.submit( new ReadWorker( gmf,
generator, 0, readMeter ) );
+        final ListenableFuture<Long> future = deleteExecutor.submit( new ReadWorker(
gmf, generator, 0, readMeter ) );
 
         final List<Throwable> failures = new ArrayList<>();
 
@@ -552,7 +582,7 @@ public class GraphManagerShardConsistencyIT {
             @Override
             public void onSuccess( @Nullable final Long result ) {
                 logger.info( "Successfully ran the read, re-running" );
-                executor.submit( new ReadWorker( gmf, generator, writeCount, readMeter )
);
+                deleteExecutor.submit( new ReadWorker( gmf, generator, writeCount, readMeter
) );
             }
 
 
@@ -606,8 +636,8 @@ public class GraphManagerShardConsistencyIT {
             }
 
 
-            //we're done, 1 shard remains, we have a group, and it's our default shard
-            if ( shardCount == 1 && group != null &&  group.getMinShard().getShardIndex()
== Shard.MIN_SHARD.getShardIndex()  ) {
+            // we're done, 1 shard remains, we have a group, and it's our default shard
+            if ( shardCount == 1 && group.getMinShard().getShardIndex() == Shard.MIN_SHARD.getShardIndex()
 ) {
                 logger.info( "All compactions complete," );
 
                 break;
@@ -619,7 +649,7 @@ public class GraphManagerShardConsistencyIT {
 
         //now that we have finished expanding s
 
-        executor.shutdownNow();
+        deleteExecutor.shutdownNow();
     }
 
 

http://git-wip-us.apache.org/repos/asf/usergrid/blob/b112488d/stack/corepersistence/graph/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/stack/corepersistence/graph/src/test/resources/log4j.properties b/stack/corepersistence/graph/src/test/resources/log4j.properties
index 79401c3..5afc288 100644
--- a/stack/corepersistence/graph/src/test/resources/log4j.properties
+++ b/stack/corepersistence/graph/src/test/resources/log4j.properties
@@ -37,8 +37,9 @@ log4j.logger.cassandra.db=ERROR
 #log4j.logger.org.apache.usergrid.persistence.core.rx=TRACE
 #log4j.logger.org.apache.usergrid.persistence.core.astyanax=TRACE
 #log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.parse=TRACE
-#log4j.logger.org.apache.usergrid.persistence.core.astyanax.MultiRowColumnIterator=INFO
-#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardsColumnIterator=INFO
-#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupColumnIterator=INFO
-#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup=INFO
+#log4j.logger.org.apache.usergrid.persistence.core.astyanax.MultiRowColumnIterator=TRACE
+#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardsColumnIterator=TRACE
+#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.impl.ShardGroupColumnIterator=TRACE
+#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup=TRACE
+#log4j.logger.org.apache.usergrid.persistence.graph.serialization.impl.shard.ShardEntryGroup=TRACE
 


Mime
View raw message