Sunday, 10 August 2025

Getting Started with Loading Data into Apache Druid Using a Spec File and CURL request

Apache Druid is a high-performance, real-time analytics database designed for interactive querying of large datasets. Native batch ingestion using a specification file (also called an ingestion spec) provides greater control over how data is ingested, transformed, and indexed.

 

In this blog, I’ll Walk through the basics of loading data into Druid using a spec file. This is perfect for beginners who want a deeper understanding of Druid's ingestion pipeline and want to start exploring Druid beyond quick demos.

 

1.  Data file to ingest.

sales_data.csv  

timestamp,product,city,sales
2025-04-01T10:00:00Z,Laptop,Delhi,300
2025-04-01T10:00:00Z,Laptop,Delhi,200
2025-04-01T11:00:00Z,Tablet,Mumbai,150
2025-04-01T11:00:00Z,Tablet,Mumbai,50
2025-04-01T12:00:00Z,Mobile,Bengaluru,200
2025-04-01T13:00:00Z,Laptop,Hyderabad,250
2025-04-01T14:00:00Z,Tablet,Chennai,180
2025-04-01T15:00:00Z,Mobile,Pune,220
2025-04-01T15:00:00Z,Mobile,Pune,80

2. Define specification file to ingest the sales data to Druid.

 

sales_data_ingestion_spec.json

{
  "type": "index",
  "spec": {
    "dataSchema": {
      "dataSource": "sales_data",
      "timestampSpec": {
        "column": "timestamp",
        "format": "iso"
      },
      "dimensionsSpec": {
        "dimensions": ["product", "city"]
      },
      "metricsSpec": [
        {
          "type": "doubleSum",
          "name": "total_sales",
          "fieldName": "sales"
        }
      ],
      "granularitySpec": {
        "type": "uniform",
        "segmentGranularity": "day",
        "queryGranularity": "none",
        "rollup": false
      }
    },
    "ioConfig": {
      "type": "index",
      "inputSource": {
        "type": "local",
        "baseDir": "/Users/Shared/druid_samples",
        "filter": "sales_data.csv"
      },
      "inputFormat": {
        "type": "csv",
        "findColumnsFromHeader": true
      }
    },
    "tuningConfig": {
      "type": "index"
    }
  }
}

2.1 What is this spec file?

A Druid ingestion spec file tells Druid how to read, parse, transform, and store data. It contains information like:

 

·      Where is your data?

·      Metadata about the data (Ex: columns like product, city, sales)

·      What do you want to do with them? (sum, group, roll up)

·      How should Druid store the final processed data?

 

Now let's go line-by-line through your file.

 

2.1.1 Full Spec Overview

{
  "type": "index",
  "spec": {
    // ...
  }
}

"type": "index"

This tells Druid you are doing a batch ingestion, which means you’re loading data from files (not real-time streams like Kafka).

 

2.1.2 dataSchema: Describes your data

This section defines:

 

·      What the data looks like

·      Which fields to use

·      How to store and query it

 

{
  "spec": {
    "dataSchema": {
      "dataSource": "sales_data",
      "timestampSpec": {
        "column": "timestamp",
        "format": "iso"
      },
      "dimensionsSpec": {
        "dimensions": [
          "product",
          "city"
        ]
      },
      "metricsSpec": [
        {
          "type": "doubleSum",
          "name": "total_sales",
          "fieldName": "sales"
        }
      ],
      "granularitySpec": {
        "type": "uniform",
        "segmentGranularity": "day",
        "queryGranularity": "none",
        "rollup": false
      }
    }
  }
}

 

"dataSource": "sales_data"

This is the name of your Druid table. After ingestion, your data will live under this name and you can query it like:

SELECT * FROM sales_data;

timestampSpec

"timestampSpec": {
  "column": "timestamp",
  "format": "iso"
}

 

Druid requires a time column to effectively address time-based queries. This tells Druid to use the column named "timestamp" for time based queries and it's in ISO format like "2023-08-01T10:30:00Z"

 

Without a proper timestamp, Druid won’t know when the data created, which is crucial for time-series analysis.

 

dimensionsSpec

"dimensionsSpec": {
  "dimensions": ["product", "city"]
}

 

Dimensions are descriptive fields, you group or filter data by them.

 

In this example:

·      "product" – like "Laptop", "Phone"

·      "city" – like "New York", "London"

 

These are used for slicing/dicing your data:

SELECT SUM(total_sales) FROM sales_data WHERE city = 'New York';

 

metricsSpec

"metricsSpec": [
  {
    "type": "doubleSum",
    "name": "total_sales",
    "fieldName": "sales"
  }
]

 

"metricsSpec": This is an array that lists the metrics you want Druid to ingest and aggregate.

 

"type": "doubleSum"

This specifies the type of aggregation to perform on the data in the "fieldName". In this case, it's "doubleSum", which means Druid will calculate and store the sum of the values found in the "sales" column as a 64-bit floating-point number (double). This aggregation applied when rollup is set to true. In our example, we set the rollup to false, so no aggregated is applied, and raw value is used as it is.

 

"name": "total_sales":

