In this post, I am going to explain the scenario, where you need to customize the boundary query for retrieving max and min value of the primary key.
Let me explain with customers table of retail_db database in cloudera vm.
mysql> desc customers; +-------------------+--------------+------+-----+---------+----------------+ | Field | Type | Null | Key | Default | Extra | +-------------------+--------------+------+-----+---------+----------------+ | customer_id | int(11) | NO | PRI | NULL | auto_increment | | customer_fname | varchar(45) | NO | | NULL | | | customer_lname | varchar(45) | NO | | NULL | | | customer_email | varchar(45) | NO | | NULL | | | customer_password | varchar(45) | NO | | NULL | | | customer_street | varchar(255) | NO | | NULL | | | customer_city | varchar(45) | NO | | NULL | | | customer_state | varchar(45) | NO | | NULL | | | customer_zipcode | varchar(45) | NO | | NULL | | +-------------------+--------------+------+-----+---------+----------------+ 9 rows in set (0.00 sec) mysql> mysql> select count(*) from customers; +----------+ | count(*) | +----------+ | 12435 | +----------+ 1 row in set (0.00 sec) mysql> mysql> select min(customer_id), max(customer_id) from customers; +------------------+------------------+ | min(customer_id) | max(customer_id) | +------------------+------------------+ | 1 | 12435 | +------------------+------------------+ 1 row in set (0.00 sec)
When you try to import the customers data using ‘sqoop import’ command, sqoop start 4 mappers by default, each mapper divide the data by split size.
split_size = = (max_primary_key - min_primary_key) / no_of_mappers
= (12435 – 1 ) / 4
= 12435 / 4 = 3108
As per the above formula, each mapper get 3108 records to process.
Let us run the import command and confirm same.
sqoop import \
--connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
--username "root" \
--password "cloudera" \
--table "customers" \
--target-dir /customers_import_demo_1
[cloudera@quickstart Desktop]$ sqoop import \
> --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
> --username "root" \
> --password "cloudera" \
> --table "customers" \
> --target-dir /customers_import_demo_1
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
22/04/01 09:10:41 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.13.0
22/04/01 09:10:41 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
22/04/01 09:10:42 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
22/04/01 09:10:42 INFO tool.CodeGenTool: Beginning code generation
22/04/01 09:10:42 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `customers` AS t LIMIT 1
22/04/01 09:10:42 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `customers` AS t LIMIT 1
22/04/01 09:10:42 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-mapreduce
Note: /tmp/sqoop-cloudera/compile/1784cad50b2c560ace68fc1121e02931/customers.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
22/04/01 09:10:45 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-cloudera/compile/1784cad50b2c560ace68fc1121e02931/customers.jar
22/04/01 09:10:45 WARN manager.MySQLManager: It looks like you are importing from mysql.
22/04/01 09:10:45 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
22/04/01 09:10:45 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
22/04/01 09:10:45 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
22/04/01 09:10:45 INFO mapreduce.ImportJobBase: Beginning import of customers
22/04/01 09:10:45 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
22/04/01 09:10:45 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
22/04/01 09:10:46 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
22/04/01 09:10:47 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
22/04/01 09:10:49 INFO db.DBInputFormat: Using read commited transaction isolation
22/04/01 09:10:49 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`customer_id`), MAX(`customer_id`) FROM `customers`
22/04/01 09:10:49 INFO db.IntegerSplitter: Split size: 3108; Num splits: 4 from: 1 to: 12435
22/04/01 09:10:49 INFO mapreduce.JobSubmitter: number of splits:4
22/04/01 09:10:49 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1647946797614_0011
22/04/01 09:10:49 INFO impl.YarnClientImpl: Submitted application application_1647946797614_0011
22/04/01 09:10:49 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1647946797614_0011/
22/04/01 09:10:49 INFO mapreduce.Job: Running job: job_1647946797614_0011
22/04/01 09:10:57 INFO mapreduce.Job: Job job_1647946797614_0011 running in uber mode : false
22/04/01 09:10:57 INFO mapreduce.Job: map 0% reduce 0%
22/04/01 09:11:13 INFO mapreduce.Job: map 25% reduce 0%
22/04/01 09:11:16 INFO mapreduce.Job: map 50% reduce 0%
22/04/01 09:11:18 INFO mapreduce.Job: map 100% reduce 0%
22/04/01 09:11:19 INFO mapreduce.Job: Job job_1647946797614_0011 completed successfully
22/04/01 09:11:19 INFO mapreduce.Job: Counters: 31
File System Counters
FILE: Number of bytes read=0
FILE: Number of bytes written=685912
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=487
HDFS: Number of bytes written=953525
HDFS: Number of read operations=16
HDFS: Number of large read operations=0
HDFS: Number of write operations=8
Job Counters
Killed map tasks=1
Launched map tasks=4
Other local map tasks=4
Total time spent by all maps in occupied slots (ms)=64384
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=64384
Total vcore-milliseconds taken by all map tasks=64384
Total megabyte-milliseconds taken by all map tasks=65929216
Map-Reduce Framework
Map input records=12435
Map output records=12435
Input split bytes=487
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=656
CPU time spent (ms)=4680
Physical memory (bytes) snapshot=555941888
Virtual memory (bytes) snapshot=6044942336
Total committed heap usage (bytes)=243007488
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=953525
22/04/01 09:11:19 INFO mapreduce.ImportJobBase: Transferred 931.1768 KB in 32.9476 seconds (28.2623 KB/sec)
22/04/01 09:11:19 INFO mapreduce.ImportJobBase: Retrieved 12435 records.
[cloudera@quickstart Desktop]$
Let’s query the folder /customers_import_demo_1.
[cloudera@quickstart Desktop]$ hadoop fs -ls -h /customers_import_demo_1
Found 5 items
-rw-r--r-- 1 cloudera supergroup 0 2022-04-01 09:11 /customers_import_demo_1/_SUCCESS
-rw-r--r-- 1 cloudera supergroup 231.6 K 2022-04-01 09:11 /customers_import_demo_1/part-m-00000
-rw-r--r-- 1 cloudera supergroup 232.4 K 2022-04-01 09:11 /customers_import_demo_1/part-m-00001
-rw-r--r-- 1 cloudera supergroup 232.5 K 2022-04-01 09:11 /customers_import_demo_1/part-m-00002
-rw-r--r-- 1 cloudera supergroup 234.7 K 2022-04-01 09:11 /customers_import_demo_1/part-m-00003
From the above output, you can confirm that each mapper job get 232KB data on average.
Let us add below record to employees table.
INSERT INTO customers (customer_id, customer_fname, customer_lname, customer_city) VALUES (100000, 'sailu', 'ptr', 'Hyerabad')
mysql> INSERT INTO customers (customer_id, customer_fname, customer_lname, customer_city) VALUES (100000, 'sailu', 'ptr', 'Hyerabad'); Query OK, 1 row affected, 5 warnings (0.01 sec) mysql> SELECT COUNT(*) FROM customers; +----------+ | COUNT(*) | +----------+ | 12436 | +----------+ 1 row in set (0.00 sec) mysql> mysql> SELECT min(customer_id), max(customer_id) from customers; +------------------+------------------+ | min(customer_id) | max(customer_id) | +------------------+------------------+ | 1 | 100000 | +------------------+------------------+ 1 row in set (0.00 sec)
Now,
split_size = (max_primary_key - min_primary_key) / no_of_mappers
= (100000 – 1) / 4
= 24999
As per the above formula, each mapper will get 24999 records to process. But do we really have 100000 records in customers table? No. We have only 12436 records in customers table. As per the split logic, only 1st mapper will get all the load, remaining mappers have zero records to process.
Let’s confirm the same by executing below query.
sqoop import \
--connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
--username "root" \
--password "cloudera" \
--table "customers" \
--target-dir /customers_import_demo_2
[cloudera@quickstart Desktop]$ sqoop import \
> --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
> --username "root" \
> --password "cloudera" \
> --table "customers" \
> --target-dir /customers_import_demo_2
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
22/04/02 08:22:42 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.13.0
22/04/02 08:22:42 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
22/04/02 08:22:42 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
22/04/02 08:22:42 INFO tool.CodeGenTool: Beginning code generation
22/04/02 08:22:43 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `customers` AS t LIMIT 1
22/04/02 08:22:43 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `customers` AS t LIMIT 1
22/04/02 08:22:43 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-mapreduce
Note: /tmp/sqoop-cloudera/compile/90f9ab21736c8137ca09cfd57e22fe3e/customers.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
22/04/02 08:22:45 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-cloudera/compile/90f9ab21736c8137ca09cfd57e22fe3e/customers.jar
22/04/02 08:22:45 WARN manager.MySQLManager: It looks like you are importing from mysql.
22/04/02 08:22:45 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
22/04/02 08:22:45 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
22/04/02 08:22:45 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
22/04/02 08:22:45 INFO mapreduce.ImportJobBase: Beginning import of customers
22/04/02 08:22:45 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
22/04/02 08:22:45 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
22/04/02 08:22:46 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
22/04/02 08:22:46 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
22/04/02 08:22:48 INFO db.DBInputFormat: Using read commited transaction isolation
22/04/02 08:22:48 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`customer_id`), MAX(`customer_id`) FROM `customers`
22/04/02 08:22:48 INFO db.IntegerSplitter: Split size: 24999; Num splits: 4 from: 1 to: 100000
22/04/02 08:22:48 INFO mapreduce.JobSubmitter: number of splits:4
22/04/02 08:22:48 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1647946797614_0012
22/04/02 08:22:49 INFO impl.YarnClientImpl: Submitted application application_1647946797614_0012
22/04/02 08:22:49 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1647946797614_0012/
22/04/02 08:22:49 INFO mapreduce.Job: Running job: job_1647946797614_0012
22/04/02 08:22:56 INFO mapreduce.Job: Job job_1647946797614_0012 running in uber mode : false
22/04/02 08:22:56 INFO mapreduce.Job: map 0% reduce 0%
22/04/02 08:23:13 INFO mapreduce.Job: map 25% reduce 0%
22/04/02 08:23:14 INFO mapreduce.Job: map 50% reduce 0%
22/04/02 08:23:15 INFO mapreduce.Job: map 100% reduce 0%
22/04/02 08:23:16 INFO mapreduce.Job: Job job_1647946797614_0012 completed successfully
22/04/02 08:23:16 INFO mapreduce.Job: Counters: 31
File System Counters
FILE: Number of bytes read=0
FILE: Number of bytes written=685912
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=494
HDFS: Number of bytes written=953556
HDFS: Number of read operations=16
HDFS: Number of large read operations=0
HDFS: Number of write operations=8
Job Counters
Killed map tasks=1
Launched map tasks=4
Other local map tasks=4
Total time spent by all maps in occupied slots (ms)=60769
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=60769
Total vcore-milliseconds taken by all map tasks=60769
Total megabyte-milliseconds taken by all map tasks=62227456
Map-Reduce Framework
Map input records=12436
Map output records=12436
Input split bytes=494
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=626
CPU time spent (ms)=3560
Physical memory (bytes) snapshot=531841024
Virtual memory (bytes) snapshot=6041796608
Total committed heap usage (bytes)=243007488
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=953556
22/04/02 08:23:16 INFO mapreduce.ImportJobBase: Transferred 931.207 KB in 30.2692 seconds (30.7641 KB/sec)
22/04/02 08:23:16 INFO mapreduce.ImportJobBase: Retrieved 12436 records.
[cloudera@quickstart Desktop]$
Let’s query the folder /customers_import_demo_2.
[cloudera@quickstart Desktop]$ hadoop fs -ls -h /customers_import_demo_2
Found 5 items
-rw-r--r-- 1 cloudera supergroup 0 2022-04-02 08:23 /customers_import_demo_2/_SUCCESS
-rw-r--r-- 1 cloudera supergroup 931.2 K 2022-04-02 08:23 /customers_import_demo_2/part-m-00000
-rw-r--r-- 1 cloudera supergroup 0 2022-04-02 08:23 /customers_import_demo_2/part-m-00001
-rw-r--r-- 1 cloudera supergroup 0 2022-04-02 08:23 /customers_import_demo_2/part-m-00002
-rw-r--r-- 1 cloudera supergroup 31 2022-04-02 08:23 /customers_import_demo_2/part-m-00003
[cloudera@quickstart Desktop]$
[cloudera@quickstart Desktop]$ hadoop fs -cat /customers_import_demo_2/part-m-00003
100000,sailu,ptr,,,,Hyerabad,,
From the above output, you can confirm that only first mapper took all the load, 2nd and 3rd mappers do not get any records to process. 4th mapper got last record to process.
How can we supply min and max values while calculating split_size?
Using –boundary-query option we can pass min and max values that are used in split_size calculation.
Example
sqoop import \
--connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
--username "root" \
--password "cloudera" \
--table "customers" \
--target-dir /customers_import_demo_3 \
--boundary-query "SELECT 1, 12436"
[cloudera@quickstart Desktop]$ sqoop import \
> --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
> --username "root" \
> --password "cloudera" \
> --table "customers" \
> --target-dir /customers_import_demo_3 \
> --boundary-query "SELECT 1, 12436"
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
22/04/02 08:28:48 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.13.0
22/04/02 08:28:48 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
22/04/02 08:28:48 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
22/04/02 08:28:48 INFO tool.CodeGenTool: Beginning code generation
22/04/02 08:28:49 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `customers` AS t LIMIT 1
22/04/02 08:28:49 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `customers` AS t LIMIT 1
22/04/02 08:28:49 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-mapreduce
Note: /tmp/sqoop-cloudera/compile/edfd655684e4b99e5ab11ef70c4bacf9/customers.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
22/04/02 08:28:51 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-cloudera/compile/edfd655684e4b99e5ab11ef70c4bacf9/customers.jar
22/04/02 08:28:51 WARN manager.MySQLManager: It looks like you are importing from mysql.
22/04/02 08:28:51 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
22/04/02 08:28:51 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
22/04/02 08:28:51 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
22/04/02 08:28:51 INFO mapreduce.ImportJobBase: Beginning import of customers
22/04/02 08:28:51 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
22/04/02 08:28:51 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
22/04/02 08:28:51 WARN db.DataDrivenDBInputFormat: Could not find $CONDITIONS token in query: SELECT 1, 12436; splits may not partition data.
22/04/02 08:28:52 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
22/04/02 08:28:52 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
22/04/02 08:28:54 INFO db.DBInputFormat: Using read commited transaction isolation
22/04/02 08:28:54 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT 1, 12436
22/04/02 08:28:54 INFO db.IntegerSplitter: Split size: 3108; Num splits: 4 from: 1 to: 12436
22/04/02 08:28:54 INFO mapreduce.JobSubmitter: number of splits:4
22/04/02 08:28:54 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1647946797614_0013
22/04/02 08:28:55 INFO impl.YarnClientImpl: Submitted application application_1647946797614_0013
22/04/02 08:28:55 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1647946797614_0013/
22/04/02 08:28:55 INFO mapreduce.Job: Running job: job_1647946797614_0013
22/04/02 08:29:02 INFO mapreduce.Job: Job job_1647946797614_0013 running in uber mode : false
22/04/02 08:29:02 INFO mapreduce.Job: map 0% reduce 0%
22/04/02 08:29:19 INFO mapreduce.Job: map 25% reduce 0%
22/04/02 08:29:22 INFO mapreduce.Job: map 50% reduce 0%
22/04/02 08:29:24 INFO mapreduce.Job: map 100% reduce 0%
22/04/02 08:29:25 INFO mapreduce.Job: Job job_1647946797614_0013 completed successfully
22/04/02 08:29:25 INFO mapreduce.Job: Counters: 31
File System Counters
FILE: Number of bytes read=0
FILE: Number of bytes written=687276
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=487
HDFS: Number of bytes written=953525
HDFS: Number of read operations=16
HDFS: Number of large read operations=0
HDFS: Number of write operations=8
Job Counters
Killed map tasks=1
Launched map tasks=4
Other local map tasks=4
Total time spent by all maps in occupied slots (ms)=69405
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=69405
Total vcore-milliseconds taken by all map tasks=69405
Total megabyte-milliseconds taken by all map tasks=71070720
Map-Reduce Framework
Map input records=12435
Map output records=12435
Input split bytes=487
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=1033
CPU time spent (ms)=5050
Physical memory (bytes) snapshot=544698368
Virtual memory (bytes) snapshot=6043906048
Total committed heap usage (bytes)=243007488
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=953525
22/04/02 08:29:25 INFO mapreduce.ImportJobBase: Transferred 931.1768 KB in 33.1715 seconds (28.0716 KB/sec)
22/04/02 08:29:25 INFO mapreduce.ImportJobBase: Retrieved 12435 records.
[cloudera@quickstart Desktop]$
Let’s query the folder ‘/customers_import_demo_3’ and confirm the same.
[cloudera@quickstart Desktop]$ hadoop fs -ls -h /customers_import_demo_3
Found 5 items
-rw-r--r-- 1 cloudera supergroup 0 2022-04-02 08:29 /customers_import_demo_3/_SUCCESS
-rw-r--r-- 1 cloudera supergroup 231.6 K 2022-04-02 08:29 /customers_import_demo_3/part-m-00000
-rw-r--r-- 1 cloudera supergroup 232.4 K 2022-04-02 08:29 /customers_import_demo_3/part-m-00001
-rw-r--r-- 1 cloudera supergroup 232.6 K 2022-04-02 08:29 /customers_import_demo_3/part-m-00002
-rw-r--r-- 1 cloudera supergroup 234.6 K 2022-04-02 08:29 /customers_import_demo_3/part-m-00003
From the above output, you can confirm that the load is equally distributed.
Note
a. If you add a filter criteria using –where option, this is considered (I mean this where clause is appended to the boundary query "SELECT 1, 12436") while calculating boundary values.
No comments:
Post a Comment