Java Structured Concurrency and StructuredTaskScope

Introduced as an incubation feature in JEP-428 (Java 19) and as a preview feature in JEP-453 (Java 21), the structured concurrency aims to simplify Java concurrent programs by treating multiple threads/sub-tasks as a single unit of work. These multiple threads are forked from the same parent thread, thus treated as a single unit of work.

The fundamental principle of structured concurrency is when a task has to be solved concurrently then all the threads needed to solve it are spun and rejoined in the same block of code. In other words, all these threads’ lifetime is bound to the block’s scope, so we have clear and explicit entry-exit points for each concurrent code block.

Treating all such child threads as a single unit will help us in managing all threads as a unit; thus, canceling and error handling can be done more reliably thus eliminating common risks such as thread leaks and cancellation delays.

Note that Structured concurrency internally uses a lightweight VirtualThread implementation, also part of Project Loom.

1. Issues with Traditional/Unstructured Concurrency

1.1. Thread Leaks

In traditional multi-threaded programming (unstructured concurrency), if an application has to perform a complex task, it breaks the program into multiple smaller and independent units of sub-tasks. Then, the application submits all the tasks to ThreadPoolExecutor, generally with an ExecutorService that runs all tasks and sub-tasks.

In such a programming model, all the child tasks run concurrently, so each can succeed or fail independently. There is no support in the API to cancel all related subtasks if one of them fails. The application has no control over the subtasks and must wait for all of them to finish before returning the result of the parent task. This waiting is a waste of resources and decreases the application’s performance.

In traditional concurrency, the child threads run concurrently and each can succeed or fail independently. There is no way to immediately stop other childs, if one of them fails.

For example, if a task has to fetch the details of an account and it requires fetching details from multiple sources such as account details, linked accounts, user’s demographic data, etc., then pseudo code a concurrent request processing will look like this:

Response fetch(Long id) throws ExecutionException, InterruptedException {

    Future<AccountDetails>  accountDetailsFuture  = es.submit(() -> getAccountDetails(id));  //Sub-task 1
    Future<LinkedAccounts> linkedAccountsFuture = es.submit(() -> fetchLinkedAccounts(id));  //Sub-task 2
    Future<DemographicData> userDetailsFuture = es.submit(() -> fetchUserDetails(id));  //Sub-task 3

    AccountDetails accountDetails  = accountDetailsFuture.get();  //Result of Sub-task 1
    LinkedAccounts linkedAccounts  = linkedAccountsFuture.get();  //Result of Sub-task 2
    DemographicData userDetails    = userDetailsFuture.get();  //Result of Sub-task 3

    return new Response(accountDetails, linkedAccounts, userDetails);  //Combine results and return the response
}

In the above example, all three threads execute independently.

  • Suppose there is an error in fetching the linked accounts then fetch() will return an error response. But the other two threads will continue running in the background. This is a case of the thread leak.
  • Similarly, if the user cancels the request from the front end and the fetch() is interrupted, all three threads will continue running in the background.

Though canceling the subtasks is programmatically possible, there is no straightforward way to do it, and there are chances of error.

1.2. Unrelated Thread Dumps and Diagnosis

In the previous example, if there is an error in the fetch() API then it is hard to analyze the thread dumps because these are running in 3 different threads. Making a relationship between the information in 3 threads is very difficult because there is no relationship between these threads at the API level.

When the call stack defines the task-subtask hierarchy, such as in sequential method executions, we get the parent-child relationship, which flows into error propagation.

Ideally, the task relationship should reflect at the API level to control child threads’ execution and debug when necessary. This would allow a child to report a result or exception only to its parent — the unique task that owns all the subtasks — which, then, could implicitly cancel the remaining subtasks.

2. Introduction to Structured Concurrency

In structured multi-threaded code, if a task splits into concurrent subtasks, they all return to the same place i.e., the task’s code block. This way the lifetime of a concurrent subtask is confined to that syntactic block.

In this approach, subtasks work on behalf of a task that awaits their results and monitors them for failures. At run time, structured concurrency builds a tree-shaped hierarchy of tasks, with sibling subtasks being owned by the same parent task.

