Introduced as an incubation feature in JEP-428 (Java 19) and as a preview feature in JEP-453 (Java 21), the structured concurrency aims to simplify Java concurrent programs by treating multiple threads/sub-tasks as a single unit of work. These multiple threads are forked from the same parent thread, thus treated as a single unit of work.
Treating all such child threads as a single unit will help us in managing all threads as a unit; thus, canceling and error handling can be done more reliably thus eliminating common risks such as thread leaks and cancellation delays.
Note that Structured concurrency uses the lightweight VirtualThread implementation, also part of Project Loom.
1. Issues with Traditional Concurrency
1.1. Thread Leaks
In traditional multi-threaded programming (unstructured concurrency), if an application has to perform a complex task, it breaks the program into multiple smaller and independent units of sub-tasks. Then, the application submits all the tasks to ThreadPoolExecutor, generally with an ExecutorService that runs all tasks and sub-tasks.
In such a programming model, all the child tasks run concurrently, so each can succeed or fail independently. There is no support in the API to cancel all related subtasks if one of them fails. The application has no control over the subtasks and must wait for all of them to finish before returning the result of the parent task. This waiting is a waste of resources and decreases the application’s performance.
For example, if a task has to fetch the details of an account and it requires fetching details from multiple sources such as account details, linked accounts, user’s demographic data etc., then pseudo code a concurrent request processing will look like this:
Response fetch(Long id) throws ExecutionException, InterruptedException {
Future<AccountDetails> accountDetailsFuture = es.submit(() -> getAccountDetails(id)); //Sub-task 1
Future<LinkedAccounts> linkedAccountsFuture = es.submit(() -> fetchLinkedAccounts(id)); //Sub-task 2
Future<DemographicData> userDetailsFuture = es.submit(() -> fetchUserDetails(id)); //Sub-task 3
AccountDetails accountDetails = accountDetailsFuture.get(); //Result of Sub-task 1
LinkedAccounts linkedAccounts = linkedAccountsFuture.get(); //Result of Sub-task 2
DemographicData userDetails = userDetailsFuture.get(); //Result of Sub-task 3
return new Response(accountDetails, linkedAccounts, userDetails); //Combine results and return the response
}
In the above example, all three threads execute independently.
- Suppose there is an error in fetching the linked accounts then fetch() will return an error response. But the other two threads will continue running in the background. This is a case of the thread leak.
- Similarly, if the user cancels the request from the front end and the fetch() is interrupted, all three threads will continue running in the background.
Though canceling the subtasks is programmatically possible, there is no straightforward way to do it, and there are chances of error.
1.2. Unrelated Thread Dumps and Diagnosis
In the previous example, if there is an error in the fetch() API then it is hard to analyze the thread dumps because these are running in 3 different threads. Making a relationship between the information in 3 threads is very difficult because there is no relationship between these threads at the API level.
When the call stack defines the task-subtask hierarchy, such as in sequential method executions, we get the parent-child relationship, which flows into error propagation.
Ideally, the task relationship should reflect at the API level to control child threads’ execution and debug when necessary. This would allow a child to report a result or exception only to its parent — the unique task that owns all the subtasks — which, then, could implicitly cancel the remaining subtasks.
2. Introduction to Structured Concurrency
In structured multi-threaded code, if a task splits into concurrent subtasks, they all return to the same place i.e., the task’s code block. This way the lifetime of a concurrent subtask is confined to that syntactic block.
In this approach, subtasks work on behalf of a task that awaits their results and monitors them for failures. At run time, structured concurrency builds a tree-shaped hierarchy of tasks, with sibling subtasks being owned by the same parent task.
This call tree can be viewed as the concurrent counterpart to the call stack of a single thread with multiple method calls.
3. Implementing Structured Concurrency with StructuredTaskScope
The StructuredTaskScope is the core API for structured concurrency that supports splitting a task into several concurrent sub-tasks to be executed in their own virtual threads, manage their lifecycles and ensure proper nesting of concurrent tasks.
The StructuredTaskScope enforces that the sub-tasks must be completed before the main task continues. It ensures that the lifetime of a concurrent operation is confined by a syntax block.
The StructuredTaskScope provides two static inner classes, both of which extend the StructuredTaskScope class. Based on our requirements, we instantiate one of these classes to begin with structured concurrency:
- ShutdownOnFailure captures the first exception and shuts down the task scope. It helps in collecting the results of all tasks. If any subtask fails, then the results of other unfinished subtasks are no longer needed.
- ShutdownOnSuccess captures the first result and shuts down the task scope to interrupt unfinished threads and wake up the main thread (parent task). It helps in collecting the result of any one task, whichever is completed first.
4. ShutdownOnFailure – Wait for Completion of all Sub-tasks
Let us rewrite the previous example with StructuredTaskScope API.
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Supplier<AccountDetails> accountDetailsFuture = scope.fork(() -> getAccountDetails(id));
Supplier<LinkedAccounts> linkedAccountsFuture = scope.fork(() -> fetchLinkedAccounts(id));
Supplier<DemographicData> userDetailsFuture = scope.fork(() -> fetchUserDetails(id));
scope.join(); // Join all subtasks
scope.throwIfFailed(e -> new WebApplicationException(e)); //Handle error when any subtask fails
//The subtasks have completed by now so process the result
return new Response(accountDetailsFuture.get(),
linkedAccountsFuture.get(),
userDetailsFuture.get());
}
- In the above example, we create the StructuredTaskScope instance using the factory method ShutdownOnFailure(). This ensures all the tasks are shut down immediately if any of the tasks or even if the parent task fails.
- Also, notice that we submitted the tasks with scope.fork() instead of using the ExecutorService, which returns a Supplier instead of a Future object. Each fork() operation starts a new virtual thread and executes the submitted sub-task.
- Next, scope.join() joins the tasks within that scope and waits tills all the tasks are completed.
- Next, scope.throwIfFailed() propagates the specified exception if any child task fails.
- Finally, scope.close() closes the task scope.
The StructuredTaskScope class implements AutoCloseable interface so if we use the try-with-resources block then close()
will be invoked automatically after the parent thread finishes execution.
This solution resolves all the problems with unstructured concurrency as noted down in traditional concurrency.
5. ShutdownOnSuccess – Using Result of the First Completed Task
A similar factor method StructuredTaskScope.ShutdownOnSuccess() captures the result of the first subtask to complete successfully, and then cancels all other subtasks.
It helps in situations where work can be done in multiple ways, and we need the fastest result and cancel other methods to avoid wasting resources unnecessarily.
Here is a StructuredTaskScope with a shutdown-on-success policy that returns the result of the first successful subtask:
<T> T runSubTasks(List<Callable<T>> tasks) throws Exception {
try (var scope = new StructuredTaskScope.ShutdownOnSuccess<T>()) {
for (var task : tasks) {
scope.fork(task);
}
return scope.result();
}
}
6. Executing Sub-Tasks within Specified Time
In structured concurrency, we can specify a deadline within which all the tasks should be completed using the scope.joinUntil() method.
When specified a time duration using Instant, joinUntil() method waits until:
- All threads started in the task scope have finished execution
- The scope.shutdown() method is invoked
- The current thread is interrupted
- Or, the deadline is reached
In the following example, both subtasks must be finished within 5 seconds.
try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {
Supplier<String> subTask1 = scope.fork(() -> subTask(...));
Supplier<String> subTask2 = scope.fork(() -> subTask(...));
scope.joinUntil(Instant.now().plusSeconds(5));
//process the results
}
7. Structured Concurrency and Virtual Threads
In structured concurrency, each call to scope.fork() starts a new thread to execute a subtask, which by default is a virtual thread.
Virtual threads are JVM-managed lightweight threads for writing high-throughput concurrent applications. As virtual threads are inexpensive compared to traditional OS threads, structured concurrency takes advantage of them for forking all new threads.
In addition to being plentiful, virtual threads are cheap enough to represent any concurrent unit of behavior, even behavior that involves I/O. Behind the scenes, the task-subtask relationship is maintained by associating each virtual thread with its unique owner, so it knows its hierarchy, similar to how a frame in the call stack knows its unique caller.
Note that all of the subtasks’ threads are guaranteed to have terminated once the scope is closed, and no thread is left behind when the block exits.
8. Conclusion
When combined with virtual threads, the structured concurrency promises long-awaited and much-needed features to Java that are already present in other programming languages (e.g., goroutines in Go and processes in Erlang). It will help in writing more complex and concurrent applications with excellent reliability and fewer thread leaks.
Such applications will be easier to debug and profile when errors occur.
Happy Learning !!
Comments