Tuesday, 31 May 2022

Sqoop: staging table

In this post, I am going to explain the usage of staging table.

 

Suppose, you are exporting 1million records csv file to a RDBMS table using ‘sqoop export’ command. Since sqoop divide the task among multiple mappers, there is a possibility that a failure of one or more mapper tasks lead to partial export of the data. We can address this partial update problem using a staging table.

 

How to specify a staging table?

You can specify a staging table with the option ‘--staging-table’.

sqoop export \
--connect "jdbc:mysql://quickstart.cloudera:3306/staging_demo" \
--username "root" \
--password "cloudera" \
--table "employee" \
--staging-table "employee_stg" \
--export-dir /staging_table_demo/emp.csv

 

Staging table must have same structure like the target table. Staging table should either be empty before the export job run, or the --clear-staging-table option must be specified to empty the table before job runs.

 

When you specify a staging table while exporting the data, then data is first copied to the staging table first, if everything copied successfully, then the staged data is moved to the actual table in a single transaction.

 

 


Step 1: Let’s create staging_demo database and create employee table.

[cloudera@quickstart ~]$ mysql -u root -p
Enter password: 
Welcome to the MySQL monitor.  Commands end with ; or \g.
Your MySQL connection id is 116
Server version: 5.1.73 Source distribution

Copyright (c) 2000, 2013, Oracle and/or its affiliates. All rights reserved.

Oracle is a registered trademark of Oracle Corporation and/or its
affiliates. Other names may be trademarks of their respective
owners.

Type 'help;' or '\h' for help. Type '\c' to clear the current input statement.

mysql> 

 

‘cloudera’ is the password for cloudera quick start vm instance.

 

Create a database ‘staging_demo’.

mysql> CREATE DATABASE staging_demo;
Query OK, 1 row affected (0.00 sec)

 

Create employee and employee_stg tables.

CREATE TABLE employee (id INT, name VARCHAR(20), age INT, PRIMARY KEY (id));

CREATE TABLE employee_stg (id INT, name VARCHAR(20), age INT, PRIMARY KEY (id));

mysql> USE staging_demo;
Database changed
mysql> 
mysql> CREATE TABLE employee (id INT, name VARCHAR(20), age INT, PRIMARY KEY (id));
Query OK, 0 rows affected (0.02 sec)
mysql> 
mysql> CREATE TABLE employee_stg (id INT, name VARCHAR(20), age INT, PRIMARY KEY (id));
Query OK, 0 rows affected (0.01 sec)

 

Step 2: Copy emp.csv file to HDFS.

 

emp.csv

 

1,Ram,31
2,Krishna,32
3,Joel,41
4,Shankar,38
5,Shanthi,48
6,Sameer,29

