Friday, 30 December 2022

Hive: How to use Bucketing, partitioning together

In this post, I am going to demonstrate an example that use both bucketing and partitioning.

 

Both partitioning and bucketing are used to organize a large dataset into smaller datasets to run the queries faster.

 

For example, I have below data.

 

orderId

productid

discountCode

Timestamp

countryCode

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

 

If I partition the dataset on the column countryCode (assume I have IN, JP and USA country codes) and bucketing (4 buckets) on productid column, entire dataset will be organized like below.

 

 


As you see above image,

a.   Hive create a folder for every distinct countryCode. Records that belong to the countryCode will be placed in ‘countryCode=IN’ folder.

b.   Since we are defining 4 buckets, there are 4 files get created in every folder. For example, all the ‘countryCode=IN’ records will be further splitted across 4 files by applying some hash algorithm on productid value.

 

 

Let’s experiment it with below example.

 

Step 1: Set following properties in Hive interactive shell.

SET hive.enforce.bucketing=true;
SET hive.exec.dynamic.partition=true;
SET hive.exec.dynamic.partition.mode=nonstrict;

hive> CREATE TABLE orders_staging (
    >     orderId INT,
    >     productId INT,
    >     discountCode STRING,
    >     timestamp BIGINT,
    >     countryCode STRING
    >  )
    > ROW FORMAT DELIMITED
    > FIELDS TERMINATED BY ','
    > STORED AS TEXTFILE;
OK
Time taken: 0.049 seconds
hive> ;
hive> ;
hive> DESCRIBE orders_staging;
OK
orderid             	int                 	                    
productid           	int                 	                    
discountcode        	string              	                    
timestamp           	bigint              	                    
countrycode         	string              	                    
Time taken: 0.042 seconds, Fetched: 5 row(s)

Step 2: Create data.csv file and load the content to orders_staging table.

 

data.csv

1,1,d1,1650533538101,IN
2,2,d1,1650533538101,IN
3,2,d3,1650533538101,IN
4,4,d3,1650533552595,IN
5,1,d2,1650533552595,IN
11,5,d11,1650534429531,USA
12,6,d11,1650534429531,USA
13,7,d13,1650534437151,USA
14,8,d13,1650534437151,USA
15,1,d12,1650534437151,USA
21,9,d21,1650534429531,JP
22,4,d21,1650534429531,JP
23,5,d23,1650534429531,JP
24,6,d23,1650534437151,JP
25,7,d22,1650534437151,JP

Execute below command to load the data from data.csv file to orders_staging table.

LOAD DATA LOCAL INPATH '/home/cloudera/Desktop/examples/data.csv' 
INTO TABLE orders_staging

hive> LOAD DATA LOCAL INPATH '/home/cloudera/Desktop/examples/data.csv' 
    > INTO TABLE orders_staging;
Loading data to table default.orders_staging
Table default.orders_staging stats: [numFiles=1, totalSize=385]
OK
Time taken: 0.118 seconds
hive> ;
hive> ;
hive> SELECT * FROM orders_staging;
OK
1	1	d1	1650533538101	IN
2	2	d1	1650533538101	IN
3	2	d3	1650533538101	IN
4	4	d3	1650533552595	IN
5	1	d2	1650533552595	IN
11	5	d11	1650534429531	USA
12	6	d11	1650534429531	USA
13	7	d13	1650534437151	USA
14	8	d13	1650534437151	USA
15	1	d12	1650534437151	USA
21	9	d21	1650534429531	JP
22	4	d21	1650534429531	JP
23	5	d23	1650534429531	JP
24	6	d23	1650534437151	JP
25	7	d22	1650534437151	JP
Time taken: 0.029 seconds, Fetched: 15 row(s)

Step 3: Create orders table and specify partition and buckets.

CREATE TABLE orders (
    orderId INT,
    productId INT,
    discountCode STRING,
    timestamp BIGINT
 )
PARTITIONED BY(countryCode STRING)
CLUSTERED BY(productId) INTO 4 BUCKETS
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;