This defines the name that this aggregated metric will have in your Druid datasource after ingestion. When you query this datasource later, you will refer to the sum of the "sales" column using the name "total_sales".

 

"fieldName": "sales":

This specifies the name of the column in your input data source (e.g., a CSV file, Kafka topic, etc.) that contains the numeric values you want to sum. Druid will read the values from this column and apply the doubleSum aggregation.

 

granularitySpec

"granularitySpec": {
  "type": "uniform",
  "segmentGranularity": "day",
  "queryGranularity": "none",
  "rollup": false
}

·      "granularitySpec": This is the top-level object that contains the configuration for time-based partitioning and querying.

 

·      "type": "uniform": This specifies the type of granularitySpec. "uniform" indicates that Druid should create segments of a uniform time duration, as defined by segmentGranularity.

 

·      "segmentGranularity": "day": This parameter defines the time chunking for how Druid stores your data into segments. In this case, "day" means that Druid will aim to create new data segments that contain data for a single day. For example, all data ingested on April 18, 2025, will likely be stored in one or more segments covering that entire day. Segment granularity is crucial for efficient data storage and querying, as Druid prunes segments that are not relevant to a query.

 

·      "queryGranularity": "none": This parameter defines the lowest level of time precision that will be available for querying. Setting it to "none" means that Druid will retain the original timestamp precision of your data during ingestion and will not truncate or aggregate timestamps at a coarser level for querying. When you query this data, you will be able to group and filter down to the most granular timestamp available in your raw data.

 

·      "rollup": false": This parameter controls ingestion-time data rollup (pre-aggregation). When set to false, Druid will not automatically pre-aggregate data during the ingestion process based on the dimensions and the segmentGranularity. Each raw event will be stored as a separate record (within the bounds of the segmentGranularity). If rollup were set to true, Druid would attempt to aggregate rows with the same timestamp and dimension values based on the specified aggregators in the "metricsSpec", potentially reducing the storage footprint and improving query performance for aggregated queries. However, with rollup: false, you retain the ability to perform more flexible aggregations at query time.

 

2.1.3 ioConfig: Where the data comes from

This section tells Druid where and how to read your source file.

"ioConfig": {
  "type": "index",
  "inputSource": {
    "type": "local",
    "baseDir": "/Users/Shared/druid_samples",
    "filter": "sales_data.csv"
  },
  // ...
}

 

·      "ioConfig": This is the top-level object that configures the input and parsing aspects of the ingestion process.

 

·      "type": "index": This specifies the ingestion task type. "index" is the standard task type for batch ingestion of data directly into Druid segments.

 

·      "inputSource": This nested object defines the source of your input data.

 

·      "type": "local": This indicates that the input data is located on the local filesystem of the Druid indexing task node.

 

·      "baseDir": "/Users/Shared/druid_samples": This specifies the base directory on the local filesystem where Druid should look for the input files.

 

·      "filter": "sales_data.csv": This is a filter that tells Druid which files within the baseDir to process. In this case, it will only process the file named sales_data.csv.

 

 

3. Submit the spec file to Druid using following curl command.

curl -X POST http://localhost:8081/druid/indexer/v1/task \
     -H 'Content-Type: application/json' \
     -d @sales_data_ingestion_spec.json

$curl -X POST http://localhost:8081/druid/indexer/v1/task \
>      -H 'Content-Type: application/json' \
>      -d @sales_data_ingestion_spec.json
{"task":"index_sales_data_dgbdecpd_2025-04-18T08:02:05.636Z"}

 

Navigate to Tasks tab in Druid, you can observe that the task is successfully completed.

 


Navigate to Query tab and execute the following query to print the records of sales_data datasoruce.

SELECT * FROM "sales_data"

 


Let’s update the config file by setting rollup to true.

sales_data_aggregated_ingestion_spec.json 

{
  "type": "index",
  "spec": {
    "dataSchema": {
      "dataSource": "sales_data_aggregated",
      "timestampSpec": {
        "column": "timestamp",
        "format": "iso"
      },
      "dimensionsSpec": {
        "dimensions": ["product", "city"]
      },
      "metricsSpec": [
        {
          "type": "doubleSum",
          "name": "total_sales",
          "fieldName": "sales"
        }
      ],
      "granularitySpec": {
        "type": "uniform",
        "segmentGranularity": "day",
        "queryGranularity": "none",
        "rollup": true
      }
    },
    "ioConfig": {
      "type": "index",
      "inputSource": {
        "type": "local",
        "baseDir": "/Users/Shared/druid_samples",
        "filter": "sales_data.csv"
      },
      "inputFormat": {
        "type": "csv",
        "findColumnsFromHeader": true
      }
    },
    "tuningConfig": {
      "type": "index"
    }
  }
}

Submit above schema file

curl -X POST http://localhost:8081/druid/indexer/v1/task \
     -H 'Content-Type: application/json' \
     -d @sales_data_aggregated_ingestion_spec.json

 


 

In this example, as we set the rollout to true, Druid groups data based on these dimensions + timestamp (rounded by queryGranularity). 

  

Previous                                                    Next                                                    Home

No comments:

Post a Comment