In this post, I am going to explain how to transfer the data from RDBMS to HDFS.
‘sqoop import’ command is used to import the data from a RDBMS table to HDFS.
Syntax
sqoop import --connect "{jdbc_connection_url}" --username "{user_name}" --password "{password}" --table "{table_name}" --target-dir {target_directory}
'target_directory' must not exist while executing 'sqoop import'.
Example
sqoop import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --username "root" --password "cloudera" --table "orders" --target-dir /import_demo
Above command copies the data from ‘orders’ table to the HDFS directory import_demo.
I am going to demonstrate the example using cloudera quick start vm.
Step 1: Login to cloudera quick start vm and connect to mysql by executing below command.
mysql -u root -p
Above command prompt a password, password is cloudera.
$mysql -u root -p
Enter password:
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 81
Server version: 5.1.73 Source distribution
Copyright (c) 2000, 2013, Oracle and/or its affiliates. All rights reserved.
Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql>
Execute the command ‘show databases’ to list all the available database.
mysql> show databases; +--------------------+ | Database | +--------------------+ | information_schema | | cm | | firehose | | hue | | metastore | | mysql | | nav | | navms | | oozie | | retail_db | | rman | | sentry | +--------------------+ 12 rows in set (0.00 sec)
Let’s use the database retail_db and print all the tables in it.
mysql> use retail_db; Reading table information for completion of table and column names You can turn off this feature to get a quicker startup with -A Database changed mysql> mysql> show tables; +---------------------+ | Tables_in_retail_db | +---------------------+ | categories | | customers | | departments | | order_items | | orders | | products | +---------------------+ 6 rows in set (0.00 sec)
Let’s import orders table data to HDFS.
Step 2: Import ‘orders’ table data to HDFS.
sqoop import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --username "root" --password "cloudera" --table "orders" --target-dir /import_demo
[cloudera@quickstart Desktop]$ sqoop import --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" --username "root" --password "cloudera" --table "orders" --target-dir /import_demo
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
22/03/31 11:04:31 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.13.0
22/03/31 11:04:31 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
22/03/31 11:04:32 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
22/03/31 11:04:32 INFO tool.CodeGenTool: Beginning code generation
22/03/31 11:04:32 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `orders` AS t LIMIT 1
22/03/31 11:04:32 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `orders` AS t LIMIT 1
22/03/31 11:04:32 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-mapreduce
Note: /tmp/sqoop-cloudera/compile/2768723de6eed159852edbba0e281f3f/orders.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
22/03/31 11:04:35 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-cloudera/compile/2768723de6eed159852edbba0e281f3f/orders.jar
22/03/31 11:04:35 WARN manager.MySQLManager: It looks like you are importing from mysql.
22/03/31 11:04:35 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
22/03/31 11:04:35 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
22/03/31 11:04:35 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
22/03/31 11:04:35 INFO mapreduce.ImportJobBase: Beginning import of orders
22/03/31 11:04:35 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
22/03/31 11:04:35 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
22/03/31 11:04:36 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
22/03/31 11:04:36 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
22/03/31 11:04:37 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/03/31 11:04:37 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/03/31 11:04:37 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/03/31 11:04:37 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/03/31 11:04:37 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/03/31 11:04:37 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/03/31 11:04:38 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/03/31 11:04:38 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/03/31 11:04:38 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/03/31 11:04:38 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/03/31 11:04:38 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/03/31 11:04:38 INFO db.DBInputFormat: Using read commited transaction isolation
22/03/31 11:04:38 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`order_id`), MAX(`order_id`) FROM `orders`
22/03/31 11:04:38 INFO db.IntegerSplitter: Split size: 17220; Num splits: 4 from: 1 to: 68883
22/03/31 11:04:38 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/03/31 11:04:38 INFO mapreduce.JobSubmitter: number of splits:4
22/03/31 11:04:38 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1647946797614_0001
22/03/31 11:04:39 INFO impl.YarnClientImpl: Submitted application application_1647946797614_0001
22/03/31 11:04:39 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1647946797614_0001/
22/03/31 11:04:39 INFO mapreduce.Job: Running job: job_1647946797614_0001
22/03/31 11:04:50 INFO mapreduce.Job: Job job_1647946797614_0001 running in uber mode : false
22/03/31 11:04:50 INFO mapreduce.Job: map 0% reduce 0%
22/03/31 11:05:16 INFO mapreduce.Job: map 25% reduce 0%
22/03/31 11:05:18 INFO mapreduce.Job: map 50% reduce 0%
22/03/31 11:05:21 INFO mapreduce.Job: map 75% reduce 0%
22/03/31 11:05:22 INFO mapreduce.Job: map 100% reduce 0%
22/03/31 11:05:22 INFO mapreduce.Job: Job job_1647946797614_0001 completed successfully
22/03/31 11:05:22 INFO mapreduce.Job: Counters: 31
File System Counters
FILE: Number of bytes read=0
FILE: Number of bytes written=685372
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=469
HDFS: Number of bytes written=2999944
HDFS: Number of read operations=16
HDFS: Number of large read operations=0
HDFS: Number of write operations=8
Job Counters
Killed map tasks=1
Launched map tasks=4
Other local map tasks=4
Total time spent by all maps in occupied slots (ms)=104457
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=104457
Total vcore-milliseconds taken by all map tasks=104457
Total megabyte-milliseconds taken by all map tasks=106963968
Map-Reduce Framework
Map input records=68883
Map output records=68883
Input split bytes=469
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=1352
CPU time spent (ms)=10700
Physical memory (bytes) snapshot=500211712
Virtual memory (bytes) snapshot=6044966912
Total committed heap usage (bytes)=243007488
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=2999944
22/03/31 11:05:22 INFO mapreduce.ImportJobBase: Transferred 2.861 MB in 46.1739 seconds (63.4478 KB/sec)
22/03/31 11:05:22 INFO mapreduce.ImportJobBase: Retrieved 68883 records.
[cloudera@quickstart Desktop]$
Step 3: Let’s query the HDFS folder /import_demo and confirm whether data is copied or not.
[cloudera@quickstart Desktop]$ hadoop fs -ls /import_demo
Found 5 items
-rw-r--r-- 1 cloudera supergroup 0 2022-03-31 11:05 /import_demo/_SUCCESS
-rw-r--r-- 1 cloudera supergroup 741614 2022-03-31 11:05 /import_demo/part-m-00000
-rw-r--r-- 1 cloudera supergroup 753022 2022-03-31 11:05 /import_demo/part-m-00001
-rw-r--r-- 1 cloudera supergroup 752368 2022-03-31 11:05 /import_demo/part-m-00002
-rw-r--r-- 1 cloudera supergroup 752940 2022-03-31 11:05 /import_demo/part-m-00003
‘/import_demo/_SUCCESS’ is an empty file used to specify the import operation is successful.
As you the output, there are 4 part-m-* files created. It is because ‘sqoop import’ command itself is a MapReduce job (MapOnly job) that run on 4 mappers. Each part file represents a mapper output.
You can query the content of part file by executing below command.
hadoop fs -cat /import_demo/part-m-00000
FAQs
Is ‘sqoop import’ a map reduce job?
Yes, it is a Map only job (no reducers). All the mapper jobs divide the data among themselves based the table primary key.
How many mappers run by default for ‘sqoop import’ command?
4 mappers.
Can I customize the number of mappers while executing ‘sqoop import’ command?
Yes
What is the behaviour of ‘sqoop import’ when the table do not have a primary key?
By default, ‘sqoop import’ command divide the data among 4 mappers using the primary key. If a table do not have a primary key, then the operation will fail with below kind of error.
No primary key could be found for table {TABLE_NAME}. Please specify one with --split-by or perform a sequential import with '-m 1'.
There are 2 ways to address this problem
a. Set the number of mappers to 1. (OR)
b. specify split-by column to divide the work. In this case, ‘sqoop import’ job use the split-by column to divide the data among mappers.
Previous Next Home
No comments:
Post a Comment