spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ian Wilkinson <ia...@me.com>
Subject Re: DynamoDB input source
Date Mon, 21 Jul 2014 13:28:24 GMT
Hi,

I am invoking the spark-shell (Spark 1.0.0) with:

spark-shell --jars \
libs/aws-java-sdk-1.3.26.jar,\
libs/httpclient-4.1.1.jar,\
libs/httpcore-nio-4.1.jar,\
libs/gson-2.1.jar,\
libs/httpclient-cache-4.1.1.jar,\
libs/httpmime-4.1.1.jar,\
libs/hive-dynamodb-handler-0.11.0.jar,\
libs/httpcore-4.1.jar,\
libs/joda-time-2.1.jar

and, entering the following in the shell:


import org.apache.hadoop.io.Text;
import org.apache.hadoop.dynamodb.DynamoDBItemWritable
import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat
import org.apache.hadoop.mapred.JobConf

var jobConf = new JobConf(sc.hadoopConfiguration)
jobConf.set("dynamodb.servicename", "dynamodb")
jobConf.set("dynamodb.input.tableName", "<...>")
jobConf.set("dynamodb.endpoint", "dynamodb.eu-west-1.amazonaws.com")
jobConf.set("dynamodb.regionid", "eu-west-1")
jobConf.set("dynamodb.throughput.read", "1")
jobConf.set("dynamodb.throughput.read.percent", "1")

jobConf.set("dynamodb.awsAccessKeyId", "<...>")
jobConf.set("dynamodb.awsSecretAccessKey", "<...>")

jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat")

var users = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])
users.count()


This is raising an npe for FileSplit (as below). Any suggestions on
what I might pursue to correct this would be very welcome.

ian


14/07/20 23:56:03 INFO deprecation: session.id is deprecated. Instead, use dfs.metrics.session-id
14/07/20 23:56:03 INFO JvmMetrics: Initializing JVM Metrics with processName=JobTracker, sessionId=
14/07/20 23:56:03 INFO AbstractDynamoDBInputFormat: Throughput percent: 1.0
14/07/20 23:56:03 INFO EndpointProvider: Using endpoint for DynamoDB: dynamodb.eu-west-1.amazonaws.com
14/07/20 23:56:03 INFO DynamoDBClient: Describe Table Output: {Table: {TableName: <...>,
KeySchema: {HashKeyElement: {AttributeName: id, AttributeType: S, }, }, TableStatus: ACTIVE,
CreationDateTime: Wed May 07 14:38:30 BST 2014, ProvisionedThroughput: {ReadCapacityUnits:
4, WriteCapacityUnits: 4, }, TableSizeBytes: 2473, ItemCount: 14, }, }
14/07/20 23:56:03 INFO SparkContext: Starting job: count at <console>:21
14/07/20 23:56:03 INFO DAGScheduler: Got job 0 (count at <console>:21) with 1 output
partitions (allowLocal=false)
14/07/20 23:56:03 INFO DAGScheduler: Final stage: Stage 0(count at <console>:21)
14/07/20 23:56:03 INFO DAGScheduler: Parents of final stage: List()
14/07/20 23:56:03 INFO DAGScheduler: Missing parents: List()
14/07/20 23:56:03 INFO DAGScheduler: Submitting Stage 0 (HadoopRDD[0] at hadoopRDD at <console>:18),
which has no missing parents
14/07/20 23:56:03 ERROR DAGSchedulerActorSupervisor: eventProcesserActor failed due to the
error null; shutting down SparkContext
14/07/20 23:56:04 INFO SparkUI: Stopped Spark web UI at http://10.0.1.7:4040
14/07/20 23:56:04 INFO DAGScheduler: Stopping DAGScheduler
14/07/20 23:56:05 INFO MapOutputTrackerMasterActor: MapOutputTrackerActor stopped!
14/07/20 23:56:05 INFO ConnectionManager: Selector thread was interrupted!
14/07/20 23:56:05 INFO ConnectionManager: ConnectionManager stopped
14/07/20 23:56:05 INFO MemoryStore: MemoryStore cleared
14/07/20 23:56:05 INFO BlockManager: BlockManager stopped
14/07/20 23:56:05 INFO BlockManagerMasterActor: Stopping BlockManagerMaster
14/07/20 23:56:05 INFO BlockManagerMaster: BlockManagerMaster stopped
14/07/20 23:56:05 INFO SparkContext: Successfully stopped SparkContext
14/07/20 23:56:05 ERROR OneForOneStrategy:
java.lang.NullPointerException
	at org.apache.hadoop.mapreduce.lib.input.FileSplit.write(FileSplit.java:80)
	at org.apache.hadoop.mapred.FileSplit.write(FileSplit.java:85)
	at org.apache.hadoop.dynamodb.split.DynamoDBSplit.write(DynamoDBSplit.java:63)
	at org.apache.hadoop.io.ObjectWritable.writeObject(ObjectWritable.java:202)
	at org.apache.hadoop.io.ObjectWritable.writeObject(ObjectWritable.java:128)
	at org.apache.hadoop.io.ObjectWritable.write(ObjectWritable.java:82)
	at org.apache.spark.SerializableWritable.writeObject(SerializableWritable.scala:35)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:606)
	at java.io.ObjectStreamClass.invokeWriteObject(ObjectStreamClass.java:988)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1495)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
	at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1547)
	at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1508)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1431)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
	at org.apache.spark.scheduler.ResultTask.writeExternal(ResultTask.scala:132)
	at java.io.ObjectOutputStream.writeExternalData(ObjectOutputStream.java:1458)
	at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1429)
	at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1177)
	at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:347)
	at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
	at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:71)
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitMissingTasks(DAGScheduler.scala:767)
	at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$submitStage(DAGScheduler.scala:713)
	at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:697)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1176)
	at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
	at akka.actor.ActorCell.invoke(ActorCell.scala:456)
	at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
	at akka.dispatch.Mailbox.run(Mailbox.scala:219)
	at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
	at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
	at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
	at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
	at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