[cloudera@quickstart ~]$ hadoop fs -mkdir /staging_table_demo
[cloudera@quickstart ~]$ 
[cloudera@quickstart ~]$ hadoop fs -copyFromLocal emp.csv /staging_table_demo
[cloudera@quickstart ~]$ 
[cloudera@quickstart ~]$ hadoop fs -ls /staging_table_demo
Found 1 items
-rw-r--r--   1 cloudera supergroup         71 2022-04-05 04:32 /staging_table_demo/emp.csv
[cloudera@quickstart ~]$ 
[cloudera@quickstart ~]$ hadoop fs -cat /staging_table_demo/*
1,Ram,31
2,Krishna,32
3,Joel,41
4,Shankar,38
5,Shanthi,48
6,Sameer,29

 

Step 3: Execute ‘sqoop export’ command by specifying staging table.

sqoop export \
--connect "jdbc:mysql://quickstart.cloudera:3306/staging_demo" \
--username "root" \
--password "cloudera" \
--table "employee" \
--staging-table "employee_stg" \
--export-dir /staging_table_demo/emp.csv

[cloudera@quickstart ~]$ sqoop export \
> --connect "jdbc:mysql://quickstart.cloudera:3306/staging_demo" \
> --username "root" \
> --password "cloudera" \
> --table "employee" \
> --staging-table "employee_stg" \
> --export-dir /staging_table_demo/emp.csv
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
22/04/05 04:54:45 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.13.0
22/04/05 04:54:45 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.
22/04/05 04:54:45 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
22/04/05 04:54:45 INFO tool.CodeGenTool: Beginning code generation
22/04/05 04:54:46 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `employee` AS t LIMIT 1
22/04/05 04:54:46 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `employee` AS t LIMIT 1
22/04/05 04:54:46 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-mapreduce
Note: /tmp/sqoop-cloudera/compile/9bda8fb4f474836f64147d513cfe5e04/employee.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
22/04/05 04:54:48 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-cloudera/compile/9bda8fb4f474836f64147d513cfe5e04/employee.jar
22/04/05 04:54:48 INFO mapreduce.ExportJobBase: Data will be staged in the table: employee_stg
22/04/05 04:54:48 INFO mapreduce.ExportJobBase: Beginning export of employee
22/04/05 04:54:48 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
22/04/05 04:54:48 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
22/04/05 04:54:50 INFO Configuration.deprecation: mapred.reduce.tasks.speculative.execution is deprecated. Instead, use mapreduce.reduce.speculative
22/04/05 04:54:50 INFO Configuration.deprecation: mapred.map.tasks.speculative.execution is deprecated. Instead, use mapreduce.map.speculative
22/04/05 04:54:50 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
22/04/05 04:54:50 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
22/04/05 04:54:50 WARN hdfs.DFSClient: Caught exception 
java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1281)
	at java.lang.Thread.join(Thread.java:1355)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/04/05 04:54:51 WARN hdfs.DFSClient: Caught exception 
java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1281)
	at java.lang.Thread.join(Thread.java:1355)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/04/05 04:54:51 WARN hdfs.DFSClient: Caught exception 
java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1281)
	at java.lang.Thread.join(Thread.java:1355)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/04/05 04:54:51 WARN hdfs.DFSClient: Caught exception 
java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1281)
	at java.lang.Thread.join(Thread.java:1355)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/04/05 04:54:51 WARN hdfs.DFSClient: Caught exception 
java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1281)
	at java.lang.Thread.join(Thread.java:1355)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/04/05 04:54:51 WARN hdfs.DFSClient: Caught exception 
java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1281)
	at java.lang.Thread.join(Thread.java:1355)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/04/05 04:54:51 WARN hdfs.DFSClient: Caught exception 
java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1281)
	at java.lang.Thread.join(Thread.java:1355)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/04/05 04:54:51 INFO input.FileInputFormat: Total input paths to process : 1
22/04/05 04:54:51 INFO input.FileInputFormat: Total input paths to process : 1
22/04/05 04:54:51 WARN hdfs.DFSClient: Caught exception 
java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1281)
	at java.lang.Thread.join(Thread.java:1355)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/04/05 04:54:51 WARN hdfs.DFSClient: Caught exception 
java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1281)
	at java.lang.Thread.join(Thread.java:1355)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/04/05 04:54:51 INFO mapreduce.JobSubmitter: number of splits:4
22/04/05 04:54:51 INFO Configuration.deprecation: mapred.map.tasks.speculative.execution is deprecated. Instead, use mapreduce.map.speculative
22/04/05 04:54:51 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1649003113144_0016
22/04/05 04:54:52 INFO impl.YarnClientImpl: Submitted application application_1649003113144_0016
22/04/05 04:54:52 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1649003113144_0016/
22/04/05 04:54:52 INFO mapreduce.Job: Running job: job_1649003113144_0016
22/04/05 04:55:00 INFO mapreduce.Job: Job job_1649003113144_0016 running in uber mode : false
22/04/05 04:55:00 INFO mapreduce.Job:  map 0% reduce 0%
22/04/05 04:55:16 INFO mapreduce.Job:  map 25% reduce 0%
22/04/05 04:55:18 INFO mapreduce.Job:  map 50% reduce 0%
22/04/05 04:55:20 INFO mapreduce.Job:  map 100% reduce 0%
22/04/05 04:55:20 INFO mapreduce.Job: Job job_1649003113144_0016 completed successfully
22/04/05 04:55:20 INFO mapreduce.Job: Counters: 30
	File System Counters
		FILE: Number of bytes read=0
		FILE: Number of bytes written=684480
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=834
		HDFS: Number of bytes written=0
		HDFS: Number of read operations=19
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=0
	Job Counters 
		Launched map tasks=4
		Data-local map tasks=4
		Total time spent by all maps in occupied slots (ms)=61940
		Total time spent by all reduces in occupied slots (ms)=0
		Total time spent by all map tasks (ms)=61940
		Total vcore-milliseconds taken by all map tasks=61940
		Total megabyte-milliseconds taken by all map tasks=63426560
	Map-Reduce Framework
		Map input records=6
		Map output records=6
		Input split bytes=631
		Spilled Records=0
		Failed Shuffles=0
		Merged Map outputs=0
		GC time elapsed (ms)=700
		CPU time spent (ms)=2300
		Physical memory (bytes) snapshot=493535232
		Virtual memory (bytes) snapshot=6032359424
		Total committed heap usage (bytes)=243007488
	File Input Format Counters 
		Bytes Read=0
	File Output Format Counters 
		Bytes Written=0
22/04/05 04:55:20 INFO mapreduce.ExportJobBase: Transferred 834 bytes in 30.7745 seconds (27.1003 bytes/sec)
22/04/05 04:55:20 INFO mapreduce.ExportJobBase: Exported 6 records.
22/04/05 04:55:20 INFO mapreduce.ExportJobBase: Starting to migrate data from staging table to destination.
22/04/05 04:55:20 INFO manager.SqlManager: Migrated 6 records from `employee_stg` to `employee`
[cloudera@quickstart ~]$

 

Let’s query employee table.

mysql> SELECT * FROM employee;
+----+---------+------+
| id | name    | age  |
+----+---------+------+
|  1 | Ram     |   31 |
|  2 | Krishna |   32 |
|  3 | Joel    |   41 |
|  4 | Shankar |   38 |
|  5 | Shanthi |   48 |
|  6 | Sameer  |   29 |
+----+---------+------+
6 rows in set (0.01 sec)

  Since upon job success, data will be moved from staging table to actual table, you will see nothing in staging table.

mysql> SELECT * FROM employee_stg;
Empty set (0.00 sec)

 

Suppose if the job fail in between, then you might see some partial records in the staging table.

 


 

Previous                                                    Next                                                    Home

No comments:

Post a Comment