Thursday, 17 July 2025

Transforming Data at Ingestion time in Apache Pinot

When ingesting data into Apache Pinot, you often need to clean, reshape, or extract specific fields before storing them in Pinot tables. Pinot provides ingestion transformations, allows you to apply transformations at the ingestion stage rather than at query time.

This tutorial will guide you through transforming JSON data during ingestion in an offline table. We will use jsonFormat() and jsonPathString() functions to extract and reshape nested JSON data.

 

To demonstrate this example, I am going to read following json document

{
  "eventId": "E123",
  "user": {
    "id": "U001",
    "name": "John Doe"
  },
  "details": {
    "category": "sports",
    "score": 98
  },
  "timestamp": 1711790400000
}

And transform it like below before ingesting into Pinot.

{
  "eventId": "E123",
  "userId": "U001",
  "userName": "John Doe",
  "category": "sports",
  "score": 98,
  "timestamp": 1711790400000
}

 

1. Define Schema for events

This time let’s use the Pinot User interface to onboard EventTable schema.

 

Open Pinot user interface by navigating to the url ‘http://localhost:9000/’.

 


Click on TABLES card.

 

You will be navigated to Tables screen.

 


Click on ‘Add Schema’ button available under OPERATIONS section.

 


Toggle the ‘Add Schema’ button from SIMPLE to JSON.

 

Add following schema.

{
  "schemaName": "EventTable",
  "dimensionFieldSpecs": [
    {"name": "eventId", "dataType": "STRING"},
    {"name": "userId", "dataType": "STRING"},
    {"name": "userName", "dataType": "STRING"},
    {"name": "category", "dataType": "STRING"}
  ],
  "metricFieldSpecs": [
    {"name": "score", "dataType": "INT"}
  ],
  "dateTimeFieldSpecs": [
    {
      "name": "timestamp",
      "dataType": "LONG",
      "format": "1:MILLISECONDS:EPOCH",
      "granularity": "1:MILLISECONDS"
    }
  ]
}

 


 

Click on SAVE button.

 

Upon successful onboarding, you can observe that the schema EVENTABLE is available under SCHEMAS section.

 


2. Onboard Table Definition

Click on ‘Add Offline Table’ button under OPERATIONS section. 

 


It opens following form.

 


ADD TABLE section

Give the table name as EventTable, rest jepp as it is.

 

TENANTS section

Keep as it is

 

Ingestions section

Under transform functions, click on ADD NEW FIELD button.

 

·      Select the column name as userId and set transformation function to jsonPathString(user, '$.id')

·      Select the column name as userName and set the transformation to jsonPathString(user, '$.name')

·      Select the column name as category and set the transformation to jsonPathString(details, '$.category')

 


Save this form.

 

Upon successful onboarding of the table, you can observe that EventTable_OFFLINE is available under TABLES section.

 


Final generated table config looks like below.

{
  "OFFLINE": {
    "tableName": "EventTable_OFFLINE",
    "tableType": "OFFLINE",
    "segmentsConfig": {
      "segmentPushFrequency": "HOURLY",
      "replicasPerPartition": "1",
      "minimizeDataMovement": false,
      "schemaName": "EventTable",
      "replication": "1",
      "timeColumnName": "timestamp",
      "segmentPushType": "APPEND",
      "retentionTimeValue": "0"
    },
    "tenants": {
      "broker": "DefaultTenant",
      "server": "DefaultTenant"
    },
    "tableIndexConfig": {
      "createInvertedIndexDuringSegmentGeneration": false,
      "onHeapDictionaryColumns": [],
      "varLengthDictionaryColumns": [],
      "invertedIndexColumns": [],
      "enableDefaultStarTree": false,
      "optimizeDictionaryForMetrics": false,
      "autoGeneratedInvertedIndex": false,
      "optimizeDictionaryType": false,
      "enableDynamicStarTreeCreation": false,
      "columnMajorSegmentBuilderEnabled": true,
      "noDictionarySizeRatioThreshold": 0.85,
      "loadMode": "MMAP",
      "noDictionaryColumns": [],
      "aggregateMetrics": false,
      "nullHandlingEnabled": false,
      "bloomFilterColumns": [],
      "rangeIndexColumns": [],
      "sortedColumn": [],
      "rangeIndexVersion": 2,
      "optimizeDictionary": false
    },
    "metadata": {},
    "quota": {},
    "routing": {},
    "query": {},
    "fieldConfigList": [],
    "ingestionConfig": {
      "segmentTimeValueCheck": true,
      "transformConfigs": [
        {
          "columnName": "userId",
          "transformFunction": "jsonPathString(user, '$.id')"
        },
        {
          "columnName": "userName",
          "transformFunction": "jsonPathString(user, '$.name')"
        },
        {
          "columnName": "category",
          "transformFunction": "jsonPathString(details, '$.category')"
        }
      ],
      "continueOnError": false,
      "rowTimeValueCheck": false
    },
    "isDimTable": false
  }
}

 

3. Onboarding data

data.json

 

[
  {
    "eventId": "E123",
    "user": {
      "id": "U001",
      "name": "John Doe"
    },
    "details": {
      "category": "sports",
      "score": 98
    },
    "timestamp": 1711790400000
  },
  {
    "eventId": "E124",
    "user": {
      "id": "U002",
      "name": "Jane Smith"
    },
    "details": {
      "category": "music",
      "score": 85
    },
    "timestamp": 1711790700000
  }
]

To upload this JSON file into Pinot, we use the ingestFromFile API.

curl -X POST -F file=@data.json -H "Content-Type: multipart/form-data" "http://localhost:9000/ingestFromFile?tableNameWithType=EventTable_OFFLINE&batchConfigMapStr={"inputFormat":"json"}"

Since query parameters need encoding, the final command becomes:

curl -X POST -F file=@data.json \
  -H "Content-Type: multipart/form-data" \
"http://localhost:9000/ingestFromFile?tableNameWithType=EventTable_OFFLINE&batchConfigMapStr=%7B%22inputFormat%22%3A%22json%22%7D"

$curl -X POST -F file=@data.json \
>   -H "Content-Type: multipart/form-data" \
> "http://localhost:9000/ingestFromFile?tableNameWithType=EventTable_OFFLINE&batchConfigMapStr=%7B%22inputFormat%22%3A%22json%22%7D"
{"status":"Successfully ingested file into table: EventTable_OFFLINE as segment: EventTable_1711790400000_1711790700000_1743389533630"}

 

How to Query Onboarded Data?

Navigate to Query Console.

 

Click on EventTable link, to query the records of this table

 

 

Apache Pinot provides powerful data transformation capabilities at the time of ingestion. This feature allows users to preprocess data before storing it in Pinot tables, reducing query-time computation and improving performance.

  

 

Previous                                                    Next                                                    Home

No comments:

Post a Comment