writing to local files on a worker

classic Classic list List threaded Threaded
5 messages Options
Reply | Threaded
Open this post in threaded view
|

writing to local files on a worker

lordjoe
I have a problem where a critical step needs to be performed by  a third party c++ application. I can send or install this program on the worker nodes. I can construct  a function holding all the data this program needs to process. The problem is that the program is designed to read and write from the local file system. I can call the program from Java and read its output as  a  local file - then deleting all temporary files but I doubt that it is possible to get the program to read from hdfs or any shared file system. 
My question is can a function running on a worker node create temporary files and pass the names of these to a local process assuming everything is cleaned up after the call?

--
Steven M. Lewis PhD
4221 105th Ave NE
Kirkland, WA 98033
206-384-1340 (cell)
Skype lordjoe_com

Reply | Threaded
Open this post in threaded view
|

Re: writing to local files on a worker

Joe-2
Hello,
You could try using mapPartitions function if you can send partial data
to your C++ program:

mapPartitions(func):
Similar to map, but runs separately on each partition (block) of the
RDD, so /func/ must be of type Iterator<T> => Iterator<U> when running
on an RDD of type T.

That way you can write partition data to temp file, call your C++ app,
then delete the temp file. Of course your data would be limited to all
rows in one partition.

Also the latest release of Spark (2.4.0) introduced barrier execution mode:
https://issues.apache.org/jira/browse/SPARK-24374

Maybe you could combine the two, just using mapPartitions will give you
single partition data only, and your app call will be repeated on all
nodes, not necessarily at the same time.

Spark's strong point is parallel execution, so what you're trying to do
kind of defeats that.
But if you do not need to combine all the data before calling your app
then you could do it.
Or you could split your job into Spark -> app -> Spark chain.
Good luck,

Joe



On 11/11/2018 02:13 PM, Steve Lewis wrote:

> I have a problem where a critical step needs to be performed by  a
> third party c++ application. I can send or install this program on the
> worker nodes. I can construct  a function holding all the data this
> program needs to process. The problem is that the program is designed
> to read and write from the local file system. I can call the program
> from Java and read its output as  a  local file - then deleting all
> temporary files but I doubt that it is possible to get the program to
> read from hdfs or any shared file system.
> My question is can a function running on a worker node create
> temporary files and pass the names of these to a local process
> assuming everything is cleaned up after the call?
>
> --
> Steven M. Lewis PhD
> 4221 105th Ave NE
> Kirkland, WA 98033
> 206-384-1340 (cell)
> Skype lordjoe_com
>


---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: writing to local files on a worker

Jörn Franke
In reply to this post by lordjoe
Can you use JNI to call the c++ functionality directly from Java?

Or you wrap this into a MR step outside Spark and use Hadoop Streaming (it allows you to use shell scripts as mapper and reducer)?

You can also write temporary files for each partition and execute the software within a map step.

Generally you should not call external applications from Spark.

> Am 11.11.2018 um 23:13 schrieb Steve Lewis <[hidden email]>:
>
> I have a problem where a critical step needs to be performed by  a third party c++ application. I can send or install this program on the worker nodes. I can construct  a function holding all the data this program needs to process. The problem is that the program is designed to read and write from the local file system. I can call the program from Java and read its output as  a  local file - then deleting all temporary files but I doubt that it is possible to get the program to read from hdfs or any shared file system.
> My question is can a function running on a worker node create temporary files and pass the names of these to a local process assuming everything is cleaned up after the call?
>
> --
> Steven M. Lewis PhD
> 4221 105th Ave NE
> Kirkland, WA 98033
> 206-384-1340 (cell)
> Skype lordjoe_com
>

---------------------------------------------------------------------
To unsubscribe e-mail: [hidden email]

Reply | Threaded
Open this post in threaded view
|

Re: writing to local files on a worker

lordjoe
I have been looking at Spark-Blast which calls Blast - a well known C++ program in parallel -
In my case I have tried to translate the C++ code to Java but am not getting the same results - it is convoluted -
I have code that will call the program and read its results - the only real issue is the program wants local files -
their use is convoluted with many seeks so replacement with streaming will not work -
as long as my Java code can write to a local file for the duration of one call things can work - 

