Friday, 3 June 2022

Quick guide to sqoop jobs

Sqoop job is used to create a job that can

a.   save the command

b.   Execute the command

c.    Store the past state of the command, if the command relates to incremental imports

 

Let’s create a sqoop job, execute and check the job configuration etc.,

 

How to create a sqoop job?

We can create a sqoop job with below syntax.

 

Syntax

sqoop job --create {JOB_NAME} -- {COMMAND_TO_EXECUTE}

 

Example

sqoop job \
--create customer_import \
-- import \
--connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
--username "root" \
--password "cloudera" \
--table "customers" \
--warehouse-dir /job-demo-1 \
--incremental append \
--check-column customer_id \
--last-value 0

[cloudera@quickstart ~]$ sqoop job \
> --create customer_import \
> -- import \
> --connect "jdbc:mysql://quickstart.cloudera:3306/retail_db" \
> --username "root" \
> --password "cloudera" \
> --table "customers" \
> --warehouse-dir /job-demo-1 \
> --incremental append \
> --check-column customer_id \
> --last-value 0 
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
22/04/08 18:57:46 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.13.0
22/04/08 18:57:47 WARN tool.BaseSqoopTool: Setting your password on the command-line is insecure. Consider using -P instead.

 

How to print all the sqoop jobs?

‘sqoop job --list' command print all the sqoop jobs.

[cloudera@quickstart ~]$ sqoop job --list
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
22/04/08 18:54:58 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.13.0
Available jobs:
  customer_import

 

How to execute the job?

Syntax

sqoop job --exec {job_name}

 

Example

sqoop job --exec customer_import

 

[cloudera@quickstart ~]$ sqoop job --exec customer_import
Warning: /usr/lib/sqoop/../accumulo does not exist! Accumulo imports will fail.
Please set $ACCUMULO_HOME to the root of your Accumulo installation.
22/04/08 18:58:29 INFO sqoop.Sqoop: Running Sqoop version: 1.4.6-cdh5.13.0
Enter password: 
22/04/08 18:58:40 INFO manager.MySQLManager: Preparing to use a MySQL streaming resultset.
22/04/08 18:58:40 INFO tool.CodeGenTool: Beginning code generation
22/04/08 18:58:40 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `customers` AS t LIMIT 1
22/04/08 18:58:40 INFO manager.SqlManager: Executing SQL statement: SELECT t.* FROM `customers` AS t LIMIT 1
22/04/08 18:58:40 INFO orm.CompilationManager: HADOOP_MAPRED_HOME is /usr/lib/hadoop-mapreduce
Note: /tmp/sqoop-cloudera/compile/d4e92057e12f71bb02c170e9c6aa826a/customers.java uses or overrides a deprecated API.
Note: Recompile with -Xlint:deprecation for details.
22/04/08 18:58:42 INFO orm.CompilationManager: Writing jar file: /tmp/sqoop-cloudera/compile/d4e92057e12f71bb02c170e9c6aa826a/customers.jar
22/04/08 18:58:43 INFO tool.ImportTool: Maximal id query for free form incremental import: SELECT MAX(`customer_id`) FROM `customers`
22/04/08 18:58:43 INFO tool.ImportTool: Incremental import based on column `customer_id`
22/04/08 18:58:43 INFO tool.ImportTool: Lower bound value: 0
22/04/08 18:58:43 INFO tool.ImportTool: Upper bound value: 12440
22/04/08 18:58:43 WARN manager.MySQLManager: It looks like you are importing from mysql.
22/04/08 18:58:43 WARN manager.MySQLManager: This transfer can be faster! Use the --direct
22/04/08 18:58:43 WARN manager.MySQLManager: option to exercise a MySQL-specific fast path.
22/04/08 18:58:43 INFO manager.MySQLManager: Setting zero DATETIME behavior to convertToNull (mysql)
22/04/08 18:58:43 INFO mapreduce.ImportJobBase: Beginning import of customers
22/04/08 18:58:43 INFO Configuration.deprecation: mapred.job.tracker is deprecated. Instead, use mapreduce.jobtracker.address
22/04/08 18:58:43 INFO Configuration.deprecation: mapred.jar is deprecated. Instead, use mapreduce.job.jar
22/04/08 18:58:43 INFO Configuration.deprecation: mapred.map.tasks is deprecated. Instead, use mapreduce.job.maps
22/04/08 18:58:43 INFO client.RMProxy: Connecting to ResourceManager at /0.0.0.0:8032
22/04/08 18:58:44 WARN hdfs.DFSClient: Caught exception 
java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1281)
	at java.lang.Thread.join(Thread.java:1355)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.closeResponder(DFSOutputStream.java:967)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.endBlock(DFSOutputStream.java:705)
	at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:894)
