Tuesday, 15 July 2025

Ingesting Data into an Apache Pinot Real-Time Table from Kafka

Apache Pinot is a high-performance distributed OLAP store designed for real-time analytics. When working with streaming data, we can ingest records into a real-time table, which continuously pulls data from sources like Apache Kafka.

 

In this guide, we will set up a real-time table in Apache Pinot that ingests log data from a Kafka topic running on our local computer. We will cover key concepts like schema definition, table configuration, and setting up Kafka as a data source before testing the ingestion and querying the data.

 

1. Steup Kafka

Step 1: Go to the following link and setup Kafka.

https://kafka.apache.org/downloads

 

At the time of writing this post, 4.0.0 is the latest version.

 

Step 2: Generate a Cluster UUID by executing following statement.

KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"

 

$KAFKA_CLUSTER_ID="$(bin/kafka-storage.sh random-uuid)"
$
$echo $KAFKA_CLUSTER_ID
ImA96y2CTJGN-HzF3GSFWg

Step 3: Format log directories.

bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties

$bin/kafka-storage.sh format --standalone -t $KAFKA_CLUSTER_ID -c config/server.properties
Formatting dynamic metadata voter directory /tmp/kraft-combined-logs with metadata.version 4.0-IV3.

 

Step 4: Start the Kafka Server

bin/kafka-server-start.sh config/server.properties

Once the Kafka server has successfully launched, you will have a basic Kafka environment running and ready to use.

 

Step 5: Create a topic to store the application log events by executing following command.

bin/kafka-topics.sh --create --topic app-log-events --bootstrap-server localhost:9092

$ bin/kafka-topics.sh --create --topic app-log-events --bootstrap-server localhost:9092
Created topic app-log-events.

 

Let’s see the topic details by executing below command.

bin/kafka-topics.sh --describe --topic app-log-events  --bootstrap-server localhost:9092

$ bin/kafka-topics.sh --describe --topic app-log-events  --bootstrap-server localhost:9092
Topic: app-log-events TopicId: EURuBJt7S-iC5ebwaERc_A PartitionCount: 1 ReplicationFactor: 1  Configs: segment.bytes=1073741824
  Topic: app-log-events Partition: 0  Leader: 1 Replicas: 1 Isr: 1  Elr:  LastKnownElr:

Step 6: Write some application logs to the Kafka topic by executing following statement.  

 

echo '{"log_level":"INFO", "service_name":"AuthService", "host":"host1", "response_time":200, "log_timestamp":1714567890123}' | \
bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic app-log-events

$echo '{"log_level":"INFO", "service_name":"AuthService", "host":"host1", "response_time":200, "log_timestamp":1714567890123}' | \
> bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic app-log-events

Step 7: Read the events by executing following command.

bin/kafka-console-consumer.sh --topic app-log-events --from-beginning --bootstrap-server localhost:9092

$bin/kafka-console-consumer.sh --topic app-log-events --from-beginning --bootstrap-server localhost:9092
{"log_level":"INFO", "service_name":"AuthService", "host":"host1", "response_time":200, "log_timestamp":1714567890123}

2. Create schema, table in Apache Pinot to process the events

Open the url ‘http://localhost:9000’ in browser.

 


Click on the link ‘Swagger REST API’ available at let navigation bar, it open Pinot Swagger user interface.

 

2.1 Define Schema

Let’s define a schema file to handle application logs.

 

schema.json 

{
  "schemaName": "application_logs",
  "enableColumnBasedNullHandling": false,
  "dimensionFieldSpecs": [
    { "name": "log_level", "dataType": "STRING", "fieldType": "DIMENSION" },
    { "name": "service_name", "dataType": "STRING", "fieldType": "DIMENSION" },
    { "name": "host", "dataType": "STRING", "fieldType": "DIMENSION" }
  ],
  "metricFieldSpecs": [
    { "name": "response_time", "dataType": "DOUBLE", "fieldType": "METRIC" }
  ],
  "dateTimeFieldSpecs": [
    { "name": "log_timestamp", "dataType": "TIMESTAMP", "fieldType": "DATE_TIME", "format": "TIMESTAMP", "granularity": "1:MILLISECONDS" }
  ]
}

 

Let’s understand what is in the schema.

 

a. schemaName: " application_logs"

This names the schema "orders", meaning it's defining the structure for order data.

 

b. enableColumnBasedNullHandling: false

When "enableColumnBasedNullHandling" is set to false, Pinot does not explicitly track null values. Instead, it follows the default transformation behavior, where:

 

·      Numeric columns (e.g., INT, DOUBLE): Default to 0

·      String columns: Default to "null" (as a string, not an actual null)

·      Boolean columns: Default to false

·      Timestamp columns: Default to 0 (epoch time)

 

This behavior is necessary to ensure that Pinot can efficiently build inverted indexes, forward indexes, and dictionary encodings without needing extra null markers.

 

When enableColumnBasedNullHandling is set to true, then Pinot

Allows columns to explicitly define their null handling and supports two approaches:

·      Default null replacement values (same as above but configurable per-column)

 

{

  "name": "total_amount",

  "dataType": "DOUBLE",

  "defaultNullValue": -1.0  // Custom default for nulls

}

 

When you want to distinguish between "real zeros" and "nulls" (e.g., 0.0 = actual zero, -1.0 = null).

 

·      Native null support (stores actual nulls): Pinot tracks null values explicitly instead of replacing them. Queries can use IS NULL and IS NOT NULL conditions to filter records. This requires additional storage and memory overhead to maintain null bitmaps.

 

c. Field Types

c.1. Dimension Fields (DIMENSION)

These are like the "categories" or "labels" in your data:

 

