Friday 22 April 2022

Sqoop import: transfer the data from RBDMS to HDFS

In this post, I am going to explain how to transfer the data from RDBMS to HDFS.

 

‘sqoop import’ command is used to import the data from a RDBMS table to HDFS.

 

Syntax

sqoop import --connect "{jdbc_connection_url}" --username "{user_name}" --password "{password}" --table "{table_name}" --target-dir {target_directory}

 

'target_directory' must not exist while executing 'sqoop import'.

 

Example

sqoop import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --username "root" --password "cloudera" --table "orders" --target-dir /import_demo

Above command copies the data from ‘orders’ table to the HDFS directory import_demo.

 

I am going to demonstrate the example using cloudera quick start vm.

 

Step 1: Login to cloudera quick start vm and connect to mysql by executing below command.

mysql -u root -p

Above command prompt a password, password is cloudera.

$mysql -u root -p
Enter password: 
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 81
Server version: 5.1.73 Source distribution

Copyright (c) 2000, 2013, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> 

Execute the command ‘show databases’ to list all the available database.

mysql> show databases;
+--------------------+
| Database           |
+--------------------+
| information_schema |
| cm                 |
| firehose           |
| hue                |
| metastore          |
| mysql              |
| nav                |
| navms              |
| oozie              |
| retail_db          |
| rman               |
| sentry             |
+--------------------+
12 rows in set (0.00 sec)

Let’s use the database retail_db and print all the tables in it.

mysql> use retail_db;
Reading table information for completion of table and column names
You can turn off this feature to get a quicker startup with -A

Database changed
mysql> 
mysql> show tables;
+---------------------+
| Tables_in_retail_db |
+---------------------+
| categories          |
| customers           |
| departments         |
| order_items         |
| orders              |
| products            |
+---------------------+
6 rows in set (0.00 sec)

Let’s import orders table data to HDFS.

 

Step 2: Import ‘orders’ table data to HDFS.

sqoop import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --username "root" --password "cloudera" --table "orders" --target-dir /import_demo

[cloudera@quickstart Desktop]$ sqoop import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --username "root" --password "cloudera" --table "orders" --target-dir /import_demo
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
22/03/31 11:04:31 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.13.0
22/03/31 11:04:31 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
22/03/31 11:04:32 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
22/03/31 11:04:32 INFO tool.CodeGenTool: Beginning code generation
22/03/31 11:04:32 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `orders` AS t LIMIT 1
22/03/31 11:04:32 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `orders` AS t LIMIT 1
22/03/31 11:04:32 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-mapreduce
Note: /tmp/sqoop-cloudera/compile/2768723de6eed159852edbba0e281f3f/orders.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
22/03/31 11:04:35 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-cloudera/compile/2768723de6eed159852edbba0e281f3f/orders.jar
22/03/31 11:04:35 WARN manager.MySQLManager: It looks like you are importing from mysql.
22/03/31 11:04:35 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
22/03/31 11:04:35 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
22/03/31 11:04:35 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
22/03/31 11:04:35 INFO mapreduce.ImportJobBase: Beginning import of orders
22/03/31 11:04:35 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
22/03/31 11:04:35 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
22/03/31 11:04:36 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
22/03/31 11:04:36 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
22/03/31 11:04:37 WARN hdfs.DFSClient: Caught exception 
java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1281)
	at java.lang.Thread.join(Thread.java:1355)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/03/31 11:04:37 WARN hdfs.DFSClient: Caught exception 
java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1281)
	at java.lang.Thread.join(Thread.java:1355)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/03/31 11:04:37 WARN hdfs.DFSClient: Caught exception 
java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1281)
	at java.lang.Thread.join(Thread.java:1355)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/03/31 11:04:37 WARN hdfs.DFSClient: Caught exception 
java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1281)
	at java.lang.Thread.join(Thread.java:1355)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/03/31 11:04:37 WARN hdfs.DFSClient: Caught exception 
java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1281)
	at java.lang.Thread.join(Thread.java:1355)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/03/31 11:04:37 WARN hdfs.DFSClient: Caught exception 
java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1281)
	at java.lang.Thread.join(Thread.java:1355)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/03/31 11:04:38 WARN hdfs.DFSClient: Caught exception 
java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1281)
	at java.lang.Thread.join(Thread.java:1355)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/03/31 11:04:38 WARN hdfs.DFSClient: Caught exception 
java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1281)
	at java.lang.Thread.join(Thread.java:1355)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/03/31 11:04:38 WARN hdfs.DFSClient: Caught exception 
java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1281)
	at java.lang.Thread.join(Thread.java:1355)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/03/31 11:04:38 WARN hdfs.DFSClient: Caught exception 
java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1281)
	at java.lang.Thread.join(Thread.java:1355)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/03/31 11:04:38 WARN hdfs.DFSClient: Caught exception 
java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1281)
	at java.lang.Thread.join(Thread.java:1355)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/03/31 11:04:38 INFO db.DBInputFormat: Using read commited transaction isolation
