共计 3159 个字符,预计需要花费 8 分钟才能阅读完成。
这篇文章主要介绍“sqoop 内部结构是怎样的”,在日常操作中,相信很多人在 sqoop 内部结构是怎样的问题上存在疑惑,丸趣 TV 小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”sqoop 内部结构是怎样的”的疑惑有所帮助!接下来,请跟着丸趣 TV 小编一起来学习吧!
1.1 Sqoop 内部结构
Sqoop 程序是由的主类 com.cloudera.sqoop.Sqoop 驱动。有限数量的额外的类在同一个包:SqoopOptions(如前所述),ConnFactory(即熟练操作 ManagerFactory 实例)。
1.1.1 一般程序流程
一般的程序流程如下:
com.cloudera.sqoop.Sqoop 是主类和实现了 Tool,一个新的实例 ToolRunner 被推出,Sqoop 的第一个参数是一个字符串,并在 SqoopTool 中定义并执行,SqoopTool 来执行用户的各种请求操作(如:import, export, codegen 等)。
SqoopTool 将解析其余的参数,设置相应的字段在 SqoopOptions 类,然后执行。
在 SqoopTool 的 run()方法中,import、export 或者其他正确的指令被执行。一般情况下 ConnManager 一般在 SqoopOptions 数据的基础上进行实例化。ConnFactory 被用于从 ManagerFactory 中获取一个 ConnManager。这在前面的部分已经进行了描述。Imports、exports 或者其他大型数据的迁移操作通常是一个并行的、可靠的 MapReduce 任务。Import 操作并不是一定要以 MapReduce 作业的方式运行,ConnManager.importTable()将确定如何以最佳的方式进行 import 操作。每一个主要操作实际上都有 ConnMananger 控制,除了生成代码的操作,这些操作是由 CompilationManager 和 ClassWriter 做的 (都在 com.cloudera.sqoop.orm 包中)。导入数据到 Hive 的操作是由 com.cloudera.sqoop.hive.HiveImport 的 importTable() 完成的,这样做是为了不为使用 ConnManager 的实现都担心。
ConnManager 的 importTable()方法接收一个类型为 ImportJobContext 的参数,其中包含这个方法所需的各种参数值。将来,该类可扩展附加参数,以实现功能更加强大的 import 操作。同样,exportTable()方法接收的参数类型 ExportJobContext。这些类包含一系列导入 / 导出,指向 SqoopOptions 对象,以及其他相关的数据。
1.1.2 子包
com.cloudera.sqoop 包中的子包,包括:
※ Hive:便于数据导入到 Hive
※ IO: 实现的 java.io. * 接口
※ Lib: 外部公共 API(如前所述)
※ Manager: ConnManager 和 ManagerFactory 的接口和它们的实现
※ Mapreduce: 新的(0.20 +)MapReduce 的 API 接口的类
※ Orm: 代码自动生成
※ Tool: 实现 SqoopTool
※ Util: 各种实用工具类
IO 包中的 OutputStream 和 BufferedWriter 的实现被用于直接向 HDFS 进行写入。SplittableBufferedWriter 允许为客户端打开一个单一的 BufferedWriter,在引擎之下,当达到目标值后连续的写入文件系统。这允许在 Sqoopimport 的同时使用压缩机制(如 gzip), 同时允许在 MapR 任务之后对数据集进行分割。大对象文件存储系统的代码也存在于 IO 包中。
Mapreduce 包中的代码用于直接与 Hadoop MapReduce 做接口,将在下一章节中详述。
ORM 包中的代码用于生产代码。它依赖于提供了 com.sun.tools.javac 包的 JDK 的 tools.jar 包。
UTIL 包中包含用于整个 Sqoop 的各种工具
※ ClassLoaderStack:管理由当前线程的堆栈使用的 ClassLoader 的实例,这正是当以本地模式运行 MapReduce 任务时自动生成代码写入当心线程的原理。
※ DirectImportUtils:包含一些直接进行 Hadoop import 操作的方便方法。
※ Executor:启动外部进程,并连接这些来生成由一个 AsyncSink(见下面更详细地)的流处理程序。
※ ExportException:当 exports 失败时由 ConnManagers 抛出异常。
※ ImportException:当 import 失败时由 ConnManagers 抛出异常。
※ JDBCURL:处理连接字符串的解析,这是类 URL,是不规范的、不合格的。
※ PerfCounters:被用来估计用于显示给用户的传输速率。
※ ResultSetPrinter:漂亮地打印结果集。
在不同的时候,Sqoop 从外部程序中读取 stdout,最简单的模式就是由 LocalMySQLManager 和 DirectPostgresqlManager 执行的直接模式(direct-mode)import。之后由 Runtime.exec()产生一个进程,该进程的标准输出 (Process.getInputStream()) 和潜在错误 (Process.getErrorStream()) 都需要被处理。在这些流之间无法读取更多的数据从而导致在写入更多数据之前外部进程阻塞。因此,这些都必须处理,最好以异步的方式。
按照 Sqoop 的说法,“异步接收器”是一个线程需要一个 InputStream 和读取完成。这些实现 AsyncSink 实现。com.cloudera.sqoop.util.AsyncSink 抽象类定义了这家工厂必须执行的操作。processStream()将产生另一个线程立即开始处理从 InputStream 中读取的数据参数; 它必须读这流来完成的。join()方法允许外部线程等待,直到处理完成。
一些 stock 被同步实现:LoggingAsyncSink 将重复 InputStream 上的一切在 log4j INFO 语句中。NullAsyncSink 消耗所有的输入和什么都不做。
各种 ConnManagers 使得外部进程以内部类的方式拥有它们自己的 AsyncSink 实现,它们通过数据库 tools 读取,并且促使数据流向 HDFS,有可能在此期间进行格式转换。
1.1.3 与 MapReduce 的接口
Sqoop 调度 MapReduce 作业产生 imports 和 exports。配置和执行的 MapReduce 工作如下几个共同的步骤(配置 InputFormat 配置 OutputFormat 设置映射的实施; 等等)。这些步骤是在 com.cloudera.sqoop.mapreduce.JobBase 类中的。为了使用,JobBase 允许一个用户来指定 InputFormat,OutputFormat,和映射器。
JobBase 本身就是 ImportJobBase 和 ExportJobBase 的子集,为特定的配置步骤提供更好的支持,分别由 ImportJobBase 和 ExportJobBase 的子类。,ImportJobBase.runImport()将调用的配置步骤,并运行一个工作表导入 HDFS。
到此,关于“sqoop 内部结构是怎样的”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注丸趣 TV 网站,丸趣 TV 小编会继续努力为大家带来更多实用的文章!