From user-return-60536-apmail-spark-user-archive=spark.apache.org@spark.apache.org Mon Aug 1 09:01:45 2016 Return-Path: X-Original-To: apmail-spark-user-archive@minotaur.apache.org Delivered-To: apmail-spark-user-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id 097371920F for ; Mon, 1 Aug 2016 09:01:45 +0000 (UTC) Received: (qmail 14700 invoked by uid 500); 1 Aug 2016 09:01:39 -0000 Delivered-To: apmail-spark-user-archive@spark.apache.org Received: (qmail 14589 invoked by uid 500); 1 Aug 2016 09:01:39 -0000 Mailing-List: contact user-help@spark.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Delivered-To: mailing list user@spark.apache.org Received: (qmail 14579 invoked by uid 99); 1 Aug 2016 09:01:39 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Aug 2016 09:01:39 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 00210C1DB0 for ; Mon, 1 Aug 2016 09:01:39 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: 1.179 X-Spam-Level: * X-Spam-Status: No, score=1.179 tagged_above=-999 required=6.31 tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1, HTML_MESSAGE=2, RCVD_IN_DNSWL_LOW=-0.7, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001] autolearn=disabled Authentication-Results: spamd1-us-west.apache.org (amavisd-new); dkim=pass (2048-bit key) header.d=gmail.com Received: from mx2-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id Gn8phDXXJALA for ; Mon, 1 Aug 2016 09:01:35 +0000 (UTC) Received: from mail-oi0-f54.google.com (mail-oi0-f54.google.com [209.85.218.54]) by mx2-lw-eu.apache.org (ASF Mail Server at mx2-lw-eu.apache.org) with ESMTPS id 7DED85F251 for ; Mon, 1 Aug 2016 09:01:34 +0000 (UTC) Received: by mail-oi0-f54.google.com with SMTP id l65so183920190oib.1 for ; Mon, 01 Aug 2016 02:01:34 -0700 (PDT) DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=gmail.com; s=20120113; h=mime-version:in-reply-to:references:from:date:message-id:subject:to :cc; bh=EFxnmEQHrkoglIg+/aYoYOEoEw4CoZ9pkg2Zfpj+VnI=; b=cyKZEiftP4+sGuIyXr4CxiiDIjDR41QgZAPliiv8bIqSFRz6P9ehdvorAojtbwLYw/ eAQrJp8mmPH1QEDZK4pEIFY9tA7jSZYxAMdr3LAhuQzGwrN6yULSvAyq5XxxJP424BTB jvFNeCHJZwezbmVJBS/YZnIQ7ivq+Dfyvy2LMAWSRg1ZoddPfEIzMp98K6wHcKcSW1Dw dRJS/ZfZizgXE+3PPp1h2fXJE8ir4TkoDg9mO1FVsCukMuWSWGd5tfWTmnephnLu+QF0 CIKh7PwS+PyVy8SZCNjgmorY3T33OrYTrAm/b8BryAQKxaUnCNNnj+pdZTke3lhEShVK fPCw== X-Google-DKIM-Signature: v=1; a=rsa-sha256; c=relaxed/relaxed; d=1e100.net; s=20130820; h=x-gm-message-state:mime-version:in-reply-to:references:from:date :message-id:subject:to:cc; bh=EFxnmEQHrkoglIg+/aYoYOEoEw4CoZ9pkg2Zfpj+VnI=; b=nDdOxqBvbNz9+quj809F+s9clFT3ejM1s4ysEl5UcMu0vnCfAItiuYT8jRIsCEoHzc FtJeJJjhdCjHDrY6VUej1gBFl71sZf6k9RA53ZbYWp2dHfIb69+gBcKP4mllSen3Q5oE F0k4jUzuJUZOmCFALAYjMUqi2m85MAC1VvgTybPJmx35BMbO78U5WYgVAHVCMM+0kDcS oIu+7GvRpjRO6akZDAmAVBpo2nT0jQH0J4cg7zYrUAEoVLERecBG+LS+mkPUql+F+COB TfyZ56fKCJnymCabUxby0vzOi36jaFPgZ/TARm0zoN1v2nZ9jki7UEu/K2BFXBHVxbAJ WDIA== X-Gm-Message-State: AEkoouvtCIHXp/1v5fhpsaCQJlQccta7P8+r3XT/3m85IXug8hia7kw2NCzDT8T0M3MD4i/OMr5KLTvQUPcmVw== X-Received: by 10.202.213.72 with SMTP id m69mr32756392oig.87.1470042093313; Mon, 01 Aug 2016 02:01:33 -0700 (PDT) MIME-Version: 1.0 Received: by 10.157.35.43 with HTTP; Mon, 1 Aug 2016 02:01:32 -0700 (PDT) In-Reply-To: References: From: Ayoub Benali Date: Mon, 1 Aug 2016 11:01:32 +0200 Message-ID: Subject: Re: spark 2.0 readStream from a REST API To: Michael Armbrust Cc: Jacek Laskowski , user Content-Type: multipart/alternative; boundary=001a113de40ac6e6be0538fed8ca --001a113de40ac6e6be0538fed8ca Content-Type: text/plain; charset=UTF-8 Hello, using the full class name worked, thanks. the problem now is that when I consume the dataframe for example with count I get the stack trace below. I followed the implementation of TextSocketSourceProvider to implement my data source and Text Socket source is used in the official documentation here . Why does count works in the example documentation? is there some other trait that need to be implemented ? Thanks, Ayoub. org.apache.spark.sql.AnalysisException: Queries with streaming sources must > be executed with writeStream.start(); > at > org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:173) > at > org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:33) > at > org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:31) > at > org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:126) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125) > at > org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125) > at scala.collection.immutable.List.foreach(List.scala:318) > at > org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125) > at > org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:31) > at > org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:59) > at > org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:70) > at > org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:68) > at > org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:74) > at > org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:74) > at > org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:78) > at > org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:76) > at > org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:83) > at > org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:83) > at org.apache.spark.sql.Dataset.withCallback(Dataset.scala:2541) > at org.apache.spark.sql.Dataset.count(Dataset.scala:2216) 2016-07-31 21:56 GMT+02:00 Michael Armbrust : > You have to add a file in resource too (example > ). > Either that or give a full class name. > > On Sun, Jul 31, 2016 at 9:45 AM, Ayoub Benali > wrote: > >> Looks like the way to go in spark 2.0 is to implement >> StreamSourceProvider >> >> with DataSourceRegister >> . >> But now spark fails at loading the class when doing: >> >> spark.readStream.format("mysource").load() >> >> I get : >> >> java.lang.ClassNotFoundException: Failed to find data source: mysource. >> Please find packages at http://spark-packages.org >> >> Is there something I need to do in order to "load" the Stream source >> provider ? >> >> Thanks, >> Ayoub >> >> 2016-07-31 17:19 GMT+02:00 Jacek Laskowski : >> >>> On Sun, Jul 31, 2016 at 12:53 PM, Ayoub Benali >>> wrote: >>> >>> > I started playing with the Structured Streaming API in spark 2.0 and I >>> am >>> > looking for a way to create streaming Dataset/Dataframe from a rest >>> HTTP >>> > endpoint but I am bit stuck. >>> >>> What a great idea! Why did I myself not think about this?!?! >>> >>> > What would be the easiest way to hack around it ? Do I need to >>> implement the >>> > Datasource API ? >>> >>> Yes and perhaps Hadoop API too, but not sure which one exactly since I >>> haven't even thought about it (not even once). >>> >>> > Are there examples on how to create a DataSource from a REST endpoint ? >>> >>> Never heard of one. >>> >>> I'm hosting a Spark/Scala meetup this week so I'll definitely propose >>> it as a topic. Thanks a lot! >>> >>> Pozdrawiam, >>> Jacek Laskowski >>> ---- >>> https://medium.com/@jaceklaskowski/ >>> Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark >>> Follow me at https://twitter.com/jaceklaskowski >>> >> >> > --001a113de40ac6e6be0538fed8ca Content-Type: text/html; charset=UTF-8 Content-Transfer-Encoding: quoted-printable
Hello,

