Saturday, July 16, 2022

Java Streams: Read files in Parallel, Sequential and Per Chunk


Parallel processing a problem in multiple cores each process running in a separate thread. however, consider the overhead. In this example it is faster to get the paths names files  in parallel than sequential, however it is faster or equal to read the content of each file by sequential than parallel. So the parallel stream will be used in specific cases.  for example when calling a function or stored procedure from databases at the same time with different parameters or something like that.

Read files in Parallel.

Parallel computing involves dividing a problem into subproblems, solving those problems simultaneously (in parallel, with each subproblem running in a separate thread), and then combining the results of the solutions to the subproblems. 

normalize() :  Return current path and remove redundant name elements.
isRegularFile():  Return true if regular file, and false is not a regular file for example a  directory.

import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.stream.Collectors;

public class Demo2 {

public static void main(String[] args) {

long startTime = System.currentTimeMillis();

try {
String directory = "C:\\demo";
var files = Files.walk(Paths.get(directory))
.parallel().map(Path::normalize)
.filter(Files::isRegularFile)
.filter(path -> path.getFileName().toString().endsWith(".txt"))
.collect(Collectors.toList());

files.stream().parallel().forEach(item -> {
System.out.println(item + " " + Thread.currentThread().getName());
try {
Files.lines(Paths.get(item.toString())).parallel().forEach(val -> {
System.out.println("value -> " + val);
});
} catch (IOException e) {
throw new RuntimeException(e);
}
});

} catch (IOException e) {
throw new RuntimeException(e);
}

long stopTime = System.currentTimeMillis();
long elapsedTime = stopTime - startTime;
System.out.println("millisecond "+elapsedTime);
}
}

Result this:

C:\demo\lines1.txt ForkJoinPool.commonPool-worker-1
C:\demo\lines3.txt ForkJoinPool.commonPool-worker-2
C:\demo\lines2.txt main
value -> third file2
value -> first file1
value -> second file2
value -> third file1
value -> first file2
value -> second file1
millisecond 35



Read files in Sequential.

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.stream.Collectors;

public class Demo3 {

public static void main(String[] args) {

long startTime = System.currentTimeMillis();

try {
String directory = "C:\\demo";
var files = Files.walk(Paths.get(directory))
.map(Path::normalize)
.filter(Files::isRegularFile)
.filter(path -> path.getFileName().toString().endsWith(".txt"))
.collect(Collectors.toList());

files.stream().forEach(item -> {
System.out.println(item + " " + Thread.currentThread().getName());
try {
Files.lines(Paths.get(item.toString())).forEach(val -> {
System.out.println("value -> " + val);
});
} catch (IOException e) {
throw new RuntimeException(e);
}
});

} catch (Exception e) {
throw new RuntimeException(e);
}

long stopTime = System.currentTimeMillis();
long elapsedTime = stopTime - startTime;
System.out.println("millisecond " + elapsedTime);
}
}

Result this:

C:\demo\lines1.txt main
value -> first file1
value -> first file2
C:\demo\lines2.txt main
value -> second file1
value -> second file2
C:\demo\lines3.txt main
value -> third file1
value -> third file2
millisecond 38



Read files Per Chunk.

It used AtomicInteger for counting the number lines in the file. Files.lines used by threads concurrently.

The file contains 10 rows then in this example  read the file per chunk 3 rows. 


import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

public class Demo1 {

public static void main(String[] args) {

try {

final AtomicInteger c = new AtomicInteger();
final AtomicInteger count = new AtomicInteger(1);

String fileName = "C:\\demo\\lines1.txt";
Files.lines(Paths.get(fileName))
.collect(Collectors.groupingBy(e -> c.getAndIncrement() / 3))//per 3
.forEach((fi, item) -> {
System.out.println("Chunk " + fi + " List " + item);
for (String line : item) {
System.out.println(" Number line -> " + count.getAndIncrement() + " -> " + line);
}

});

} catch (IOException e) {
e.printStackTrace();
}
}
}

Result this:

Chunk 0 List [first file1, first file2, first file3]
 Number line -> 1 -> first file1
 Number line -> 2 -> first file2
 Number line -> 3 -> first file3
Chunk 1 List [first file4, first file5, first file6]
 Number line -> 4 -> first file4
 Number line -> 5 -> first file5
 Number line -> 6 -> first file6
Chunk 2 List [first file7, first file8, first file9]
 Number line -> 7 -> first file7
 Number line -> 8 -> first file8
 Number line -> 9 -> first file9
Chunk 3 List [first file10]
 Number line -> 10 -> first file10











References:
https://docs.oracle.com/javase/tutorial/collections/streams/parallelism.html
https://www.baeldung.com/java-when-to-use-parallel-stream
https://mkyong.com/java8/java-8-parallel-streams-examples/
https://stackoverflow.com/questions/27583623/is-there-an-elegant-way-to-process-a-stream-in-chunks
https://howtodoinjava.com/java/multi-threading/atomicinteger-example/
https://www.geeksforgeeks.org/path-normalize-method-in-java-with-examples/
https://www.logicbig.com/how-to/code-snippets/jcode-java-io-files-isregularfile.html










































































































































No comments:

Post a Comment

Provisioning Cloud SQL with Private Service Connect Using Terraform & Accessing from Cloud Run with Spring Boot

In this post, we'll explore how to provision Cloud SQL instances with Private Service Connect (PSC) connectivity using Terraform and the...