hive> CREATE TABLE orders (
    >     orderId INT,
    >     productId INT,
    >     discountCode STRING,
    >     timestamp BIGINT
    >  )
    > PARTITIONED BY(countryCode STRING)
    > CLUSTERED BY(productId) INTO 4 BUCKETS
    > ROW FORMAT DELIMITED
    > FIELDS TERMINATED BY ','
    > STORED AS TEXTFILE;
OK
Time taken: 0.067 seconds
hive> ;
hive> ;
hive> DESCRIBE orders;
OK
orderid             	int                 	                    
productid           	int                 	                    
discountcode        	string              	                    
timestamp           	bigint              	                    
countrycode         	string              	                    
	 	 
# Partition Information	 	 
# col_name            	data_type           	comment             
	 	 
countrycode         	string              	                    
Time taken: 0.037 seconds, Fetched: 10 row(s)

Step 4: Load data from orders_staging table to orders table.

INSERT INTO TABLE orders
PARTITION(countryCode)
SELECT * FROM orders_staging;

hive> INSERT INTO TABLE orders
    > PARTITION(countryCode)
    > SELECT * FROM orders_staging;
Query ID = root_20220427223232_ccd2449a-c4e6-4f52-84db-85c66badf757
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 4
In order to change the average load for a reducer (in bytes):
  set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
  set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
  set mapreduce.job.reduces=<number>
Starting Job = job_1649172504056_0042, Tracking URL = http://quickstart.cloudera:8088/proxy/application_1649172504056_0042/
Kill Command = /usr/lib/hadoop/bin/hadoop job  -kill job_1649172504056_0042
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 4
2022-04-27 22:32:40,148 Stage-1 map = 0%,  reduce = 0%
2022-04-27 22:32:45,432 Stage-1 map = 100%,  reduce = 0%, Cumulative CPU 0.74 sec
2022-04-27 22:33:01,169 Stage-1 map = 100%,  reduce = 25%, Cumulative CPU 1.9 sec
2022-04-27 22:33:04,513 Stage-1 map = 100%,  reduce = 50%, Cumulative CPU 3.1 sec
2022-04-27 22:33:05,628 Stage-1 map = 100%,  reduce = 75%, Cumulative CPU 4.29 sec
2022-04-27 22:33:06,659 Stage-1 map = 100%,  reduce = 100%, Cumulative CPU 5.46 sec
MapReduce Total cumulative CPU time: 5 seconds 460 msec
Ended Job = job_1649172504056_0042
Loading data to table default.orders partition (countrycode=null)
     Time taken for load dynamic partitions : 170
    Loading partition {countrycode=IN}
    Loading partition {countrycode=JP}
    Loading partition {countrycode=USA}
     Time taken for adding to write entity : 0
Partition default.orders{countrycode=IN} stats: [numFiles=4, numRows=5, totalSize=105, rawDataSize=100]
Partition default.orders{countrycode=JP} stats: [numFiles=4, numRows=5, totalSize=115, rawDataSize=110]
Partition default.orders{countrycode=USA} stats: [numFiles=4, numRows=5, totalSize=115, rawDataSize=110]
MapReduce Jobs Launched: 
Stage-Stage-1: Map: 1  Reduce: 4   Cumulative CPU: 5.46 sec   HDFS Read: 20751 HDFS Write: 1015 SUCCESS
Total MapReduce CPU Time Spent: 5 seconds 460 msec
OK
Time taken: 34.919 seconds
hive>

Query orders table.

hive> SELECT * FROM orders;
OK
4 4 d3  1650533552595 IN
5 1 d2  1650533552595 IN
1 1 d1  1650533538101 IN
2 2 d1  1650533538101 IN
3 2 d3  1650533538101 IN
22  4 d21 1650534429531 JP
21  9 d21 1650534429531 JP
23  5 d23 1650534429531 JP
24  6 d23 1650534437151 JP
25  7 d22 1650534437151 JP
14  8 d13 1650534437151 USA
11  5 d11 1650534429531 USA
15  1 d12 1650534437151 USA
12  6 d11 1650534429531 USA
13  7 d13 1650534437151 USA
Time taken: 0.049 seconds, Fetched: 15 row(s)

Let’s find the HDFS location of orders table and check the folder hierarchy.

 

