Wednesday 1 June 2022

Sqoop: incremental import using append mode

In this post, I am going to explain how to import the data to HDFS using sqoop incremental import mode.

 

When to use append mode?

Use this mode, when you are only interested in adding newly created records to the existing dataset.

 

How to specify incremental mode?

Using --incremental option, you can specify the incremental mode.

 

Example

sqoop import \
--connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
--username "root" \
--password "cloudera" \
--table "customers" \
--warehouse-dir /incremental-append-demo \
--incremental append \
--check-column customer_id \
--last-value 0

 Above snippet import the data from customers table to /incremental-append-demo.

 

To demonstrate the example, I am going to onboard customers data to HDFS incrementally.

 

mysql> DESCRIBE customers;
+-------------------+--------------+------+-----+---------+----------------+
| Field             | Type         | Null | Key | Default | Extra          |
+-------------------+--------------+------+-----+---------+----------------+
| customer_id       | int(11)      | NO   | PRI | NULL    | auto_increment |
| customer_fname    | varchar(45)  | NO   |     | NULL    |                |
| customer_lname    | varchar(45)  | NO   |     | NULL    |                |
| customer_email    | varchar(45)  | NO   |     | NULL    |                |
| customer_password | varchar(45)  | NO   |     | NULL    |                |
| customer_street   | varchar(255) | NO   |     | NULL    |                |
| customer_city     | varchar(45)  | NO   |     | NULL    |                |
| customer_state    | varchar(45)  | NO   |     | NULL    |                |
| customer_zipcode  | varchar(45)  | NO   |     | NULL    |                |
+-------------------+--------------+------+-----+---------+----------------+
9 rows in set (0.00 sec)
mysql> 
mysql> SELECT COUNT(*) FROM customers;
+----------+
| COUNT(*) |
+----------+
|    12435 |
+----------+
1 row in set (0.02 sec)

Let’s onboard the customers data to HDFS.

[cloudera@quickstart ~]$ sqoop import \
> --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
> --username "root" \
> --password "cloudera" \
> --table "customers" \
> --warehouse-dir /incremental-append-demo \
> --incremental append \
> --check-column customer_id \
> --last-value 0
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
22/04/05 08:38:13 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.13.0
22/04/05 08:38:13 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
22/04/05 08:38:13 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
22/04/05 08:38:13 INFO tool.CodeGenTool: Beginning code generation
22/04/05 08:38:13 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `customers` AS t LIMIT 1
22/04/05 08:38:13 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `customers` AS t LIMIT 1
22/04/05 08:38:13 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-mapreduce
Note: /tmp/sqoop-cloudera/compile/fd526416e6cd7bf163633edd634bd8e8/customers.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
22/04/05 08:38:15 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-cloudera/compile/fd526416e6cd7bf163633edd634bd8e8/customers.jar
22/04/05 08:38:16 INFO tool.ImportTool: Maximal id query for free form incremental import: SELECT MAX(`customer_id`) FROM `customers`
22/04/05 08:38:16 INFO tool.ImportTool: Incremental import based on column `customer_id`
22/04/05 08:38:16 INFO tool.ImportTool: Lower bound value: 0
22/04/05 08:38:16 INFO tool.ImportTool: Upper bound value: 12435
22/04/05 08:38:16 WARN manager.MySQLManager: It looks like you are importing from mysql.
22/04/05 08:38:16 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
22/04/05 08:38:16 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
22/04/05 08:38:16 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
22/04/05 08:38:16 INFO mapreduce.ImportJobBase: Beginning import of customers
22/04/05 08:38:16 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
22/04/05 08:38:17 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
22/04/05 08:38:17 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
22/04/05 08:38:17 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
22/04/05 08:38:18 WARN hdfs.DFSClient: Caught exception 
java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1281)
	at java.lang.Thread.join(Thread.java:1355)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/04/05 08:38:19 WARN hdfs.DFSClient: Caught exception 
java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1281)
	at java.lang.Thread.join(Thread.java:1355)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/04/05 08:38:19 INFO db.DBInputFormat: Using read commited transaction isolation
