In this post, I am going to explain how to do incremental import using lastmodified mode.
Using Sqoop incremental import, we can import the delta changes from previous import to current import.
For example, Suppose you imported employee table (with 50000 records) from RDBMS to HDFS yesterday. There are 1000 new records added to employee table today, do you really want to reimport all the 51000 records again to HDFS? No.
Using incremental import technique, we can onboard newly added records, updated records after previous sync to the HDFS.
Sqoop incremental modes
Sqoop supports two incremental modes.
a. append mode: Use this mode, when you are only interested in adding newly created records to the existing dataset.
b. lastmodified mode: Use this mode, when you want to import the data by updated time stamp. This mode captures newly created records and updated records from previous sync.
Step 1: Let’s create an employee table to experiment with incremental upload.
Connect to mysql server.
[cloudera@quickstart ~]$ mysql -u root -p
Enter password:
Welcome to the MySQL monitor. Commands end with ; or \g.
Your MySQL connection id is 32
Server version: 5.1.73 Source distribution
Copyright (c) 2000, 2013, Oracle and/or its affiliates. All rights reserved.
Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.
Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.
mysql>
‘cloudera’ is a the password for cloudera quick start vm.
Create a database ‘incremental_demo’.
mysql> CREATE DATABASE incremental_demo; Query OK, 1 row affected (0.00 sec)
Create employee table in incremental_demo database.
CREATE TABLE employees (id INT, name VARCHAR(20), age INT, modified_time TIMESTAMP, PRIMARY KEY (id));
mysql> CREATE TABLE employees (id INT, name VARCHAR(20), age INT, modified_time TIMESTAMP, PRIMARY KEY (id));
Query OK, 0 rows affected (0.01 sec)
mysql>
mysql> DESCRIBE employees;
+---------------+-------------+------+-----+-------------------+-----------------------------+
| Field | Type | Null | Key | Default | Extra |
+---------------+-------------+------+-----+-------------------+-----------------------------+
| id | int(11) | NO | PRI | 0 | |
| name | varchar(20) | YES | | NULL | |
| age | int(11) | YES | | NULL | |
| modified_time | timestamp | NO | | CURRENT_TIMESTAMP | on update CURRENT_TIMESTAMP |
+---------------+-------------+------+-----+-------------------+-----------------------------+
4 rows in set (0.00 sec)
Let’s insert some data to employees table.
INSERT INTO employees VALUES (1, 'RAM', 34, '2008-01-01 00:00:00');
INSERT INTO employees VALUES (2, 'Rahim', 28, '2009-01-01 00:00:00');
INSERT INTO employees VALUES (3, 'Robert', 43, '2010-01-01 00:00:00');
INSERT INTO employees VALUES (4, 'Narasimha', 46, '2008-01-01 00:00:00');
INSERT INTO employees VALUES (5, 'Shankaer', 51, '2009-01-01 00:00:00');
INSERT INTO employees VALUES (6, 'Siva', 21, '2010-01-01 00:00:00');
mysql> INSERT INTO employees VALUES (1, 'RAM', 34, '2008-01-01 00:00:00');
Query OK, 1 row affected (0.00 sec)
mysql> INSERT INTO employees VALUES (2, 'Rahim', 28, '2009-01-01 00:00:00');
Query OK, 1 row affected (0.00 sec)
mysql> INSERT INTO employees VALUES (3, 'Robert', 43, '2010-01-01 00:00:00');
Query OK, 1 row affected (0.00 sec)
mysql> INSERT INTO employees VALUES (4, 'Narasimha', 46, '2008-01-01 00:00:00');
Query OK, 1 row affected (0.00 sec)
mysql> INSERT INTO employees VALUES (5, 'Shankaer', 51, '2009-01-01 00:00:00');
Query OK, 1 row affected (0.00 sec)
mysql> INSERT INTO employees VALUES (6, 'Siva', 21, '2010-01-01 00:00:00');
Query OK, 1 row affected (0.00 sec)
mysql>
mysql> SELECT * FROM employees;
+----+-----------+------+---------------------+
| id | name | age | modified_time |
+----+-----------+------+---------------------+
| 1 | RAM | 34 | 2008-01-01 00:00:00 |
| 2 | Rahim | 28 | 2009-01-01 00:00:00 |
| 3 | Robert | 43 | 2010-01-01 00:00:00 |
| 4 | Narasimha | 46 | 2008-01-01 00:00:00 |
| 5 | Shankaer | 51 | 2009-01-01 00:00:00 |
| 6 | Siva | 21 | 2010-01-01 00:00:00 |
+----+-----------+------+---------------------+
6 rows in set (0.00 sec)
Step 2: Let’s add the records from employees table to HDFS location ‘/last-modified-demo’ by executing below command.
sqoop import \
--connect "jdbc:mysql://quickstart.cloudera:3306/incremental_demo" \
--username "root" \
--password "cloudera" \
--table "employees" \
--warehouse-dir /last-modified-demo \
--incremental lastmodified \
--check-column modified_time \
--last-value 0
[cloudera@quickstart ~]$ sqoop import \
> --connect "jdbc:mysql://quickstart.cloudera:3306/incremental_demo" \
> --username "root" \
> --password "cloudera" \
> --table "employees" \
> --warehouse-dir /last-modified-demo \
> --incremental lastmodified \
> --check-column modified_time \
> --last-value 0
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
22/04/07 04:03:07 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.13.0
22/04/07 04:03:07 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
22/04/07 04:03:07 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
22/04/07 04:03:07 INFO tool.CodeGenTool: Beginning code generation
22/04/07 04:03:08 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `employees` AS t LIMIT 1
22/04/07 04:03:08 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `employees` AS t LIMIT 1
22/04/07 04:03:08 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-mapreduce
Note: /tmp/sqoop-cloudera/compile/fe0652c6df23e9479533b7f3338604fb/employees.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
22/04/07 04:03:10 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-cloudera/compile/fe0652c6df23e9479533b7f3338604fb/employees.jar
22/04/07 04:03:12 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `employees` AS t LIMIT 1
22/04/07 04:03:12 INFO tool.ImportTool: Incremental import based on column `modified_time`
22/04/07 04:03:12 INFO tool.ImportTool: Lower bound value: '0'
22/04/07 04:03:12 INFO tool.ImportTool: Upper bound value: '2022-04-07 04:03:12.0'
22/04/07 04:03:12 WARN manager.MySQLManager: It looks like you are importing from mysql.
22/04/07 04:03:12 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
22/04/07 04:03:12 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
22/04/07 04:03:12 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
22/04/07 04:03:12 INFO mapreduce.ImportJobBase: Beginning import of employees
22/04/07 04:03:12 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
22/04/07 04:03:12 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
22/04/07 04:03:12 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
22/04/07 04:03:12 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
22/04/07 04:03:13 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/04/07 04:03:14 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/04/07 04:03:14 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/04/07 04:03:14 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/04/07 04:03:14 INFO db.DBInputFormat: Using read commited transaction isolation
22/04/07 04:03:14 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`id`), MAX(`id`) FROM `employees` WHERE ( `modified_time` >= '0' AND `modified_time` < '2022-04-07 04:03:12.0' )
22/04/07 04:03:14 INFO db.IntegerSplitter: Split size: 1; Num splits: 4 from: 1 to: 6
22/04/07 04:03:14 INFO mapreduce.JobSubmitter: number of splits:4
22/04/07 04:03:14 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1649172504056_0004
22/04/07 04:03:14 INFO impl.YarnClientImpl: Submitted application application_1649172504056_0004
22/04/07 04:03:14 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1649172504056_0004/
22/04/07 04:03:14 INFO mapreduce.Job: Running job: job_1649172504056_0004
22/04/07 04:03:22 INFO mapreduce.Job: Job job_1649172504056_0004 running in uber mode : false
22/04/07 04:03:22 INFO mapreduce.Job: map 0% reduce 0%
22/04/07 04:03:41 INFO mapreduce.Job: map 25% reduce 0%
22/04/07 04:03:42 INFO mapreduce.Job: map 50% reduce 0%
22/04/07 04:03:44 INFO mapreduce.Job: map 100% reduce 0%
22/04/07 04:03:44 INFO mapreduce.Job: Job job_1649172504056_0004 completed successfully
22/04/07 04:03:45 INFO mapreduce.Job: Counters: 31
File System Counters
FILE: Number of bytes read=0
FILE: Number of bytes written=688860
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=393
HDFS: Number of bytes written=203
HDFS: Number of read operations=16
HDFS: Number of large read operations=0
HDFS: Number of write operations=8
Job Counters
Killed map tasks=1
Launched map tasks=4
Other local map tasks=4
Total time spent by all maps in occupied slots (ms)=68244
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=68244
Total vcore-milliseconds taken by all map tasks=68244
Total megabyte-milliseconds taken by all map tasks=69881856
Map-Reduce Framework
Map input records=6
Map output records=6
Input split bytes=393
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=740
CPU time spent (ms)=3490
Physical memory (bytes) snapshot=496422912
Virtual memory (bytes) snapshot=6040731648
Total committed heap usage (bytes)=243007488
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=203
22/04/07 04:03:45 INFO mapreduce.ImportJobBase: Transferred 203 bytes in 32.6043 seconds (6.2262 bytes/sec)
22/04/07 04:03:45 INFO mapreduce.ImportJobBase: Retrieved 6 records.
22/04/07 04:03:45 INFO tool.ImportTool: Final destination exists, will run merge job.
22/04/07 04:03:45 INFO tool.ImportTool: Moving data from temporary directory _sqoop/7a31e9cbd6e44cc8a7adc08ff8e5c3a1_employees to final destination /last-modified-demo/employees
22/04/07 04:03:45 INFO tool.ImportTool: Incremental import complete! To run another incremental import of all data following this import, supply the following arguments:
22/04/07 04:03:45 INFO tool.ImportTool: --incremental lastmodified
22/04/07 04:03:45 INFO tool.ImportTool: --check-column modified_time
22/04/07 04:03:45 INFO tool.ImportTool: --last-value 2022-04-07 04:03:12.0
22/04/07 04:03:45 INFO tool.ImportTool: (Consider saving this with 'sqoop job --create')
[cloudera@quickstart ~]$
Let’s query the folder /last-modified-demo.
[cloudera@quickstart ~]$ hadoop fs -ls /last-modified-demo
Found 1 items
drwxr-xr-x - cloudera cloudera 0 2022-04-07 04:03 /last-modified-demo/employees
[cloudera@quickstart ~]$
[cloudera@quickstart ~]$ hadoop fs -ls /last-modified-demo/employees
Found 5 items
-rw-r--r-- 1 cloudera cloudera 0 2022-04-07 04:03 /last-modified-demo/employees/_SUCCESS
-rw-r--r-- 1 cloudera cloudera 64 2022-04-07 04:03 /last-modified-demo/employees/part-m-00000
-rw-r--r-- 1 cloudera cloudera 34 2022-04-07 04:03 /last-modified-demo/employees/part-m-00001
-rw-r--r-- 1 cloudera cloudera 37 2022-04-07 04:03 /last-modified-demo/employees/part-m-00002
-rw-r--r-- 1 cloudera cloudera 68 2022-04-07 04:03 /last-modified-demo/employees/part-m-00003
[cloudera@quickstart ~]$
[cloudera@quickstart ~]$ hadoop fs -cat /last-modified-demo/employees/*
1,RAM,34,2008-01-01 00:00:00.0
2,Rahim,28,2009-01-01 00:00:00.0
3,Robert,43,2010-01-01 00:00:00.0
4,Narasimha,46,2008-01-01 00:00:00.0
5,Shankaer,51,2009-01-01 00:00:00.0
6,Siva,21,2010-01-01 00:00:00.0
Let’s update 2 records and add 2 new records to employees table.
INSERT INTO employees VALUES (7, 'Keerthi', 23, CURRENT_TIMESTAMP);
INSERT INTO employees VALUES (8, 'Maha', 27, CURRENT_TIMESTAMP);
UPDATE employees SET modified_time=CURRENT_TIMESTAMP WHERE id < 3;
mysql> INSERT INTO employees VALUES (7, 'Keerthi', 23, CURRENT_TIMESTAMP);
Query OK, 1 row affected (0.01 sec)
mysql> INSERT INTO employees VALUES (8, 'Maha', 27, CURRENT_TIMESTAMP);
Query OK, 1 row affected (0.00 sec)
mysql> UPDATE employees SET modified_time=CURRENT_TIMESTAMP WHERE id < 3;
Query OK, 2 rows affected (0.02 sec)
Rows matched: 2 Changed: 2 Warnings: 0
mysql> SELECT * FROM employees;
+----+-----------+------+---------------------+
| id | name | age | modified_time |
+----+-----------+------+---------------------+
| 1 | RAM | 34 | 2022-04-07 04:13:42 |
| 2 | Rahim | 28 | 2022-04-07 04:13:42 |
| 3 | Robert | 43 | 2010-01-01 00:00:00 |
| 4 | Narasimha | 46 | 2008-01-01 00:00:00 |
| 5 | Shankaer | 51 | 2009-01-01 00:00:00 |
| 6 | Siva | 21 | 2010-01-01 00:00:00 |
| 7 | Keerthi | 23 | 2022-04-07 04:12:35 |
| 8 | Maha | 27 | 2022-04-07 04:12:42 |
+----+-----------+------+---------------------+
8 rows in set (0.00 sec)
Let’s re run the sqoop import command to import this updated data.
sqoop import \
--connect "jdbc:mysql://quickstart.cloudera:3306/incremental_demo" \
--username "root" \
--password "cloudera" \
--table "employees" \
--warehouse-dir /last-modified-demo \
--incremental lastmodified \
--check-column modified_time \
--last-value "2022-04-07 04:03:12.0" \
--append
You can get the value for --last-value option from the previous command run output.
[cloudera@quickstart ~]$ sqoop import \
> --connect "jdbc:mysql://quickstart.cloudera:3306/incremental_demo" \
> --username "root" \
> --password "cloudera" \
> --table "employees" \
> --warehouse-dir /last-modified-demo \
> --incremental lastmodified \
> --check-column modified_time \
> --last-value "2022-04-07 04:03:12.0" \
> --append
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
22/04/07 04:22:21 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.13.0
22/04/07 04:22:21 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
22/04/07 04:22:21 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
22/04/07 04:22:21 INFO tool.CodeGenTool: Beginning code generation
22/04/07 04:22:22 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `employees` AS t LIMIT 1
22/04/07 04:22:22 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `employees` AS t LIMIT 1
22/04/07 04:22:22 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-mapreduce
Note: /tmp/sqoop-cloudera/compile/a5ae4f87ccee0a0ff577c8ca66a3781a/employees.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
22/04/07 04:22:24 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-cloudera/compile/a5ae4f87ccee0a0ff577c8ca66a3781a/employees.jar
22/04/07 04:22:26 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `employees` AS t LIMIT 1
22/04/07 04:22:26 INFO tool.ImportTool: Incremental import based on column `modified_time`
22/04/07 04:22:26 INFO tool.ImportTool: Lower bound value: '2022-04-07 04:03:12.0'
22/04/07 04:22:26 INFO tool.ImportTool: Upper bound value: '2022-04-07 04:22:26.0'
22/04/07 04:22:26 WARN manager.MySQLManager: It looks like you are importing from mysql.
22/04/07 04:22:26 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
22/04/07 04:22:26 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
22/04/07 04:22:26 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
22/04/07 04:22:26 INFO mapreduce.ImportJobBase: Beginning import of employees
22/04/07 04:22:26 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
22/04/07 04:22:26 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
22/04/07 04:22:26 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
22/04/07 04:22:26 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
22/04/07 04:22:28 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/04/07 04:22:28 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/04/07 04:22:28 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/04/07 04:22:28 INFO db.DBInputFormat: Using read commited transaction isolation
22/04/07 04:22:28 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`id`), MAX(`id`) FROM `employees` WHERE ( `modified_time` >= '2022-04-07 04:03:12.0' AND `modified_time` < '2022-04-07 04:22:26.0' )
22/04/07 04:22:28 INFO db.IntegerSplitter: Split size: 1; Num splits: 4 from: 1 to: 8
22/04/07 04:22:28 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/04/07 04:22:28 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/04/07 04:22:28 INFO mapreduce.JobSubmitter: number of splits:4
22/04/07 04:22:29 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1649172504056_0005
22/04/07 04:22:29 INFO impl.YarnClientImpl: Submitted application application_1649172504056_0005
22/04/07 04:22:29 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1649172504056_0005/
22/04/07 04:22:29 INFO mapreduce.Job: Running job: job_1649172504056_0005
22/04/07 04:22:38 INFO mapreduce.Job: Job job_1649172504056_0005 running in uber mode : false
22/04/07 04:22:38 INFO mapreduce.Job: map 0% reduce 0%
22/04/07 04:22:58 INFO mapreduce.Job: map 25% reduce 0%
22/04/07 04:23:00 INFO mapreduce.Job: map 100% reduce 0%
22/04/07 04:23:01 INFO mapreduce.Job: Job job_1649172504056_0005 completed successfully
22/04/07 04:23:01 INFO mapreduce.Job: Counters: 30
File System Counters
FILE: Number of bytes read=0
FILE: Number of bytes written=689016
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=393
HDFS: Number of bytes written=131
HDFS: Number of read operations=16
HDFS: Number of large read operations=0
HDFS: Number of write operations=8
Job Counters
Launched map tasks=4
Other local map tasks=4
Total time spent by all maps in occupied slots (ms)=71931
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=71931
Total vcore-milliseconds taken by all map tasks=71931
Total megabyte-milliseconds taken by all map tasks=73657344
Map-Reduce Framework
Map input records=4
Map output records=4
Input split bytes=393
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=750
CPU time spent (ms)=2890
Physical memory (bytes) snapshot=453955584
Virtual memory (bytes) snapshot=6040748032
Total committed heap usage (bytes)=243007488
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=131
22/04/07 04:23:01 INFO mapreduce.ImportJobBase: Transferred 131 bytes in 35.3644 seconds (3.7043 bytes/sec)
22/04/07 04:23:01 INFO mapreduce.ImportJobBase: Retrieved 4 records.
22/04/07 04:23:01 INFO util.AppendUtils: Appending to directory employees
22/04/07 04:23:01 INFO util.AppendUtils: Using found partition 4
22/04/07 04:23:01 INFO tool.ImportTool: Incremental import complete! To run another incremental import of all data following this import, supply the following arguments:
22/04/07 04:23:01 INFO tool.ImportTool: --incremental lastmodified
22/04/07 04:23:01 INFO tool.ImportTool: --check-column modified_time
22/04/07 04:23:01 INFO tool.ImportTool: --last-value 2022-04-07 04:22:26.0
22/04/07 04:23:01 INFO tool.ImportTool: (Consider saving this with 'sqoop job --create')
[cloudera@quickstart ~]$
Let’s query the folder /last-modified-demo.
[cloudera@quickstart ~]$ hadoop fs -ls /last-modified-demo
Found 1 items
drwxr-xr-x - cloudera cloudera 0 2022-04-07 04:23 /last-modified-demo/employees
[cloudera@quickstart ~]$
[cloudera@quickstart ~]$ hadoop fs -ls /last-modified-demo/employees
Found 9 items
-rw-r--r-- 1 cloudera cloudera 0 2022-04-07 04:03 /last-modified-demo/employees/_SUCCESS
-rw-r--r-- 1 cloudera cloudera 64 2022-04-07 04:03 /last-modified-demo/employees/part-m-00000
-rw-r--r-- 1 cloudera cloudera 34 2022-04-07 04:03 /last-modified-demo/employees/part-m-00001
-rw-r--r-- 1 cloudera cloudera 37 2022-04-07 04:03 /last-modified-demo/employees/part-m-00002
-rw-r--r-- 1 cloudera cloudera 68 2022-04-07 04:03 /last-modified-demo/employees/part-m-00003
-rw-r--r-- 1 cloudera cloudera 64 2022-04-07 04:22 /last-modified-demo/employees/part-m-00004
-rw-r--r-- 1 cloudera cloudera 0 2022-04-07 04:22 /last-modified-demo/employees/part-m-00005
-rw-r--r-- 1 cloudera cloudera 0 2022-04-07 04:22 /last-modified-demo/employees/part-m-00006
-rw-r--r-- 1 cloudera cloudera 67 2022-04-07 04:22 /last-modified-demo/employees/part-m-00007
[cloudera@quickstart ~]$
[cloudera@quickstart ~]$ hadoop fs -cat /last-modified-demo/employees/*
1,RAM,34,2008-01-01 00:00:00.0
2,Rahim,28,2009-01-01 00:00:00.0
3,Robert,43,2010-01-01 00:00:00.0
4,Narasimha,46,2008-01-01 00:00:00.0
5,Shankaer,51,2009-01-01 00:00:00.0
6,Siva,21,2010-01-01 00:00:00.0
1,RAM,34,2022-04-07 04:13:42.0
2,Rahim,28,2022-04-07 04:13:42.0
7,Keerthi,23,2022-04-07 04:12:35.0
8,Maha,27,2022-04-07 04:12:42.0
As you observe the above output, there is a problem of duplicate data. Records with ids 1 and 2 are repeated twice. Since we used –append option while performing incremental import, it keeps the old data along with new data.
How to remove this old data and keep only the new records?
Use the option –merge-key.
Example
--merge-key <merge-column>
Let’s add one more record and update another record with id 3.
mysql> INSERT INTO employees VALUES (9, 'Sailu', 33, CURRENT_TIMESTAMP);
Query OK, 1 row affected (0.01 sec)
mysql> UPDATE employees SET modified_time=CURRENT_TIMESTAMP WHERE id = 3;
Query OK, 1 row affected (0.00 sec)
Rows matched: 1 Changed: 1 Warnings: 0
mysql> SELECT * FROM employees;
+----+-----------+------+---------------------+
| id | name | age | modified_time |
+----+-----------+------+---------------------+
| 1 | RAM | 34 | 2022-04-07 04:13:42 |
| 2 | Rahim | 28 | 2022-04-07 04:13:42 |
| 3 | Robert | 43 | 2022-04-07 04:28:38 |
| 4 | Narasimha | 46 | 2008-01-01 00:00:00 |
| 5 | Shankaer | 51 | 2009-01-01 00:00:00 |
| 6 | Siva | 21 | 2010-01-01 00:00:00 |
| 7 | Keerthi | 23 | 2022-04-07 04:12:35 |
| 8 | Maha | 27 | 2022-04-07 04:12:42 |
| 9 | Sailu | 33 | 2022-04-07 04:28:16 |
+----+-----------+------+---------------------+
9 rows in set (0.00 sec)
Let’s run following command to merge both old and new records data.
sqoop import \
--connect "jdbc:mysql://quickstart.cloudera:3306/incremental_demo" \
--username "root" \
--password "cloudera" \
--table "employees" \
--warehouse-dir /last-modified-demo \
--incremental lastmodified \
--check-column modified_time \
--last-value "2022-04-07 04:22:26.0" \
--merge-key id
In the above example, I suggested sqoop to use id (primary key) column as merge key.
[cloudera@quickstart ~]$ sqoop import \
> --connect "jdbc:mysql://quickstart.cloudera:3306/incremental_demo" \
> --username "root" \
> --password "cloudera" \
> --table "employees" \
> --warehouse-dir /last-modified-demo \
> --incremental lastmodified \
> --check-column modified_time \
> --last-value "2022-04-07 04:22:26.0" \
> --merge-key id
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
22/04/07 04:30:33 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.13.0
22/04/07 04:30:33 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
22/04/07 04:30:33 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
22/04/07 04:30:33 INFO tool.CodeGenTool: Beginning code generation
22/04/07 04:30:34 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `employees` AS t LIMIT 1
22/04/07 04:30:34 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `employees` AS t LIMIT 1
22/04/07 04:30:34 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-mapreduce
Note: /tmp/sqoop-cloudera/compile/8d84ead1cd49c1c0671a137297f4f191/employees.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
22/04/07 04:30:36 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-cloudera/compile/8d84ead1cd49c1c0671a137297f4f191/employees.jar
22/04/07 04:30:37 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `employees` AS t LIMIT 1
22/04/07 04:30:37 INFO tool.ImportTool: Incremental import based on column `modified_time`
22/04/07 04:30:37 INFO tool.ImportTool: Lower bound value: '2022-04-07 04:22:26.0'
22/04/07 04:30:37 INFO tool.ImportTool: Upper bound value: '2022-04-07 04:30:37.0'
22/04/07 04:30:37 WARN manager.MySQLManager: It looks like you are importing from mysql.
22/04/07 04:30:37 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
22/04/07 04:30:37 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
22/04/07 04:30:37 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
22/04/07 04:30:37 INFO mapreduce.ImportJobBase: Beginning import of employees
22/04/07 04:30:37 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
22/04/07 04:30:37 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
22/04/07 04:30:37 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
22/04/07 04:30:37 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
22/04/07 04:30:39 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/04/07 04:30:39 INFO db.DBInputFormat: Using read commited transaction isolation
22/04/07 04:30:39 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`id`), MAX(`id`) FROM `employees` WHERE ( `modified_time` >= '2022-04-07 04:22:26.0' AND `modified_time` < '2022-04-07 04:30:37.0' )
22/04/07 04:30:39 INFO db.IntegerSplitter: Split size: 1; Num splits: 4 from: 3 to: 9
22/04/07 04:30:39 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/04/07 04:30:39 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/04/07 04:30:39 INFO mapreduce.JobSubmitter: number of splits:4
22/04/07 04:30:40 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1649172504056_0006
22/04/07 04:30:40 INFO impl.YarnClientImpl: Submitted application application_1649172504056_0006
22/04/07 04:30:40 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1649172504056_0006/
22/04/07 04:30:40 INFO mapreduce.Job: Running job: job_1649172504056_0006
22/04/07 04:30:48 INFO mapreduce.Job: Job job_1649172504056_0006 running in uber mode : false
22/04/07 04:30:48 INFO mapreduce.Job: map 0% reduce 0%
22/04/07 04:31:03 INFO mapreduce.Job: map 25% reduce 0%
22/04/07 04:31:06 INFO mapreduce.Job: map 50% reduce 0%
22/04/07 04:31:07 INFO mapreduce.Job: map 100% reduce 0%
22/04/07 04:31:07 INFO mapreduce.Job: Job job_1649172504056_0006 completed successfully
22/04/07 04:31:07 INFO mapreduce.Job: Counters: 30
File System Counters
FILE: Number of bytes read=0
FILE: Number of bytes written=689640
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=393
HDFS: Number of bytes written=67
HDFS: Number of read operations=16
HDFS: Number of large read operations=0
HDFS: Number of write operations=8
Job Counters
Launched map tasks=4
Other local map tasks=4
Total time spent by all maps in occupied slots (ms)=58472
Total time spent by all reduces in occupied slots (ms)=0
Total time spent by all map tasks (ms)=58472
Total vcore-milliseconds taken by all map tasks=58472
Total megabyte-milliseconds taken by all map tasks=59875328
Map-Reduce Framework
Map input records=2
Map output records=2
Input split bytes=393
Spilled Records=0
Failed Shuffles=0
Merged Map outputs=0
GC time elapsed (ms)=648
CPU time spent (ms)=2620
Physical memory (bytes) snapshot=492769280
Virtual memory (bytes) snapshot=6040731648
Total committed heap usage (bytes)=243007488
File Input Format Counters
Bytes Read=0
File Output Format Counters
Bytes Written=67
22/04/07 04:31:07 INFO mapreduce.ImportJobBase: Transferred 67 bytes in 30.1112 seconds (2.2251 bytes/sec)
22/04/07 04:31:07 INFO mapreduce.ImportJobBase: Retrieved 2 records.
22/04/07 04:31:07 INFO tool.ImportTool: Final destination exists, will run merge job.
22/04/07 04:31:07 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
22/04/07 04:31:08 INFO Configuration.deprecation: mapred.output.key.class is deprecated. Instead, use mapreduce.job.output.key.class
22/04/07 04:31:08 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
22/04/07 04:31:08 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/04/07 04:31:08 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/04/07 04:31:08 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/04/07 04:31:09 INFO input.FileInputFormat: Total input paths to process : 12
22/04/07 04:31:09 WARN hdfs.DFSClient: Caught exception
java.lang.InterruptedException
at java.lang.Object.wait(Native Method)
at java.lang.Thread.join(Thread.java:1281)
at java.lang.Thread.join(Thread.java:1355)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/04/07 04:31:09 INFO mapreduce.JobSubmitter: number of splits:12
22/04/07 04:31:09 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1649172504056_0007
22/04/07 04:31:09 INFO impl.YarnClientImpl: Submitted application application_1649172504056_0007
22/04/07 04:31:09 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1649172504056_0007/
22/04/07 04:31:09 INFO mapreduce.Job: Running job: job_1649172504056_0007
22/04/07 04:31:17 INFO mapreduce.Job: Job job_1649172504056_0007 running in uber mode : false
22/04/07 04:31:17 INFO mapreduce.Job: map 0% reduce 0%
22/04/07 04:31:37 INFO mapreduce.Job: map 8% reduce 0%
22/04/07 04:31:44 INFO mapreduce.Job: map 17% reduce 0%
22/04/07 04:31:46 INFO mapreduce.Job: map 25% reduce 0%
22/04/07 04:31:49 INFO mapreduce.Job: map 33% reduce 0%
22/04/07 04:31:50 INFO mapreduce.Job: map 50% reduce 0%
22/04/07 04:32:03 INFO mapreduce.Job: map 58% reduce 0%
22/04/07 04:32:10 INFO mapreduce.Job: map 67% reduce 0%
22/04/07 04:32:12 INFO mapreduce.Job: map 75% reduce 0%
22/04/07 04:32:14 INFO mapreduce.Job: map 92% reduce 0%
22/04/07 04:32:17 INFO mapreduce.Job: map 100% reduce 31%
22/04/07 04:32:18 INFO mapreduce.Job: map 100% reduce 100%
22/04/07 04:32:18 INFO mapreduce.Job: Job job_1649172504056_0007 completed successfully
22/04/07 04:32:18 INFO mapreduce.Job: Counters: 51
File System Counters
FILE: Number of bytes read=551
FILE: Number of bytes written=2248217
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=2197
HDFS: Number of bytes written=303
HDFS: Number of read operations=39
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Killed map tasks=1
Launched map tasks=12
Launched reduce tasks=1
Other local map tasks=4
Data-local map tasks=8
Total time spent by all maps in occupied slots (ms)=274519
Total time spent by all reduces in occupied slots (ms)=31645
Total time spent by all map tasks (ms)=274519
Total time spent by all reduce tasks (ms)=31645
Total vcore-milliseconds taken by all map tasks=274519
Total vcore-milliseconds taken by all reduce tasks=31645
Total megabyte-milliseconds taken by all map tasks=281107456
Total megabyte-milliseconds taken by all reduce tasks=32404480
Map-Reduce Framework
Map input records=12
Map output records=12
Map output bytes=521
Map output materialized bytes=617
Input split bytes=1796
Combine input records=0
Combine output records=0
Reduce input groups=9
Reduce shuffle bytes=617
Reduce input records=12
Reduce output records=9
Spilled Records=24
Shuffled Maps =12
Failed Shuffles=0
Merged Map outputs=12
GC time elapsed (ms)=3724
CPU time spent (ms)=5660
Physical memory (bytes) snapshot=2641453056
Virtual memory (bytes) snapshot=19595821056
Total committed heap usage (bytes)=2048114688
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=401
File Output Format Counters
Bytes Written=303
22/04/07 04:32:18 INFO tool.ImportTool: Incremental import complete! To run another incremental import of all data following this import, supply the following arguments:
22/04/07 04:32:18 INFO tool.ImportTool: --incremental lastmodified
22/04/07 04:32:18 INFO tool.ImportTool: --check-column modified_time
22/04/07 04:32:18 INFO tool.ImportTool: --last-value 2022-04-07 04:30:37.0
22/04/07 04:32:18 INFO tool.ImportTool: (Consider saving this with 'sqoop job --create')
[cloudera@quickstart ~]$
Since we suggested sqoop to merge the records, it perform merge on all the part files and finally generate single part file. Let’s conform the same by querying the folder /last-modified-demo.
[cloudera@quickstart ~]$ hadoop fs -ls /last-modified-demo
Found 1 items
drwxr-xr-x - cloudera cloudera 0 2022-04-07 04:32 /last-modified-demo/employees
[cloudera@quickstart ~]$
[cloudera@quickstart ~]$ hadoop fs -ls /last-modified-demo/employees
Found 2 items
-rw-r--r-- 1 cloudera cloudera 0 2022-04-07 04:32 /last-modified-demo/employees/_SUCCESS
-rw-r--r-- 1 cloudera cloudera 303 2022-04-07 04:32 /last-modified-demo/employees/part-r-00000
[cloudera@quickstart ~]$
[cloudera@quickstart ~]$ hadoop fs -cat /last-modified-demo/employees/*
1,RAM,34,2008-01-01 00:00:00.0
2,Rahim,28,2022-04-07 04:13:42.0
3,Robert,43,2022-04-07 04:28:38.0
4,Narasimha,46,2008-01-01 00:00:00.0
5,Shankaer,51,2009-01-01 00:00:00.0
6,Siva,21,2010-01-01 00:00:00.0
7,Keerthi,23,2022-04-07 04:12:35.0
8,Maha,27,2022-04-07 04:12:42.0
9,Sailu,33,2022-04-07 04:28:16.0
From the output, you can confirm that there is no duplicates.
No comments:
Post a Comment