- The Concurrency API has the
ExecutorServiceclass which creates and manages threads for you. - You obtain an instance of this class, and send tasks to be processed.
- This class has built in polling and scheduling.
- We have an
Executorfactory class to get an instance of ExecutorService. - Here is an example of using
Executor.newSingleThreadExecutor:
import java.util.concurrent.*;
public class ZooInfo {
public static void main(String[] args) {
ExecutorService service = null;
try {
service = Executors.newSingleThreadExecutor();
System.out.println("begin");
service.execute(
() -> System.out.println("Printing zoo inventory"));
service.execute(() -> {
for(int i=0;i<3;i++)
System.out.println("Printing record: "+i);
}
);
service.execute(() -> System.out.println("Printing zoo inventory"));
System.out.println("end");
} finally {
if(service!=null) service.shutdown();
}
}
}- This example uses exactly ONE thread, and all the tasks are completely asynchrously. Here is an example output:
begin
Printing zoo inventory
end
Printing record: 0
Printing record: 1
Printing record: 2
Printing zoo inventory
- Notice how the end text is displayed but the executor service - this is because the
main()thread is INDEPENDENT of the thread from ExecutorService.
- You need to call the
shutdown()method when you are finished with a thread executor. If you do not the application will never terminate. - The
ExecutorServicehas the following shutdown process:
- Reject any new tasks while continuing with previously submitted tasks.
- If a new task is submitted to the thread executor, a
RejectedExecutionExceptionis thrown isShutdown() = trueisTerminatedFalse() = false
- If a new task is submitted to the thread executor, a
- When all active tasks are completed, the lifecycle is complete
isShutDown() = trueisTerminated() = true
- We can execute tasks to be done asynchrously using the
execute()methood which takes aRunnablelambda or implementation. The return type isvoid- so we do not know the result of the task - We can submit tasks to be done asynchrously and obtain a
Future<T>object usingsubmit() - We can send off a set of taks using
invokeAll()andinvokeAny()invokeAll()will synchrously return the results of all tasks as a Collection of Future objects.invokeAny()will synchrously return the result of any one of the finished tasks, cancelling any unfinished tasks.
execute()does not supportCallableexpressions.
-
The
invokeAny()andinvokeAll()methods are SYNCHRONOUS - they will wait for results to be available before returning control to the enclosing program. -
invokeAll()executes all tasks in the collection and returns aListof orderedFutureobjects. This method will wait indefinitely till all tasks are completed. -
invokeAny()executes a collection of tasks and returns the result of one of the tasks that successfully completed execution. This method will only wait till one task is complete. -
Here is an example of using
invokeAll():
class CallableClass implements Callable<String> {
private int i;
public CallableClass(int i) {
this.i = i;
}
public String call() throws Exception {
return ""+i;
}
}
// MAIN METHOD:
public static void main(String[] args) {
ExecutorService service = null;
try {
service = Executors.newFixedThreadPool(2);
List<Callable<String>> list =
List.of(new CallableClass(1),
new CallableClass(2),
new CallableClass(3),
new CallableClass(4)
);
List<Future<String>> futureList = service.invokeAll(list);
for (Future<String> future: futureList)
System.out.println(future.get());
} finally {
service.shutdown();
}
}- The above code prints:
1
2
3
4
- The
submit()method returns aFuture<V>object:
Future<?> future = service.submit(() -> System.out.println("Hello Zoo"));- This
futureobject has the following methods:
boolean isDone()- true if the task was complete, threw an exception or cancelledboolean isCancelled()- true if cancelled before completed normallyboolean cancel()- attempts to cancel execution of the taskV get()- obtains result, will wait endlessly if not availableV get(long timeout, TimeUnit unit)- obtains the result waiting the specified time. If unavailable thenTimeoutExceptionis thrown.
- We previously wrote a
CheckResultsclass using thread polling:
public class CheckResults {
private static int counter = 0;
public static void main(String[] args) throws InterruptedException {
new Thread(() -> {
for(int i=0;i<500;i++) CheckResults.counter++;
}).start();
while (CheckResults.counter<100) {
System.out.println("Not reached yet");
Thread.sleep(1000); // 1 SECOND
}
System.out.println("Reached");
}
}- Here is the same class which uses a Future object:
public class CheckResults {
private static int counter = 0;
public static void main(String[] args) throw InterruptedException, ExecutionException {
ExecutorService service = null;
try {
service = Executors.newSingleThreadExecutor();
Future<?> future = service.submit(() -> {
for(int i=0;i<500;i++) CheckResults.counter++;
});
future.get(10, TimeUnit.SECONDS);
System.out.println("Reached");
} catch (TimeoutException e) {
System.out.println("Not reached in time");
} finally {
if(service!=null) service.shutdown();
}
}
}- This implementation does not use the Thread class directly - which is the exact purpose of the Concurrency API.
- Java 5 introduced the
Callableinterface. It has acall()which returns a value:
@FunctionalInterface public interface Callable<V> {
V call() throws Exception;
}- In comparison, the Runnable interface has a
run()method which returns void and throws no checked exceptions. - The
ExecutorServicehas an overload forsubmit()which takes aCallableand returnsFuture<T> - Here is an example of using
Callable:
public class AddData {
public static void main(String[] args) throws InterruptedException,
ExecutionException {
ExecutorService service = null;
try {
service = Executors.newSingleThreadExecutor();
Future<Integer> result =
service.submit(() -> 30+11);
System.out.println(result.get());
} finally {
if(service!=null) service.shutdown();
}
}
}- The
Callableinterface allows you to throw a checked exception. Meaning if you are supplied a lambda to theExecutors.submit()and it DOES return something, you can write statements which have checked exceptions - Conversely, the
Runnableinterface does not allow for exceptions. Meaning if you use a lambda expression which does not return anything, you can not write statements which throw checked exceptions
service.submit(() -> {Thread.sleep(1000); return null;}); // compiles fine
service.submit(() -> {Thread.sleep(1000);}); // COMPILER ERROR- We can use the
Future.get()method to wait for the results to finish - If we do not ned the results of tasks and finished with our thread executor we can use the
awaitTermination(long timeout, TimeUnit unit)method which waits for a specified time for all tasks to finish. - E.g.:
ExecutorService service = null;
try {
service = Executors.newSingleThreadExecutor();
// tasks...
} finally {
if(service!=null) service.shutdown();
}
if (service!=null) {
service.awaitTermination(1, TimeUnit.MINUTES);
// check if all tasks are finished:
if(service.isTerminated())
System.out.println("All tasks finished")l
else
System.out.println("At least one task is still running");
}- The
ScheduledExecutorServiceis a subinterface ofExecutorServicewhich lets you schedule a task which needs to be done repeatedly for some fixed interval - We obtain an instance using the
Executorsfactory class:
ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();- We have the following methods for
ScheduledExecutorService:
schedule(Callable<V> callable, long delay, TimeUnit unit)- executes the callable after the given delay - returns aScheduledFuture<V>schedule(Runnable runnable, long delay, TimeUnit unit)- executes the runnable after the given delay - returns aScheduledFuture<V>scheduleAtFixedRate(Runnable runnable, long initialDelay, long period, TimeUnit unit)- executes the runnable after the initial delay, and creates and executes the runnable every period valuescheduleAtFixedDelay(Runnable runnable, long initialDelay, long delay, TimeUnit unit)- executes the runnable after the initial delay, and commences the next runnable after the termination + delay value
- A thread pool is a group of pre-instantiated threads which can be reused to perform some tasks
- We can create a thread pool using the following methods from Executors:
newCachedThreadPool()- Created a thrad pool which creates threads when needed but will also reuse old threads which are availablenewFixedThreadPool(n threads)- Creates a thread pool, and only reuses a fixed amount of threadsnewFixedScheduledThreadPool(n threads)- creates a thread pool whicvh can schedule commands to run after a given delay or period