Spring Boot REST API for Long-Running Tasks

Learn to design and implement a Spring Boot REST API that accepts long-running tasks and publishes the execution progress to a Kafka topic.

A long-running task is an operation that requires a considerable amount of server resources and/or time. To avoid blocking the client, the task must be completed asynchronously without the persistent connection between the client and the server. After submitting the task, the client needs to poll to a provided URL for the task-execution progress.

1. Goals

In this tutorial, we will create REST APIs with Spring Boot that will perform the followings:

  • Accept the API request and return the 202 (Accepted) status code along with a URL to poll for task-execution status in the Location header. The URL is a Apache Kafka topic polling URL.
  • Publish the task execution status to the Kafka topic.
  • Poll the Kafka topic to get the current status of the task.

2. Designing REST API for Long-Running Tasks

We will create a REST API that accepts a long-running task in the POST URL:

HTTP POST: http://localhost:9000/tasks

{
    "name": "test-task"
}

It returns an empty response with the Location header containing the Kafka topic URL. A new Kafka topic is created programmatically for each submitted task so its result can be polled without affecting the polling for results of other tasks.

In the following response sample, eaa98908-0115-4cd7-8884-0b2155f7811e is a dynamically generated identifier and it is the topic name. The client can, itself, poll the Kafka topic or it can invoke the provided API URL that will fetch the task status.

HTTP 201
Location: http://localhost:9000/tasks/eaa98908-0115-4cd7-8884-0b2155f7811e/progress

When the client invokes the API, it polls the Kafka topic and returns the current status of the long-running task. A sample response body can be:

HTTP GET: http://localhost:9000/tasks/eaa98908-0115-4cd7-8884-0b2155f7811e/progress

{
  "taskId": "eaa98908-0115-4cd7-8884-0b2155f7811e",
  "taskName": "test-task",
  "percentageComplete": 100.0,
  "status": "FINISHED",
}

Once the task is finished, it can either provide the task execution result in the same response, or it can provide another URL that will return the task execution result.

HTTP GET: http://localhost:9000/tasks/eaa98908-0115-4cd7-8884-0b2155f7811e/progress

{
  "taskId": "eaa98908-0115-4cd7-8884-0b2155f7811e",
  "taskName": "test-task",
  "percentageComplete": 100.0,
  "status": "FINISHED",
  "resultUrl": "http://localhost:9000/tasks/eaa98908-0115-4cd7-8884-0b2155f7811e/result"
}

HTTP GET: http://localhost:9000/tasks/eaa98908-0115-4cd7-8884-0b2155f7811e/result

{
	"taskId": "eaa98908-0115-4cd7-8884-0b2155f7811e",
  "taskName": "test-task",
  "status": "FINISHED",
	"sys-log-location":"/log/....",
	"err-log-location":"/log/...."
}

3. Implementation of Long-running REST API with Spring Boot

Let us implement the above-discussed API design in a Spring boot application.

3.1. Model

The models consist of three classes: TaskRequest, TaskResponse and TaskStatus. We can add/modify the fields according to application requirements.

public class TaskRequest {

  private String name;
}

public class TaskResponse implements Serializable {

  private String taskId;
  private String name;
}

public class TaskStatus implements Serializable {

  private String taskId;
  private String taskName;
  private float percentageComplete;
  private Status status;
  private String resultUrl;

  public enum Status {
    SUBMITTED, STARTED, RUNNING, FINISHED, TERMINATED
  }
}

3.2. Accepting a New Task

We are writing a very basic RESt API that accepts the task request and returns the progress URL in Location header.

@Autowired
TaskService taskService;

@PostMapping
public ResponseEntity<TaskResponse> processAsync(@RequestBody TaskRequest taskRequest, UriComponentsBuilder b) {

  String taskId = UUID.randomUUID().toString();
  UriComponents progressURL = b.path("/tasks/{id}/progress").buildAndExpand(taskId);
  taskService.process(taskId, taskRequest, b);
  return ResponseEntity.accepted().location(progressURL.toUri()).build();
}

The TaskService uses a @Async method to process the task asynchronously. It first creates a new Kafka topic so it can publish the progress. Then it processes the task and regularly posts the task progress status to the Topic. We are using Thread.currentThread().sleep() to simulate the task processing time.

Also, notice that we are using the KafkaAdmin to create new topics. After we create a new Kafka topic, KafkaConsumer.subscribe() to subscribe to the topic so the application can later poll to the topic.

@Log
@Service
public class TaskService {

  @Autowired
  KafkaConsumer<String, TaskStatus> kafkaConsumer;

  @Autowired
  KafkaProducerService kafkaProducerService;

  @Autowired
  KafkaAdmin kafkaAdmin;

  final static Logger LOGGER = LoggerFactory.getLogger(TaskService.class);

  @Async
  public void process(String taskId, TaskRequest taskRequest, UriComponentsBuilder b) {

    try {
      createNewTopic(taskId);

      updateTaskExecutionProgess(new TaskStatus(taskId, taskRequest.getName(), 0.0f, Status.SUBMITTED, null));

      Thread.currentThread().sleep(2000l);
      updateTaskExecutionProgess(new TaskStatus(taskId, taskRequest.getName(), 10.0f, Status.STARTED, null));

      Thread.currentThread().sleep(5000l);
      updateTaskExecutionProgess(new TaskStatus(taskId, taskRequest.getName(), 50.0f, Status.RUNNING, null));

      UriComponents resultURL = b.path("/tasks/{id}/result").buildAndExpand(taskId);

      Thread.currentThread().sleep(5000l);
      updateTaskExecutionProgess(new TaskStatus(taskId, taskRequest.getName(), 100.0f, Status.FINISHED, resultURL.toUriString()));

    } catch (InterruptedException | ExecutionException e) {
      updateTaskExecutionProgess(new TaskStatus(taskId, taskRequest.getName(), 100.0f, Status.TERMINATED, null));
      throw new RuntimeException(e);
    }
  }

