nifi-users mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Mark Payne <marka...@hotmail.com>
Subject Re: Connecting Spark to Nifi 0.4.0
Date Mon, 22 Feb 2016 19:24:55 GMT
Kyle,

Another thought here, that I don't see mentioned yet is how NiFi's site-to-site works. Because
the Spark Receiver does not know whether the NiFi you're talking to is a standalone instance
or
a cluster, it first will ask for a list of all NiFi nodes. Assuming that you've got a standalone
instance
running, it responds with whatever it believes the hostname is.

It's possible that what you're seeing here is that NiFi responds and says "here's the hostname
you can connect to,"
which is not localhost but rather whatever is configured in your OS. You can explicitly override
this and tell NiFi
which hostname to use by setting the "nifi.remote.input.socket.host" property in conf/nifi.properties.

So I would try setting that explicitly to "localhost" or to whatever hostname you want to
communicate over.
If you change that property, though, you'll have to restart NiFi in order for the change to
take effect.

Thanks
-Mark


> On Feb 22, 2016, at 1:20 PM, Bryan Bende <bbende@gmail.com> wrote:
> 
> Hi Kyle,
> 
> It seems like the stack trace is suggesting that Spark is trying to download dependencies
from the like that references Executor.updateDependencies:
> https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L391
<https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/executor/Executor.scala#L391>
> 
> Any chance you are behind some kind of firewall preventing this?
> 
> I'm not that familiar with Spark streaming, but I also noticed in one of the tutorials
that it did something like this:
> 
> spark.driver.extraClassPath /opt/spark-receiver/nifi-spark-receiver-0.4.1.jar:/opt/spark-receiver/nifi-site-to-site-client-0.4.1.jar:/opt/nifi-1.1.1.0-12/lib/nifi-api-1.1.1.0-12.jar:/opt/nifi-1.1.1.0-12/lib/bootstrap/nifi-utils-1.1.1.0-12.jar:/opt/nifi-1.1.1.0-12/work/nar/framework/nifi-framework-nar-1.1.1.0-12.nar-unpacked/META-INF/bundled-dependencies/nifi-client-dto-1.1.1.0-12.jar
> 
> Which I would think means it wouldn't have to go out and download the NiFi dependencies
if it is being provided on the class path, but again not really sure.
> 
> -Bryan
> 
> 
> On Mon, Feb 22, 2016 at 1:09 PM, Kyle Burke <kyle.burke@ignitionone.com <mailto:kyle.burke@ignitionone.com>>
wrote:
> Joe,
>    I’m not sure what to do with Bryan’s comment. The spark code I’m running has
no problem reading from a Kafka receiver. I only get the error when trying to read from a
Nifi receiver. When I create a Nifi flow that reads from the same kafka stream and sends the
data to our outport port I get the issue.
> 
> Respectfully,
> 
> 
> Kyle Burke | Data Science Engineer
> IgnitionOne - Marketing Technology. Simplified.
> Office: 1545 Peachtree St NE, Suite 500 | Atlanta, GA | 30309
> Direct: 404.961.3918 <tel:404.961.3918>
> 
> 
> 
> 
> 
> 
> 
> 
> 
> On 2/22/16, 1:00 PM, "Joe Witt" <joe.witt@gmail.com <mailto:joe.witt@gmail.com>>
wrote:
> 
> >Kyle,
> >
> >Did you get a chance to look into what Bryan mentioned?  He made a
> >great point in that the stacktrace doesn't seem to have any
> >relationship to NiFi or NiFi's site-to-site code.
> >
> >Thanks
> >Joe
> >
> >On Mon, Feb 22, 2016 at 12:58 PM, Kyle Burke <kyle.burke@ignitionone.com <mailto:kyle.burke@ignitionone.com>>
wrote:
> >> Telnet leads me to believe the port is open. (I upgrade to 0.5.0 today in
> >> hopes that it will help but no luck)
> >>
> >> From Telnet:
> >>
> >> 12:50:11 [~/Dev/nifi/nifi-0.5.0] $ telnet localhost 8080
> >>
> >> Trying ::1...
> >>
> >> Connected to localhost.
> >>
> >> Escape character is '^]’.
> >>
> >>
> >> Respectfully,
> >>
> >> Kyle Burke | Data Science Engineer
> >> IgnitionOne - Marketing Technology. Simplified.
> >> Office: 1545 Peachtree St NE, Suite 500 | Atlanta, GA | 30309
> >> Direct: 404.961.3918
> >>
> >>
> >> From: Joe Witt
> >> Reply-To: "users@nifi.apache.org <mailto:users@nifi.apache.org>"
> >> Date: Saturday, February 20, 2016 at 5:16 PM
> >> To: "users@nifi.apache.org <mailto:users@nifi.apache.org>"
> >> Subject: Re: Connecting Spark to Nifi 0.4.0
> >>
> >> Kyle
> >>
> >> Can you try connecting to that nifi port using telnet and see if you are
> >> able?
> >>
> >> Use the same host and port as you are in your spark job.
> >>
> >> Thanks
> >> Joe
> >>
> >> On Feb 20, 2016 4:55 PM, "Kyle Burke" <kyle.burke@ignitionone.com <mailto:kyle.burke@ignitionone.com>>
wrote:
> >>>
> >>> All,
> >>>    I’m attempting to connect Spark to Nifi but I’m getting a “connect
> >>> timed out” error when spark tries to pull records from the input port.
I
> >>> don’t understand why I”m getting the issue because nifi and spark are
both
> >>> running on my local laptop. Any suggestions about how to get around the
> >>> issue?
> >>>
> >>> It appears that nifi is listening on the port because I see the following
> >>> when running the lsof command:
> >>>
> >>> java    31455 kyle.burke 1054u  IPv4 0x1024ddd67a640091      0t0  TCP
> >>> *:9099 (LISTEN)
> >>>
> >>>
> >>> I’ve been following the instructions give in these two articles:
> >>> https://blogs.apache.org/nifi/entry/stream_processing_nifi_and_spark <https://blogs.apache.org/nifi/entry/stream_processing_nifi_and_spark>
> >>>
> >>> https://community.hortonworks.com/articles/12708/nifi-feeding-data-to-spark-streaming.html
<https://community.hortonworks.com/articles/12708/nifi-feeding-data-to-spark-streaming.html>
> >>>
> >>> Here is how I have my nifi.properties setting:
> >>>
> >>> # Site to Site properties
> >>>
> >>> nifi.remote.input.socket.host=
> >>>
> >>> nifi.remote.input.socket.port=9099
> >>>
> >>> nifi.remote.input.secure=false
> >>>
> >>>
> >>> Below is the full error stack:
> >>>
> >>> 16/02/20 16:34:45 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID
> >>> 0)
> >>>
> >>> java.net.SocketTimeoutException: connect timed out
> >>>
> >>> at java.net.PlainSocketImpl.socketConnect(Native Method)
> >>>
> >>> at
> >>> java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:350)
> >>>
> >>> at
> >>> java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:206)
> >>>
> >>> at
> >>> java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:188)
> >>>
> >>> at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392)
> >>>
> >>> at java.net.Socket.connect(Socket.java:589)
> >>>
> >>> at sun.net.NetworkClient.doConnect(NetworkClient.java:175)
> >>>
> >>> at sun.net.www.http.HttpClient.openServer(HttpClient.java:432)
> >>>
> >>> at sun.net.www.http.HttpClient.openServer(HttpClient.java:527)
> >>>
> >>> at sun.net.www.http.HttpClient.<init>(HttpClient.java:211)
> >>>
> >>> at sun.net.www.http.HttpClient.New(HttpClient.java:308)
> >>>
> >>> at sun.net.www.http.HttpClient.New(HttpClient.java:326)
> >>>
> >>> at
> >>> sun.net.www.protocol.http.HttpURLConnection.getNewHttpClient(HttpURLConnection.java:1168)
> >>>
> >>> at
> >>> sun.net.www.protocol.http.HttpURLConnection.plainConnect0(HttpURLConnection.java:1104)
> >>>
> >>> at
> >>> sun.net.www.protocol.http.HttpURLConnection.plainConnect(HttpURLConnection.java:998)
> >>>
> >>> at
> >>> sun.net.www.protocol.http.HttpURLConnection.connect(HttpURLConnection.java:932)
> >>>
> >>> at org.apache.spark.util.Utils$.doFetchFile(Utils.scala:555)
> >>>
> >>> at org.apache.spark.util.Utils$.fetchFile(Utils.scala:369)
> >>>
> >>> at
> >>> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:405)
> >>>
> >>> at
> >>> org.apache.spark.executor.Executor$$anonfun$org$apache$spark$executor$Executor$$updateDependencies$5.apply(Executor.scala:397)
> >>>
> >>> at
> >>> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:772)
> >>>
> >>> at
> >>> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> >>>
> >>> at
> >>> scala.collection.mutable.HashMap$$anonfun$foreach$1.apply(HashMap.scala:98)
> >>>
> >>> at
> >>> scala.collection.mutable.HashTable$class.foreachEntry(HashTable.scala:226)
> >>>
> >>> at scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:39)
> >>>
> >>> at scala.collection.mutable.HashMap.foreach(HashMap.scala:98)
> >>>
> >>> at
> >>> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:771)
> >>>
> >>> at
> >>> org.apache.spark.executor.Executor.org <http://org.apache.spark.executor.executor.org/>$apache$spark$executor$Executor$$updateDependencies(Executor.scala:397)
> >>>
> >>> at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193)
> >>>
> >>> at
> >>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
> >>>
> >>> at
> >>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
> >>>
> >>> at java.lang.Thread.run(Thread.java:745)
> >>>
> >>>
> >>> Respectfully,
> >>>
> >>> Kyle Burke | Data Science Engineer
> >>> IgnitionOne - Marketing Technology. Simplified.
> 


Mime
View raw message