The KafkaAdmin class simplifies the administrative tasks for creating, deleting, and inspecting Kafka topics in a Spring application. KafkaAdmin is very useful in scenarios where topics need to be created dynamically based on certain conditions or events. For example, creating a new topic for posting the status of a long-running task asynchronously.
1. Maven
The KafkaAdmin is part of spring-kafka module so start with adding the latest version, if not included already.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>3.0.7</version> <!-- version is optional in a spring boot project-->
</dependency>
2. Initializing KafkaAdmin Bean
By default, Spring boot’s KafkaAutoConfiguration class creates a KafkaAdmin bean, provided the necessary dependency and configuration are already present.
spring.kafka.bootstrap-servers=localhost:9092
spring.kafka.consumer.group-id=default-task-Group
We can inject the KafkaAdmin bean into other components or use it to perform administrative tasks in the Kafka cluster.
If we want to customize the KafkaAdmin bean’s configuration, we can create a custom KafkaAdmin bean explicitly in the application’s configuration class. If KafkaAdmin bean is defined, the default KafkaAdmin bean will not be created, and the custom bean definition will take precedence.
@Configuration
public class KafkaConfig {
@Autowired KafkaProperties kafkaProperties;
@Bean
public KafkaAdmin kafkaAdmin() {
KafkaAdmin kafkaAdmin = new KafkaAdmin(kafkaProperties.buildAdminProperties());
kafkaAdmin.setFatalIfBrokerNotAvailable(kafkaProperties.getAdmin().isFailFast());
return kafkaAdmin;
}
}
3. Creating a New Topic using KafkaAdmin
Suppose there is an API to process long-running tasks. We want to provide the status of the task in a new Kafka topic created specific to the task, rather than publishing the status in a general topic.
In the following example, we are using kafkaAdmin to get the AdminClient instance and then calling adminClient.createTopics(NewTopic) to create the new topic. The topic name can be taskId so the client application can poll on that specific topic to retrieve the messages posted specifically for that task only.
@Service
public class LongRunningTaskService {
@Autowired
KafkaConsumer<String, TaskStatus> kafkaConsumer;
@Autowired
KafkaAdmin kafkaAdmin;
private void createNewTopic(String topicName) throws ExecutionException, InterruptedException {
Map<String, String> topicConfig = new HashMap<>();
topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(24 * 60 * 60 * 1000)); // 24 hours retention
NewTopic newTopic = new NewTopic(topicName, 1, (short) 1).configs(topicConfig);
try (AdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {
//Blocking call to make sure topic is created
adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
}
//We can use this consumer to manually retrive messages when requested by the client application
kafkaConsumer.subscribe(Collections.singletonList(topicName));
}
//...
}
4. Demo
In the following example, we created a REST API to accept the request for long-running tasks. Another API is created for fetching the latest status of the currently executing task using its task id. Note that the client can also subscribe to the topic and receive task status updates.
@RestController
@RequestMapping("/tasks")
public class TaskController {
@Autowired
TaskService taskService;
@Autowired
KafkaConsumerService kafkaConsumerService;
@PostMapping
public ResponseEntity<TaskResponse> processAsync(@RequestBody TaskRequest taskRequest, UriComponentsBuilder b) {
String taskId = UUID.randomUUID().toString();
UriComponents progressURL = b.path("/tasks/{id}/progress").buildAndExpand(taskId);
TaskResponse task = new TaskResponse(taskId, taskRequest.getName(), progressURL.toUriString());
taskService.process(taskId, taskRequest);
return ResponseEntity.accepted().body(task);
}
@GetMapping("{taskId}/progress")
public ResponseEntity<?> processAsync(@PathVariable String taskId) {
TaskStatus taskStatus = kafkaConsumerService.getLatestTaskStatus(taskId);
if(taskStatus == null) {
return ResponseEntity.noContent().build();
}
return ResponseEntity.ok().body(taskStatus);
}
}
The TaskService uses KafkaAdmin to create a new topic and post updates while processing the task.
@Log
@Service
public class TaskService {
final static Logger LOGGER = LoggerFactory.getLogger(TaskService.class);
@Autowired
KafkaConsumer<String, TaskStatus> kafkaConsumer;
@Autowired
KafkaProducerService kafkaProducerService;
@Autowired
KafkaAdmin kafkaAdmin;
@Async
public void process(String taskId, TaskRequest taskRequest) {
try {
createNewTopic(taskId);
updateTaskExecutionProgess(new TaskStatus(taskId, taskRequest.getName(), 0.0f, Status.SUBMITTED));
Thread.currentThread().sleep(2000l); //Simulating processing delay
updateTaskExecutionProgess(new TaskStatus(taskId, taskRequest.getName(), 10.0f, Status.STARTED));
Thread.currentThread().sleep(5000l); //Simulating processing delay
updateTaskExecutionProgess(new TaskStatus(taskId, taskRequest.getName(), 50.0f, Status.RUNNING));
Thread.currentThread().sleep(10000l); //Simulating processing delay
updateTaskExecutionProgess(new TaskStatus(taskId, taskRequest.getName(), 100.0f, Status.FINISHED));
} catch (InterruptedException | ExecutionException e) {
updateTaskExecutionProgess(new TaskStatus(taskId, taskRequest.getName(), 100.0f, Status.TERMINATED));
throw new RuntimeException(e);
}
}
private void createNewTopic(String topicName) throws ExecutionException, InterruptedException {
Map<String, String> topicConfig = new HashMap<>();
topicConfig.put(TopicConfig.RETENTION_MS_CONFIG, String.valueOf(24 * 60 * 60 * 1000)); // 24 hours retention
NewTopic newTopic = new NewTopic(topicName, 1, (short) 1).configs(topicConfig);
try (AdminClient adminClient = AdminClient.create(kafkaAdmin.getConfigurationProperties())) {
adminClient.createTopics(Collections.singletonList(newTopic)).all().get();
}
kafkaConsumer.subscribe(Collections.singletonList(topicName));
}
void updateTaskExecutionProgess(TaskStatus taskStatus) {
kafkaProducerService.send(taskStatus.getTaskId(), taskStatus.getTaskId(), taskStatus);
}
}
Let us test the code by submitting a new task.
Now get the status of the task after a delay.
5. Conclusion
In this Apache Kafka tutorial, we learned the basics of KafkaAdmin. We also learned how to configure KafkaAdmin with custom properties, and to create topics in the Kafka cluster. We also created a demo to create two APIs that can accept a long-running task request, and then share the status of tasks in the newly created Kafka topic.
Happy Learning !!
Comments