In structured concurrency, child threads are virtual threads controlled by JVM. Using StructuredTaskScope, JVM gives us handle to control the life cycle of the child threads.

This call tree can be viewed as the concurrent counterpart to the call stack of a single thread with multiple method calls.

3. Implementing Structured Concurrency with StructuredTaskScope

The StructuredTaskScope is the core API for structured concurrency that supports splitting a task into several concurrent sub-tasks to be executed in their own virtual threads, manage their lifecycles and ensure proper nesting of concurrent tasks.

The StructuredTaskScope enforces that the sub-tasks must be completed before the main task continues. It ensures that the lifetime of a concurrent operation is confined by a syntax block.

The StructuredTaskScope provides two static inner classes, both of which extend the StructuredTaskScope class. Based on our requirements, we instantiate one of these classes to begin with structured concurrency:

  • ShutdownOnFailure captures the first exception and shuts down the task scope. It helps in collecting the results of all tasks. If any subtask fails, then the results of other unfinished subtasks are no longer needed.
  • ShutdownOnSuccess captures the first result and shuts down the task scope to interrupt unfinished threads and wake up the main thread (parent task). It helps in collecting the result of any one task, whichever is completed first.

Let’s dig deeper into both policies.

4. ShutdownOnFailure: Wait for Completion of all Sub-tasks

Let us rewrite the previous example with StructuredTaskScope API. In this example, we create the StructuredTaskScope instance using the factory method ShutdownOnFailure(). This ensures all the tasks are shut down immediately if any of the tasks or even if the parent task fails.

  • We submit the tasks with scope.fork() instead of using the ExecutorService, which returns a Supplier instead of a Future object. Each fork() operation starts a new virtual thread and executes the submitted sub-task.
  • Next, scope.join() joins the tasks within that scope and waits tills all the tasks are completed.
  • Next, scope.throwIfFailed() propagates the specified exception if any child task fails.
  • Finally, scope.close() closes the task scope.
public Response getClientById(Long id) {

    System.out.println("Forking new threads...");

    try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {

      Supplier<AccountDetails> accountDetailsFuture = scope.fork(() -> getAccountDetails(id));
      Supplier<LinkedAccounts> linkedAccountsFuture = scope.fork(() -> fetchLinkedAccounts(id));
      Supplier<DemographicData> userDetailsFuture = scope.fork(() -> fetchUserDetails(id));

      System.out.println("Joining all threads...");
      scope.join();  // Join all subtasks
      scope.throwIfFailed(WebApplicationException::new);  //Handle error when any subtask fails

      System.out.println("Response is received from all workers...");
      //The subtasks have completed by now so process the result
      return new Response(accountDetailsFuture.get(),
          linkedAccountsFuture.get(),
          userDetailsFuture.get());
    } catch (InterruptedException e) {
      throw new RuntimeException(e);
    }
  }

  private DemographicData fetchUserDetails(Long id) throws InterruptedException {
    Thread.sleep(2000L);
    System.out.println("Retrieved DemographicData.");
    return new DemographicData();
  }

  private LinkedAccounts fetchLinkedAccounts(Long id) throws InterruptedException {
    Thread.sleep(3000L);
    System.out.println("Retrieved LinkedAccounts.");
    return new LinkedAccounts();
  }

  private AccountDetails getAccountDetails(Long id) throws InterruptedException {
    Thread.sleep(4000L);
    System.out.println("Retrieved AccountDetails.");
    return new AccountDetails();
  }
}

The program output:

Forking new threads...
Joining all threads...

Retrieved DemographicData.
Retrieved LinkedAccounts.
Retrieved AccountDetails.

Response is received from all workers...

The StructuredTaskScope class implements AutoCloseable interface so if we use the try-with-resources block then close() will be invoked automatically after the parent thread finishes execution.

This solution resolves all the problems with unstructured concurrency as noted down in traditional concurrency.

5. ShutdownOnSuccess: Use Result of the First Completed Task

A similar factor method StructuredTaskScope.ShutdownOnSuccess() captures the result of the first subtask to complete successfully, and then cancels all other subtasks.

