Parallel processing a problem in multiple cores each process running in a separate thread. however, consider the overhead. In this example it is faster to get the paths names files in parallel than sequential, however it is faster or equal to read the content of each file by sequential than parallel. So the parallel stream will be used in specific cases. for example when calling a function or stored procedure from databases at the same time with different parameters or something like that.
Read files in Parallel.
Parallel computing involves dividing a problem into subproblems, solving those problems simultaneously (in parallel, with each subproblem running in a separate thread), and then combining the results of the solutions to the subproblems.
normalize() : Return current path and remove redundant name elements.
isRegularFile(): Return true if regular file, and false is not a regular file for example a directory.
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.stream.Collectors;
public class Demo2 {
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
try {
String directory = "C:\\demo";
var files = Files.walk(Paths.get(directory))
.parallel().map(Path::normalize)
.filter(Files::isRegularFile)
.filter(path -> path.getFileName().toString().endsWith(".txt"))
.collect(Collectors.toList());
files.stream().parallel().forEach(item -> {
System.out.println(item + " " + Thread.currentThread().getName());
try {
Files.lines(Paths.get(item.toString())).parallel().forEach(val -> {
System.out.println("value -> " + val);
});
} catch (IOException e) {
throw new RuntimeException(e);
}
});
} catch (IOException e) {
throw new RuntimeException(e);
}
long stopTime = System.currentTimeMillis();
long elapsedTime = stopTime - startTime;
System.out.println("millisecond "+elapsedTime);
}
}
Result this:
C:\demo\lines1.txt ForkJoinPool.commonPool-worker-1
C:\demo\lines3.txt ForkJoinPool.commonPool-worker-2
C:\demo\lines2.txt main
value -> third file2
value -> first file1
value -> second file2
value -> third file1
value -> first file2
value -> second file1
millisecond 35
Read files in Sequential.
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.stream.Collectors;
public class Demo3 {
public static void main(String[] args) {
long startTime = System.currentTimeMillis();
try {
String directory = "C:\\demo";
var files = Files.walk(Paths.get(directory))
.map(Path::normalize)
.filter(Files::isRegularFile)
.filter(path -> path.getFileName().toString().endsWith(".txt"))
.collect(Collectors.toList());
files.stream().forEach(item -> {
System.out.println(item + " " + Thread.currentThread().getName());
try {
Files.lines(Paths.get(item.toString())).forEach(val -> {
System.out.println("value -> " + val);
});
} catch (IOException e) {
throw new RuntimeException(e);
}
});
} catch (Exception e) {
throw new RuntimeException(e);
}
long stopTime = System.currentTimeMillis();
long elapsedTime = stopTime - startTime;
System.out.println("millisecond " + elapsedTime);
}
}
Result this:
C:\demo\lines1.txt main
value -> first file1
value -> first file2
C:\demo\lines2.txt main
value -> second file1
value -> second file2
C:\demo\lines3.txt main
value -> third file1
value -> third file2
millisecond 38
Read files Per Chunk.
It used AtomicInteger for counting the number lines in the file. Files.lines used by threads concurrently.
The file contains 10 rows then in this example read the file per chunk 3 rows.
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
public class Demo1 {
public static void main(String[] args) {
try {
final AtomicInteger c = new AtomicInteger();
final AtomicInteger count = new AtomicInteger(1);
String fileName = "C:\\demo\\lines1.txt";
Files.lines(Paths.get(fileName))
.collect(Collectors.groupingBy(e -> c.getAndIncrement() / 3))//per 3
.forEach((fi, item) -> {
System.out.println("Chunk " + fi + " List " + item);
for (String line : item) {
System.out.println(" Number line -> " + count.getAndIncrement() + " -> " + line);
}
});
} catch (IOException e) {
e.printStackTrace();
}
}
}
Result this:
Chunk 0 List [first file1, first file2, first file3]
Number line -> 1 -> first file1
Number line -> 2 -> first file2
Number line -> 3 -> first file3
Chunk 1 List [first file4, first file5, first file6]
Number line -> 4 -> first file4
Number line -> 5 -> first file5
Number line -> 6 -> first file6
Chunk 2 List [first file7, first file8, first file9]
Number line -> 7 -> first file7
Number line -> 8 -> first file8
Number line -> 9 -> first file9
Chunk 3 List [first file10]
Number line -> 10 -> first file10
References:
https://docs.oracle.com/javase/tutorial/collections/streams/parallelism.html
https://www.baeldung.com/java-when-to-use-parallel-stream
https://mkyong.com/java8/java-8-parallel-streams-examples/
https://stackoverflow.com/questions/27583623/is-there-an-elegant-way-to-process-a-stream-in-chunks
https://howtodoinjava.com/java/multi-threading/atomicinteger-example/
https://www.geeksforgeeks.org/path-normalize-method-in-java-with-examples/
https://www.logicbig.com/how-to/code-snippets/jcode-java-io-files-isregularfile.html
No comments:
Post a Comment