I considered in memory files as long as they can be passed to another program - I am willing to have OS specific code 
So my issue is I need to write 3 files - run a program and read one output file - then all files can be deleted -
JNI calls will be hard - this is s program not a library and it is available for worker nodes

On Sun, Nov 11, 2018 at 10:52 PM Jörn Franke <[hidden email]> wrote:
Can you use JNI to call the c++ functionality directly from Java?

Or you wrap this into a MR step outside Spark and use Hadoop Streaming (it allows you to use shell scripts as mapper and reducer)?

You can also write temporary files for each partition and execute the software within a map step.

Generally you should not call external applications from Spark.

> Am 11.11.2018 um 23:13 schrieb Steve Lewis <[hidden email]>:
>
> I have a problem where a critical step needs to be performed by  a third party c++ application. I can send or install this program on the worker nodes. I can construct  a function holding all the data this program needs to process. The problem is that the program is designed to read and write from the local file system. I can call the program from Java and read its output as  a  local file - then deleting all temporary files but I doubt that it is possible to get the program to read from hdfs or any shared file system.
> My question is can a function running on a worker node create temporary files and pass the names of these to a local process assuming everything is cleaned up after the call?
>
> --
> Steven M. Lewis PhD
> 4221 105th Ave NE
> Kirkland, WA 98033
> 206-384-1340 (cell)
> Skype lordjoe_com
>


--
Steven M. Lewis PhD
4221 105th Ave NE
Kirkland, WA 98033
206-384-1340 (cell)
Skype lordjoe_com

Reply | Threaded
Open this post in threaded view
|

Re: writing to local files on a worker

lordjoe
I looked at Java's mechanism for creating temporary local files. I believe they can be created, written to and passed to other programs on the system.
I wrote a proof of concept to send some Strings out and use the local program cat to concatenate them and write the result to a local file . Clearly there is a more complex program I want to target but is there anything wrong with this approach

==========================================
package com.lordjoe.comet;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import scala.Option;
import scala.Tuple2;

import java.io.*;
import java.util.*;

/**
* com.lordjoe.comet.SparkCatTest
* Tests using Java temp files in a function call in
*/
public class SparkCatTest {

public static final int NUMBER_REPEATS = 10; // make NUMBER_REPEATS * NUMBER_REPEATS paris

public static List<String> buildItems(String text, int repeats) {
List<String>ret = new ArrayList<>() ;
for (int i = 0; i < repeats; i++) {
ret.add(text + i);

}
return ret;
}


public static void main(String[] args) throws Exception {
SparkConf sparkConf = new SparkConf().setAppName("CatWithFiles");

Option<String> option = sparkConf.getOption("spark.master");
if (!option.isDefined()) { // use local over nothing
sparkConf.setMaster("local[*]");
}

JavaSparkContext ctx = new JavaSparkContext(sparkConf);
List<String> start = buildItems("Start ",NUMBER_REPEATS ) ; // make some data like Start 9
List<String> end = buildItems("End ",NUMBER_REPEATS ) ; // make some data like End 9

JavaRDD<String> startRdd = ctx.parallelize(start);
JavaRDD<String> endRdd = ctx.parallelize(end);
JavaPairRDD<String, String> cross = startRdd.cartesian(endRdd); // make all pairs
/**
* dirty work is done here and used files to perform cat
*/
JavaRDD<String> map = cross.map(new Function<Tuple2<String, String>, String>() {

@Override
public String call(Tuple2<String, String> x) throws Exception {
File f1 = makeTempFile( );
writeFile(f1, x._1);
File f2 = makeTempFile( ) ;
writeFile(f2, x._2);
File f3 = makeTempFile( );
boolean success = false;
String ret = null;
String f1path = f1.getAbsolutePath();
String f2path = f2.getAbsolutePath();
String f3Path = f3.getAbsolutePath();
String command = "cat " + f1path + " " + f2path + " > " + f3Path;
if(osIsWindows())
success = executeCommandLine("cmd","/c",command);
else
success = executeCommandLine("/bin/sh","-c",command);
if(success) {
ret = readFile(f3);
}

f1.delete();
f2.delete();
f3.delete();
return ret;
}
});

// note the list returned by collect is immutable so we need a copy
List<String> collect = new ArrayList(map.collect());
Collections.sort(collect);
for (String s : collect) {
System.out.println(s);
}
}


/**
* true if running on Windows - otherwise Linux assumed
* @return
*/
public static synchronized boolean osIsWindows()
{
String osName = System.getProperty("os.name").toLowerCase();
return (osName.indexOf("windows") != -1);
}

/**
* make a temporary file wiht a unique name and delete on exit
* @return non-null file
*/
public static File makeTempFile( ) throws IOException {
String prefix = UUID.randomUUID().toString(); // unique name
String suffix = ".txt";
File tempFile2 = File.createTempFile(prefix, suffix);
tempFile2.deleteOnExit(); // drop on shutdown
return tempFile2;
}



public static boolean executeCommandLine(String... args) throws IOException, InterruptedException {
ProcessBuilder p = new ProcessBuilder(args);
Process process = p.start();
int result = process.waitFor();
int returnVal = process.exitValue();
return returnVal == 0;
}


/**
* @name writeFile
* @param FileName name of file to create
* @param data date to write
* @function write the string data to the file Filename
*/
public static boolean writeFile(File f, String data) throws IOException {
PrintWriter out = new PrintWriter(new FileWriter(f));
if (out != null) {
out.print(data);
out.close();
return (true);
}
return (false);
// failure
}
/**
* @name readFile
* @function write the string data to the file Filename
* @param FileName name of file to read
* @return contents of a text file
*/
public static String readFile(File f ) throws IOException{
LineNumberReader rdr = new LineNumberReader(new FileReader(f));
StringBuilder sb = new StringBuilder();
String line = rdr.readLine();
while(line != null) {
sb.append(line);
sb.append("\n");
line = rdr.readLine();
}
rdr.close();
return sb.toString();
// failure
}

}


