beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "Rick Lin (JIRA)" <j...@apache.org>
Subject [jira] [Created] (BEAM-3210) The problem about the use of waitUntilFinish() in DirectRunner
Date Fri, 17 Nov 2017 05:47:01 GMT
Rick Lin created BEAM-3210:
------------------------------

             Summary: The problem about the use of waitUntilFinish() in DirectRunner
                 Key: BEAM-3210
                 URL: https://issues.apache.org/jira/browse/BEAM-3210
             Project: Beam
          Issue Type: Bug
          Components: runner-direct
    Affects Versions: 2.1.0
         Environment: Ubuntn 14.04.3 LTS
JDK 1.8
Beam 2.1.0
Maven 3.5.0
            Reporter: Rick Lin
            Assignee: Thomas Groh
             Fix For: 2.1.0


Dear sir,

The description of waitUntilFinish() is "waits until the pipeline finishes and returns the
final status."

In my project, a static variable is used to record a PCollection context, where the static
variable is a data list type.

For this, I considered the "p.run().waitUntilFinish()"  to wait until the pipeline finishes
to avoid the loss of record in the data list.

Unfortunately, there is a problem that the data list{color:#d04437} *sometimes* {color}may
record the "null" value instead of the realistic value

In order to clearly explain, i provide my java code in the following.
{color:#14892c}"import java.io.IOException;
import java.util.ArrayList;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Mean;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.DoFn.ProcessContext;
import org.apache.beam.sdk.transforms.DoFn.ProcessElement;
public class BeamTestStatic extends Thread {
  public static ArrayList<Double> myList = new ArrayList<Double>();

  public static class StaticTest extends DoFn<Double, Void> {
    @ProcessElement		 
   	public void test(ProcessContext c) {
		  myList.add(c.element());	
   	}		
  }
 public static void main(String[] args) throws IOException {			
	StaticTest testa=new StaticTest();
	PipelineOptions options = PipelineOptionsFactory.create();
	Pipeline p = Pipeline.create(options);
  PCollection<Double> data=p.apply("Rawdata",
Create.of(1.0,2.0,3.0,4.0,5.0,6.0,7.0,8.0,9.0,10.0,));
	PCollection<Void> listtest= data.apply(ParDo.of(testa));
  p.run().waitUntilFinish();

  System.out.println("mylist_size_a="+myList.size());
		 
        for (int i = 0; i < myList.size(); i++) {
        	System.out.println("mylist_data="+myList.get(i));
        }
"{color}
In addition, the result of my code is:
{color:#205081}"mylist_size_a=10
mylist_data=null
mylist_data=4.0
mylist_data=5.0
mylist_data=9.0
mylist_data=6.0
mylist_data=1.0
mylist_data=7.0
mylist_data=8.0
mylist_data=10.0
mylist_data=3.0"{color}

If you have any further information, I am glad to be informed.

Thanks

Rick




--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message