mrunit-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Reinis <mru...@orbit-x.de>
Subject Avro serialization and mapReduceDriver
Date Wed, 19 Feb 2014 10:30:47 GMT
Hello,

with the (rather ugly) configuration bellow I have managed to make most 
of the tests using Avro serialization to work except the final test with 
MapReduceDriver:

     @BeforeClass
     public static void setUp() throws IOException {
         TicketMasterDataMapper mapper = new TicketMasterDataMapper();
         AgentToTicketDataCombiner reducer = new 
AgentToTicketDataCombiner();

         Job jobInput = Job.getInstance(), jobOutput = Job.getInstance();
         jobInput.setInputFormatClass(AvroKeyInputFormat.class);
         AvroJob.setInputKeySchema(jobInput, vTicketMasterData.SCHEMA$);
AvroSerialization.addToConfiguration(jobInput.getConfiguration());
AvroSerialization.setKeyWriterSchema(jobInput.getConfiguration(), 
vTicketMasterData.SCHEMA$);

jobOutput.setOutputFormatClass(AvroKeyValueOutputFormat.class);
         AvroJob.setMapOutputKeySchema(jobOutput, Agent.SCHEMA$);
         AvroJob.setMapOutputValueSchema(jobOutput, 
TicketPreference.SCHEMA$);
         AvroJob.setOutputKeySchema(jobOutput, Agent.SCHEMA$);
         AvroJob.setOutputValueSchema(jobOutput, TicketPreference.SCHEMA$);
AvroSerialization.addToConfiguration(jobOutput.getConfiguration());
AvroSerialization.setKeyWriterSchema(jobOutput.getConfiguration(), 
Agent.SCHEMA$);
AvroSerialization.setValueWriterSchema(jobOutput.getConfiguration(), 
TicketPreference.SCHEMA$);

         // works!
         mapDriver = 
MapDriver.newMapDriver(mapper).withConfiguration(jobInput.getConfiguration());
mapDriver.withOutputSerializationConfiguration(jobOutput.getConfiguration());

         // works!
         reduceDriver = 
ReduceDriver.newReduceDriver(reducer).withConfiguration(jobOutput.getConfiguration());

         // fails!
         mapReduceDriver = MapReduceDriver.newMapReduceDriver(mapper, 
reducer).withConfiguration(jobInput.getConfiguration());
mapReduceDriver.withOutputSerializationConfiguration(jobOutput.getConfiguration());
     }

This test fails with AvroRuntimeException exception:

     @Test
     public void testTicketMasterDataMapSameAgentMapReduce() throws 
IOException {
         vTicketMasterData vTicketMasterData = new vTicketMasterData(1L, 
21L, 21L, "", "", 0L);

         Agent agent = new Agent((long) 21);
         TicketPreference resultingTicketPreference = new 
TicketPreference((long) 1, 0.8d);

         mapReduceDriver.withInput(new AvroKey<>(vTicketMasterData), 
NullWritable.get());

         // runTest cannot be used because there is no support for 
different schemas for inputs/outputs in copy methdod
         //mapReduceDriver.withOutput(new AvroKey<>(agent), new 
AvroValue<>(resultingTicketPreference));
         mapReduceDriver.run();
     }

and stacktrace:

org.apache.avro.AvroRuntimeException: Bad index
     at my.avro.Agent.get(Agent.java:33)
     at org.apache.avro.generic.GenericData.getField(GenericData.java:537)
     at org.apache.avro.reflect.ReflectData.getField(ReflectData.java:131)
     at org.apache.avro.generic.GenericData.getField(GenericData.java:552)
     at 
org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:104)
     at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:66)
     at 
org.apache.avro.reflect.ReflectDatumWriter.write(ReflectDatumWriter.java:104)
     at 
org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:58)
     at 
org.apache.avro.hadoop.io.AvroSerializer.serialize(AvroSerializer.java:104)
     at 
org.apache.avro.hadoop.io.AvroSerializer.serialize(AvroSerializer.java:46)
     at 
org.apache.hadoop.mrunit.internal.io.Serialization.copy(Serialization.java:74)
     at 
org.apache.hadoop.mrunit.internal.io.Serialization.copy(Serialization.java:91)
     at 
org.apache.hadoop.mrunit.internal.output.MockOutputCollector.collect(MockOutputCollector.java:48)
     at 
org.apache.hadoop.mrunit.internal.mapreduce.AbstractMockContextWrapper$4.answer(AbstractMockContextWrapper.java:90)
     at 
org.mockito.internal.stubbing.StubbedInvocationMatcher.answer(StubbedInvocationMatcher.java:31)
     at org.mockito.internal.MockHandler.handle(MockHandler.java:97)
     at 
org.mockito.internal.creation.MethodInterceptorFilter.intercept(MethodInterceptorFilter.java:47)
     at 
org.apache.hadoop.mapreduce.Mapper$Context$$EnhancerByMockitoWithCGLIB$$2bad9530.write(<generated>)
     at my.TicketMasterDataMapper.map(TicketMasterDataMapper.java:42)
     at my.TicketMasterDataMapper.map(TicketMasterDataMapper.java:22)
     at org.apache.hadoop.mapreduce.Mapper.run(Mapper.java:145)
     at org.apache.hadoop.mrunit.mapreduce.MapDriver.run(MapDriver.java:137)
     at 
org.apache.hadoop.mrunit.mapreduce.MapReduceDriver.run(MapReduceDriver.java:253)
     at 
my.TicketMasterDataMRTest.testTicketMasterDataMapSameAgentMapReduce(TicketMasterDataMRTest.java:119)
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
     at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
     at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
     at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:45)
     at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
     at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:42)
     at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
     at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:263)
     at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:68)
     at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:47)
     at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
     at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
     at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:28)
     at 
org.junit.rules.ExternalResource$1.evaluate(ExternalResource.java:46)
     at org.junit.rules.RunRules.evaluate(RunRules.java:18)
     at org.junit.runners.ParentRunner.run(ParentRunner.java:300)
     at org.junit.runners.Suite.runChild(Suite.java:128)
     at org.junit.runners.Suite.runChild(Suite.java:24)
     at org.junit.runners.ParentRunner$3.run(ParentRunner.java:231)
     at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:60)
     at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:229)
     at org.junit.runners.ParentRunner.access$000(ParentRunner.java:50)
     at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:222)
     at org.junit.runners.ParentRunner.run(ParentRunner.java:300)
     at org.junit.runner.JUnitCore.run(JUnitCore.java:157)
     at 
com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:74)
     at 
com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:202)
     at 
com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:65)
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
     at 
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
     at com.intellij.rt.execution.application.AppMain.main(AppMain.java:120)

although this test (using the very same @BeforeClass) does NOT fail:

     @Test
     public void testTicketMasterDataMapLastAgentMap() throws IOException {
         vTicketMasterData vTicketMasterData = new vTicketMasterData(1L, 
321L, null, "", "", 0L);
         Agent lastAgent = new Agent((long) 321);
         TicketPreference ticketPreference = new TicketPreference((long) 
1, 0.6d);

         mapDriver.withInput(new AvroKey<>(vTicketMasterData), 
NullWritable.get());
         //mapDriver.withOutput(new AvroKey<>(lastAgent), new 
AvroValue<>(ticketPreference));
         mapDriver.run();
     }

I'd appreciate any thoughts on how to make this final puzzle piece to work.

kind regards
reinis


Mime
View raw message