flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Robert Metzger <rmetz...@apache.org>
Subject Re: java.lang.IllegalStateException: This stub is not part of an iteration step function.
Date Thu, 08 Jan 2015 15:16:07 GMT
Hi Max,

No. I think there is nobody in the Flink community who has plans to
implement nested iterations in the near future.

On Wed, Jan 7, 2015 at 10:58 AM, Maximilian Alber <
alber.maximilian@gmail.com> wrote:

> Thanks!
>
> I made a workaround using a pseudo join with the workset. But now I'm back
> to the nested iteration issue. Is there any chance that this feature will
> be available in the next time(2-3 weeks)?
>
> Cheers,
> Max
>
> On Wed, Jan 7, 2015 at 10:11 AM, Aljoscha Krettek <aljoscha@apache.org>
> wrote:
>
>> Hi,
>> the problem is that your operations do not depend on the
>> iteration-step-dataset. Your code could be rewritten like this to make it
>> more obvious:
>>
>>  val emptyDataSet = env.fromCollection[Vector](Seq())
>>  // here we call the function
>>      val center = calcCenter(env, X, residual, randoms, -1)
>>
>>      val centerX = (X subtV center) map {_ square}
>>      val x = calcWidthHeight(env, centerX, residual, widthCandidates,
>> center)
>>      val width = x._1
>>      val height = x._2
>>
>>      residual = residual - (getKernelVector(X, center, width) multV
>> height)
>>
>>      val centerOut = center map {x => new Vector(0, x.values)}
>>      val widthOut = width map {x => new Vector(1, x.values)}
>>      val heightOut = height map {x => new Vector(2, x.values)}
>>      val stepModel = centerOut union widthOut union height
>>
>>    // here the loop begins
>>    val model = emptyDataSet.iterate(config.iterations){
>>      stepSet =>
>>
>>      stepSet union (stepModel.map(new RichMapFunction[Vector, Vector]{
>>      def map(x: Vector) = new
>> Vector((getIterationRuntimeContext.getSuperstepNumber-1)*3+x.id,
>> x.values)
>>       }))
>>     }
>>
>>     model map { _ toString } writeAsText config.outFile
>> }
>>
>> This means that for the system these operations are considered to be
>> outside the loop, thus you don't have access to the IterationContext.
>>
>> Regards,
>> Aljoscha
>>
>> On Tue, Jan 6, 2015 at 4:13 PM, Maximilian Alber <
>> alber.maximilian@gmail.com> wrote:
>>
>>> Hey Flinksters!
>>>
>>> ran into this error
>>>
>>> java.lang.IllegalStateException: This stub is not part of an iteration
>>> step function.
>>>
>>> below is my code, the concerning parts are marked. Is it a problem, that
>>> the stub is in a function that is called from the iteration step function?
>>>
>>>
>>> Code:
>>>
>>>     ......
>>>
>>>    val emptyDataSet = env.fromCollection[Vector](Seq())
>>>    // here the loop begins
>>>    val model = emptyDataSet.iterate(config.iterations){
>>>      stepSet =>
>>>      // here we call the function
>>>      val center = calcCenter(env, X, residual, randoms, -1)
>>>
>>>      val centerX = (X subtV center) map {_ square}
>>>      val x = calcWidthHeight(env, centerX, residual, widthCandidates,
>>> center)
>>>      val width = x._1
>>>      val height = x._2
>>>
>>>      residual = residual - (getKernelVector(X, center, width) multV
>>> height)
>>>
>>>      val centerOut = center map {x => new Vector(0, x.values)}
>>>      val widthOut = width map {x => new Vector(1, x.values)}
>>>      val heightOut = height map {x => new Vector(2, x.values)}
>>>      val stepModel = centerOut union widthOut union height
>>>
>>>      stepSet union (stepModel.map(new RichMapFunction[Vector, Vector]{
>>>      def map(x: Vector) = new
>>> Vector((getIterationRuntimeContext.getSuperstepNumber-1)*3+x.id,
>>> x.values)
>>>       }))
>>>     }
>>>
>>>     model map { _ toString } writeAsText config.outFile
>>> }
>>>
>>>
>>> def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector], residual:
>>> DataSet[Vector], randoms: DataSet[Vector], iteration: Int): DataSet[Vector]
>>> = {
>>>     val residual_2 = residual * residual
>>>      val randomValue = if(iteration >= 0)
>>>         (randoms filter {_.id == iteration})
>>>      else
>>>        // and this filter function causes the error
>>>        (randoms.filter(new RichFilterFunction[Vector]{
>>>            def filter(x: Vector) = x.id ==
>>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>>      }))
>>>     val ys = ((residual_2 sumV() neutralize) * (randomValue neutralize))
>>>
>>>    .....
>>>
>>> The full errror:
>>>
>>> Error: The program execution failed: java.lang.IllegalStateException:
>>> This stub is not part of an iteration step function.
>>> at
>>> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>>> at bumpboost.BumpBoost$$anon$2.filter(BumpBoost.scala:119)
>>> at bumpboost.BumpBoost$$anon$2.filter(BumpBoost.scala:118)
>>> at
>>> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>>> at
>>> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>>> at
>>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>>> at
>>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176)
>>> at
>>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257)
>>> at java.lang.Thread.run(Thread.java:745)
>>>
>>>
>>> I append my program with the input files. To reproduce the error use
>>> following command line args, please replace in_file, random_file,
>>> width_candidates with the provided ones, and put for out_file the path you
>>> want to:
>>>
>>> flink run -v bump_boost-0.1.jar -c bumpboost.Job in_file=foobar
>>> out_file=/tmp/tmphIHeEs random_file=/tmp/tmpeKQPZk dimensions=1 N=100
>>> width_candidates_file=/tmp/tmpTJLKMm N_width_candidates=50 iterations=30
>>> multi_bump_boost=0 gradient_descent_iterations=30 cache=False
>>> start_width=1.0 min_width=-4 max_width=6 min_width_update=1e-08
>>> max_width_update=10
>>>
>>> Thanks!
>>> Cheers,
>>> Max
>>>
>>
>>
>

Mime
View raw message