How to get the HDFS location of orders table?

Execute the command ‘DESCRIBE FORMATTED orders’ from hive interactive shell.

hive> DESCRIBE FORMATTED orders;
OK
# col_name              data_type             comment             
     
orderid               int                                       
productid             int                                       
discountcode          string                                    
timestamp             bigint                                    
     
# Partition Information    
# col_name              data_type             comment             
     
countrycode           string                                    
     
# Detailed Table Information     
Database:             default                
Owner:                root                   
CreateTime:           Wed Apr 27 22:31:03 PDT 2022   
LastAccessTime:       UNKNOWN                
Protect Mode:         None                   
Retention:            0                      
Location:             hdfs://quickstart.cloudera:8020/user/hive/warehouse/orders   
Table Type:           MANAGED_TABLE          
Table Parameters:    
  numPartitions         3                   
  transient_lastDdlTime 1651123863          
     
# Storage Information    
SerDe Library:        org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe   
InputFormat:          org.apache.hadoop.mapred.TextInputFormat   
OutputFormat:         org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat   
Compressed:           No                     
Num Buckets:          4                      
Bucket Columns:       [productid]            
Sort Columns:         []                     
Storage Desc Params:     
  field.delim           ,                   
  serialization.format  ,                   
Time taken: 0.044 seconds, Fetched: 36 row(s)
hive>

From the output, I can confirm that orders table content is stored at the location ‘/user/hive/warehouse/orders’.

 

Let’s query the HDFS location ‘/user/hive/warehouse/orders’.

$hadoop fs -ls /user/hive/warehouse/orders
Found 3 items
drwxrwxrwx   - root supergroup          0 2022-04-27 22:33 /user/hive/warehouse/orders/countrycode=IN
drwxrwxrwx   - root supergroup          0 2022-04-27 22:33 /user/hive/warehouse/orders/countrycode=JP
drwxrwxrwx   - root supergroup          0 2022-04-27 22:33 /user/hive/warehouse/orders/countrycode=USA
$
$hadoop fs -ls /user/hive/warehouse/orders/countrycode=IN
Found 4 items
-rwxrwxrwx   1 root supergroup         21 2022-04-27 22:33 /user/hive/warehouse/orders/countrycode=IN/000000_0
-rwxrwxrwx   1 root supergroup         42 2022-04-27 22:33 /user/hive/warehouse/orders/countrycode=IN/000001_0
-rwxrwxrwx   1 root supergroup         42 2022-04-27 22:33 /user/hive/warehouse/orders/countrycode=IN/000002_0
-rwxrwxrwx   1 root supergroup          0 2022-04-27 22:33 /user/hive/warehouse/orders/countrycode=IN/000003_0
$
$hadoop fs -ls /user/hive/warehouse/orders/countrycode=JP
Found 4 items
-rwxrwxrwx   1 root supergroup         23 2022-04-27 22:33 /user/hive/warehouse/orders/countrycode=JP/000000_0
-rwxrwxrwx   1 root supergroup         46 2022-04-27 22:33 /user/hive/warehouse/orders/countrycode=JP/000001_0
-rwxrwxrwx   1 root supergroup         23 2022-04-27 22:33 /user/hive/warehouse/orders/countrycode=JP/000002_0
-rwxrwxrwx   1 root supergroup         23 2022-04-27 22:33 /user/hive/warehouse/orders/countrycode=JP/000003_0
$
$hadoop fs -ls /user/hive/warehouse/orders/countrycode=USA
Found 4 items
-rwxrwxrwx   1 root supergroup         23 2022-04-27 22:33 /user/hive/warehouse/orders/countrycode=USA/000000_0
-rwxrwxrwx   1 root supergroup         46 2022-04-27 22:33 /user/hive/warehouse/orders/countrycode=USA/000001_0
-rwxrwxrwx   1 root supergroup         23 2022-04-27 22:33 /user/hive/warehouse/orders/countrycode=USA/000002_0
-rwxrwxrwx   1 root supergroup         23 2022-04-27 22:33 /user/hive/warehouse/orders/countrycode=USA/000003_0




Previous                                                    Next                                                    Home

No comments:

Post a Comment