如何进行Spark底层通信RPC源码分析

67次阅读
没有评论

共计 6027 个字符,预计需要花费 16 分钟才能阅读完成。

本篇文章给大家分享的是有关如何进行 Spark 底层通信 RPC 源码分析,丸趣 TV 小编觉得挺实用的,因此分享给大家学习,希望大家阅读完这篇文章后可以有所收获,话不多说,跟着丸趣 TV 小编一起来看看吧。

RPC 通信: 无论是 hadoop2.x 的 Rpc 通信方式还是 Spark2.x 的 Rpc 通信方式,简单通俗的来说就是两个进程之间的远程通信,比如 java 一个 A 项目里面有一个 class A,里面有一个 washA 方法一个 B 项目里面有一个 Class B 类,里面有一个方法是 washB,B 项目通过代理模式以及 java 的反射机制调用到 A 项目里面的 washA, 这种情况下就可以理解为是一个简单的 Rpc 通信方式。

Spark2.x

Spark2.x 使用基于 RPC 的通信方式,去除了 1.x 的 Akka 的实现方式,只保留了 netty 的实现方式,Spark2.x Rpc 提供了上层抽象(RpcEndpoint、RpcEnv、RpcEndPointRef),具体的实现方式只要实现了定义的抽象就可以完成 Rpc 通信,Spark2.x 之后目前版本只保留了 Netty(NettyRpcEnv、NettyRpcEndpointRef)的实现,定义抽象最大的好处相信开发的朋友都很清楚,以后不管提供了什么方式的实现只要实现了 RPCEndpoint,RpcEnv,RpcEndpointRef 就可以完成的通信功能。比如自己写一个自己版本的 Rpc 通信实现。

Spark2.x 的 Rpc 通信方式主要包括一下几个重要方面

RpcEndpoint: 消息通信体,主要是用来接收消息、处理消息,实现了 RpcEndPoint 接口就是一个消息通信体(Master、Work),RpcEndpoint 需要向 RpcEnv 注册

RpcEnv:Rpc 通信的上下文环境,消息发送过来首先经过 RpcEnv 然后路由给对应的 RpcEndPoint,得到 RpcEndPoint

RpcEndPointRef:RpcEndPoint 的引用如果要想某个 RpcEndPoint 发送消息,首先要通过 RpcEnv 得到 RpcEndPoint 的引用

RpcEndPoint 接口 里面的定义如下

val rpcEnv : RpcEnv // 得到 RpcEnv 对象

final def self: RpcEndpointRef = {// 返回一个 RpcEnpointRef 这个方法通常用来自己给自己发送消息

  rpcEnv.endpointRef(this)

  }

def receive: PartialFunction[Any, Unit]// 处理 RpcEndPointRef.send 或者 RpcEndPointRef.reply 方法,该方法不需要进行响应信息

def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit]// 处理 RpcEndPointref.ask 发送的消息,处理完之后需要给调用 ask 的通信端响应消息(reply)

def onError(cause: Throwable)// 处理消息失败的时候会调用此方法

def onConnected(remoteAddress: RpcAddress)// 远程连接的当前节点的时候触发

def onDisconnected(remoteAddress: RpcAddress)// 远程连接断开时候触发

def onNetworkError(cause: Throwable, remoteAddress: RpcAddress)// 远程连接发生网络异常时触发

def onStop()// 停止 RpcEndPoint

def onStart()// 启动 RpcEndPoint,这里不仅仅是网络上说的启动 RpcEndPoint 处理任何消息,onStart 方法里面很多情况下可以写自己的 RpcEndPoint 的一些实现比如启动端口,或者创建目录

但是 RpcEndPoint 只有在 onStart 方法做一些处理之后 才可以接受 RpcEndPointRef 发送的消息

private[spark] trait ThreadSafeRpcEndpoint extends RpcEndpoint// 因为 receive 是并发操作如果要现成安全就是用 threadSafeRpcEndPoint

RpcEndPoint 的生命周期 构造 – onStart– receive — onStop, 注意 onStart 的方法是在调用 setRpcEndPoint 注册之后就会执行任何 RpcEndPoint 的 onStart 方法都是在注册之后执行的

原因后面的源码的提到

RpcEndpointRef: 抽象类

  def address: RpcAddress // 根据主机名端口返回一个 RppAddress

def name: String//name 一个字符串 暂时不知道干嘛的

def send(message: Any): Unit// 向 RpcEndPoint 发送一个消息 不需要返回结果

 def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]

  def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout) // 向 RpcEndPoint 发送消息并得到返回结果

def askWithRetry[T: ClassTag](message: Any): T = askWithRetry(message, defaultAskTimeout)// 想 RpcEndPoint 发送消息并在一定时间内返回结果 失败的时候并且进行一定次数的重试

RpcEnv

  private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef// 传入 RpcEndPoint 得到 RpcEndPointref 对象

  def address: RpcAddress// 根据主机名端口返回一个 RppAddress

def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef// 注册 RpcEndPoint 返回对应的 RpcEndPointRef

def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef]// 通过 uri 一步获取 RpcEndPointRef

 def stop(endpoint: RpcEndpointRef): Unit// 停止 RpcEndPoint 根据 RpcEndPointRef

  def shutdown(): Unit// 关闭 RpcEndPoint

 def awaitTermination(): Unit// 等待 RpcEndPoint 退出