14/07/20 23:56:05 INFO RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
14/07/20 23:56:05 INFO RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down;
proceeding with flushing remote transports.
14/07/20 23:56:05 INFO Remoting: Remoting shut down
14/07/20 23:56:05 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.


On 4 Jul 2014, at 18:04, Nick Pentreath <nick.pentreath@gmail.com> wrote:

> Interesting - I would have thought they would make that available publicly. Unfortunately,
unless you can use Spark on EMR, I guess your options are to hack it by spinning up an EMR
cluster and getting the JAR, or maybe fall back to using boto and rolling your own :(
> 
> 
> 
> On Fri, Jul 4, 2014 at 9:28 AM, Ian Wilkinson <ianw1@me.com> wrote:
> Trying to discover source for the DynamoDBInputFormat.
> Not appearing in:
> 
> - https://github.com/aws/aws-sdk-java
> - https://github.com/apache/hive
> 
> Then came across http://stackoverflow.com/questions/17077774/jar-containing-org-apache-hadoop-hive-dynamodb.
> Unsure whether this represents the latest situation…
> 
> ian 
> 
> 
> On 4 Jul 2014, at 16:58, Nick Pentreath <nick.pentreath@gmail.com> wrote:
> 
>> I should qualify by saying there is boto support for dynamodb - but not for the inputFormat.
You could roll your own python-based connection but this involves figuring out how to split
the data in dynamo - inputFormat takes care of this so should be the easier approach 
>> —
>> Sent from Mailbox
>> 
>> 
>> On Fri, Jul 4, 2014 at 8:51 AM, Ian Wilkinson <ianw1@me.com> wrote:
>> 
>> Excellent. Let me get browsing on this.
>> 
>> 
>> Huge thanks,
>> ian
>> 
>> 
>> On 4 Jul 2014, at 16:47, Nick Pentreath <nick.pentreath@gmail.com> wrote:
>> 
>>> No boto support for that. 
>>> 
>>> In master there is Python support for loading Hadoop inputFormat. Not sure if
it will be in 1.0.1 or 1.1
>>> 
>>> I master docs under the programming guide are instructions and also under examples
project there are pyspark examples of using Cassandra and HBase. These should hopefully give
you enough to get started. 
>>> 
>>> Depending on how easy it is to use the dynamo DB format, you may have to write
a custom converter (see the mentioned examples for storm details).
>>> 
>>> Sent from my iPhone
>>> 
>>> On 4 Jul 2014, at 08:38, Ian Wilkinson <ianw1@me.com> wrote:
>>> 
>>>> Hi Nick,
>>>> 
>>>> I’m going to be working with python primarily. Are you aware of
>>>> comparable boto support?
>>>> 
>>>> ian
>>>> 
>>>> On 4 Jul 2014, at 16:32, Nick Pentreath <nick.pentreath@gmail.com>
wrote:
>>>> 
>>>>> You should be able to use DynamoDBInputFormat (I think this should be
part of AWS libraries for Java) and create a HadoopRDD from that.
>>>>> 
>>>>> 
>>>>> On Fri, Jul 4, 2014 at 8:28 AM, Ian Wilkinson <ianw1@me.com> wrote:
>>>>> Hi,
>>>>> 
>>>>> I noticed mention of DynamoDB as input source in
>>>>> http://ampcamp.berkeley.edu/wp-content/uploads/2012/06/matei-zaharia-amp-camp-2012-advanced-spark.pdf.
>>>>> 
>>>>> Unfortunately, Google is not coming to my rescue on finding
>>>>> further mention for this support.
>>>>> 
>>>>> Any pointers would be well received.
>>>>> 
>>>>> Big thanks,
>>>>> ian
>>>>> 
>>>> 
>> 
>> 
> 
> 


Mime
View raw message