22/04/05 08:38:19 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`customer_id`), MAX(`customer_id`) FROM `customers` WHERE ( `customer_id` > 0 AND `customer_id` <= 12435 )
22/04/05 08:38:19 INFO db.IntegerSplitter: Split size: 3108; Num splits: 4 from: 1 to: 12435
22/04/05 08:38:19 INFO mapreduce.JobSubmitter: number of splits:4
22/04/05 08:38:19 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1649172504056_0002
22/04/05 08:38:20 INFO impl.YarnClientImpl: Submitted application application_1649172504056_0002
22/04/05 08:38:20 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1649172504056_0002/
22/04/05 08:38:20 INFO mapreduce.Job: Running job: job_1649172504056_0002
22/04/05 08:38:29 INFO mapreduce.Job: Job job_1649172504056_0002 running in uber mode : false
22/04/05 08:38:29 INFO mapreduce.Job:  map 0% reduce 0%
22/04/05 08:38:48 INFO mapreduce.Job:  map 25% reduce 0%
22/04/05 08:38:51 INFO mapreduce.Job:  map 50% reduce 0%
22/04/05 08:38:52 INFO mapreduce.Job:  map 75% reduce 0%
22/04/05 08:38:53 INFO mapreduce.Job:  map 100% reduce 0%
22/04/05 08:38:53 INFO mapreduce.Job: Job job_1649172504056_0002 completed successfully
22/04/05 08:38:54 INFO mapreduce.Job: Counters: 30
	File System Counters
		FILE: Number of bytes read=0
		FILE: Number of bytes written=689052
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=487
		HDFS: Number of bytes written=953525
		HDFS: Number of read operations=16
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=8
	Job Counters 
		Launched map tasks=4
		Other local map tasks=4
		Total time spent by all maps in occupied slots (ms)=71605
		Total time spent by all reduces in occupied slots (ms)=0
		Total time spent by all map tasks (ms)=71605
		Total vcore-milliseconds taken by all map tasks=71605
		Total megabyte-milliseconds taken by all map tasks=73323520
	Map-Reduce Framework
		Map input records=12435
		Map output records=12435
		Input split bytes=487
		Spilled Records=0
		Failed Shuffles=0
		Merged Map outputs=0
		GC time elapsed (ms)=894
		CPU time spent (ms)=5100
		Physical memory (bytes) snapshot=537309184
		Virtual memory (bytes) snapshot=6045786112
		Total committed heap usage (bytes)=243007488
	File Input Format Counters 
		Bytes Read=0
	File Output Format Counters 
		Bytes Written=953525
22/04/05 08:38:54 INFO mapreduce.ImportJobBase: Transferred 931.1768 KB in 37.1809 seconds (25.0445 KB/sec)
22/04/05 08:38:54 INFO mapreduce.ImportJobBase: Retrieved 12435 records.
22/04/05 08:38:54 INFO util.AppendUtils: Creating missing output directory - customers
22/04/05 08:38:54 INFO tool.ImportTool: Incremental import complete! To run another incremental import of all data following this import, supply the following arguments:
22/04/05 08:38:54 INFO tool.ImportTool:  --incremental append
22/04/05 08:38:54 INFO tool.ImportTool:   --check-column customer_id
22/04/05 08:38:54 INFO tool.ImportTool:   --last-value 12435
22/04/05 08:38:54 INFO tool.ImportTool: (Consider saving this with 'sqoop job --create')
[cloudera@quickstart ~]$

Log messages to consider

22/04/05 08:38:54 INFO tool.ImportTool: Incremental import complete! To run another incremental import of all data following this import, supply the following arguments:
22/04/05 08:38:54 INFO tool.ImportTool:  --incremental append
22/04/05 08:38:54 INFO tool.ImportTool:   --check-column customer_id
22/04/05 08:38:54 INFO tool.ImportTool:   --last-value 12435

From the above messages, you can confirm that incremental import is completed and next time we need to sync from the customer id 12345.

 

Let’s query the folder /incremental-append-demo and confirm.

