In this post, I am going to explain how to import the data to HDFS using sqoop incremental import mode.
When to use append mode?
Use this mode, when you are only interested in adding newly created records to the existing dataset.
How to specify incremental mode?
Using --incremental option, you can specify the incremental mode.
Example
sqoop import \
--connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
--username "root" \
--password "cloudera" \
--table "customers" \
--warehouse-dir /incremental-append-demo \
--incremental append \
--check-column customer_id \
--last-value 0
Above snippet import the data from customers table to /incremental-append-demo.
To demonstrate the example, I am going to onboard customers data to HDFS incrementally.
mysql> DESCRIBE customers; +-------------------+--------------+------+-----+---------+----------------+ | Field | Type | Null | Key | Default | Extra | +-------------------+--------------+------+-----+---------+----------------+ | customer_id | int(11) | NO | PRI | NULL | auto_increment | | customer_fname | varchar(45) | NO | | NULL | | | customer_lname | varchar(45) | NO | | NULL | | | customer_email | varchar(45) | NO | | NULL | | | customer_password | varchar(45) | NO | | NULL | | | customer_street | varchar(255) | NO | | NULL | | | customer_city | varchar(45) | NO | | NULL | | | customer_state | varchar(45) | NO | | NULL | | | customer_zipcode | varchar(45) | NO | | NULL | | +-------------------+--------------+------+-----+---------+----------------+ 9 rows in set (0.00 sec) mysql> mysql> SELECT COUNT(*) FROM customers; +----------+ | COUNT(*) | +----------+ | 12435 | +----------+ 1 row in set (0.02 sec)
Let’s onboard the customers data to HDFS.
[cloudera@quickstart ~]$ sqoop import \
> --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
> --username "root" \
> --password "cloudera" \
> --table "customers" \
> --warehouse-dir /incremental-append-demo \
> --incremental append \
> --check-column customer_id \
> --last-value 0
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
22/04/05 08:38:13 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.13.0
22/04/05 08:38:13 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
22/04/05 08:38:13 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
22/04/05 08:38:13 INFO tool.CodeGenTool: Beginning code generation
22/04/05 08:38:13 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `customers` AS t LIMIT 1
22/04/05 08:38:13 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `customers` AS t LIMIT 1
22/04/05 08:38:13 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-mapreduce
Note: /tmp/sqoop-cloudera/compile/fd526416e6cd7bf163633edd634bd8e8/customers.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
22/04/05 08:38:15 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-cloudera/compile/fd526416e6cd7bf163633edd634bd8e8/customers.jar
22/04/05 08:38:16 INFO tool.ImportTool: Maximal id query for free form incremental import: SELECT MAX(`customer_id`) FROM `customers`
22/04/05 08:38:16 INFO tool.ImportTool: Incremental import based on column `customer_id`
22/04/05 08:38:16 INFO tool.ImportTool: Lower bound value: 0
22/04/05 08:38:16 INFO tool.ImportTool: Upper bound value: 12435
22/04/05 08:38:16 WARN manager.MySQLManager: It looks like you are importing from mysql.
22/04/05 08:38:16 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
22/04/05 08:38:16 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
22/04/05 08:38:16 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
22/04/05 08:38:16 INFO mapreduce.ImportJobBase: Beginning import of customers
22/04/05 08:38:16 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
22/04/05 08:38:17 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
22/04/05 08:38:17 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
22/04/05 08:38:17 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
22/04/05 08:38:18 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/04/05 08:38:19 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/04/05 08:38:19 INFO db.DBInputFormat: Using read commited transaction isolation
22/04/05 08:38:19 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`customer_id`), MAX(`customer_id`) FROM `customers` WHERE ( `customer_id` > 0 AND `customer_id` <= 12435 )
22/04/05 08:38:19 INFO db.IntegerSplitter: Split size: 3108; Num splits: 4 from: 1 to: 12435
22/04/05 08:38:19 INFO mapreduce.JobSubmitter: number of splits:4
22/04/05 08:38:19 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1649172504056_0002
22/04/05 08:38:20 INFO impl.YarnClientImpl: Submitted application application_1649172504056_0002
22/04/05 08:38:20 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1649172504056_0002/
22/04/05 08:38:20 INFO mapreduce.Job: Running job: job_1649172504056_0002
22/04/05 08:38:29 INFO mapreduce.Job: Job job_1649172504056_0002 running in uber mode : false
22/04/05 08:38:29 INFO mapreduce.Job: map 0% reduce 0%
22/04/05 08:38:48 INFO mapreduce.Job: map 25% reduce 0%
22/04/05 08:38:51 INFO mapreduce.Job: map 50% reduce 0%
22/04/05 08:38:52 INFO mapreduce.Job: map 75% reduce 0%
22/04/05 08:38:53 INFO mapreduce.Job: map 100% reduce 0%
22/04/05 08:38:53 INFO mapreduce.Job: Job job_1649172504056_0002 completed successfully
22/04/05 08:38:54 INFO mapreduce.Job: Counters: 30
File System Counters
FILE: Number of bytes read=0
FILE: Number of bytes written=689052
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=487
HDFS: Number of bytes written=953525
HDFS: Number of read operations=16
HDFS: Number of large read operations=0
HDFS: Number of write operations=8
Job Counters
Launched map tasks=4
Other local map tasks=4
Total time spent by all maps in occupied slots (ms)=71605
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=71605
Total vcore-milliseconds taken by all map tasks=71605
Total megabyte-milliseconds taken by all map tasks=73323520
Map-Reduce Framework
Map input records=12435
Map output records=12435
Input split bytes=487
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=894
CPU time spent (ms)=5100
Physical memory (bytes) snapshot=537309184
Virtual memory (bytes) snapshot=6045786112
Total committed heap usage (bytes)=243007488
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=953525
22/04/05 08:38:54 INFO mapreduce.ImportJobBase: Transferred 931.1768 KB in 37.1809 seconds (25.0445 KB/sec)
22/04/05 08:38:54 INFO mapreduce.ImportJobBase: Retrieved 12435 records.
22/04/05 08:38:54 INFO util.AppendUtils: Creating missing output directory - customers
22/04/05 08:38:54 INFO tool.ImportTool: Incremental import complete! To run another incremental import of all data following this import, supply the following arguments:
22/04/05 08:38:54 INFO tool.ImportTool: --incremental append
22/04/05 08:38:54 INFO tool.ImportTool: --check-column customer_id
22/04/05 08:38:54 INFO tool.ImportTool: --last-value 12435
22/04/05 08:38:54 INFO tool.ImportTool: (Consider saving this with 'sqoop job --create')
[cloudera@quickstart ~]$
Log messages to consider
22/04/05 08:38:54 INFO tool.ImportTool: Incremental import complete! To run another incremental import of all data following this import, supply the following arguments: 22/04/05 08:38:54 INFO tool.ImportTool: --incremental append 22/04/05 08:38:54 INFO tool.ImportTool: --check-column customer_id 22/04/05 08:38:54 INFO tool.ImportTool: --last-value 12435
From the above messages, you can confirm that incremental import is completed and next time we need to sync from the customer id 12345.
Let’s query the folder /incremental-append-demo and confirm.
[cloudera@quickstart ~]$ hadoop fs -ls /incremental-append-demo
Found 1 items
drwxr-xr-x - cloudera supergroup 0 2022-04-05 08:38 /incremental-append-demo/customers
[cloudera@quickstart ~]$
[cloudera@quickstart ~]$ hadoop fs -ls /incremental-append-demo/customers
Found 4 items
-rw-r--r-- 1 cloudera cloudera 237145 2022-04-05 08:38 /incremental-append-demo/customers/part-m-00000
-rw-r--r-- 1 cloudera cloudera 237965 2022-04-05 08:38 /incremental-append-demo/customers/part-m-00001
-rw-r--r-- 1 cloudera cloudera 238092 2022-04-05 08:38 /incremental-append-demo/customers/part-m-00002
-rw-r--r-- 1 cloudera cloudera 240323 2022-04-05 08:38 /incremental-append-demo/customers/part-m-00003
[cloudera@quickstart ~]$
[cloudera@quickstart ~]$ hadoop fs -cat /incremental-append-demo/customers/* | wc -l
12435
Let’s add 5 more records to the customers table.
INSERT INTO customers VALUES (12436,'Ram','Gurram','XXXXXXX','XXXXX','Chowdeswari Street','Bangalore', 'Karnataka', '560034');
INSERT INTO customers VALUES (12437,'Krishna','Ponname','XXXXXXX','XXXXX','Chowdeswari Street','Bangalore', 'Karnataka', '560034');
INSERT INTO customers VALUES (12438,'Bhoomi','Kalyan','XXXXXXX','XXXXX','Chowdeswari Street','Bangalore', 'Karnataka', '560034');
INSERT INTO customers VALUES (12439,'Dev','Gowda','XXXXXXX','XXXXX','Chowdeswari Street','Bangalore', 'Karnataka', '560034');
INSERT INTO customers VALUES (12440,'Ramesh','S','XXXXXXX','XXXXX','Chowdeswari Street','Bangalore', 'Karnataka', '560034');
mysql> INSERT INTO customers VALUES (12436,'Ram','Gurram','XXXXXXX','XXXXX','Chowdeswari Street','Bangalore', 'Karnataka', '560034');
Query OK, 1 row affected (0.00 sec)
mysql> INSERT INTO customers VALUES (12437,'Krishna','Ponname','XXXXXXX','XXXXX','Chowdeswari Street','Bangalore', 'Karnataka', '560034');
Query OK, 1 row affected (0.00 sec)
mysql> INSERT INTO customers VALUES (12438,'Bhoomi','Kalyan','XXXXXXX','XXXXX','Chowdeswari Street','Bangalore', 'Karnataka', '560034');
Query OK, 1 row affected (0.00 sec)
mysql> INSERT INTO customers VALUES (12439,'Dev','Gowda','XXXXXXX','XXXXX','Chowdeswari Street','Bangalore', 'Karnataka', '560034');
Query OK, 1 row affected (0.00 sec)
mysql> INSERT INTO customers VALUES (12440,'Ramesh','S','XXXXXXX','XXXXX','Chowdeswari Street','Bangalore', 'Karnataka', '560034');
Query OK, 1 row affected (0.00 sec)
mysql> SELECT COUNT(*) FROM customers;
+----------+
| COUNT(*) |
+----------+
| 12440 |
+----------+
1 row in set (0.01 sec)
Let me add these five records incrementally to the existing data set by executing below command.
sqoop import \
--connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
--username "root" \
--password "cloudera" \
--table "customers" \
--warehouse-dir /incremental-append-demo \
--incremental append \
--check-column customer_id \
--last-value 12345
[cloudera@quickstart ~]$ sqoop import \
> --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
> --username "root" \
> --password "cloudera" \
> --table "customers" \
> --warehouse-dir /incremental-append-demo \
> --incremental append \
> --check-column customer_id \
> --last-value 12345
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
22/04/05 08:56:35 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.13.0
22/04/05 08:56:35 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
22/04/05 08:56:35 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
22/04/05 08:56:35 INFO tool.CodeGenTool: Beginning code generation
22/04/05 08:56:36 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `customers` AS t LIMIT 1
22/04/05 08:56:36 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `customers` AS t LIMIT 1
22/04/05 08:56:36 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-mapreduce
Note: /tmp/sqoop-cloudera/compile/5df667e74bd381e69515c2dc51c969bc/customers.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
22/04/05 08:56:38 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-cloudera/compile/5df667e74bd381e69515c2dc51c969bc/customers.jar
22/04/05 08:56:40 INFO tool.ImportTool: Maximal id query for free form incremental import: SELECT MAX(`customer_id`) FROM `customers`
22/04/05 08:56:40 INFO tool.ImportTool: Incremental import based on column `customer_id`
22/04/05 08:56:40 INFO tool.ImportTool: Lower bound value: 12345
22/04/05 08:56:40 INFO tool.ImportTool: Upper bound value: 12440
22/04/05 08:56:40 WARN manager.MySQLManager: It looks like you are importing from mysql.
22/04/05 08:56:40 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
22/04/05 08:56:40 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
22/04/05 08:56:40 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
22/04/05 08:56:40 INFO mapreduce.ImportJobBase: Beginning import of customers
22/04/05 08:56:40 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
22/04/05 08:56:40 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
22/04/05 08:56:40 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
22/04/05 08:56:40 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
22/04/05 08:56:42 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/04/05 08:56:42 INFO db.DBInputFormat: Using read commited transaction isolation
22/04/05 08:56:42 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`customer_id`), MAX(`customer_id`) FROM `customers` WHERE ( `customer_id` > 12345 AND `customer_id` <= 12440 )
22/04/05 08:56:42 INFO db.IntegerSplitter: Split size: 23; Num splits: 4 from: 12346 to: 12440
22/04/05 08:56:42 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/04/05 08:56:42 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/04/05 08:56:42 INFO mapreduce.JobSubmitter: number of splits:4
22/04/05 08:56:43 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1649172504056_0003
22/04/05 08:56:43 INFO impl.YarnClientImpl: Submitted application application_1649172504056_0003
22/04/05 08:56:43 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1649172504056_0003/
22/04/05 08:56:43 INFO mapreduce.Job: Running job: job_1649172504056_0003
22/04/05 08:56:51 INFO mapreduce.Job: Job job_1649172504056_0003 running in uber mode : false
22/04/05 08:56:51 INFO mapreduce.Job: map 0% reduce 0%
22/04/05 08:57:05 INFO mapreduce.Job: map 25% reduce 0%
22/04/05 08:57:08 INFO mapreduce.Job: map 50% reduce 0%
22/04/05 08:57:09 INFO mapreduce.Job: map 75% reduce 0%
22/04/05 08:57:10 INFO mapreduce.Job: map 100% reduce 0%
22/04/05 08:57:10 INFO mapreduce.Job: Job job_1649172504056_0003 completed successfully
22/04/05 08:57:10 INFO mapreduce.Job: Counters: 30
File System Counters
FILE: Number of bytes read=0
FILE: Number of bytes written=689084
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=497
HDFS: Number of bytes written=7329
HDFS: Number of read operations=16
HDFS: Number of large read operations=0
HDFS: Number of write operations=8
Job Counters
Launched map tasks=4
Other local map tasks=4
Total time spent by all maps in occupied slots (ms)=55261
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=55261
Total vcore-milliseconds taken by all map tasks=55261
Total megabyte-milliseconds taken by all map tasks=56587264
Map-Reduce Framework
Map input records=95
Map output records=95
Input split bytes=497
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=691
CPU time spent (ms)=2890
Physical memory (bytes) snapshot=560947200
Virtual memory (bytes) snapshot=6044770304
Total committed heap usage (bytes)=243007488
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=7329
22/04/05 08:57:10 INFO mapreduce.ImportJobBase: Transferred 7.1572 KB in 30.3459 seconds (241.5152 bytes/sec)
22/04/05 08:57:10 INFO mapreduce.ImportJobBase: Retrieved 95 records.
22/04/05 08:57:10 INFO util.AppendUtils: Appending to directory customers
22/04/05 08:57:10 INFO util.AppendUtils: Using found partition 4
22/04/05 08:57:11 INFO tool.ImportTool: Incremental import complete! To run another incremental import of all data following this import, supply the following arguments:
22/04/05 08:57:11 INFO tool.ImportTool: --incremental append
22/04/05 08:57:11 INFO tool.ImportTool: --check-column customer_id
22/04/05 08:57:11 INFO tool.ImportTool: --last-value 12440
22/04/05 08:57:11 INFO tool.ImportTool: (Consider saving this with 'sqoop job --create')
[cloudera@quickstart ~]$
When you query the /incremental-append-demo/customers folder, you can observe that 4 more new part files created and added to the HDFS folder.
[cloudera@quickstart ~]$ hadoop fs -ls /incremental-append-demo/customers
Found 8 items
-rw-r--r-- 1 cloudera cloudera 237145 2022-04-05 08:38 /incremental-append-demo/customers/part-m-00000
-rw-r--r-- 1 cloudera cloudera 237965 2022-04-05 08:38 /incremental-append-demo/customers/part-m-00001
-rw-r--r-- 1 cloudera cloudera 238092 2022-04-05 08:38 /incremental-append-demo/customers/part-m-00002
-rw-r--r-- 1 cloudera cloudera 240323 2022-04-05 08:38 /incremental-append-demo/customers/part-m-00003
-rw-r--r-- 1 cloudera cloudera 1865 2022-04-05 08:57 /incremental-append-demo/customers/part-m-00004
-rw-r--r-- 1 cloudera cloudera 1860 2022-04-05 08:57 /incremental-append-demo/customers/part-m-00005
-rw-r--r-- 1 cloudera cloudera 1742 2022-04-05 08:57 /incremental-append-demo/customers/part-m-00006
-rw-r--r-- 1 cloudera cloudera 1862 2022-04-05 08:57 /incremental-append-demo/customers/part-m-00007
Previous Next Home
No comments:
Post a Comment