Using Spring KafkaAdmin to Create New Topics Programmatically

The KafkaAdmin class simplifies the administrative tasks for creating, deleting, and inspecting Kafka topics in a Spring application. KafkaAdmin is very useful in scenarios where topics need to be created dynamically based on certain conditions or events. For example, creating a new topic for posting the status of a long-running task asynchronously.

1. Maven

The KafkaAdmin is part of spring-kafka module so start with adding the latest version, if not included already.

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>3.0.7</version> <!-- version is optional in a spring boot project-->
</dependency>

2. Initializing KafkaAdmin Bean

By default, Spring boot’s KafkaAutoConfiguration class creates a KafkaAdmin bean, provided the necessary dependency and configuration are already present.

spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=default-task-Group

We can inject the KafkaAdmin bean into other components or use it to perform administrative tasks in the Kafka cluster.

If we want to customize the KafkaAdmin bean’s configuration, we can create a custom KafkaAdmin bean explicitly in the application’s configuration class. If KafkaAdmin bean is defined, the default KafkaAdmin bean will not be created, and the custom bean definition will take precedence.

@Configuration
public class KafkaConfig {

	@Autowired KafkaProperties kafkaProperties;

	@Bean
	public KafkaAdmin kafkaAdmin() {

	  KafkaAdmin kafkaAdmin = new KafkaAdmin(kafkaProperties.buildAdminProperties());
	  kafkaAdmin.setFatalIfBrokerNotAvailable(kafkaProperties.getAdmin().isFailFast());
	  return kafkaAdmin;
	}
}

3. Creating a New Topic using KafkaAdmin

Suppose there is an API to process long-running tasks. We want to provide the status of the task in a new Kafka topic created specific to the task, rather than publishing the status in a general topic.

In the following example, we are using kafkaAdmin to get the AdminClient instance and then calling adminClient.createTopics(NewTopic) to create the new topic. The topic name can be taskId so the client application can poll on that specific topic to retrieve the messages posted specifically for that task only.

@Service
public class LongRunningTaskService {

  @Autowired
  KafkaConsumer<String, TaskStatus> kafkaConsumer;

  @Autowired
  KafkaAdmin kafkaAdmin;

  private void createNewTopic(String topicName) throws ExecutionException, InterruptedException {

    Map<String, String> topicConfig = new HashMap<>();
    topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(24 * 60 * 60 * 1000)); // 24 hours retention

    NewTopic newTopic = new NewTopic(topicName, 1, (short) 1).configs(topicConfig);

    try (AdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {
    	//Blocking call to make sure topic is created
        adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
    }

    //We can use this consumer to manually retrive messages when requested by the client application
    kafkaConsumer.subscribe(Collections.singletonList(topicName));
  }

  //...
}

4. Demo

In the following example, we created a REST API to accept the request for long-running tasks. Another API is created for fetching the latest status of the currently executing task using its task id. Note that the client can also subscribe to the topic and receive task status updates.

@RestController
@RequestMapping("/tasks")
public class TaskController {

	@Autowired
	TaskService taskService;

	@Autowired
	KafkaConsumerService kafkaConsumerService;

	@PostMapping
	public ResponseEntity<TaskResponse> processAsync(@RequestBody TaskRequest taskRequest, UriComponentsBuilder b) {

		String taskId = UUID.randomUUID().toString();
		UriComponents progressURL = b.path("/tasks/{id}/progress").buildAndExpand(taskId);
		TaskResponse task = new TaskResponse(taskId, taskRequest.getName(), progressURL.toUriString());
		taskService.process(taskId, taskRequest);
		return ResponseEntity.accepted().body(task);
	}

	@GetMapping("{taskId}/progress")
	public ResponseEntity<?> processAsync(@PathVariable String taskId) {

		TaskStatus taskStatus = kafkaConsumerService.getLatestTaskStatus(taskId);
		if(taskStatus == null) {
			return ResponseEntity.noContent().build();
		}
		return ResponseEntity.ok().body(taskStatus);
	}
}

The TaskService uses KafkaAdmin to create a new topic and post updates while processing the task.

@Log
@Service
public class TaskService {

  final static Logger LOGGER = LoggerFactory.getLogger(TaskService.class);

  @Autowired
  KafkaConsumer<String, TaskStatus> kafkaConsumer;

  @Autowired
  KafkaProducerService kafkaProducerService;

  @Autowired
  KafkaAdmin kafkaAdmin;

  @Async
  public void process(String taskId, TaskRequest taskRequest) {

    try {
      createNewTopic(taskId);
      updateTaskExecutionProgess(new TaskStatus(taskId, taskRequest.getName(), 0.0f, Status.SUBMITTED));

      Thread.currentThread().sleep(2000l); //Simulating processing delay
      updateTaskExecutionProgess(new TaskStatus(taskId, taskRequest.getName(), 10.0f, Status.STARTED));

      Thread.currentThread().sleep(5000l); //Simulating processing delay
      updateTaskExecutionProgess(new TaskStatus(taskId, taskRequest.getName(), 50.0f, Status.RUNNING));

      Thread.currentThread().sleep(10000l); //Simulating processing delay
      updateTaskExecutionProgess(new TaskStatus(taskId, taskRequest.getName(), 100.0f, Status.FINISHED));

    } catch (InterruptedException | ExecutionException e) {
      updateTaskExecutionProgess(new TaskStatus(taskId, taskRequest.getName(), 100.0f, Status.TERMINATED));
      throw new RuntimeException(e);
    }
  }

  private void createNewTopic(String topicName) throws ExecutionException, InterruptedException {

    Map<String, String> topicConfig = new HashMap<>();
    topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(24 * 60 * 60 * 1000)); // 24 hours retention
    NewTopic newTopic = new NewTopic(topicName, 1, (short) 1).configs(topicConfig);
    try (AdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {
      adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
    }
    kafkaConsumer.subscribe(Collections.singletonList(topicName));
  }

  void updateTaskExecutionProgess(TaskStatus taskStatus) {
    kafkaProducerService.send(taskStatus.getTaskId(), taskStatus.getTaskId(), taskStatus);
  }
}

Let us test the code by submitting a new task.

Now get the status of the task after a delay.

5. Conclusion

In this Apache Kafka tutorial, we learned the basics of KafkaAdmin. We also learned how to configure KafkaAdmin with custom properties, and to create topics in the Kafka cluster. We also created a demo to create two APIs that can accept a long-running task request, and then share the status of tasks in the newly created Kafka topic.

Happy Learning !!

Source Code on Github

Comments

Subscribe
Notify of
guest
0 Comments
Inline Feedbacks
View all comments

About Us

HowToDoInJava provides tutorials and how-to guides on Java and related technologies.

It also shares the best practices, algorithms & solutions and frequently asked interview questions.