DIGIT CORE
Search…
Kafka Producer & Consumer

Producer

Producer classes help in pushing data from the application to kafka topics. For this, we have a custom implementation of KafkaTemplate class in our tracer library called CustomKafkaTemplate. This implementation of producer class does not change across services of DIGIT. Producer implementation can be viewed here - Producer Implementation
Now, for adding producer support in this guide, the following steps need to be followed -
i) Update tracer version in pom.xml to 2.0.0-SNAPSHOT
Under the kafka folder, open the Producer.java class and paste the following code:
package digit.kafka;
import lombok.extern.slf4j.Slf4j;
import org.egov.tracer.kafka.CustomKafkaTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Service
@Slf4j
public class Producer {
@Autowired
private CustomKafkaTemplate<String, Object> kafkaTemplate;
public void push(String topic, Object value) {
kafkaTemplate.send(topic, value);
}
}

Consumer

Sending SMS notifications to the customer:
Once an application is created/updated the data is pushed on Kafka topic. We trigger notification by consuming data from this topic. Whenever any message is consumed the service will call the localisation service to fetch the SMS template. It will then replace the placeholders in the SMS template with the values in the message it consumed(For example: It will replace the {NAME} placeholder with owner name from the data consumed). Once the SMS text is ready, the service will push this data on notification topic. SMS service consumes data from notification topic and triggers SMS.
For our guide, we will be implementing a notification consumer in the following section.
Once an application is created/requested or progresses further in the workflow, notifications can be triggered as each of these events are pushed onto kafka topics which can be listened on and a sms/email/in-app notification can be sent to the concerned user(s).
For our guide, we will be implementing a notification consumer which will listen onto the topic on which voter registration applications are created, create a customised message and send it to the notification service(sms/email) to be sent to the concerned users. Open kafka/NotificationConsumer.java and paste the following code:
package digit.kafka;
import com.fasterxml.jackson.databind.ObjectMapper;
import digit.service.NotificationService;
import digit.web.models.BirthRegistrationRequest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.util.HashMap;
@Component
@Slf4j
public class NotificationConsumer {
@Autowired
private ObjectMapper mapper;
@Autowired
private NotificationService notificationService;
@KafkaListener(topics = {"${egov.bt.registration.create.topic}"})
public void listen(final HashMap<String, Object> record, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
try {
BirthRegistrationRequest request = mapper.convertValue(record, BirthRegistrationRequest.class);
//log.info(request.toString());
notificationService.prepareEventAndSend(request);
} catch (final Exception e) {
log.error("Error while listening to value: " + record + " on topic: " + topic + ": ", e);
}
}
}
Create a POJO by the name of SMSRequest in the web.models package and add the following content into it:
package digit.web.models;
import lombok.*;
@Getter
@AllArgsConstructor
@NoArgsConstructor
@Builder
@ToString
public class SMSRequest {
private String mobileNumber;
private String message;
}
Next, to handle preparation of customised message and pushing the notification we will create a class by the name of NotificationService under service folder. Add the following content to it -
package digit.service;
import digit.config.BTRConfiguration;
import digit.kafka.Producer;
import digit.web.models.BirthRegistrationApplication;
import digit.web.models.BirthRegistrationRequest;
import digit.web.models.SMSRequest;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;
import java.util.ArrayList;
import java.util.List;
@Slf4j
@Service
public class NotificationService {
@Autowired
private Producer producer;
@Autowired
private BTRConfiguration config;
@Autowired
private RestTemplate restTemplate;
private static final String smsTemplate = "Dear {FATHER_NAME} and {MOTHER_NAME} your birth registration application has been successfully created on the system with application number - {APPNUMBER}.";
public void prepareEventAndSend(BirthRegistrationRequest request){
List<SMSRequest> smsRequestList = new ArrayList<>();
request.getBirthRegistrationApplications().forEach(application -> {
SMSRequest smsRequestForFather = SMSRequest.builder().mobileNumber(application.getFatherMobileNumber()).message(getCustomMessage(smsTemplate, application)).build();
SMSRequest smsRequestForMother = SMSRequest.builder().mobileNumber(application.getMotherMobileNumber()).message(getCustomMessage(smsTemplate, application)).build();
smsRequestList.add(smsRequestForFather);
smsRequestList.add(smsRequestForMother);
});
for (SMSRequest smsRequest : smsRequestList) {
producer.push(config.getSmsNotificationTopic(), smsRequest);
log.info("Messages: " + smsRequest.getMessage());
}
}
private String getCustomMessage(String template, BirthRegistrationApplication application) {
template = template.replace("{APPNUMBER}", application.getApplicationNumber());
template = template.replace("{FATHER_NAME}", application.getFather().getName());
template = template.replace("{MOTHER_NAME}", application.getMother().getName());
return template;
}
}
All content on this page by eGov Foundation is licensed under a Creative Commons Attribution 4.0 International License.
Copy link
On this page
Producer
Consumer