storm-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From kabh...@apache.org
Subject [1/3] storm git commit: Added parseInputStream() method
Date Thu, 16 Jul 2015 22:04:30 GMT
Repository: storm
Updated Branches:
  refs/heads/master e2157678f -> 9c3dfc986


Added parseInputStream() method

Included reusable parseInputStream for use by parseFile and parseResource. Also allows a path
for programatically creating topologies without the need to write to / read from disk.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/11823a72
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/11823a72
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/11823a72

Branch: refs/heads/master
Commit: 11823a722749ba77c4b207ad5d369df69fd2551b
Parents: e215767
Author: Brendan W. Lyon <blyon@datainterfuse.com>
Authored: Thu Jul 16 13:48:00 2015 -0400
Committer: Jungtaek Lim <kabhwan@gmail.com>
Committed: Fri Jul 17 06:57:28 2015 +0900

----------------------------------------------------------------------
 .../apache/storm/flux/parser/FluxParser.java    | 57 +++++++++++---------
 1 file changed, 32 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/11823a72/external/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
----------------------------------------------------------------------
diff --git a/external/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
b/external/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
index 72f8a8e..27ff677 100644
--- a/external/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
+++ b/external/flux/flux-core/src/main/java/org/apache/storm/flux/parser/FluxParser.java
@@ -44,40 +44,47 @@ public class FluxParser {
     // TODO refactor input stream processing (see parseResource() method).
     public static TopologyDef parseFile(String inputFile, boolean dumpYaml, boolean processIncludes,
                                         String propertiesFile, boolean envSub) throws IOException
{
-        Yaml yaml = yaml();
+   
         FileInputStream in = new FileInputStream(inputFile);
-        // TODO process properties, etc.
-        TopologyDef topology = loadYaml(yaml, in, propertiesFile, envSub);
+        TopologyDef topology = parseInputStream(in, dumpYaml, processIncludes, propertiesFile,
envSub);
         in.close();
-        if(dumpYaml){
-            dumpYaml(topology, yaml);
-        }
-        if(processIncludes) {
-            return processIncludes(yaml, topology, propertiesFile, envSub);
-        } else {
-            return topology;
-        }
+        
+        return topology;
     }
 
     public static TopologyDef parseResource(String resource, boolean dumpYaml, boolean processIncludes,
                                             String propertiesFile, boolean envSub) throws
IOException {
-        Yaml yaml = yaml();
+        
         InputStream in = FluxParser.class.getResourceAsStream(resource);
-        if(in == null){
-            LOG.error("Unable to load classpath resource: " + resource);
-            System.exit(1);
-        }
-        TopologyDef topology = loadYaml(yaml, in, propertiesFile, envSub);
+        TopologyDef topology = parseInputStream(in, dumpYaml, processIncludes, propertiesFile,
envSub);
         in.close();
-        if(dumpYaml){
-            dumpYaml(topology, yaml);
-        }
-        if(processIncludes) {
-            return processIncludes(yaml, topology, propertiesFile, envSub);
-        } else {
-            return topology;
-        }
+        
+        return topology;
     }
+    
+    public static TopologyDef parseInputStream(InputStream inputStream, boolean dumpYaml,
boolean processIncludes,
+            String propertiesFile, boolean envSub) throws IOException {
+		
+    	Yaml yaml = yaml();
+    	
+		if (inputStream == null) {
+			LOG.error("Unable to load input stream");
+			System.exit(1);
+		}
+		
+		TopologyDef topology = loadYaml(yaml, inputStream, propertiesFile, envSub);
+		inputStream.close();
+		
+		if (dumpYaml) {
+			dumpYaml(topology, yaml);
+		}
+		
+		if (processIncludes) {
+			return processIncludes(yaml, topology, propertiesFile, envSub);
+		} else {
+			return topology;
+		}
+	}
 
     private static TopologyDef loadYaml(Yaml yaml, InputStream in, String propsFile, boolean
envSubstitution) throws IOException {
         ByteArrayOutputStream bos = new ByteArrayOutputStream();


Mime
View raw message