carbondata-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From qiang...@apache.org
Subject carbondata git commit: [CARBONDATA-2212] Event fired while updating the status during streaming handoff
Date Thu, 01 Mar 2018 09:03:46 GMT
Repository: carbondata
Updated Branches:
  refs/heads/master 0c8024907 -> e078ef4d9


[CARBONDATA-2212] Event fired while updating the status during streaming handoff

This closes #2009


Project: http://git-wip-us.apache.org/repos/asf/carbondata/repo
Commit: http://git-wip-us.apache.org/repos/asf/carbondata/commit/e078ef4d
Tree: http://git-wip-us.apache.org/repos/asf/carbondata/tree/e078ef4d
Diff: http://git-wip-us.apache.org/repos/asf/carbondata/diff/e078ef4d

Branch: refs/heads/master
Commit: e078ef4d91833318534fc19680ba811aac5d4248
Parents: 0c80249
Author: rahulforallp <rahul.kumar@knoldus.in>
Authored: Tue Feb 27 22:00:40 2018 +0530
Committer: QiangCai <qiangcai@qq.com>
Committed: Thu Mar 1 17:00:30 2018 +0800

----------------------------------------------------------------------
 .../apache/carbondata/streaming/StreamHandoffRDD.scala | 13 +++++++++++++
 1 file changed, 13 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/carbondata/blob/e078ef4d/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
----------------------------------------------------------------------
diff --git a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
index 35a3513..b03ee1e 100644
--- a/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
+++ b/streaming/src/main/scala/org/apache/carbondata/streaming/StreamHandoffRDD.scala
@@ -39,9 +39,11 @@ import org.apache.carbondata.core.scan.result.iterator.RawResultIterator
 import org.apache.carbondata.core.statusmanager.{LoadMetadataDetails, SegmentStatus, SegmentStatusManager}
 import org.apache.carbondata.core.util.CarbonUtil
 import org.apache.carbondata.core.util.path.CarbonStorePath
+import org.apache.carbondata.events.{OperationContext, OperationListenerBus}
 import org.apache.carbondata.hadoop.{CarbonInputSplit, CarbonProjection}
 import org.apache.carbondata.hadoop.api.CarbonTableInputFormat
 import org.apache.carbondata.hadoop.streaming.{CarbonStreamInputFormat, CarbonStreamRecordReader}
+import org.apache.carbondata.processing.loading.events.LoadEvents.{LoadTablePostStatusUpdateEvent,
LoadTablePreStatusUpdateEvent}
 import org.apache.carbondata.processing.loading.model.CarbonLoadModel
 import org.apache.carbondata.processing.merger.{CompactionResultSortProcessor, CompactionType}
 import org.apache.carbondata.processing.util.CarbonLoaderUtil
@@ -307,7 +309,18 @@ object StreamHandoffRDD {
         SegmentStatus.INSERT_IN_PROGRESS,
         carbonLoadModel.getFactTimeStamp,
         false)
+      val operationContext = new OperationContext()
+      val loadTablePreStatusUpdateEvent: LoadTablePreStatusUpdateEvent =
+        new LoadTablePreStatusUpdateEvent(
+          carbonLoadModel.getCarbonDataLoadSchema.getCarbonTable.getCarbonTableIdentifier,
+          carbonLoadModel)
+      OperationListenerBus.getInstance().fireEvent(loadTablePreStatusUpdateEvent, operationContext)
+
       CarbonLoaderUtil.recordNewLoadMetadata(newMetaEntry, carbonLoadModel, true, false)
+      val loadTablePostStatusUpdateEvent: LoadTablePostStatusUpdateEvent =
+        new LoadTablePostStatusUpdateEvent(carbonLoadModel)
+      OperationListenerBus.getInstance()
+        .fireEvent(loadTablePostStatusUpdateEvent, operationContext)
       // convert a streaming segment to columnar segment
       val status = new StreamHandoffRDD(
         sparkSession.sparkContext,


Mime
View raw message