using the full class name worked= , thanks.=C2=A0

the problem now is that when I con= sume the dataframe for example with count I get the stack trace below.

I followed the implementation of=C2=A0TextSocketSourceProvider=C2=A0to implement my data source and Te= xt Socket source is used in the official documentation=C2=A0here.=C2=A0

Why does coun= t works in the example documentation? is there some other trait that need t= o be implemented ?

Thanks,
Ayoub.
<= div>

org.apache.spark.sql.A= nalysisException: Queries with streaming sources must be executed with writ= eStream.start();
at org.apache.spark.sql.catalyst.analysis.UnsupportedOperat= ionChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChec= ker$$throwError(UnsupportedOperationChecker.scala:173)
at org.apache.spark.s= ql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.a= pply(UnsupportedOperationChecker.scala:33)
at org.apache.spark.sql.catalyst.= analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(Unsuppo= rtedOperationChecker.scala:31)
at org.apache.spark.sql.catalyst.trees.TreeNo= de.foreachUp(TreeNode.scala:126)
at org.apache.spark.sql.catalyst.trees.Tree= Node$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
at org.apache.spark.sql.= catalyst.trees.TreeNode$$anonfun$foreachUp$1.apply(TreeNode.scala:125)
<= span class=3D"gmail-Apple-tab-span" style=3D"white-space:pre"> at sc= ala.collection.immutable.List.foreach(List.scala:318)
at org.apache.spark.sq= l.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:125)
at org.apache.spark.= sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(Unsupporte= dOperationChecker.scala:31)
at org.apache.spark.sql.execution.QueryExecuti= on.assertSupported(QueryExecution.scala:59)
at org.apache.spark.sql.executio= n.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:70)
at org.a= pache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scal= a:68)
at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycomp= ute(QueryExecution.scala:74)
at org.apache.spark.sql.execution.QueryExecuti= on.optimizedPlan(QueryExecution.scala:74)
at org.apache.spark.sql.execution.= QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:78)
at org.apache.s= park.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:76)
at org.= apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExec= ution.scala:83)
at org.apache.spark.sql.execution.QueryExecution.executedPla= n(QueryExecution.scala:83)
at org.apache.spark.sql.Dataset.withCallback(Data= set.scala:2541)
at org.apache.spark.sql.Dataset.count(Dataset.scala:2216)