object RpcEnv

 def create(

  name: String,

  host: String,

  port: Int,

  conf: SparkConf,

  securityManager: SecurityManager,

  clientMode: Boolean = false): RpcEnv = {

  val config = RpcEnvConfig(conf, name, host, port, securityManager, clientMode)

  new NettyRpcEnvFactory().create(config)

  }

// 通过 RpcEnvFactory.create 创建 RpcEnv 环境

RpcEnvConfig

private[spark] case class RpcEnvConfig(

  conf: SparkConf,

  name: String,

  host: String,

  port: Int,

  securityManager: SecurityManager,

  clientMode: Boolean)

case 类 里面包括 SparkConf,name,host,port 等

NettyRpcEnv NettyRpcEnv 通过 NettyRpcEnvFactory 的 create 方法创建

 val nettyEnv =

  new NettyRpcEnv(sparkConf, javaSerializerInstance, config.host, config.securityManager)// 创建 nettyEnv

 private val dispatcher: Dispatcher = new Dispatcher(this)

Dispatcher 负责 RPC 消息的路由,它能够将消息路由到对应的 RpcEndpoint 进行处理, 同时存放 RpcEndPoint 与 RpcEndPointRef 的映射

NettyStreamManager 负责提供文件服务(文件、JAR 文件、目录)

TransportContext 负责管理网路传输上下文信息:创建 MessageEncoder、MessageDecoder、TransportClientFactory、TransportServer

NettyRpcHandler 负责处理网络 IO 事件,接收 RPC 调用请求,并通过 Dispatcher 派发消息

这里说一下 Dispatcher 该类主要负责 Rpc 消息路由 里面有一个内部累 EndPointData 但是有一个现成安全的 Inbox 这里面存放的时候收到的消息,非常重要后面会做具体分析

private class EndpointData(

  val name: String,

  val endpoint: RpcEndpoint,

  val ref: NettyRpcEndpointRef) {

  val inbox = new Inbox(ref, endpoint)

  }

  private val endpoints = new ConcurrentHashMap[String, EndpointData]// 存放 name- 对应的 EndPoint 的信息

  private val endpointRefs = new ConcurrentHashMap[RpcEndpoint, RpcEndpointRef]// 存放 RpcEndpoint, RpcEndpointRef 的映射关系

  private val receivers = new LinkedBlockingQueue[EndpointData]// 队列下面会有一个现成不断的从里面取出来处理

 def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {

  val addr = RpcEndpointAddress(nettyEnv.address, name)

  val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)

  synchronized {

  if (stopped) {

  throw new IllegalStateException(RpcEnv has been stopped)

  }

  if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {

  throw new IllegalArgumentException(s There is already an RpcEndpoint called $name)

   }

  val data = endpoints.get(name)

  endpointRefs.put(data.endpoint, data.ref)

  receivers.offer(data)  // for the OnStart message

  }

  endpointRef

  }

// 注册 RpcEndPoint 在这里面发生 同时将 data put 到 receivers 

在 NettyRpcEndPoint 里面有一个 threadpool

private val threadpool: ThreadPoolExecutor = {

  val numThreads = nettyEnv.conf.getInt(spark.rpc.netty.dispatcher.numThreads ,

  math.max(2, Runtime.getRuntime.availableProcessors()))

  val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, dispatcher-event-loop)

  for (i – 0 until numThreads) {

  pool.execute(new MessageLoop)

  }

  pool

  }

MessageLoop 是一个实现了 Runnable 的类,里面的 run 方法里面不断从 receivers 取出来进行处理

重要代码   data.inbox.process(Dispatcher.this)

这个里面有一个非常重要的点就是 什么时候调用 onStart 的方法因为 receivers 里面存放的是 EndPoint 的信息 同时创建 EndPointData 对象

进入 Inbox 里面看一下

  inbox =   // Give this an alias so we can use it more clearly in closures.

  @GuardedBy(this)

  protected val messages = new java.util.LinkedList[InboxMessage]()

 inbox.synchronized {

  messages.add(OnStart)

  }

创建这个类的时候会有一个 messagelinkedList 的 list 集合 在创建这个结合之后 就会将 onStart 方法添加到里面,并且是现成安全的

然后 process 方法里面会不断的拿到集合的数据来进行对应的操作

 case OnStart =

  endpoint.onStart()

  if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {

  inbox.synchronized {

  if (!stopped) {

  enableConcurrent = true

  }

  }

  }

这个时候就会调用 onStart 方法

这个时候相当于 RpcEndPoint 可以接受消息并且处理了

Spark Rpc 通信方式 分为本地消息和远程消息,本地消息相当于调用的方法直接存放到 Index(中文收件箱),远程消息需要走 NettyRpcHandler

以上就是如何进行 Spark 底层通信 RPC 源码分析,丸趣 TV 小编相信有部分知识点可能是我们日常工作会见到或用到的。希望你能通过这篇文章学到更多知识。更多详情敬请关注丸趣 TV 行业资讯频道。

正文完
 
丸趣
版权声明:本站原创文章,由 丸趣 2023-08-16发表,共计6027字。
转载说明:除特殊说明外本站除技术相关以外文章皆由网络搜集发布,转载请注明出处。
评论(没有评论)