beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Ismaël Mejía (JIRA) <j...@apache.org>
Subject [jira] [Assigned] (BEAM-2787) IllegalArgumentException with MongoDbIO with empty PCollection
Date Fri, 08 Sep 2017 10:03:00 GMT

     [ https://issues.apache.org/jira/browse/BEAM-2787?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]

Ismaël Mejía reassigned BEAM-2787:
----------------------------------

    Assignee: Ismaël Mejía

> IllegalArgumentException with MongoDbIO with empty PCollection
> --------------------------------------------------------------
>
>                 Key: BEAM-2787
>                 URL: https://issues.apache.org/jira/browse/BEAM-2787
>             Project: Beam
>          Issue Type: Bug
>          Components: sdk-java-extensions
>    Affects Versions: 2.0.0
>            Reporter: Siddharth Mittal
>            Assignee: Ismaël Mejía
>
> We have read a file and created a PCollection of Strings where each record represent
one line on the file. There after we have multiple PTransforms to validate the records. In
the end The Pcollection was filtered into two PCollection , one with all valid records and
one with all invalid records. Now both the pcollections are stored to respective mongo db
collections . In case if any of these pcollection is empty we are facing below exception trace
:
> 17/08/11 05:30:20 INFO SparkContext: Successfully stopped SparkContext
> Exception in thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: java.lang.IllegalArgumentException:
state should be: writes is not an empty list
>         at org.apache.beam.runners.spark.SparkPipelineResult.beamExceptionFrom(SparkPipelineResult.java:66)
>         at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:99)
>         at org.apache.beam.runners.spark.SparkPipelineResult.waitUntilFinish(SparkPipelineResult.java:87)
>         at com.hsbc.rsl.applications.DataTransformation.StoreRiskToL3(DataTransformation.java:106)
>         at com.hsbc.rsl.applications.DataTransformation.lambda$executeAndWaitUntilFinish$5(DataTransformation.java:55)
>         at com.hsbc.rsl.applications.DataTransformation$$Lambda$10/737852016.accept(Unknown
Source)
>         at java.util.stream.ForEachOps$ForEachOp$OfRef.accept(ForEachOps.java:184)
>         at java.util.stream.IntPipeline$4$1.accept(IntPipeline.java:250)
>         at java.util.stream.Streams$RangeIntSpliterator.forEachRemaining(Streams.java:110)
>         at java.util.Spliterator$OfInt.forEachRemaining(Spliterator.java:693)
>         at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:512)
>         at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:502)
>         at java.util.stream.ForEachOps$ForEachOp.evaluateSequential(ForEachOps.java:151)
>         at java.util.stream.ForEachOps$ForEachOp$OfRef.evaluateSequential(ForEachOps.java:174)
>         at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
>         at java.util.stream.ReferencePipeline.forEach(ReferencePipeline.java:418)
>         at com.hsbc.rsl.applications.DataTransformation.executeAndWaitUntilFinish(DataTransformation.java:51)
>         at com.hsbc.rsl.applications.TransformationProcessor.main(TransformationProcessor.java:23)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:497)
>         at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
>         at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
>         at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
>         at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
>         at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
> Caused by: java.lang.IllegalArgumentException: state should be: writes is not an empty
list
>         at com.mongodb.assertions.Assertions.isTrueArgument(Assertions.java:99)
>         at com.mongodb.operation.MixedBulkWriteOperation.<init>(MixedBulkWriteOperation.java:95)
>         at com.mongodb.MongoCollectionImpl.insertMany(MongoCollectionImpl.java:323)
>         at com.mongodb.MongoCollectionImpl.insertMany(MongoCollectionImpl.java:311)
>         at org.apache.beam.sdk.io.mongodb.MongoDbIO$Write$WriteFn.flush(MongoDbIO.java:513)
>         at org.apache.beam.sdk.io.mongodb.MongoDbIO$Write$WriteFn.finishBundle(MongoDbIO.java:506)
> 17/08/11 05:30:20 INFO RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
> There is no exception when we have at least one record in the pcollection.  



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Mime
View raw message