2016-07-31 21:56 GMT+02:00 Michael Armbrust <michael@databricks.= com>:
You = have to add a file in resource too (example).=C2=A0 E= ither that or give a full class name.

On Sun, J= ul 31, 2016 at 9:45 AM, Ayoub Benali <benali.ayoub.info@gmail.co= m> wrote:
=
Looks like the way to go in spark 2.0 is to implement=C2=A0StreamSourceProvider=C2=A0with=C2=A0DataSourceRegister. But now spark fails at loading t= he class when doing:

spark.readStream.format(&= quot;mysource").load()

I get :
=
j= ava.lang.ClassNotFoundException: Failed to find data source: mysourc= e. Pl= ease find packages at http://spark-packages.org

Is the= re something I need to do in order to "load" the Stream source pr= ovider ?

Thanks,
Ayoub
<= div>

2016-07-31 17= :19 GMT+02:00 Jacek Laskowski <jacek@japila.pl>:
On Sun, Jul 31, 2016 at 12:53 PM, Ayoub Benali <benali= .ayoub.info@gmail.com> wrote:

> I started playing with the Structured Streaming API in spark 2.0 and I= am
> looking for a way to create streaming Dataset/Dataframe from a rest HT= TP
> endpoint but I am bit stuck.

What a great idea! Why did I myself not think about this?!?!

> What would be the easiest way to hack around it ? Do I need to impleme= nt the
> Datasource API ?

Yes and perhaps Hadoop API too, but not sure which one exactly since= I
haven't even thought about it (not even once).

> Are there examples on how to create a DataSource from a REST endpoint = ?

Never heard of one.

I'm hosting a Spark/Scala meetup this week so I'll definitely propo= se
it as a topic. Thanks a lot!

Pozdrawiam,
Jacek Laskowski
----
https://medium.com/@jaceklaskowski/
Mastering Apache Spark 2.0 http://bit.ly/mastering-apache-spark<= /a>
Follow me at
https://twitter.com/jaceklaskowski



--001a113de40ac6e6be0538fed8ca--