Sqoop的原理分析是什么

81次阅读
没有评论

共计 5632 个字符,预计需要花费 15 分钟才能阅读完成。

这篇文章主要为大家分析了 Sqoop 的原理分析是什么的相关知识点,内容详细易懂,操作细节合理,具有一定参考价值。如果感兴趣的话,不妨跟着跟随丸趣 TV 小编一起来看看,下面跟着丸趣 TV 小编一起深入学习“Sqoop 的原理分析是什么”的知识吧。

一简介

Sqoop 是一个用来将 Hadoop 和关系型数据库中的数据相互转移的工具,可以将一个关系型数据库(例如:MySQL ,Oracle ,Postgres 等)中的数据导进到 Hadoop 的 HDFS 中,也可以将 HDFS 的数据导进到关系型数据库中。

二特点

Sqoop 中一大亮点就是可以通过 hadoop 的 mapreduce 把数据从关系型数据库中导入数据到 HDFS。

三 Sqoop 命令

Sqoop 大约有 13 种命令, 和几种通用的参数 (都支持这 13 种命令),这里先列出这 13 种命令。

接 着列出 Sqoop 的各种通用参数, 然后针对以上 13 个命令列出他们自己的参数。Sqoop 通用参数又分 Common arguments,Incremental import arguments,Output line formatting arguments,Input parsing arguments,Hive arguments,HBase arguments,Generic Hadoop command-line arguments, 下面一一说明:

1.Common arguments

通用参数, 主要是针对关系型数据库链接的一些参数

四  sqoop 命令举例

1)列出 mysql 数据库中的所有数据库

sqoop list-databases –connect jdbc:mysql://localhost:3306/ –username root –password 123456

2) 连接 mysql 并列出 test 数据库中的表

sqoop list-tables –connect jdbc:mysql://localhost:3306/test –username root –password 123456

命令中的 test 为 mysql 数据库中的 test 数据库名称 username password 分别为 mysql 数据库的用户密码

3) 将关系型数据的表结构复制到 hive 中, 只是复制表的结构,表中的内容没有复制过去。

sqoop create-hive-table –connect jdbc:mysql://localhost:3306/test

–table sqoop_test –username root –password 123456 –hive-table

test

其中 –table sqoop_test 为 mysql 中的数据库 test 中的表 –hive-table

test 为 hive 中新建的表名称

4) 从关系数据库导入文件到 hive 中

sqoop import –connect jdbc:mysql://localhost:3306/zxtest –username

root –password 123456 –table sqoop_test –hive-import –hive-table

s_test -m 1

5) 将 hive 中的表数据导入到 mysql 中, 在进行导入之前,mysql 中的表

hive_test 必须已经提前创建好了。

sqoop export –connect jdbc:mysql://localhost:3306/zxtest –username

root –password root –table hive_test –export-dir

/user/hive/warehouse/new_test_partition/dt=2012-03-05

6)从数据库导出表的数据到 HDFS 上文件

./sqoop import –connect

jdbc:mysql://10.28.168.109:3306/compression –username=hadoop

–password=123456 –table HADOOP_USER_INFO -m 1 –target-dir

/user/test

7)从数据库增量导入表数据到 hdfs 中

./sqoop import –connect jdbc:mysql://10.28.168.109:3306/compression

–username=hadoop –password=123456 –table HADOOP_USER_INFO -m 1

–target-dir /user/test –check-column id –incremental append

–last-value 3

五 Sqoop 原理(以 import 为例)

Sqoop 在 import 时,需要制定 split-by 参数。Sqoop 根据不同的 split-by 参数值来进行切 分, 然后将切分出来的区域分配到不同 map 中。每个 map 中再处理数据库中获取的一行一行的值,写入到 HDFS 中。同时 split-by 根据不同的参数类 型有不同的切分方法,如比较简单的 int 型,Sqoop 会取最大和最小 split-by 字段值,然后根据传入的 num-mappers 来确定划分几个区 域。比如 select max(split_by),min(split-by) from 得到的 max(split-by) 和 min(split-by) 分别为 1000 和 1,而 num-mappers 为 2 的话,则会分成两个区域 (1,500) 和 (501-100), 同时也会分成 2 个 sql 给 2 个 map 去进行导入操作,分别为 select XXX from table where split-by =1 and split-by 500 和 select XXX from table where split-by =501 and split-by =1000。最后每个 map 各自获取各自 SQL 中的数据进行导入工作。

六 mapreduce job 所需要的各种参数在 Sqoop 中的实现

1) InputFormatClass

com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat

2) OutputFormatClass

1)TextFile

com.cloudera.sqoop.mapreduce.RawKeyTextOutputFormat

