Saturday, 15 August 2015

Java8: streams: parallel processing

Streams api allows you to process elements in parallel. You can convert a collection into parallel stream by calling parallelStream method on this collection. Parallel stream splits collection data into chunks and process each chunk with separate thread. This parallel processing enables you to use multi processor architecture of your system effectively.

Can I convert parallel stream to sequential stream
Yes, you can convert parallel stream to sequential stream by calling sequential method on it.

Can I convert sequential stream to parallel stream
Yes, you can convert sequential stream to parallel stream by calling parallel method on it.

Can I combine parallel and sequential operations
Yes, to solve given problem, you can combine both parallel and sequential streams like below.

For example,
employees.parallelStream()
.filter((emp) -> emp.getSalary() > 50000)
.sequential()
.map(Employee::getFirstName)
.parallel()
.collect(Collectors.toList());

How many threads used by parallel stream?
As i said Parallel stream divides data into chunck, process each chunck of data with separate thread. But how many threads a parallel stream creates. Internally parallel stream uses ‘java.util.concurrent.ForkJoinPool’ to create threads, by default it has as many threads as processors in your system. If your system has 3 processors, by default parallel stream work with 3 threads.

Can I control number of threads used by parallel stream?
Yes, you can control number of threads used by parallel stream by setting System property ‘java.util.concurrent.ForkJoinPool.common.parallelism’.

Example
System.setProperty("java.util.concurrent.ForkJoinPool.common.parallelism", "9");

Note:
Above setting is global across your Java application. It is not limited to particular stream.


Lets write simple application to compute sum of numbers form 1 to 999999999 using iterative, sequential stream and parallel stream approach.
import java.util.stream.LongStream;

public class PerformanceCheck {

 public static long parallelSum(long n) {
  return LongStream.range(1, n).parallel().reduce(0l, Long::sum);
 }

 public static long sequentialSum(long n) {
  return LongStream.range(1, n).reduce(0l, Long::sum);
 }

 public static long iterationSum(long n) {
  long sum = 0;
  for (long i = 1; i < n; i++)
   sum += i;
  return sum;
 }

 public static void main(String args[]) {
  long upTo = 999999999l;

  long time1 = System.nanoTime();

  iterationSum(upTo);
  long time2 = System.nanoTime();

  sequentialSum(upTo);
  long time3 = System.nanoTime();

  parallelSum(upTo);
  long time4 = System.nanoTime();

  String temp = "Time taken by ";
  System.out.println(temp + "iteration approach is " + (time2 - time1)
    / Math.pow(10, 9) + " seconds");
  System.out.println(temp + "sequential approach is " + (time3 - time2)
    / Math.pow(10, 9) + " seconds");
  System.out.println(temp + "parallel approach is " + (time4 - time3)
    / Math.pow(10, 9) + " seconds");

 }
}


Output
Time taken by iteration approach is 0.300407731 seconds
Time taken by sequential approach is 0.537593567 seconds
Time taken by parallel approach is 0.14731205 seconds


Prevoius                                                 Next                                                 Home

No comments:

Post a Comment