[cloudera@quickstart ~]$ hadoop fs -ls /incremental-append-demo 
Found 1 items
drwxr-xr-x   - cloudera supergroup          0 2022-04-05 08:38 /incremental-append-demo/customers
[cloudera@quickstart ~]$ 
[cloudera@quickstart ~]$ hadoop fs -ls /incremental-append-demo/customers
Found 4 items
-rw-r--r--   1 cloudera cloudera     237145 2022-04-05 08:38 /incremental-append-demo/customers/part-m-00000
-rw-r--r--   1 cloudera cloudera     237965 2022-04-05 08:38 /incremental-append-demo/customers/part-m-00001
-rw-r--r--   1 cloudera cloudera     238092 2022-04-05 08:38 /incremental-append-demo/customers/part-m-00002
-rw-r--r--   1 cloudera cloudera     240323 2022-04-05 08:38 /incremental-append-demo/customers/part-m-00003
[cloudera@quickstart ~]$ 
[cloudera@quickstart ~]$ hadoop fs -cat /incremental-append-demo/customers/* | wc -l
12435

 

Let’s add 5 more records to the customers table.

 

 

INSERT INTO customers VALUES (12436,'Ram','Gurram','XXXXXXX','XXXXX','Chowdeswari Street','Bangalore', 'Karnataka', '560034');
INSERT INTO customers VALUES (12437,'Krishna','Ponname','XXXXXXX','XXXXX','Chowdeswari Street','Bangalore', 'Karnataka', '560034');
INSERT INTO customers VALUES (12438,'Bhoomi','Kalyan','XXXXXXX','XXXXX','Chowdeswari Street','Bangalore', 'Karnataka', '560034');
INSERT INTO customers VALUES (12439,'Dev','Gowda','XXXXXXX','XXXXX','Chowdeswari Street','Bangalore', 'Karnataka', '560034');
INSERT INTO customers VALUES (12440,'Ramesh','S','XXXXXXX','XXXXX','Chowdeswari Street','Bangalore', 'Karnataka', '560034');

mysql> INSERT INTO customers VALUES (12436,'Ram','Gurram','XXXXXXX','XXXXX','Chowdeswari Street','Bangalore', 'Karnataka', '560034');
Query OK, 1 row affected (0.00 sec)

mysql> INSERT INTO customers VALUES (12437,'Krishna','Ponname','XXXXXXX','XXXXX','Chowdeswari Street','Bangalore', 'Karnataka', '560034');
Query OK, 1 row affected (0.00 sec)

mysql> INSERT INTO customers VALUES (12438,'Bhoomi','Kalyan','XXXXXXX','XXXXX','Chowdeswari Street','Bangalore', 'Karnataka', '560034');
Query OK, 1 row affected (0.00 sec)

mysql> INSERT INTO customers VALUES (12439,'Dev','Gowda','XXXXXXX','XXXXX','Chowdeswari Street','Bangalore', 'Karnataka', '560034');
Query OK, 1 row affected (0.00 sec)

mysql> INSERT INTO customers VALUES (12440,'Ramesh','S','XXXXXXX','XXXXX','Chowdeswari Street','Bangalore', 'Karnataka', '560034');
Query OK, 1 row affected (0.00 sec)

mysql> SELECT COUNT(*) FROM customers;
+----------+
| COUNT(*) |
+----------+
|    12440 |
+----------+
1 row in set (0.01 sec)

Let me add these five records incrementally to the existing data set by executing below command.

sqoop import \
--connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
--username "root" \
--password "cloudera" \
--table "customers" \
--warehouse-dir /incremental-append-demo \
--incremental append \
--check-column customer_id \
--last-value 12345

[cloudera@quickstart ~]$ sqoop import \
> --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
> --username "root" \
> --password "cloudera" \
> --table "customers" \
> --warehouse-dir /incremental-append-demo \
> --incremental append \
> --check-column customer_id \
> --last-value 12345
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
22/04/05 08:56:35 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.13.0
22/04/05 08:56:35 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
22/04/05 08:56:35 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
22/04/05 08:56:35 INFO tool.CodeGenTool: Beginning code generation
22/04/05 08:56:36 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `customers` AS t LIMIT 1
22/04/05 08:56:36 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `customers` AS t LIMIT 1
22/04/05 08:56:36 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-mapreduce
Note: /tmp/sqoop-cloudera/compile/5df667e74bd381e69515c2dc51c969bc/customers.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
22/04/05 08:56:38 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-cloudera/compile/5df667e74bd381e69515c2dc51c969bc/customers.jar
22/04/05 08:56:40 INFO tool.ImportTool: Maximal id query for free form incremental import: SELECT MAX(`customer_id`) FROM `customers`
22/04/05 08:56:40 INFO tool.ImportTool: Incremental import based on column `customer_id`
22/04/05 08:56:40 INFO tool.ImportTool: Lower bound value: 12345
22/04/05 08:56:40 INFO tool.ImportTool: Upper bound value: 12440
22/04/05 08:56:40 WARN manager.MySQLManager: It looks like you are importing from mysql.
22/04/05 08:56:40 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
22/04/05 08:56:40 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
22/04/05 08:56:40 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
22/04/05 08:56:40 INFO mapreduce.ImportJobBase: Beginning import of customers
22/04/05 08:56:40 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
22/04/05 08:56:40 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
22/04/05 08:56:40 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
22/04/05 08:56:40 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
22/04/05 08:56:42 WARN hdfs.DFSClient: Caught exception 
java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1281)
	at java.lang.Thread.join(Thread.java:1355)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/04/05 08:56:42 INFO db.DBInputFormat: Using read commited transaction isolation
22/04/05 08:56:42 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`customer_id`), MAX(`customer_id`) FROM `customers` WHERE ( `customer_id` > 12345 AND `customer_id` <= 12440 )
22/04/05 08:56:42 INFO db.IntegerSplitter: Split size: 23; Num splits: 4 from: 12346 to: 12440
22/04/05 08:56:42 WARN hdfs.DFSClient: Caught exception 
java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1281)
	at java.lang.Thread.join(Thread.java:1355)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/04/05 08:56:42 WARN hdfs.DFSClient: Caught exception 
java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1281)
	at java.lang.Thread.join(Thread.java:1355)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/04/05 08:56:42 INFO mapreduce.JobSubmitter: number of splits:4
22/04/05 08:56:43 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1649172504056_0003
22/04/05 08:56:43 INFO impl.YarnClientImpl: Submitted application application_1649172504056_0003
22/04/05 08:56:43 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1649172504056_0003/
22/04/05 08:56:43 INFO mapreduce.Job: Running job: job_1649172504056_0003
22/04/05 08:56:51 INFO mapreduce.Job: Job job_1649172504056_0003 running in uber mode : false
22/04/05 08:56:51 INFO mapreduce.Job:  map 0% reduce 0%
22/04/05 08:57:05 INFO mapreduce.Job:  map 25% reduce 0%
22/04/05 08:57:08 INFO mapreduce.Job:  map 50% reduce 0%
22/04/05 08:57:09 INFO mapreduce.Job:  map 75% reduce 0%
22/04/05 08:57:10 INFO mapreduce.Job:  map 100% reduce 0%
22/04/05 08:57:10 INFO mapreduce.Job: Job job_1649172504056_0003 completed successfully
22/04/05 08:57:10 INFO mapreduce.Job: Counters: 30
	File System Counters
		FILE: Number of bytes read=0
		FILE: Number of bytes written=689084
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=497
		HDFS: Number of bytes written=7329
		HDFS: Number of read operations=16
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=8
	Job Counters 
		Launched map tasks=4
		Other local map tasks=4
		Total time spent by all maps in occupied slots (ms)=55261
		Total time spent by all reduces in occupied slots (ms)=0
		Total time spent by all map tasks (ms)=55261
		Total vcore-milliseconds taken by all map tasks=55261
		Total megabyte-milliseconds taken by all map tasks=56587264
	Map-Reduce Framework
		Map input records=95
		Map output records=95
		Input split bytes=497
		Spilled Records=0
		Failed Shuffles=0
		Merged Map outputs=0
		GC time elapsed (ms)=691
		CPU time spent (ms)=2890
		Physical memory (bytes) snapshot=560947200
		Virtual memory (bytes) snapshot=6044770304
		Total committed heap usage (bytes)=243007488
	File Input Format Counters 
		Bytes Read=0
	File Output Format Counters 
		Bytes Written=7329
22/04/05 08:57:10 INFO mapreduce.ImportJobBase: Transferred 7.1572 KB in 30.3459 seconds (241.5152 bytes/sec)
22/04/05 08:57:10 INFO mapreduce.ImportJobBase: Retrieved 95 records.
22/04/05 08:57:10 INFO util.AppendUtils: Appending to directory customers
22/04/05 08:57:10 INFO util.AppendUtils: Using found partition 4
22/04/05 08:57:11 INFO tool.ImportTool: Incremental import complete! To run another incremental import of all data following this import, supply the following arguments:
22/04/05 08:57:11 INFO tool.ImportTool:  --incremental append
22/04/05 08:57:11 INFO tool.ImportTool:   --check-column customer_id
22/04/05 08:57:11 INFO tool.ImportTool:   --last-value 12440
22/04/05 08:57:11 INFO tool.ImportTool: (Consider saving this with 'sqoop job --create')
[cloudera@quickstart ~]$

When you query the /incremental-append-demo/customers folder, you can observe that 4 more new part files created and added to the HDFS folder.

[cloudera@quickstart ~]$ hadoop fs -ls /incremental-append-demo/customers
Found 8 items
-rw-r--r--   1 cloudera cloudera     237145 2022-04-05 08:38 /incremental-append-demo/customers/part-m-00000
-rw-r--r--   1 cloudera cloudera     237965 2022-04-05 08:38 /incremental-append-demo/customers/part-m-00001
-rw-r--r--   1 cloudera cloudera     238092 2022-04-05 08:38 /incremental-append-demo/customers/part-m-00002
-rw-r--r--   1 cloudera cloudera     240323 2022-04-05 08:38 /incremental-append-demo/customers/part-m-00003
-rw-r--r--   1 cloudera cloudera       1865 2022-04-05 08:57 /incremental-append-demo/customers/part-m-00004
-rw-r--r--   1 cloudera cloudera       1860 2022-04-05 08:57 /incremental-append-demo/customers/part-m-00005
-rw-r--r--   1 cloudera cloudera       1742 2022-04-05 08:57 /incremental-append-demo/customers/part-m-00006
-rw-r--r--   1 cloudera cloudera       1862 2022-04-05 08:57 /incremental-append-demo/customers/part-m-00007


  

Previous                                                    Next                                                    Home

No comments:

Post a Comment