2)SequenceFile

org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat

3)AvroDataFile

com.cloudera.sqoop.mapreduce.AvroOutputFormat

3)Mapper

1)TextFile

com.cloudera.sqoop.mapreduce.TextImportMapper           

2)SequenceFile

com.cloudera.sqoop.mapreduce.SequenceFileImportMapper     

3)AvroDataFile

com.cloudera.sqoop.mapreduce.AvroImportMapper

4)taskNumbers

1)mapred.map.tasks(对应 num-mappers 参数)   

2)job.setNumReduceTasks(0);

这里以命令行:import –connect jdbc:mysql://localhost/test –username root –password 123456 –query“select sqoop_1.id as foo_id, sqoop_2.id as bar_id from sqoop_1 ,sqoop_2 WHERE $CONDITIONS”–target-dir /user/sqoop/test -split-by sqoop_1.id   –hadoop-home=/home/hdfs/hadoop-0.20.2-CDH3B3 –num-mappers 2

1)设置 Input

DataDrivenImportJob.configureInputFormat(Job job, String tableName,String tableClassName, String splitByCol)

a)DBConfiguration.configureDB(Configuration conf, String driverClass,   String dbUrl, String userName, String passwd, Integer fetchSize)

1).mapreduce.jdbc.driver.class com.mysql.jdbc.Driver

2).mapreduce.jdbc.url jdbc:mysql://localhost/test         

3).mapreduce.jdbc.username root

4).mapreduce.jdbc.password 123456

5).mapreduce.jdbc.fetchsize -2147483648

b)DataDrivenDBInputFormat.setInput(Job job,Class ? extends DBWritable inputClass, String inputQuery, String inputBoundingQuery)

1)job.setInputFormatClass(DBInputFormat.class);           

2)mapred.jdbc.input.bounding.query SELECT MIN(sqoop_1.id), MAX(sqoop_2.id) FROM (select sqoop_1.id as foo_id, sqoop_2.id as bar_id from sqoop_1 ,sqoop_2 WHERE (1 = 1) ) AS t1

3)job.setInputFormatClass(com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat.class);

4)mapreduce.jdbc.input.orderby sqoop_1.id

c)mapreduce.jdbc.input.class QueryResult

d)sqoop.inline.lob.length.max 16777216

2)设置 Output

ImportJobBase.configureOutputFormat(Job job, String tableName,String tableClassName)

a)job.setOutputFormatClass(getOutputFormatClass());           

b)FileOutputFormat.setOutputCompressorClass(job, codecClass);

c)SequenceFileOutputFormat.setOutputCompressionType(job,CompressionType.BLOCK);

d)FileOutputFormat.setOutputPath(job, outputPath);

3)设置 Map

DataDrivenImportJob.configureMapper(Job job, String tableName,String tableClassName)

   a)job.setOutputKeyClass(Text.class);
   b)job.setOutputValueClass(NullWritable.class);
c)job.setMapperClass(com.cloudera.sqoop.mapreduce.TextImportMapper);

4)设置 task number

JobBase.configureNumTasks(Job job)

mapred.map.tasks 4

job.setNumReduceTasks(0);

七 大概流程

1. 读取要导入数据的表结构,生成运行类,默认是 QueryResult,打成 jar 包,然后提交给 Hadoop

2. 设置好 job,主要也就是设置好以上第六章中的各个参数

3. 这里就由 Hadoop 来执行 MapReduce 来执行 Import 命令了,

1)首先要对数据进行切分,也就是 DataSplit

DataDrivenDBInputFormat.getSplits(JobContext job)

2)切分好范围后,写入范围,以便读取

DataDrivenDBInputFormat.write(DataOutput output) 这里是 lowerBoundQuery and upperBoundQuery

3)读取以上 2)写入的范围

DataDrivenDBInputFormat.readFields(DataInput input)

4)然后创建 RecordReader 从数据库中读取数据

DataDrivenDBInputFormat.createRecordReader(InputSplit split,TaskAttemptContext context)

5)创建 Map

TextImportMapper.setup(Context context)

6)RecordReader 一行一行从关系型数据库中读取数据,设置好 Map 的 Key 和 Value,交给 Map

DBRecordReader.nextKeyValue()

7)运行 map

TextImportMapper.map(LongWritable key, SqoopRecord val, Context context)

最后生成的 Key 是行数据,由 QueryResult 生成,Value 是 NullWritable.get()

关于“Sqoop 的原理分析是什么”就介绍到这了, 更多相关内容可以搜索丸趣 TV 以前的文章,希望能够帮助大家答疑解惑,请多多支持丸趣 TV 网站!

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-04发表,共计5632字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)