On Mon, Nov 12, 2018 at 9:20 AM Steve Lewis <[hidden email]> wrote:
I have been looking at Spark-Blast which calls Blast - a well known C++ program in parallel -
In my case I have tried to translate the C++ code to Java but am not getting the same results - it is convoluted -
I have code that will call the program and read its results - the only real issue is the program wants local files -
their use is convoluted with many seeks so replacement with streaming will not work -
as long as my Java code can write to a local file for the duration of one call things can work - 

I considered in memory files as long as they can be passed to another program - I am willing to have OS specific code 
So my issue is I need to write 3 files - run a program and read one output file - then all files can be deleted -
JNI calls will be hard - this is s program not a library and it is available for worker nodes

On Sun, Nov 11, 2018 at 10:52 PM Jörn Franke <[hidden email]> wrote:
Can you use JNI to call the c++ functionality directly from Java?

Or you wrap this into a MR step outside Spark and use Hadoop Streaming (it allows you to use shell scripts as mapper and reducer)?

You can also write temporary files for each partition and execute the software within a map step.

Generally you should not call external applications from Spark.

> Am 11.11.2018 um 23:13 schrieb Steve Lewis <[hidden email]>:
>
> I have a problem where a critical step needs to be performed by  a third party c++ application. I can send or install this program on the worker nodes. I can construct  a function holding all the data this program needs to process. The problem is that the program is designed to read and write from the local file system. I can call the program from Java and read its output as  a  local file - then deleting all temporary files but I doubt that it is possible to get the program to read from hdfs or any shared file system.
> My question is can a function running on a worker node create temporary files and pass the names of these to a local process assuming everything is cleaned up after the call?
>
> --
> Steven M. Lewis PhD
> 4221 105th Ave NE
> Kirkland, WA 98033
> 206-384-1340 (cell)
> Skype lordjoe_com
>


--
Steven M. Lewis PhD
4221 105th Ave NE
Kirkland, WA 98033
206-384-1340 (cell)
Skype lordjoe_com



--
Steven M. Lewis PhD
4221 105th Ave NE
Kirkland, WA 98033
206-384-1340 (cell)
Skype lordjoe_com