Monday, 21 July 2025

How to Ingest Batch Data into an Apache Pinot Offline Table Using a Job?

Apache Pinot is a real-time distributed OLAP datastore designed for low-latency analytics. When working with batch data, we need to ingest data into a Pinot offline table. This guide walks you through creating a schema, table, sample data file, and job specification file. We will also demonstrate how to submit the ingestion job using pinot-admin.sh.

 

Step 1: Onboard Schema

Create a JSON file named batch_schema.json that defines the schema for our table.

 

schema.json

{
  "schemaName": "customer_purchases",
  "dimensionFieldSpecs": [
    {"name": "customerId", "dataType": "STRING"},
    {"name": "customerName", "dataType": "STRING"},
    {"name": "purchaseAmount", "dataType": "DOUBLE"}
  ],
  "metricFieldSpecs": [],
  "dateTimeFieldSpecs": [
    {
      "name": "purchaseDate",
      "dataType": "LONG",
      "format": "1:MILLISECONDS:EPOCH",
      "granularity": "1:MILLISECONDS"
    }
  ]
}

Navigate to Pinot Swagger UI, and execute “POST /schemas” API with above payload.

Upon successful execution of the API, you will receive following response.

{
  "unrecognizedProperties": {},
  "status": "customer_purchases successfully added"
}

Step 2: Onboard table definition.

 

table.json

 

{
  "tableName": "customer_purchases",
  "schemaName": "customer_purchases",
  "tableType": "OFFLINE",
  "segmentsConfig": {
    "timeColumnName": "purchaseDate",
    "timeType": "MILLISECONDS",
    "replication": "1"
  },
  "tableIndexConfig": {
    "loadMode": "MMAP",
    "invertedIndexColumns": ["customerId", "purchaseAmount"],
    "rangeIndexColumns": ["purchaseAmount"], 
    "noDictionaryColumns": [],
    "starTreeIndexConfigs": [],
    "enableDefaultStarTree": false
  },
  "tenants": {
    "broker": "DefaultTenant",
    "server": "DefaultTenant"
  },
  "metadata": {
    "customConfigs": {}
  },
  "ingestionConfig": {
    "batchIngestionConfig": {
      "segmentIngestionType": "APPEND",
      "segmentIngestionFrequency": "DAILY"
    }
  }
}

 

Execute the API “POST tables” to onboard customer_purchases table definition.

 


 

Step 3: Create Sample Data

 

Create a CSV file named customerPurchases.csv with sample data.

 

customerPurchases.csv

customerId,customerName,purchaseAmount,purchaseDate
C001,John Doe,100.5,1711929600000
C002,Jane Smith,250.0,1712016000000
C003,Robert Brown,175.75,1712102400000

Step 4: Create a Job Specification File

 

Create a YAML file named batch_ingestion_job.yml that defines the batch ingestion job.

 

ingestionJobSpec.yaml

executionFrameworkSpec:
  name: "standalone"
  segmentGenerationJobRunnerClassName: "org.apache.pinot.plugin.ingestion.batch.standalone.SegmentGenerationJobRunner"
  segmentTarPushJobRunnerClassName: "org.apache.pinot.plugin.ingestion.batch.standalone.SegmentTarPushJobRunner"
  segmentUriPushJobRunnerClassName: "org.apache.pinot.plugin.ingestion.batch.standalone.SegmentUriPushJobRunner"

pinotFSSpecs:
  - scheme: "file"
    className: "org.apache.pinot.spi.filesystem.LocalPinotFS"

jobType: "SegmentCreationAndTarPush"

inputDirURI: "file:///{INPUT_DIRECTORY_PATH}"
includeFileNamePattern: "glob:**/*.csv"
outputDirURI: "file:///{OUTPUT_DIRECTORY_PATH}"
overwriteOutput: true

tableSpec:
  tableName: "customer_purchases"

pinotClusterSpecs:
  - controllerURI: "http://localhost:9000"

recordReaderSpec:
  dataFormat: "csv"
  className: "org.apache.pinot.plugin.inputformat.csv.CSVRecordReader"
  configClassName: "org.apache.pinot.plugin.inputformat.csv.CSVRecordReaderConfig"
  configs:
    header: "customerId,customerName,purchaseAmount,purchaseDate"
    delimiter: ","
    skipHeader: "true"

pushJobSpec:
  pushAttempts: 3
  pushRetryIntervalMillis: 1000
  pushParallelism: 1

Replace {INPUT_DIRECTORY_PATH}", {OUTPUT_DIRECTORY_PATH}" with actual folder paths.

 

Step 5: Submit the Job

Run the following command to submit the batch ingestion job:

pinot-admin.sh LaunchDataIngestionJob -jobSpecFile /path/to/batch_ingestion_job.yml

Upon successful execution of the above command, you can see following output.

 

 


You might wonder, why can’t I use the Pinot REST API directly to save my data, why should I create a Job?

 

Using the REST API is like bringing items one by one instead of delivering them in bulk.

 

What Happens When You Use a REST API?

·      You send raw data to Pinot using an API request.

·      Pinot stores raw records in a "consuming" segment (in-memory buffer).

·      When the segment reaches a threshold (time/size), Pinot converts it into an immutable segment.

·      Small segments may pile up, leading to "segment fragmentation."

 

What Happens When You Use a Batch Job?

·      Reads bulk data (CSV/JSON/Parquet) from a file system (S3, HDFS, local).

·      Pre-processes data into optimized segments (sorted, compressed, indexed).

·      Uploads segments directly to Pinot (bypassing real-time ingestion overhead).

 

  

Previous                                                    Next                                                    Home

No comments:

Post a Comment