·      log_level: A string field (text) that might contain values like "INFO", "WARN", "ERROR"

·      service_name: A string field for the name of your service/application

·      host: A string field for the server name where the log came from

 

Think of dimensions as the ways you might want to group or filter your data (e.g., "show me all ERROR logs from service X").

 

c.2. Metric Field (METRIC)

This is a numerical value you might want to measure:

 

·      response_time: A double-precision number (can have decimals) that records how long something took

 

Metrics are the numbers you'll likely want to calculate averages, sums, or other statistics on.

 

c.3. DateTime Field (DATE_TIME)

This is for timestamp data:

 

·      log_timestamp: Records when the log entry was created with millisecond precision (very exact timing)

 

Onboard this Schema to Pinot

Go to swagger, and execute the API ‘POST /schemas’ with above payload to onboard "application_logs" schema.

 


How to visualize the schema?

Navigate to Pinot Home page.

 


Click on TABLES section.

 


You can see application_logs schema available under SCHEMAS section.

Click on application_logs schema, you can see the schema json. 


 

2.2 Create application_logs table

 

table.json

{
  "tableName": "application_logs",
  "tableType": "REALTIME",
  "segmentsConfig": {
    "timeColumnName": "log_timestamp",
    "timeType": "MILLISECONDS",
    "replication": "1",
    "schemaName": "application_logs",
    "segmentPushType": "APPEND"
  },
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "invertedIndexColumns": ["log_level", "service_name", "host"],
    "rangeIndexColumns": ["response_time"],
    "sortedColumn": ["log_timestamp"],
    "nullHandlingEnabled": true
  },
  "tenants": {
    "broker": "DefaultTenant",
    "server": "DefaultTenant"
  },
  "ingestionConfig": {
    "streamIngestionConfig": {
      "streamConfigMaps": [
        {
          "streamType": "kafka",
          "stream.kafka.topic.name": "app-log-events",
          "stream.kafka.broker.list": "localhost:9092",
          "stream.kafka.consumer.type": "lowlevel",
          "stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
          "stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.json.JSONMessageDecoder",
          "stream.kafka.consumer.prop.auto.offset.reset": "smallest",
          "realtime.segment.flush.threshold.time": "600000",
          "realtime.segment.flush.threshold.rows": "50000"
        }
      ]
    }
  }
}

 

This JSON defines how Pinot should store and manage your application logs data.

 

a. Basic Table Information

·      tableName: "application_logs"

The name of your table (like a spreadsheet name)

·      tableType: "REALTIME"

This table will ingest and query live streaming data (as opposed to static batch data)

 

b. Segments Configuration (How Data is Stored)

·      timeColumnName: "log_timestamp"

Uses this field to organize data by time

·      timeType: "MILLISECONDS"

Timestamps are precise to milliseconds

·      replication: "1"

Data is stored in 1 copy (not redundant - okay for development)

·      schemaName: "application_logs"

Links to the schema we defined earlier

·      segmentPushType: "APPEND"

New data gets added to existing segments

 

c. Index Configuration (How Data is Optimized for Search)

·      loadMode: "MMAP"

Uses memory-mapped files for efficient access

 

·      invertedIndexColumns: ["log_level", "service_name", "host"]

Creates fast-search indexes for these fields (like a book index)

 

·      rangeIndexColumns: ["response_time"]

Optimizes for number range queries ("response_time > 100ms")

 

·      sortedColumn: ["log_timestamp"]

Stores data in time order for faster time-range queries

 

·      nullHandlingEnabled: true

Properly handles empty/missing values

 

d. Tenants (Resource Isolation)

Uses "DefaultTenant" for both broker (query handling) and server (data storage) - simplest setup

 

e. Ingestion Configuration (How Data Flows In)

This connects to a Kafka stream.

 

·      streamType: "kafka"

Data comes from Apache Kafka

·      topic.name: "app-log-events"

The Kafka channel to read from

·      broker.list: "localhost:9092"

Where Kafka is running

·      consumer.type: "lowlevel"

 Basic consumption mode

·      decoder.class: JSON decoder

Expects data in JSON format

·      auto.offset.reset: "smallest"

Starts reading from oldest available data

·      flush.threshold.time: 600000ms (10 min)

How often to flush the data

·      flush.threshold.rows: 50000 recrods

 


Onboard Pinot Table

Go to Swagger.

 

Pass the above payload to “POST /tables” API.

 


Upon successful execution of the API, you can see below response.

 

Table application_logs_REALTIME successfully added

 

3. Write some application logs to the topic

Execute following statements to write the log messages to the kafka topic ‘app-log-events’.

 

echo '{"log_level":"INFO", "service_name":"AuthService", "host":"host1", "response_time":200, "log_timestamp":1714567890123}' | bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic app-log-events 

echo '{"log_level":"ERROR", "service_name":"PaymentService", "host":"host2", "response_time":500, "log_timestamp":1714567891123}' | bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic app-log-events 

echo '{"log_level":"WARN", "service_name":"OrderService", "host":"host3", "response_time":350, "log_timestamp":1714567892123}' | bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic app-log-events 

echo '{"log_level":"DEBUG", "service_name":"InventoryService", "host":"host1", "response_time":120, "log_timestamp":1714567893123}' | bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic app-log-events 

echo '{"log_level":"INFO", "service_name":"ShippingService", "host":"host2", "response_time":280, "log_timestamp":1714567894123}' | bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic app-log-events

 

4. Query the table ‘application_logs’.

 

Navigate to Query Console and execute following statement.

select * from application_logs limit 10

 

You can see the records that we published to Kafka Topic.

 


References

https://kafka.apache.org/quickstart

 

 

 

Previous                                                    Next                                                    Home

No comments:

Post a Comment