Friday 22 April 2022

How sqoop distribute the data among mappers?

This is continuation to my previous post. In my previous post, I explained how to import the data from a RDBMS table to HDFS. When you execute a ‘sqoop import’ command, sqoop will start 4 mapper jobs by default and divide the data among these 4 jobs.

 

 


Let me explain it with an example. Suppose you have an employee table with 1000 records, where id is the primary key.

 

1d

name

age

email

1

Krishna

33

krishna@123.com

…..

…..

…..

….

1000

Ram

45

ram@123.com

 

When you ran a sqoop import job on employee table, sqoop will find out the minimum and maximum primary key of employee table.

 

min_primary_key = 1

max_primary_key = 1000

 

Now split_size is calculated using below formula.

 

split_size = (max_primary_key - min_primary_key) / no_of_mappers

               = (1000 – 1 ) / 4

               = 999 / 4 = 249.75 = 250 (ceiling value)

 

As per the above formula, all the 100 records are divided into 4 splits.

 

a.   Split1 contain records from 1 to 250.

b.   Split2 contain records from 251 to 500.

c.    Split2 contain records from 501 to 750.

d.   Split2 contain records from 751 to 1000.

 

Let’s run a ‘sqoop import’ job and confirm the same from output of this command.

sqoop import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --username "root" --password "cloudera" --table "orders" --target-dir /import_demo

Above command copy the content of orders table to HDFS location /import_demo.

[cloudera@quickstart Desktop]$ sqoop import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --username "root" --password "cloudera" --table "orders" --target-dir /import_demo
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
22/03/31 11:04:31 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.13.0
22/03/31 11:04:31 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
22/03/31 11:04:32 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
22/03/31 11:04:32 INFO tool.CodeGenTool: Beginning code generation
22/03/31 11:04:32 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `orders` AS t LIMIT 1
22/03/31 11:04:32 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `orders` AS t LIMIT 1
22/03/31 11:04:32 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-mapreduce
Note: /tmp/sqoop-cloudera/compile/2768723de6eed159852edbba0e281f3f/orders.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
22/03/31 11:04:35 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-cloudera/compile/2768723de6eed159852edbba0e281f3f/orders.jar
22/03/31 11:04:35 WARN manager.MySQLManager: It looks like you are importing from mysql.
22/03/31 11:04:35 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
22/03/31 11:04:35 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
22/03/31 11:04:35 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
22/03/31 11:04:35 INFO mapreduce.ImportJobBase: Beginning import of orders
22/03/31 11:04:35 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
22/03/31 11:04:35 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
22/03/31 11:04:36 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
22/03/31 11:04:36 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
22/03/31 11:04:38 INFO db.DBInputFormat: Using read commited transaction isolation
22/03/31 11:04:38 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`order_id`), MAX(`order_id`) FROM `orders`
22/03/31 11:04:38 INFO db.IntegerSplitter: Split size: 17220; Num splits: 4 from: 1 to: 68883
22/03/31 11:04:38 INFO mapreduce.JobSubmitter: number of splits:4
22/03/31 11:04:38 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1647946797614_0001
22/03/31 11:04:39 INFO impl.YarnClientImpl: Submitted application application_1647946797614_0001
22/03/31 11:04:39 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1647946797614_0001/
22/03/31 11:04:39 INFO mapreduce.Job: Running job: job_1647946797614_0001
22/03/31 11:04:50 INFO mapreduce.Job: Job job_1647946797614_0001 running in uber mode : false
22/03/31 11:04:50 INFO mapreduce.Job:  map 0% reduce 0%
22/03/31 11:05:16 INFO mapreduce.Job:  map 25% reduce 0%
22/03/31 11:05:18 INFO mapreduce.Job:  map 50% reduce 0%
22/03/31 11:05:21 INFO mapreduce.Job:  map 75% reduce 0%
22/03/31 11:05:22 INFO mapreduce.Job:  map 100% reduce 0%
22/03/31 11:05:22 INFO mapreduce.Job: Job job_1647946797614_0001 completed successfully
22/03/31 11:05:22 INFO mapreduce.Job: Counters: 31
  File System Counters
    FILE: Number of bytes read=0
    FILE: Number of bytes written=685372
    FILE: Number of read operations=0
    FILE: Number of large read operations=0
    FILE: Number of write operations=0
    HDFS: Number of bytes read=469
    HDFS: Number of bytes written=2999944
    HDFS: Number of read operations=16
    HDFS: Number of large read operations=0
    HDFS: Number of write operations=8
  Job Counters 
    Killed map tasks=1
    Launched map tasks=4
    Other local map tasks=4
    Total time spent by all maps in occupied slots (ms)=104457
    Total time spent by all reduces in occupied slots (ms)=0
    Total time spent by all map tasks (ms)=104457
    Total vcore-milliseconds taken by all map tasks=104457
    Total megabyte-milliseconds taken by all map tasks=106963968
  Map-Reduce Framework
    Map input records=68883
    Map output records=68883
    Input split bytes=469
    Spilled Records=0
    Failed Shuffles=0
    Merged Map outputs=0
    GC time elapsed (ms)=1352
    CPU time spent (ms)=10700
    Physical memory (bytes) snapshot=500211712
    Virtual memory (bytes) snapshot=6044966912
    Total committed heap usage (bytes)=243007488
  File Input Format Counters 
    Bytes Read=0
  File Output Format Counters 
    Bytes Written=2999944
22/03/31 11:05:22 INFO mapreduce.ImportJobBase: Transferred 2.861 MB in 46.1739 seconds (63.4478 KB/sec)
22/03/31 11:05:22 INFO mapreduce.ImportJobBase: Retrieved 68883 records.
[cloudera@quickstart Desktop]$

 

As you observe the output.

 

22/03/31 11:04:38 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`order_id`), MAX(`order_id`) FROM `orders`

Sqoop job find the minimum and maximum order_id from the table orders.

 

22/03/31 11:04:38 INFO db.IntegerSplitter: Split size: 17220; Num splits: 4 from: 1 to: 68883

22/03/31 11:04:38 INFO mapreduce.JobSubmitter: number of splits:4

From above snippet we can understand different between maximum and minimum order_id is 68883 and these records are divided into 4 splits (68883 / 4 = 17220), where each split size is 17220.

 

 

Note

a.   If the actual values for the primary key are not uniformly distributed across its range, then this can result in unbalanced tasks. You can avoid this by explicitly choose a different column with the --split-by argument. For example, --split-by country.

b.   Sqoop cannot currently split on multi-column indices. If your table has no index column, or has a multi-column key, then you must also manually choose a splitting column.

 

 


Previous                                                    Next                                                    Home

No comments:

Post a Comment