spark-user mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From Steve Lewis <lordjoe2...@gmail.com>
Subject Re: writing to local files on a worker
Date Thu, 15 Nov 2018 23:34:46 GMT
I looked at Java's mechanism for creating temporary local files. I believe
they can be created, written to and passed to other programs on the system.
I wrote a proof of concept to send some Strings out and use the local
program cat to concatenate them and write the result to a local file .
Clearly there is a more complex program I want to target but is there
anything wrong with this approach

==========================================

package com.lordjoe.comet;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import scala.Option;
import scala.Tuple2;

import java.io.*;
import java.util.*;

/**
 * com.lordjoe.comet.SparkCatTest
 * Tests using Java temp files in a function call in
 */
public class SparkCatTest {

    public static final int NUMBER_REPEATS = 10; // make
NUMBER_REPEATS * NUMBER_REPEATS paris

    public static List<String> buildItems(String text, int repeats) {
        List<String>ret = new ArrayList<>() ;
        for (int i = 0; i < repeats; i++) {
             ret.add(text + i);

        }
        return ret;
    }


    public static void main(String[] args) throws Exception {
         SparkConf sparkConf = new SparkConf().setAppName("CatWithFiles");

        Option<String> option = sparkConf.getOption("spark.master");
        if (!option.isDefined()) {   // use local over nothing
            sparkConf.setMaster("local[*]");
        }

        JavaSparkContext ctx = new JavaSparkContext(sparkConf);
        List<String> start = buildItems("Start ",NUMBER_REPEATS ) ; //
make some data like   Start 9
        List<String> end = buildItems("End ",NUMBER_REPEATS ) ;    //
make some data like   End 9

        JavaRDD<String> startRdd = ctx.parallelize(start);
        JavaRDD<String> endRdd = ctx.parallelize(end);
        JavaPairRDD<String, String> cross =
startRdd.cartesian(endRdd); // make all pairs
         /**
         * dirty work is done here and used files to perform cat
         */
        JavaRDD<String> map = cross.map(new Function<Tuple2<String,
String>, String>() {

            @Override
            public String call(Tuple2<String, String> x) throws Exception {
                  File f1 = makeTempFile( );
                writeFile(f1, x._1);
                File f2 = makeTempFile( ) ;
                writeFile(f2, x._2);
                File f3 = makeTempFile( );
                boolean success = false;
                String ret = null;
                String f1path = f1.getAbsolutePath();
                String f2path = f2.getAbsolutePath();
                String f3Path = f3.getAbsolutePath();
                String command = "cat " + f1path + " "  + f2path + " >
"  + f3Path;
                if(osIsWindows())
                    success =  executeCommandLine("cmd","/c",command);
                else
                    success =  executeCommandLine("/bin/sh","-c",command);
                if(success) {
                    ret = readFile(f3);
                }

                f1.delete();
                f2.delete();
                f3.delete();
                return ret;
            }
        });

        // note the list returned by collect is immutable so we need a copy
        List<String> collect = new ArrayList(map.collect());
        Collections.sort(collect);
        for (String s : collect) {
            System.out.println(s);
        }
     }


        /**
         * true if running on Windows - otherwise Linux assumed
         * @return
         */
        public static synchronized boolean osIsWindows()
        {
            String osName = System.getProperty("os.name").toLowerCase();
            return  (osName.indexOf("windows") != -1);
        }

        /**
         * make a temporary file wiht a unique name and delete on exit
         * @return   non-null file
         */
        public static File makeTempFile( ) throws IOException {
            String prefix = UUID.randomUUID().toString();  // unique name
            String suffix = ".txt";
            File tempFile2 = File.createTempFile(prefix, suffix);
            tempFile2.deleteOnExit();  // drop on shutdown
            return tempFile2;
        }



    public static boolean executeCommandLine(String... args) throws
IOException, InterruptedException {
        ProcessBuilder p = new ProcessBuilder(args);
         Process process = p.start();
        int result = process.waitFor();
        int returnVal = process.exitValue();
        return returnVal == 0;
    }


    /**
       * @name writeFile
     * @param FileName name of file to create
     * @param data     date to write
     * @function write the string data to the file Filename
     */
    public static boolean writeFile(File f, String data)  throws IOException {
        PrintWriter out = new PrintWriter(new FileWriter(f));
        if (out != null) {
            out.print(data);
            out.close();
            return (true);
        }
        return (false);
        // failure
    }
    /**
       * @name readFile
     * @function write the string data to the file Filename
     * @param FileName name of file to read
     * @return contents of a text file
     */
    public static String readFile(File f ) throws IOException{
        LineNumberReader rdr = new LineNumberReader(new FileReader(f));
        StringBuilder sb = new StringBuilder();
        String line = rdr.readLine();
        while(line != null) {
            sb.append(line);
            sb.append("\n");
            line = rdr.readLine();
        }
        rdr.close();
        return sb.toString();
        // failure
    }

}


On Mon, Nov 12, 2018 at 9:20 AM Steve Lewis <lordjoe2000@gmail.com> wrote:

> I have been looking at Spark-Blast which calls Blast - a well known C++
> program in parallel -
> In my case I have tried to translate the C++ code to Java but am not
> getting the same results - it is convoluted -
> I have code that will call the program and read its results - the only
> real issue is the program wants local files -
> their use is convoluted with many seeks so replacement with streaming will
> not work -
> as long as my Java code can write to a local file for the duration of one
> call things can work -
>
> I considered in memory files as long as they can be passed to another
> program - I am willing to have OS specific code
> So my issue is I need to write 3 files - run a program and read one output
> file - then all files can be deleted -
> JNI calls will be hard - this is s program not a library and it is
> available for worker nodes
>
> On Sun, Nov 11, 2018 at 10:52 PM Jörn Franke <jornfranke@gmail.com> wrote:
>
>> Can you use JNI to call the c++ functionality directly from Java?
>>
>> Or you wrap this into a MR step outside Spark and use Hadoop Streaming
>> (it allows you to use shell scripts as mapper and reducer)?
>>
>> You can also write temporary files for each partition and execute the
>> software within a map step.
>>
>> Generally you should not call external applications from Spark.
>>
>> > Am 11.11.2018 um 23:13 schrieb Steve Lewis <lordjoe2000@gmail.com>:
>> >
>> > I have a problem where a critical step needs to be performed by  a
>> third party c++ application. I can send or install this program on the
>> worker nodes. I can construct  a function holding all the data this program
>> needs to process. The problem is that the program is designed to read and
>> write from the local file system. I can call the program from Java and read
>> its output as  a  local file - then deleting all temporary files but I
>> doubt that it is possible to get the program to read from hdfs or any
>> shared file system.
>> > My question is can a function running on a worker node create temporary
>> files and pass the names of these to a local process assuming everything is
>> cleaned up after the call?
>> >
>> > --
>> > Steven M. Lewis PhD
>> > 4221 105th Ave NE
>> > Kirkland, WA 98033
>> > 206-384-1340 (cell)
>> > Skype lordjoe_com
>> >
>>
>
>
> --
> Steven M. Lewis PhD
> 4221 105th Ave NE
> Kirkland, WA 98033
> 206-384-1340 (cell)
> Skype lordjoe_com
>
>

-- 
Steven M. Lewis PhD
4221 105th Ave NE
Kirkland, WA 98033
206-384-1340 (cell)
Skype lordjoe_com

Mime
View raw message