flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From iñaki williams <juanramall...@gmail.com>
Subject Re: Result comparison from 2 DataStream Sources
Date Sun, 12 Jun 2016 13:06:56 GMT
Hi Konstantin, and don't worry for your response.

Your gist was perfect. Some days after my email I modify some things and I
almost got it. Besides, as some other people recommended me, I have added a
WindowStream because I am not looking for a global maximum, the value could
increase or decrease.

I am currently working on the KeySelector, but I think that it is not going
to work, I have this:

//anotherDataStream is the union of two datastreams with different or maybe
similar matchName. I want to compare them and get the key.

 DataStream<Informacion> example=anotherDatastream.keyBy(new
KeySelector<Informacion, String>() {


            public String getKey(Informacion info) {

 //The algorithm that emits a score based on similarity

                SimilarityStrategy strategy3 = new
DiceCoefficientStrategy();
                 StringSimilarityService service = new
StringSimilarityServiceImpl(strategy3);
                 double score = service.score(info.matchName,*
info2.matchName*); // Score is 0.90...
                 if(score<0.75)
                 {
                  return info.nombrePartido;
                 }
                 else{
                  return null;
                 }


                 }
           });

As you can see, I need two strings to compare between each other and I
think is not possible to do it with the KeySelector.


Thanks for everything, you have helped me a lot.



El viernes, 10 de junio de 2016, Konstantin Knauf <
konstantin.knauf@tngtech.com> escribió:

