Oh huh, that is a little surprising, I'm pretty sure we had this working with 0.11, and our code for running the job looked the exact same as you shared..

Anyway, glad you have it working now!

On Fri, Jan 8, 2016 at 4:05 PM, Yan Yang <yan@wealthfront.com> wrote:
Turns out upgrading crunch from 0.11.0 to 0.13.0 solves the problem. 

On Mon, Jan 4, 2016 at 5:40 PM, Yan Yang <yan@wealthfront.com> wrote:
Hi Jeff

I think the blank configuration may be the issue, our ExecutorClasses implements Tool and we use 

ToolRunner.run(new Configuration(), new ExecutorClass(), args) 

to run the crunch job, which worked fine with MRPipeline all the time. What is the correct way of inheriting the configuration here?

Thanks
Yan

On Mon, Jan 4, 2016 at 2:27 PM, Jeff Quinn <jeff@nuna.com> wrote:
Interesting, how are you submitting your job? Are you using spark-submit with the "yarn-master" spark master? Is your main class extending CrunchTool? My thinking is that somehow the default configurations are not being inherited, and maybe you are working with a totally blank Configuration object.

On Mon, Jan 4, 2016 at 2:19 PM, Yan Yang <yan@wealthfront.com> wrote:
Jeff,

Thanks for the suggestion. After I switch the URL to s3 an almost identical exception is now encountered:
java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3 URL, or by setting the fs.s3.awsAccessKeyId or fs.s3.awsSecretAccessKey properties (respectively).


On Mon, Jan 4, 2016 at 12:46 PM, Jeff Quinn <jeff@nuna.com> wrote:
Ah ok, I would try it with "s3://",and I think it should work as expected, assuming the machine role you are using for EMR has the proper permissions for writing to the bucket.

You should not need to set fs.s3n.awsSecretAccessKey/fs.s3n.awsAccessKeyId or any other properties, EMR service should be taking care of that for you.

On Mon, Jan 4, 2016 at 12:22 PM, Yan Yang <yan@wealthfront.com> wrote:
Hi Jeff,

We are using s3n://bucket/path

Thanks
Yan

On Mon, Jan 4, 2016 at 12:19 PM, Jeff Quinn <jeff@nuna.com> wrote:
Hey Yan,

Just a hunch but from that stacktrace it looks like you might be using the outdated s3-hadoop filesystem, is the url you are trying to write to of the form s3://bucket/path or s3n://bucket/path?

Thanks!

Jeff

On Mon, Jan 4, 2016 at 12:15 PM, Yan Yang <yan@wealthfront.com> wrote:
Hi

I have tried to set up a Sparkpipeline to run within AWS EMR.

The code is as below:

SparkConf sparkConf = new SparkConf().setAppName("JavaSparkPi");
JavaSparkContext jsc = new JavaSparkContext(sparkConf);
SparkPipeline pipeline = new SparkPipeline(jsc, "spark-app");

PCollection<Input> input = pipeline.read(From.avroFile(inputPaths, Input.class));
PCollection<Output> output = process(input);
pipeline.write(output, To.avroFile(outputPath));

The read works and a simple spark write such as calling saveAsTextFile() on an RDD object also works. 

However write using pipeline.write() hits below exceptions. I have tried to set fs.s3n.awsAccessKeyId and fs.s3n.awsSecretAccessKey in sparkConf with the same result:

java.lang.IllegalArgumentException: AWS Access Key ID and Secret Access Key must be specified as the username or password (respectively) of a s3n URL, or by setting the fs.s3n.awsAccessKeyId or fs.s3n.awsSecretAccessKey properties (respectively).
	at org.apache.hadoop.fs.s3.S3Credentials.initialize(S3Credentials.java:70)
	at org.apache.hadoop.fs.s3native.Jets3tNativeFileSystemStore.initialize(Jets3tNativeFileSystemStore.java:80)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:187)
	at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
	at org.apache.hadoop.fs.s3native.$Proxy9.initialize(Unknown Source)
	at org.apache.hadoop.fs.s3native.NativeS3FileSystem.initialize(NativeS3FileSystem.java:326)
	at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2644)
	at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:90)
	at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2678)
	at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2660)
	at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:374)
	at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
	at org.apache.avro.mapred.FsInput.<init>(FsInput.java:37)
	at org.apache.crunch.types.avro.AvroRecordReader.initialize(AvroRecordReader.java:54)
	at org.apache.crunch.impl.mr.run.CrunchRecordReader.initialize(CrunchRecordReader.java:150)
	at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:153)
	at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:124)
	at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:300)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:264)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:73)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
	at org.apache.spark.scheduler.Task.run(Task.scala:88)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:214)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
	at java.lang.Thread.run(Thread.java:745)
Thanks
Yan