Friday, 30 December 2022

Hive: quick guide to Bucketing

Bucketing is a process of dividing a large data set into more manageable (small) datasets, so that query execution time is faster.

 

Let me explain it with an example.

 

Suppose you have orders data like below.

 

orderId

productId

discountCode

timestamp

countryCode

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

Suppose if your queries mostly run on the column productid like below.

SELECT * FROM order WHERE productId = 10;

Without bucketing, system need to scan entire dataset to get all the orders associated with the productid 10.

 

How bucketing helps us?

If you create a bucketing on the column productid with 4 buckets, Hive apply a hashing algorithm on the productid and insert the data into one of the bucket based on hash value of productid.

 

Just for simplicity, hash algorithm can be productid % noOfBuckets.

 

For

a.   productId 1 => 1 % 4 = 1 - This record will go and place in bucket 1

b.   productId 2 => 2 % 4 = 2 - This record will go and place in bucket 2

c.    productId 3 => 3 % 4 = 3 - This record will go and place in bucket

d. productId 4 => 4 % 4 = 0 - This record will go and place in





As you see above image, records with

a.   productIds 4, 8, 12, 16 …. are placed in bucket 0

b.   productIds 1, 5, 9, 13 …. are placed in bucket 1

c.    productIds 2, 6, 10, 14 …. are placed in bucket 2

d.   productIds 3, 7, 11, 15 …. are placed in bucket 3

 

 

For the query ‘SELECT * FROM order WHERE productId = 10;’ instead of scanning entire dataset,

a.   Hive find the hash value of productId 10, that is 10 % 4 = 2. Hive will scan the bucket 2 data, it improve the performance a lot.

 

Follow below step-by-step procedure to experiment with an example.

 

Set the property ‘hive.enforce.bucketing’ to true before proceeding.

 

SET hive.enforce.bucketing=true;

 

Step 1: Create a staging table ‘ordes_staging’.

 

CREATE TABLE orders_staging (
    orderId INT,
    productId INT,
    discountCode STRING,
    timestamp BIGINT,
    countryCode STRING
 )
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;

hive> CREATE TABLE orders_staging (
    >     orderId INT,
    >     productId INT,
    >     discountCode STRING,
    >     timestamp BIGINT,
    >     countryCode STRING
    >  )
    > ROW FORMAT DELIMITED
    > FIELDS TERMINATED BY ','
    > STORED AS TEXTFILE;
OK
Time taken: 0.038 seconds
hive> ;
hive> ;
hive> DESC orders_staging;
OK
orderid             	int                 	                    
productid           	int                 	                    
discountcode        	string              	                    
timestamp           	bigint              	                    
countrycode         	string              	                    
Time taken: 0.036 seconds, Fetched: 5 row(s)

 

Step 2: Create data.csv file and load the content to orders_staging table.

 

data.csv 

1,1,d1,1650533538101,IN
2,2,d1,1650533538101,IN
3,2,d3,1650533538101,IN
4,4,d3,1650533552595,IN
5,1,d2,1650533552595,IN
11,5,d11,1650534429531,USA
12,6,d11,1650534429531,USA
13,7,d13,1650534437151,USA
14,8,d13,1650534437151,USA
15,1,d12,1650534437151,USA
21,9,d21,1650534429531,JP
22,4,d21,1650534429531,JP
23,5,d23,1650534429531,JP
24,6,d23,1650534437151,JP
25,7,d22,1650534437151,JP

Execute below command to load the data from data.csv file to orders_staging table.

LOAD DATA LOCAL INPATH '/home/cloudera/Desktop/examples/data.csv' 
INTO TABLE orders_staging

hive> LOAD DATA LOCAL INPATH '/home/cloudera/Desktop/examples/data.csv' 
    > INTO TABLE orders_staging;