22/04/08 18:58:45 INFO db.DBInputFormat: Using read commited transaction isolation
22/04/08 18:58:45 INFO db.DataDrivenDBInputFormat: BoundingValsQuery: SELECT MIN(`customer_id`), MAX(`customer_id`) FROM `customers` WHERE ( `customer_id` > 0 AND `customer_id` <= 12440 )
22/04/08 18:58:45 INFO db.IntegerSplitter: Split size: 3109; Num splits: 4 from: 1 to: 12440
22/04/08 18:58:45 INFO mapreduce.JobSubmitter: number of splits:4
22/04/08 18:58:45 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1649172504056_0008
22/04/08 18:58:45 INFO impl.YarnClientImpl: Submitted application application_1649172504056_0008
22/04/08 18:58:45 INFO mapreduce.Job: The url to track the job: http://quickstart.cloudera:8088/proxy/application_1649172504056_0008/
22/04/08 18:58:45 INFO mapreduce.Job: Running job: job_1649172504056_0008
22/04/08 18:58:52 INFO mapreduce.Job: Job job_1649172504056_0008 running in uber mode : false
22/04/08 18:58:52 INFO mapreduce.Job:  map 0% reduce 0%
22/04/08 18:59:06 INFO mapreduce.Job:  map 25% reduce 0%
22/04/08 18:59:10 INFO mapreduce.Job:  map 50% reduce 0%
22/04/08 18:59:11 INFO mapreduce.Job:  map 100% reduce 0%
22/04/08 18:59:12 INFO mapreduce.Job: Job job_1649172504056_0008 completed successfully
22/04/08 18:59:12 INFO mapreduce.Job: Counters: 30
	File System Counters
		FILE: Number of bytes read=0
		FILE: Number of bytes written=690728
		FILE: Number of read operations=0
		FILE: Number of large read operations=0
		FILE: Number of write operations=0
		HDFS: Number of bytes read=487
		HDFS: Number of bytes written=953915
		HDFS: Number of read operations=16
		HDFS: Number of large read operations=0
		HDFS: Number of write operations=8
	Job Counters 
		Launched map tasks=4
		Other local map tasks=4
		Total time spent by all maps in occupied slots (ms)=52463
		Total time spent by all reduces in occupied slots (ms)=0
		Total time spent by all map tasks (ms)=52463
		Total vcore-milliseconds taken by all map tasks=52463
		Total megabyte-milliseconds taken by all map tasks=53722112
	Map-Reduce Framework
		Map input records=12440
		Map output records=12440
		Input split bytes=487
		Spilled Records=0
		Failed Shuffles=0
		Merged Map outputs=0
		GC time elapsed (ms)=682
		CPU time spent (ms)=4120
		Physical memory (bytes) snapshot=562405376
		Virtual memory (bytes) snapshot=6044942336
		Total committed heap usage (bytes)=243007488
	File Input Format Counters 
		Bytes Read=0
	File Output Format Counters 
		Bytes Written=953915
22/04/08 18:59:12 INFO mapreduce.ImportJobBase: Transferred 931.5576 KB in 29.0003 seconds (32.1223 KB/sec)
22/04/08 18:59:12 INFO mapreduce.ImportJobBase: Retrieved 12440 records.
22/04/08 18:59:12 INFO util.AppendUtils: Creating missing output directory - customers
22/04/08 18:59:12 INFO tool.ImportTool: Saving incremental import state to the metastore
22/04/08 18:59:12 INFO tool.ImportTool: Updated data for job: customer_import
[cloudera@quickstart ~]$

 

Points to note

a. Even though I supplied the password while creating the job, it still prompt me to enter the password. We can address by supplying a password file while creating the job. I will explain in later post.

b. From the log messages, you can see the statement ‘Saving incremental import state to the metastore’. Since we configured job to execute an incremental import command, sqoop job stores the state (like last-value) in the metastore.

 

Location of metastore

$HOME/.sqoop

[cloudera@quickstart ~]$ ls -lart $HOME/.sqoop
total 20
drwxrwxr-x 28 cloudera cloudera 4096 Apr  8 18:53 ..
-rw-rw-r--  1 cloudera cloudera 6757 Apr  8 18:59 metastore.db.script
-rw-rw-r--  1 cloudera cloudera  419 Apr  8 18:59 metastore.db.properties
drwxrwxr-x  2 cloudera cloudera 4096 Apr  8 18:59 .

 

‘sqoop’ use hsqldb to persist the job metadata.

 

Sample content of metastore.db.script file