> Hi again,
>
> and again sorry for the late response.
>
> Regarding your first question: You can use a Key Selector Function [1].
>
> Regarding your second question: If I understand your requirement
> correctly, this is already happening in my gist.
>
> By taking the union of both streams the local and away max are taken
> over both streams. The coFlatMap holds the currentAwayMax and
> currentAwayLocal and evaluates your condition if either of the maximum
> values changes.
>
> Does this help?
>
> Cheers,
>
> Konstantin
>
>
> [1]
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.0/apis/common/index.html#specifying-keys
>
> On 02.06.2016 23:20, iñaki williams wrote:
> > Hi again! Thanks for your tips and gists, those are being really
> > helpful. However, I probably didn't express my idea properly and it has
> > been a litle misunderstood. I have been thinking about how to do this
> > during these days and I will try to put a concrete example of what I
> > want and if my way is the correct one. I have made a new diagram with
> > "real" examples.
> >
> >
> > First question, as you can notice, "Match name" is quite similar but it
> > is not always the same. Name from DataStream 1 could be "Rafa Nadal"
> > while in the 2nd DataStream the name of the match could be "R. Nadal",
> > so is there any way to rewrite the .keyby() method in order to use a
> > library that compares Strings and match it according to similarities and
> > not because of the exact name?.
> >
> > Second question, In the case that I could key those tennis matches, when
> > I am doing the CoFlatMap and having these two matches for example:
> >
> > DataStream1 <"Rafa Nadal - Roger Federer", 1.90, 2.10>
> > DataStream2 <"Rafa Nadal - Roger Federer", 2.20, 1.80>
> >
> > I would like to take the biggest values from both fields, in this case
> > it should be: 2.20 and 2.10, being the final result as:
> > <"Rafa Nadal - Roger Federer", 2.20, 2.10>.
> >
> > I don't know if I am mistaken but, I think I could use the valueState to
> > save those values and compare it?
> >
> > Thanks for your time! :)
> > Very Grateful.
> >
> >
> > 2016-05-29 17:32 GMT+02:00 Konstantin Knauf
> > <konstantin.knauf@tngtech.com <mailto:konstantin.knauf@tngtech.com>>:
> >
> >     Hi again,
> >
> >     from your diagram I have put together a gist, which I think does the
> >     job. I haven't had the time to test it though :(
> >
> >     https://gist.github.com/knaufk/d1312503b99ee51554a70c9a22abe7e5
> >
> >     If you have any questions, let me know. It sometimes just takes a
> while
> >     until I answer ;)
> >
> >     Cheers,
> >
> >     Konstantin
> >
> >     On 28.05.2016 13:49, iñaki williams wrote:
> >     > Hello again! :)
> >     >
> >     > I have been checking the solution that you proprosed and but I
> don't
> >     > really get how the KeyValueState helps on it. Could you please
> explain
> >     > it a little bmore?
> >     >
> >     > I have drawn a diagram to make what I want clear, notice that the
> middle
> >     > table doesn't need to be a table, it is just what I want and I
> don't
> >     > have enough knowledge on Flink to know how to do it.
> >     >
> >     >
> >     > Thanks for your time!
> >     >
> >     >
> >     >
> >     > 2016-05-26 20:33 GMT+02:00 Konstantin Knauf
> >     > <konstantin.knauf@tngtech.com
> >     <mailto:konstantin.knauf@tngtech.com>
> >     <mailto:konstantin.knauf@tngtech.com
> >     <mailto:konstantin.knauf@tngtech.com>>>:
> >     >
> >     >     Hi,
> >     >
> >     >     interesting use case, you are looking for sure bets, I guess ;)
> >     >
> >     >     Well, I think, what you want to then is probably to use a
> >     >     ConnectedStream, which you keyBy the "name" of both streams.
> >     >
> >     >     The you can use CoFlatMap for comparison. You can use a
> KeyValueState zu
> >     >     save prices. In each map you can then check if you have a
> price for this
> >     >     name already saved from the other stream and if not save the
> price. The
> >     >     challenge will be to clean up state.
> >     >
> >     >     Let me know, if this works out.
> >     >
> >     >     Cheers,
> >     >
> >     >     Konstantin
> >     >
> >     >     On 26.05.2016 20 <tel:26.05.2016%2020>
> <tel:26.05.2016%2020>:01, iñaki
> >     williams wrote:
> >     >     > Hi!
> >     >     >
> >     >     > I will explain it with more details:
> >     >     >
> >     >     > I am comparing real time sport odds from two different
> betting Webpages.
> >     >     >
> >     >     > Assuming that I get just one java object (in reality I
> should get a List
> >     >     > of in-play matches), for each DataStream and assuming that
> the name is
> >     >     > the same of course, what I want to do is compare both price
> attributes
> >     >     > in "real time", I am only interested on the currently price,
> not the
> >     >     > previous one. Example:
> >     >     >
> >     >     > What is the price for the Event 1  from website "X" RIGHT
> NOW?
> >     >     >
> >     >     > JavaObjectX.price
> >     >     >
> >     >     > What is the price for the Event 1  from website "Y" RIGHT
> NOW?
> >     >     >
> >     >     > JavaObjectY.price
> >     >     >
> >     >     >
> >     >     > Compare both attributes
> >     >     > Get a result depending on that comparison
> >     >     >
> >     >     > My java object doesn't have a timestamp, but I think I
> should use it right?
> >     >     >
> >     >     >
> >     >     > Thanks!
> >     >     >
> >     >     >
> >     >     >
> >     >     >
> >     >     >
> >     >     >
> >     >     >
> >     >     > 2016-05-26 19:48 GMT+02:00 Konstantin Knauf
> >     >     > <konstantin.knauf@tngtech.com <mailto:
> konstantin.knauf@tngtech.com>
> >     >     <mailto:konstantin.knauf@tngtech.com <mailto:
> konstantin.knauf@tngtech.com>>
> >     >     <mailto:konstantin.knauf@tngtech.com
> >     <mailto:konstantin.knauf@tngtech.com>
> >     >     <mailto:konstantin.knauf@tngtech.com <mailto:
> konstantin.knauf@tngtech.com>>>>:
> >     >     >
> >     >     >     Hi,
> >     >     >
> >     >     >     let me first check, if I understand your requirements
> correctly. I
> >     >     >     assume you want to compare attribute price for objects
> with the same
> >     >     >     name only, right?
> >     >     >
> >     >     >     Further, I assume the objects are some kind of offer/bid
> with a
> >     >     >     timestamp?
> >     >     >
> >     >     >     I think the solution heavily depends on how the records,
> which should be
> >     >     >     compared relate in time. So basically, if an object
> arrives from one
> >     >     >     source, which time window of objects from the other
> stream should be
> >     >     >     considered for comparison?
> >     >     >
> >     >     >     Cheers,
> >     >     >
> >     >     >     Konstantin
> >     >     >
> >     >     >     On 26.05.2016 18 <tel:26.05.2016%2018>
> >     <tel:26.05.2016%2018> <tel:26.05.2016%2018>:55, iñaki
> >     >     williams wrote:
> >     >     >     > Hi!
> >     >     >     >
> >     >     >     > I am working on something quite similar to the
> stockPrice example that
> >     >     >     > is posted on the webpage
> >     >     >     > (
> https://flink.apache.org/news/2015/02/09/streaming-example.html)
> >     >     >     >
> >     >     >     > I am extracting some data from 2 different webpages
> and I
> >     >     >     represent the
> >     >     >     > result using a java object. The diagram could be
> something like this:
> >     >     >     >
> >     >     >     > DataStream1--------JavaObject(name, price) --\
> >     >     >     >
>             \
> >     >     >     >
>               [
> >     >     >     > how to compare result?]
> >     >     >     >
>             /
> >     >     >     > DataStream2--------JavaObject(name, price) --/
> >     >     >     >
> >     >     >     >
> >     >     >     > What I want to do is to get the attribute price from
> both data objects
> >     >     >     > and compare it between each other / make some math
> operations. For
> >     >     >     > example, if the first JavaObject.price is bigger than
> the second
> >     >     >     > JavaObject.price, then show a message.
> >     >     >     >
> >     >     >     >
> >     >     >     > Which is the (best) way of doing this? I am new using
> Flink and I am
> >     >     >     > quite lost :)
> >     >     >     >
> >     >     >     >
> >     >     >     > Thanks!
> >     >     >
> >     >     >     --
> >     >     >     Konstantin Knauf * konstantin.knauf@tngtech.com <mailto:
> konstantin.knauf@tngtech.com>
> >     <mailto:konstantin.knauf@tngtech.com
> >     <mailto:konstantin.knauf@tngtech.com>>
> >     >     >     <mailto:konstantin.knauf@tngtech.com
> >     <mailto:konstantin.knauf@tngtech.com>
> >     >     <mailto:konstantin.knauf@tngtech.com
> >     <mailto:konstantin.knauf@tngtech.com>>> * +49-174-3413182
> >     <tel:%2B49-174-3413182>
> >     >     <tel:%2B49-174-3413182>
> >     >     >     <tel:%2B49-174-3413182>
> >     >     >     TNG Technology Consulting GmbH, Betastr. 13a, 85774
> >     Unterföhring
> >     >     >     Geschäftsführer: Henrik Klagges, Christoph Stock, Dr.
> Robert
> >     >     Dahlke
> >     >     >     Sitz: Unterföhring * Amtsgericht München * HRB 135082
> >     >     >
> >     >     >
> >     >
> >     >     --
> >     >     Konstantin Knauf * konstantin.knauf@tngtech.com
> >     <mailto:konstantin.knauf@tngtech.com>
> >     >     <mailto:konstantin.knauf@tngtech.com
> >     <mailto:konstantin.knauf@tngtech.com>> * +49-174-3413182
> >     <tel:%2B49-174-3413182>
> >     >     <tel:%2B49-174-3413182>
> >     >     TNG Technology Consulting GmbH, Betastr. 13a, 85774
> Unterföhring
> >     >     Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert
> >     Dahlke
> >     >     Sitz: Unterföhring * Amtsgericht München * HRB 135082
> >     >
> >     >
> >
> >     --
> >     Konstantin Knauf * konstantin.knauf@tngtech.com
> >     <mailto:konstantin.knauf@tngtech.com> * +49-174-3413182
> >     <tel:%2B49-174-3413182>
> >     TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> >     Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> >     Sitz: Unterföhring * Amtsgericht München * HRB 135082
> >
> >
>
> --
> Konstantin Knauf * konstantin.knauf@tngtech.com * +49-174-3413182
> TNG Technology Consulting GmbH, Betastr. 13a, 85774 Unterföhring
> Geschäftsführer: Henrik Klagges, Christoph Stock, Dr. Robert Dahlke
> Sitz: Unterföhring * Amtsgericht München * HRB 135082
>

Mime
View raw message