CompletionService in Java Concurrency

A Future interface provides methods to check if the computation is complete, to wait for its completion, and to retrieve the result of the computation. The result is retrieved using method get when the computation has completed, and it blocks until it is ready. If a task completes  by  throwing  an  exception,  corresponding Future.get  rethrows  it  wrapped  in  an  ExecutionException;  and if  it  was  cancelled,  get throws CancellationException.  The submit method in the ExecutorService return a Future when you submit a Runnable or a Callable. With an ExecutorService, you have to keep track of all Futures returned by the submit method to call the get method on it.

A CompletionService decouples the production of new asynchronous tasks from the consumption of the results of completed tasks. You can submit Callable tasks to the CompletionService for execution and use the queue like methods take and poll to retrieve completed results as Futures, whenever they become available. ExecutorCompletionService is an implementation of CompletionService that delegate the computation to an Executor. When a task is submitted to a ExecutorCompletionService, it is wrapped with a QueueingFuture, a subclass of FutureTask.  QueueingFuture overrides the done method of FutureTask to place result on the BlockingQueue associated with the CompletionService.

Let us understand it better with an example. We will submit few Callable tasks to a CompletionService and then wait for each result (as Future) using completionService.take();. Whenever we get a Future, we will call Future.get and write the result to the console. You should be familiar with Executor, ExecutorService, Future and FutureTask before learning this example to understand it better.

 

Example – CompletionServiceDemo.java

import java.util.Date;

import java.util.concurrent.Callable;

import java.util.concurrent.CompletionService;

import java.util.concurrent.ExecutionException;

import java.util.concurrent.Executor;

import java.util.concurrent.ExecutorCompletionService;

import java.util.concurrent.Executors;

import java.util.concurrent.Future;

 

public class CompletionServiceDemo {

 

  public void compServDemo(final Executor exec) {

 

    CompletionService<String> completionService = new ExecutorCompletionService<>(

        exec);

 

    for (int i = 0; i < 4; i++) {

      // Callable task that will be submitted to

      // CompletionService.

      Callable<String> task = new Callable<String>() {

        @Override

        public String call() {

          try {

            System.out.println("Started " + Thread.currentThread() + " at "

                + new Date());

            Thread.sleep(5000);

            System.out.println("Exiting " + Thread.currentThread() + " at "

                + new Date());

          } catch (InterruptedException e) {

            e.printStackTrace();

          }

          return Thread.currentThread() + " success!!!";

        }

      };

      // Submitting the task to the CompletionService

      completionService.submit(task);

    }

 

    // Waiting for the results and printing them

    for (int i = 0; i < 4; i++) {

      Future<String> f;

      try {

        f = completionService.take();

        System.out.println("RESULT=" + f.get() + " at " + new Date());

      } catch (InterruptedException | ExecutionException e) {

        e.printStackTrace();

      }

    }

 

  }

 

  public static void main(String[] args) {

    CompletionServiceDemo cSD = new CompletionServiceDemo();

    cSD.compServDemo(Executors.newFixedThreadPool(2));

  }

}

 

OUTPUT

Started Thread[pool-1-thread-2,5,main] at Wed Jul 10 10:57:16 IST 2013

Started Thread[pool-1-thread-1,5,main] at Wed Jul 10 10:57:16 IST 2013

Exiting Thread[pool-1-thread-1,5,main] at Wed Jul 10 10:57:20 IST 2013

Exiting Thread[pool-1-thread-2,5,main] at Wed Jul 10 10:57:20 IST 2013

Started Thread[pool-1-thread-2,5,main] at Wed Jul 10 10:57:20 IST 2013

Started Thread[pool-1-thread-1,5,main] at Wed Jul 10 10:57:20 IST 2013

RESULT=Thread[pool-1-thread-1,5,main]success!!! at Wed Jul 10 10:57:20 IST 2013

RESULT=Thread[pool-1-thread-2,5,main]success!!! at Wed Jul 10 10:57:20 IST 2013

Exiting Thread[pool-1-thread-1,5,main] at Wed Jul 10 10:57:23 IST 2013

Exiting Thread[pool-1-thread-2,5,main] at Wed Jul 10 10:57:23 IST 2013

RESULT=Thread[pool-1-thread-1,5,main] success!!! at Wed Jul 10 10:57:23 IST 2013

RESULT=Thread[pool-1-thread-2,5,main] success!!! at Wed Jul 10 10:57:23 IST 2013

Note

You can also use ExecutorService instead of Executor in the above example if you need access to lifecycle methods:

public void compServDemo(final ExecutorService exec) { 

 

Search the Web

Custom Search

Searches whole web. Use the search in the right sidebar to search only within javajee.com!!!