22/03/31 11:04:38 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`order_id`), MAX(`order_id`) FROM `orders`
22/03/31 11:04:38 INFO db.IntegerSplitter: Split size: 17220; Num splits: 4 from: 1 to: 68883
22/03/31 11:04:38 WARN hdfs.DFSClient: Caught exception 
java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1281)
	at java.lang.Thread.join(Thread.java:1355)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/03/31 11:04:38 INFO mapreduce.JobSubmitter: number of splits:4
22/03/31 11:04:38 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1647946797614_0001
22/03/31 11:04:39 INFO impl.YarnClientImpl: Submitted application application_1647946797614_0001
22/03/31 11:04:39 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1647946797614_0001/
22/03/31 11:04:39 INFO mapreduce.Job: Running job: job_1647946797614_0001
22/03/31 11:04:50 INFO mapreduce.Job: Job job_1647946797614_0001 running in uber mode : false
22/03/31 11:04:50 INFO mapreduce.Job:  map 0% reduce 0%
22/03/31 11:05:16 INFO mapreduce.Job:  map 25% reduce 0%
22/03/31 11:05:18 INFO mapreduce.Job:  map 50% reduce 0%
22/03/31 11:05:21 INFO mapreduce.Job:  map 75% reduce 0%
22/03/31 11:05:22 INFO mapreduce.Job:  map 100% reduce 0%
22/03/31 11:05:22 INFO mapreduce.Job: Job job_1647946797614_0001 completed successfully
22/03/31 11:05:22 INFO mapreduce.Job: Counters: 31
	File System Counters
		FILE: Number of bytes read=0
		FILE: Number of bytes written=685372
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=469
		HDFS: Number of bytes written=2999944
		HDFS: Number of read operations=16
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=8
	Job Counters 
		Killed map tasks=1
		Launched map tasks=4
		Other local map tasks=4
		Total time spent by all maps in occupied slots (ms)=104457
		Total time spent by all reduces in occupied slots (ms)=0
		Total time spent by all map tasks (ms)=104457
		Total vcore-milliseconds taken by all map tasks=104457
		Total megabyte-milliseconds taken by all map tasks=106963968
	Map-Reduce Framework
		Map input records=68883
		Map output records=68883
		Input split bytes=469
		Spilled Records=0
		Failed Shuffles=0
		Merged Map outputs=0
		GC time elapsed (ms)=1352
		CPU time spent (ms)=10700
		Physical memory (bytes) snapshot=500211712
		Virtual memory (bytes) snapshot=6044966912
		Total committed heap usage (bytes)=243007488
	File Input Format Counters 
		Bytes Read=0
	File Output Format Counters 
		Bytes Written=2999944
22/03/31 11:05:22 INFO mapreduce.ImportJobBase: Transferred 2.861 MB in 46.1739 seconds (63.4478 KB/sec)
22/03/31 11:05:22 INFO mapreduce.ImportJobBase: Retrieved 68883 records.
[cloudera@quickstart Desktop]$

Step 3: Let’s query the HDFS folder /import_demo and confirm whether data is copied or not.

[cloudera@quickstart Desktop]$ hadoop fs -ls /import_demo
Found 5 items
-rw-r--r--   1 cloudera supergroup          0 2022-03-31 11:05 /import_demo/_SUCCESS
-rw-r--r--   1 cloudera supergroup     741614 2022-03-31 11:05 /import_demo/part-m-00000
-rw-r--r--   1 cloudera supergroup     753022 2022-03-31 11:05 /import_demo/part-m-00001
-rw-r--r--   1 cloudera supergroup     752368 2022-03-31 11:05 /import_demo/part-m-00002
-rw-r--r--   1 cloudera supergroup     752940 2022-03-31 11:05 /import_demo/part-m-00003

‘/import_demo/_SUCCESS’ is an empty file used to specify the import operation is successful.

 


As you the output, there are 4 part-m-* files created. It is because ‘sqoop import’ command itself is a MapReduce job (MapOnly job) that run on 4 mappers. Each part file represents a mapper output.

 

You can query the content of part file by executing below command.

hadoop fs -cat /import_demo/part-m-00000

 

FAQs

Is ‘sqoop import’ a map reduce job?

Yes, it is a Map only job (no reducers). All the mapper jobs divide the data among themselves based the table primary key.

 

How many mappers run by default for ‘sqoop import’ command?

4 mappers.

 

Can I customize the number of mappers while executing ‘sqoop import’ command?

Yes

 

What is the behaviour of ‘sqoop import’ when the table do not have a primary key?

By default, ‘sqoop import’ command divide the data among 4 mappers using the primary key. If a table do not have a primary key, then the operation will fail with below kind of error.

 

No primary key could be found for table {TABLE_NAME}. Please specify one with --split-by or perform a sequential import with '-m 1'.

 

There are 2 ways to address this problem

a.   Set the number of mappers to 1.    (OR)

b.   specify split-by column to divide the work. In this case, ‘sqoop import’ job use the split-by column to divide the data among mappers.

  

Previous                                                    Next                                                    Home

No comments:

Post a Comment