Loading data to table default.orders_staging
Table default.orders_staging stats: [numFiles=1, totalSize=385]
OK
Time taken: 0.145 seconds
hive> ;
hive> ;
hive> SELECT * FROM orders_staging;
OK
1	1	d1	1650533538101	IN
2	2	d1	1650533538101	IN
3	2	d3	1650533538101	IN
4	4	d3	1650533552595	IN
5	1	d2	1650533552595	IN
11	5	d11	1650534429531	USA
12	6	d11	1650534429531	USA
13	7	d13	1650534437151	USA
14	8	d13	1650534437151	USA
15	1	d12	1650534437151	USA
21	9	d21	1650534429531	JP
22	4	d21	1650534429531	JP
23	5	d23	1650534429531	JP
24	6	d23	1650534437151	JP
25	7	d22	1650534437151	JP
Time taken: 0.037 seconds, Fetched: 15 row(s)

Step 3: Create orders table and specify 4 buckets at the time of creation.

CREATE TABLE orders (
    orderId INT,
    productId INT,
    discountCode STRING,
    timestamp BIGINT,
    countryCode STRING
 )
CLUSTERED BY (productId) INTO 4 BUCKETS
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;

hive> CREATE TABLE orders (
    >     orderId INT,
    >     productId INT,
    >     discountCode STRING,
    >     timestamp BIGINT,
    >     countryCode STRING
    >  )
    > CLUSTERED BY (productId) INTO 4 BUCKETS
    > ROW FORMAT DELIMITED
    > FIELDS TERMINATED BY ','
    > STORED AS TEXTFILE;
OK
Time taken: 0.066 seconds
hive> ;
hive> ;
hive> DESCRIBE orders;
OK
orderid             	int                 	                    
productid           	int                 	                    
discountcode        	string              	                    
timestamp           	bigint              	                    
countrycode         	string              	                    
Time taken: 0.035 seconds, Fetched: 5 row(s)

Step 4: Load data from orders_staging table to orders table.

INSERT INTO TABLE orders
SELECT * FROM orders_staging;

hive> INSERT INTO TABLE orders
    > SELECT * FROM orders_staging;
Query ID = cloudera_20220423231818_c5f58f75-e446-4c2f-ba4d-f477486dd07c
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 4
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Job = job_1649172504056_0039, Tracking URL = http://quickstart.cloudera:8088/proxy/application_1649172504056_0039/
Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_1649172504056_0039
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 4
2022-04-23 23:18:42,025 Stage-1 map = 0%,  reduce = 0%
2022-04-23 23:18:49,499 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 1.03 sec
2022-04-23 23:19:13,194 Stage-1 map = 100%,  reduce = 25%, Cumulative CPU 2.62 sec
2022-04-23 23:19:18,139 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 7.42 sec
MapReduce Total cumulative CPU time: 7 seconds 420 msec
Ended Job = job_1649172504056_0039
Loading data to table default.orders
Table default.orders stats: [numFiles=4, numRows=15, totalSize=385, rawDataSize=370]
MapReduce Jobs Launched: 
Stage-Stage-1: Map: 1  Reduce: 4   Cumulative CPU: 7.42 sec   HDFS Read: 20059 HDFS Write: 666 SUCCESS
Total MapReduce CPU Time Spent: 7 seconds 420 msec
OK
Time taken: 47.438 seconds

Let’s query the hdfs location of table orders and confirm the way, how Hive organizes the data.

 

How to find the hdfs location of a Hive table?

Execute the command ‘DESCRIBE FORMATTED orders’ to get the location of orders table.

hive> DESCRIBE FORMATTED orders;
OK
# col_name              data_type               comment             
         
orderid                 int                                         
productid               int                                         
discountcode            string                                      
timestamp               bigint                                      
countrycode             string                                      
         
# Detailed Table Information         
Database:               default                  
Owner:                  cloudera                 
CreateTime:             Sat Apr 23 23:16:49 PDT 2022     
LastAccessTime:         UNKNOWN                  
Protect Mode:           None                     
Retention:              0                        
Location:               hdfs://quickstart.cloudera:8020/user/hive/warehouse/orders   
Table Type:             MANAGED_TABLE            
Table Parameters:        
    COLUMN_STATS_ACCURATE   true                
    numFiles                4                   
    numRows                 15                  
    rawDataSize             370                 
    totalSize               385                 
    transient_lastDdlTime   1650781159          
         
