My DSS dashboard is not reflecting accurate numbers? What can I do?

How to identify indexing problems

Indexing issues can be identified by tallying the data in postgres database and in the ES. If there is a mismatch between the output there might be issues in indexing. To debug indexing issues, indexer service logs should be checked. The first step is to check if the record is getting consumed by the indexer service, if not the topic name in the indexer service should be checked. If the record is getting consumed then the logs should be checked. Errors might occur due to mismatching data types between the value in the record and in the index mapping(type of field defined in the mapping). Another source of error might be when indexer service calls other microservices like location. MDMS, HRMS etc. for enriching the data. Error might be thrown by these microservices which may result in data not getting indexed.

When to do reindexing

Reindexing is mostly done in two scenarios. The first is when the data is mismatching between RDBMS and the ES. In this case the data is reindexed into a new index and the old index is dropped. Using alias the new index is pointed to the same old index name. The second scenario is when the index structure needs to be changed. In this case the whole data needs to be reindexed using the new indexer configuration, once the reindexing is successful, the old index can be dropped and the new index can be pointed to the old index name using alias.

Payment Reindexing(Legacy Index):

Payment data is generated by the collection service and stored in the PostgreSQL database. To reindex data from postgres database, the legacy index API should be called. Once this API is called indexer service will call the _plainsearch API of collection service in loop until it fetches all the records. The indexer service will transform and enrich each record and push it on a kafka topic: dss-collection-update (which is configurable in application.properties). From this kafka topic dss-ingest consumes the record and enriches it further. Once dss-ingest enriches the record it will push the record to either kafka topic or directly to ES based on a flag called es.push.direct

If this flag is set to true dss-ingest will push directly to the ES else it will push the data to kafka topic called: egov-dss-ingest-enriched. To put data from this topic to ES, a kafka connector should be created. Steps to create kafka connector are mentioned in following section and exact cURL can be found in reference documents

Aliasing:

Suppose you had an index for property records by the name property-services. Upon triggering re-indexing, a new index was created by the name of property-services-enriched. You want to drop the original index and want all queries made to property-services index to internally refer to the newly created index. This is where the concept of aliasing comes into play. For creating an alias, the following curl needs to be executed -

POST /_aliases 
{
  "actions": [
    {
      "add": {
        "index": "property-services-enriched",
        "alias": "property-services"
      }
    }
  ]
}

For live indexing data, a configuration file should be created and added in configuration repo on GitHub. The path of the file should be added in the environment yaml file. The variable in which it has to be added is egov-indexer-yaml-repo-path. Once the configuration is added and the path is added in environment yaml, the indexer service should be restarted(redeployed) with config flag checked. This will restart the indexer service with the new configuration. Once the indexer is up and running, whenever a new event is generated by the service, the event will be consumed by the indexer service. The indexer service will transform and enrich the record based on the defined configuration. After that the indexer service will insert the data into ES.

Legacy Indexing:

Legacy index is the process of recreating the ES index from the postgres database. Indexer service does by fetching all the records from the particular service using a _plainsearch API. (The API url is part of request, but we generally expose an API called _plainsearch which is specifically used only for reindexing). The request body is as follows:

{
   "RequestInfo": {
       "apiId": "string",
       "ver": "string",
       "ts": null,
       "action": "string",
       "did": "string",
       "key": "string",
       "msgId": "string",
       "authToken": "ca3256e3-5318-47b1-8a68-ffcf2228fe35",
       "correlationId": "e721639b-c095-40b3-86e2-acecb2cb6efb",
       "userInfo": {
           "id": 23299,
           "uuid": "e721639b-c095-40b3-86e2-acecb2cb6efb",
           "userName": "xyz",
           "name": " Test user",
           "type": "EMPLOYEE",
           "mobileNumber": "9999999999",
           "emailId": "abc.xyz@gmail.com",
           "roles": [
               {
                   "id": 281,
                   "name": "SUPERUSER"
               }
           ]
       }
   },
   "apiDetails": {
       "uri": "http://fsm.egov:8080/fsm/v1/_plainsearch",
       "tenantIdForOpenSearch": "pb",
       "paginationDetails": {
           "offsetKey": "offset",
           "sizeKey": "limit",
           "maxPageSize": 200
       },
       "responseJsonPath": "$.fsm"
   },
   "legacyIndexTopic": "fsm-application-legacyindex",
   "tenantId": "pb.amritsar"
}

The requestInfo object is common for all requests. The apiDetails object contains the detail of the API which the indexer service will call to fetch the records. Following is a table describing the variables.

After fetching the records in batches, the indexer service will transform and enrich each batch and push the batch of records on a topic given against the key legacyIndexTopic. To insert the data from this kafka topic, a kafka connector has to be created.

Kafka Connector:

Kafka connector makes it easy to stream from numerous sources into Kafka and from Kafka into various sinks. Across DIGIT we use kafka connectors mainly for pushing data into the ElasticSearch sink.

For performance improvement in indexer service reindexing jobs, kafka-connect is getting used to do part of pushing records from kafka-topic to elastic search. The creation of reindexing jobs will be through indexer service only as earlier, but the portion where data is pushed to elastic search would be handled through kafka-connect and not through indexer as it was before. So for reindexing, kafka connect should be run after initiating a reindexing job through indexer service.

Following is the cURL for creating kafka connector with ElasticSearch as its sink -

curl -X POST \
  http://kafka-connect.kafka-cluster:8083/connectors/ \
H 'Content-Type: application/json' \
H 'Cookie: SESSIONID=f1349448-761e-4ebc-a8bb-f6799e756185' \
H 'Postman-Token: adabf0e8-0599-4ac9-a591-920586ff4d50' \
H 'cache-control: no-cache' \
d '{
  "name": "{{connector-uniquename}}",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "connection.url": "http://elasticsearch-data-v1.es-cluster:9200",
    "type.name": "general",
    "topics": "{{kafka-topic}}",
    "key.ignore": "false",
    "schema.ignore": true,
    "value.converter.schemas.enable": false,
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "transforms": "TopicNameRouter",
    "transforms.TopicNameRouter.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.TopicNameRouter.regex": ".*",
    "transforms.TopicNameRouter.replacement": "{{elastic-search-index}}",
    "batch.size": 10,
    "max.buffered.records": 500,
    "flush.timeout.ms": 600000,
    "retry.backoff.ms": 5000,
    "read.timout.ms": 10000,
    "linger.ms": 100,
    "max.in.flight.requests": 2,
    "errors.log.enable": true,
    "errors.deadletterqueue.topic.name": "{{kafka-topic}}-es-failed",
    "tasks.max": 1
  }
}'

Reference Docs:

All content on this page by eGov Foundation is licensed under a Creative Commons Attribution 4.0 International License.