[cloudera@quickstart ~]$ cat $HOME/.sqoop/metastore.db.script
CREATE SCHEMA PUBLIC AUTHORIZATION DBA
CREATE MEMORY TABLE SQOOP_ROOT(VERSION INTEGER,PROPNAME VARCHAR(128) NOT NULL,PROPVAL VARCHAR(256),CONSTRAINT SQOOP_ROOT_UNQ UNIQUE(VERSION,PROPNAME))
CREATE MEMORY TABLE SQOOP_SESSIONS(JOB_NAME VARCHAR(64) NOT NULL,PROPNAME VARCHAR(128) NOT NULL,PROPVAL VARCHAR(1024),PROPCLASS VARCHAR(32) NOT NULL,CONSTRAINT SQOOP_SESSIONS_UNQ UNIQUE(JOB_NAME,PROPNAME,PROPCLASS))
CREATE USER SA PASSWORD ""
GRANT DBA TO SA
SET WRITE_DELAY 10
SET SCHEMA PUBLIC
INSERT INTO SQOOP_ROOT VALUES(NULL,'sqoop.hsqldb.job.storage.version','0')
INSERT INTO SQOOP_ROOT VALUES(0,'sqoop.hsqldb.job.info.table','SQOOP_SESSIONS')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','sqoop.tool','import','schema')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','sqoop.property.set.id','0','schema')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','verbose','false','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','hcatalog.drop.and.create.table','false','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','incremental.last.value','12440','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','db.connect.string','jdbc:mysql://quickstart.cloudera:3306/retail_db','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','codegen.output.delimiters.escape','0','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','codegen.output.delimiters.enclose.required','false','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','codegen.input.delimiters.field','0','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','mainframe.input.dataset.type','p','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','split.limit','null','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','hbase.create.table','false','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','db.require.password','true','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','hdfs.append.dir','true','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','db.table','customers','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','codegen.input.delimiters.escape','0','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','accumulo.create.table','false','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','import.fetch.size','null','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','codegen.input.delimiters.enclose.required','false','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','db.username','root','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','reset.onemapper','false','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','codegen.output.delimiters.record','10','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','import.max.inline.lob.size','16777216','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','sqoop.throwOnError','false','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','hbase.bulk.load.enabled','false','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','hcatalog.create.table','false','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','db.clear.staging.table','false','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','incremental.col','customer_id','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','codegen.input.delimiters.record','0','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','hdfs.warehouse.dir','/job-demo-1','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','enable.compression','false','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','hive.overwrite.table','false','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','hive.import','false','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','codegen.input.delimiters.enclose','0','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','accumulo.batch.size','10240000','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','hive.drop.delims','false','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','customtool.options.jsonmap','{}','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','codegen.output.delimiters.enclose','0','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','hdfs.delete-target.dir','false','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','codegen.output.dir','.','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','codegen.auto.compile.dir','true','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','relaxed.isolation','false','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','mapreduce.num.mappers','4','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','accumulo.max.latency','5000','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','import.direct.split.size','0','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','sqlconnection.metadata.transaction.isolation.level','2','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','codegen.output.delimiters.field','44','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','export.new.update','UpdateOnly','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','incremental.mode','AppendRows','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','hdfs.file.format','TextFile','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','sqoop.oracle.escaping.disabled','true','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','codegen.compile.dir','/tmp/sqoop-cloudera/compile/d4e92057e12f71bb02c170e9c6aa826a','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','direct.import','false','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','temporary.dirRoot','_sqoop','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','hive.fail.table.exists','false','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','db.batch','false','SqoopOptions')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','sqoop.job.storage.implementations','com.cloudera.sqoop.metastore.hsqldb.HsqldbJobStorage,com.cloudera.sqoop.metastore.hsqldb.AutoHsqldbStorage','config')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','mapreduce.client.genericoptionsparser.used','true','config')
INSERT INTO SQOOP_SESSIONS VALUES('customer_import','sqoop.oracle.escaping.disabled','true','config')

 

Sample content of metastore.db.properties file

[cloudera@quickstart ~]$ cat $HOME/.sqoop/metastore.db.properties
#HSQL Database Engine 1.8.0.10
#Fri Apr 08 18:59:12 PDT 2022
hsqldb.script_format=0
runtime.gc_interval=0
sql.enforce_strict_size=false
hsqldb.cache_size_scale=8
readonly=false
hsqldb.nio_data_file=true
hsqldb.cache_scale=14
version=1.8.0
hsqldb.default_table_type=memory
hsqldb.cache_file_scale=1
hsqldb.log_size=200
modified=no
hsqldb.cache_version=1.7.0
hsqldb.original_version=1.8.0
hsqldb.compatible_version=1.8.0
[cloudera@quickstart ~]$

 

At last let’s confirm whether data is imported to HDFS or not by comparing the count of records.

 

Number of records in customers table.

mysql> select count(*) from customers;
+----------+
| count(*) |
+----------+
|    12440 |
+----------+
1 row in set (0.00 sec)

Let’s count the same in HDFS

[cloudera@quickstart ~]$ hadoop fs -ls /job-demo-1
Found 1 items
drwxr-xr-x   - cloudera supergroup          0 2022-04-08 18:59 /job-demo-1/customers
[cloudera@quickstart ~]$ 
[cloudera@quickstart ~]$ hadoop fs -ls /job-demo-1/customers
Found 4 items
-rw-r--r--   1 cloudera cloudera     237222 2022-04-08 18:59 /job-demo-1/customers/part-m-00000
-rw-r--r--   1 cloudera cloudera     238044 2022-04-08 18:59 /job-demo-1/customers/part-m-00001
-rw-r--r--   1 cloudera cloudera     238238 2022-04-08 18:59 /job-demo-1/customers/part-m-00002
-rw-r--r--   1 cloudera cloudera     240411 2022-04-08 18:59 /job-demo-1/customers/part-m-00003
[cloudera@quickstart ~]$ 
[cloudera@quickstart ~]$ hadoop fs -cat /job-demo-1/customers/* | wc -l
12440

 

 

Previous                                                    Next                                                    Home

No comments:

Post a Comment