  private void createNewTopic(String topicName) throws ExecutionException, InterruptedException {

    Map<String, String> topicConfig = new HashMap<>();
    topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(24 * 60 * 60 * 1000)); // 24 hours retention

    NewTopic newTopic = new NewTopic(topicName, 1, (short) 1).configs(topicConfig);
    try (AdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {
      adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
    }
    kafkaConsumer.subscribe(Collections.singletonList(topicName));
  }

  void updateTaskExecutionProgess(TaskStatus taskStatus) {
    kafkaProducerService.send(taskStatus.getTaskId(), taskStatus.getTaskId(), taskStatus);
  }
}

3.3. Publishing the Task Execution Progress

Notice that we are using the KafkaProducerService.send() to send the progress status to the topic. It internally uses the default configured KafkaTemplate to publish new messages.

@Service
@Log
public class KafkaProducerService {

  private final Logger LOGGER = LoggerFactory.getLogger(KafkaProducerService.class);

  @Autowired
  KafkaTemplate<String, TaskStatus> kafkaTemplate;

  public void send(String topicName, String key, TaskStatus value) {

    var future = kafkaTemplate.send(topicName, key, value);

    future.whenComplete((sendResult, exception) -> {
      if (exception != null) {
        future.completeExceptionally(exception);
      } else {
        future.complete(sendResult);
      }
      LOGGER.info("Task status send to Kafka topic : "+ value);
    });
  }
}

3.3. Polling the Task Execution Status

To get the current status of the long-running task, we can invoke the API URL in the Location header. It uses the KafkaConsumerService.getLatestTaskStatus() method to get the latest status published to the topic.

@Autowired
KafkaConsumerService kafkaConsumerService;

@GetMapping("{taskId}/progress")
public ResponseEntity<?> processAsync(@PathVariable String taskId) {

	TaskStatus taskStatus = kafkaConsumerService.getLatestTaskStatus(taskId);
	if (taskStatus == null) {
	  return ResponseEntity.noContent().build();
	}
	return ResponseEntity.ok().body(taskStatus);
}

The KafkaConsumerService uses the KafkaConsumer bean and polls the last message published to the topic.

@Service
@Log
public class KafkaConsumerService {

  private final Logger LOGGER = LoggerFactory.getLogger(KafkaConsumerService.class);

  @Autowired
  KafkaConsumer<String, TaskStatus> kafkaConsumer;

  public TaskStatus getLatestTaskStatus(String taskId) {

    ConsumerRecord<String, TaskStatus> latestUpdate = null;
    ConsumerRecords<String, TaskStatus> consumerRecords = kafkaConsumer.poll(Duration.ofMillis(1000));
    if (!consumerRecords.isEmpty()) {
      Iterator itr = consumerRecords.records(taskId).iterator();
      while(itr.hasNext()) {
        latestUpdate = (ConsumerRecord<String, TaskStatus>) itr.next();
      }
      LOGGER.info("Latest updated status : "+ latestUpdate.value());
    }
    return latestUpdate != null ? latestUpdate.value() : null;
  }
}

3.5. Configuration

We are using the TaskStatus class for publishing the message body. Kafka, by default, supports String keys and values. To support custom message types, we need to define beans of the type ProducerFactory and ConsumerFactory as follows. It uses the JsonDeserializer and JsonSerializer for serializing and deserializing the TaskStatus instances.

@Configuration
public class KafkaConfig {

  @Autowired KafkaProperties kafkaProperties;

  @Bean
  public KafkaAdmin kafkaAdmin() {
    KafkaAdmin kafkaAdmin = new KafkaAdmin(kafkaProperties.buildAdminProperties());
    kafkaAdmin.setFatalIfBrokerNotAvailable(kafkaProperties.getAdmin().isFailFast());
    return kafkaAdmin;
  }

  @Bean
  public ConsumerFactory<String, TaskStatus> consumerFactory() {

    Map<String, Object> configProps = new HashMap<>();
    configProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    configProps.put(JsonDeserializer.TRUSTED_PACKAGES, "*");
    configProps.put(ConsumerConfig.GROUP_ID_CONFIG, "task-group");
    configProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    configProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
    configProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    return new DefaultKafkaConsumerFactory<>(configProps);
  }

  @Bean
  public KafkaConsumer<String, TaskStatus> kafkaConsumer() {
    return (KafkaConsumer<String, TaskStatus>) consumerFactory().createConsumer();
  }

  @Bean
  public ProducerFactory<String, TaskStatus> producerFactory() {

    Map<String, Object> props = new HashMap<>();
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
    return new DefaultKafkaProducerFactory<>(props);
  }

  @Bean
  public KafkaTemplate<String, TaskStatus> kafkaTemplate() {
    var kafkaTemplate = new KafkaTemplate<>(producerFactory());
    kafkaTemplate.setConsumerFactory(consumerFactory());
    return kafkaTemplate;
  }
}

4. Demo

Let us submit a new task using the POST API http://localhost:9000/tasks

Submit a new long running task

We can hit the API after some time to check the final status of the task.

5. Conclusion

In this Spring boot tutorial, we learned to create a REST API that accepts long-running tasks and publishes the execution progress/status in a Kafka topic. We also learned to configure Kafka beans to support the publishing and polling of messages containing the task progress.

Happy Learning !!

Source Code on Github

Comments

Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments

About Us

HowToDoInJava provides tutorials and how-to guides on Java and related technologies.

It also shares the best practices, algorithms & solutions and frequently asked interview questions.