flink-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Maximilian Alber <alber.maximil...@gmail.com>
Subject Re: java.lang.IllegalStateException: This stub is not part of an iteration step function.
Date Wed, 07 Jan 2015 09:58:57 GMT
Thanks!

I made a workaround using a pseudo join with the workset. But now I'm back
to the nested iteration issue. Is there any chance that this feature will
be available in the next time(2-3 weeks)?

Cheers,
Max

On Wed, Jan 7, 2015 at 10:11 AM, Aljoscha Krettek <aljoscha@apache.org>
wrote:

> Hi,
> the problem is that your operations do not depend on the
> iteration-step-dataset. Your code could be rewritten like this to make it
> more obvious:
>
>  val emptyDataSet = env.fromCollection[Vector](Seq())
>  // here we call the function
>      val center = calcCenter(env, X, residual, randoms, -1)
>
>      val centerX = (X subtV center) map {_ square}
>      val x = calcWidthHeight(env, centerX, residual, widthCandidates,
> center)
>      val width = x._1
>      val height = x._2
>
>      residual = residual - (getKernelVector(X, center, width) multV height)
>
>      val centerOut = center map {x => new Vector(0, x.values)}
>      val widthOut = width map {x => new Vector(1, x.values)}
>      val heightOut = height map {x => new Vector(2, x.values)}
>      val stepModel = centerOut union widthOut union height
>
>    // here the loop begins
>    val model = emptyDataSet.iterate(config.iterations){
>      stepSet =>
>
>      stepSet union (stepModel.map(new RichMapFunction[Vector, Vector]{
>      def map(x: Vector) = new
> Vector((getIterationRuntimeContext.getSuperstepNumber-1)*3+x.id, x.values)
>       }))
>     }
>
>     model map { _ toString } writeAsText config.outFile
> }
>
> This means that for the system these operations are considered to be
> outside the loop, thus you don't have access to the IterationContext.
>
> Regards,
> Aljoscha
>
> On Tue, Jan 6, 2015 at 4:13 PM, Maximilian Alber <
> alber.maximilian@gmail.com> wrote:
>
>> Hey Flinksters!
>>
>> ran into this error
>>
>> java.lang.IllegalStateException: This stub is not part of an iteration
>> step function.
>>
>> below is my code, the concerning parts are marked. Is it a problem, that
>> the stub is in a function that is called from the iteration step function?
>>
>>
>> Code:
>>
>>     ......
>>
>>    val emptyDataSet = env.fromCollection[Vector](Seq())
>>    // here the loop begins
>>    val model = emptyDataSet.iterate(config.iterations){
>>      stepSet =>
>>      // here we call the function
>>      val center = calcCenter(env, X, residual, randoms, -1)
>>
>>      val centerX = (X subtV center) map {_ square}
>>      val x = calcWidthHeight(env, centerX, residual, widthCandidates,
>> center)
>>      val width = x._1
>>      val height = x._2
>>
>>      residual = residual - (getKernelVector(X, center, width) multV
>> height)
>>
>>      val centerOut = center map {x => new Vector(0, x.values)}
>>      val widthOut = width map {x => new Vector(1, x.values)}
>>      val heightOut = height map {x => new Vector(2, x.values)}
>>      val stepModel = centerOut union widthOut union height
>>
>>      stepSet union (stepModel.map(new RichMapFunction[Vector, Vector]{
>>      def map(x: Vector) = new
>> Vector((getIterationRuntimeContext.getSuperstepNumber-1)*3+x.id,
>> x.values)
>>       }))
>>     }
>>
>>     model map { _ toString } writeAsText config.outFile
>> }
>>
>>
>> def calcCenter(env: ExecutionEnvironment, X: DataSet[Vector], residual:
>> DataSet[Vector], randoms: DataSet[Vector], iteration: Int): DataSet[Vector]
>> = {
>>     val residual_2 = residual * residual
>>      val randomValue = if(iteration >= 0)
>>         (randoms filter {_.id == iteration})
>>      else
>>        // and this filter function causes the error
>>        (randoms.filter(new RichFilterFunction[Vector]{
>>            def filter(x: Vector) = x.id ==
>> (getIterationRuntimeContext.getSuperstepNumber-1)
>>      }))
>>     val ys = ((residual_2 sumV() neutralize) * (randomValue neutralize))
>>
>>    .....
>>
>> The full errror:
>>
>> Error: The program execution failed: java.lang.IllegalStateException:
>> This stub is not part of an iteration step function.
>> at
>> org.apache.flink.api.common.functions.AbstractRichFunction.getIterationRuntimeContext(AbstractRichFunction.java:59)
>> at bumpboost.BumpBoost$$anon$2.filter(BumpBoost.scala:119)
>> at bumpboost.BumpBoost$$anon$2.filter(BumpBoost.scala:118)
>> at
>> org.apache.flink.api.java.operators.translation.PlanFilterOperator$FlatMapFilter.flatMap(PlanFilterOperator.java:47)
>> at
>> org.apache.flink.runtime.operators.chaining.ChainedFlatMapDriver.collect(ChainedFlatMapDriver.java:79)
>> at
>> org.apache.flink.runtime.operators.chaining.ChainedMapDriver.collect(ChainedMapDriver.java:78)
>> at
>> org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:176)
>> at
>> org.apache.flink.runtime.execution.RuntimeEnvironment.run(RuntimeEnvironment.java:257)
>> at java.lang.Thread.run(Thread.java:745)
>>
>>
>> I append my program with the input files. To reproduce the error use
>> following command line args, please replace in_file, random_file,
>> width_candidates with the provided ones, and put for out_file the path you
>> want to:
>>
>> flink run -v bump_boost-0.1.jar -c bumpboost.Job in_file=foobar
>> out_file=/tmp/tmphIHeEs random_file=/tmp/tmpeKQPZk dimensions=1 N=100
>> width_candidates_file=/tmp/tmpTJLKMm N_width_candidates=50 iterations=30
>> multi_bump_boost=0 gradient_descent_iterations=30 cache=False
>> start_width=1.0 min_width=-4 max_width=6 min_width_update=1e-08
>> max_width_update=10
>>
>> Thanks!
>> Cheers,
>> Max
>>
>
>

Mime
View raw message