It helps in situations where work can be done in multiple ways, and we need the fastest result and cancel other methods to avoid wasting resources unnecessarily.

Here is a StructuredTaskScope with a shutdown-on-success policy that returns the result of the first successful subtask:

<T> T runSubTasks(List<Callable<T>> tasks) throws Exception {
  try (var scope = new StructuredTaskScope.ShutdownOnSuccess<T>()) {
    for (var task : tasks) {
        scope.fork(task);
    }
    scope.join();
    return scope.result();
  }
}

Let us extend our previous example. We can get the user details from two possible sources, and whatever source returns the result faster, we use the result and move on. We discard the second thread altogether.

We have two methods representing the two sources fetchUserDetails and fetchUserDetailsNew. The second method always returns the result faster in our case, so even if the first method is invoked in a new thread, its execution is discarded immediately when the other thread completes.

public DemographicData getClientDetails(Long id) {

  System.out.println("Forking new threads...");

  try (var scope = new StructuredTaskScope.ShutdownOnSuccess<>()) {

    scope.fork(() -> fetchUserDetails(id));
    scope.fork(() -> fetchUserDetailsNew(id));

    scope.join();

    System.out.println("Response is received from a worker...");
    return (DemographicData) scope.result();
  } catch (Exception e) {
    throw new RuntimeException(e);
  }
}

private DemographicData fetchUserDetails(Long id) throws InterruptedException {
  Thread.sleep(2000L);
  System.out.println("Retrieved DemographicData.");
  return new DemographicData();
}

private DemographicData fetchUserDetailsNew(Long id) throws InterruptedException {
  Thread.sleep(1000L);
  System.out.println("Retrieved DemographicData from fetchUserDetailsNew.");
  return new DemographicData();
}

The program output:

Forking new threads...
Retrieved DemographicData from fetchUserDetailsNew.
Response is received from a worker...

6. Executing Sub-Tasks within Specified Time

In structured concurrency, we can specify a deadline within which all the tasks should be completed using the scope.joinUntil() method.

When specified a time duration using Instant, joinUntil() method waits until:

  • All threads started in the task scope have finished execution
  • The scope.shutdown() method is invoked
  • The current thread is interrupted
  • Or, the deadline is reached

In the following example, both subtasks must be finished within 5 seconds.

try (var scope = new StructuredTaskScope.ShutdownOnFailure()) {

    Supplier<String> subTask1 = scope.fork(() -> subTask(...));
    Supplier<String> subTask2 = scope.fork(() -> subTask(...));

    scope.joinUntil(Instant.now().plusSeconds(5)); 

    //process the results
}

7. Structured Concurrency and Virtual Threads

In structured concurrency, each call to scope.fork() starts a new thread to execute a subtask, which by default is a virtual thread.

Virtual threads are JVM-managed lightweight threads for writing high-throughput concurrent applications. As virtual threads are inexpensive compared to traditional OS threads, structured concurrency takes advantage of them for forking all new threads.

In addition to being plentiful, virtual threads are cheap enough to represent any concurrent unit of behavior, even behavior that involves I/O. Behind the scenes, the task-subtask relationship is maintained by associating each virtual thread with its unique owner, so it knows its hierarchy, similar to how a frame in the call stack knows its unique caller.

Note that all of the subtasks’ threads are guaranteed to have terminated once the scope is closed, and no thread is left behind when the block exits.

DO NOT reuse the virtual threads. Always create a new virtual thread when you need it.

8. Conclusion

When combined with virtual threads, the structured concurrency promises long-awaited and much-needed features to Java that are already present in other programming languages (e.g., goroutines in Go and processes in Erlang). It will help in writing more complex and concurrent applications with excellent reliability and fewer thread leaks.

Such applications will be easier to debug and profile when errors occur.

Happy Learning !!

Source Code on Github

Comments

Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments

About Us

HowToDoInJava provides tutorials and how-to guides on Java and related technologies.

It also shares the best practices, algorithms & solutions and frequently asked interview questions.

Our Blogs

REST API Tutorial

Dark Mode

Dark Mode