# Storage Information        
SerDe Library:          org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe   
InputFormat:            org.apache.hadoop.mapred.TextInputFormat     
OutputFormat:           org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat   
Compressed:             No                       
Num Buckets:            4                        
Bucket Columns:         [productid]              
Sort Columns:           []                       
Storage Desc Params:         
    field.delim             ,                   
    serialization.format    ,                   
Time taken: 0.046 seconds, Fetched: 36 row(s)
hive>

From the above output, you can confirm that hive stores the data in location ‘/user/hive/warehouse/orders’

Let’s query the hdfs location.

$hadoop fs -ls /user/hive/warehouse/orders
Found 4 items
-rwxrwxrwx   1 cloudera supergroup         77 2022-04-23 23:19 /user/hive/warehouse/orders/000000_0
-rwxrwxrwx   1 cloudera supergroup        154 2022-04-23 23:19 /user/hive/warehouse/orders/000001_0
-rwxrwxrwx   1 cloudera supergroup        101 2022-04-23 23:19 /user/hive/warehouse/orders/000002_0
-rwxrwxrwx   1 cloudera supergroup         53 2022-04-23 23:19 /user/hive/warehouse/orders/000003_0

As you see above snippet, Hive divide entire dataseto into 4 files, where each file represent a bucket data.

 

Let’s print the content of files.

$hadoop fs -cat /user/hive/warehouse/orders/000000_0
4,4,d3,1650533552595,IN
22,4,d21,1650534429531,JP
14,8,d13,1650534437151,USA
$
$hadoop fs -cat /user/hive/warehouse/orders/000001_0
11,5,d11,1650534429531,USA
5,1,d2,1650533552595,IN
21,9,d21,1650534429531,JP
15,1,d12,1650534437151,USA
1,1,d1,1650533538101,IN
23,5,d23,1650534429531,JP
$
$hadoop fs -cat /user/hive/warehouse/orders/000002_0
2,2,d1,1650533538101,IN
24,6,d23,1650534437151,JP
12,6,d11,1650534429531,USA
3,2,d3,1650533538101,IN
$
$hadoop fs -cat /user/hive/warehouse/orders/000003_0
25,7,d22,1650534437151,JP
13,7,d13,1650534437151,USA

From the above output, it is clear that

a.   productIds 4, 8 are stored in bucket 0

b.   productIds 1, 5, 9 are stored in bucket 1

c.    productIds 2, 6 are stored in bucket 2

d.   productIds 7 are stored in bucket 3

 

 

Can I run a query on particular bucket?

Yes, you can.

 

Syntax

SELECT * FROM {table_name} TABLESAMPLE(bucket {bucket_number} out of {total_buckets});

hive> SELECT * FROM orders TABLESAMPLE(bucket 2 out of 4);
OK
11  5   d11 1650534429531   USA
5   1   d2  1650533552595   IN
21  9   d21 1650534429531   JP
15  1   d12 1650534437151   USA
1   1   d1  1650533538101   IN
23  5   d23 1650534429531   JP
Time taken: 0.098 seconds, Fetched: 6 row(s)
hive> ;
hive> ;
hive> SELECT * FROM orders TABLESAMPLE(bucket 2 out of 4) WHERE productId=1;
OK
5   1   d2  1650533552595   IN
15  1   d12 1650534437151   USA
1   1   d1  1650533538101   IN
Time taken: 0.039 seconds, Fetched: 3 row(s)

Can I combine both partitioning and bucketing?

Yes, you can. I will explain an example in later posts.

 

When can I use partitioning and bucketing?

When a column has more distinct values, then go for bucketing, else go for partitioning. For example,

a.   we can apply partition on countryCode column and

b.   bucketing on productId column

 

Previous                                                    Next                                                    Home

No comments:

Post a Comment