spark-core源码-RPC
in bigdata with 0 comment

spark-core源码-RPC

in bigdata with 0 comment

spark 2.x的rpc目前只剩下netty的实现。spark-RPC主要包含三个模块:

RpcEnv:RPC调用的环境,消息通过RPCEnv决定发往哪个endpoint

rpcEndPoint:主要用来接收处理不同的消息。

RpcEndpointRef:rpcEndPoint的引用。A往B发送消息首先得从env拿到B的引用。

RPCEndpoint

val rpcEnv: RpcEnv//上下文环境
final def self: RpcEndpointRef = {
    require(rpcEnv != null, "rpcEnv has not been initialized")
    rpcEnv.endpointRef(this)
  }//主要用于对自身消息的处理
  //一下是对消息的处理
  //接收消息
  def receive: PartialFunction[Any, Unit] = {
    case _ => throw new SparkException(self + " does not implement 'receive'")
  }
  //接收ref发来的 ask和reply消息无需应答
  def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    case _ => context.sendFailure(new SparkException(self + " won't reply anything"))
  }

  //错误时处理
  def onError(cause: Throwable): Unit = {
    // By default, throw e and let RpcEnv handle it
    throw cause
  }

  //连接时的处理
  def onConnected(remoteAddress: RpcAddress): Unit = {
    // By default, do nothing.
  }

 //失联
  def onDisconnected(remoteAddress: RpcAddress): Unit = {
    // By default, do nothing.
  }

  //网络错误
  def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = {
    // By default, do nothing.
  }

  //开启启动
  def onStart(): Unit = {
    // By default, do nothing.
  }

  //停止
  def onStop(): Unit = {
    // By default, do nothing.
  }

  //停止
  final def stop(): Unit = {
    val _self = self
    if (_self != null) {
      rpcEnv.stop(_self)
    }
  }
}  

rpcEndPoint:construct->onstart->handler msg->onstop

RPCEndPointRef:

 def address: RpcAddress //host:port rpc地址

  def name: String //名字
  def send(message: Any): Unit //发送消息 无需返回
  def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T] 
  def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)//发送消息在一定时间内返回异步
  def askSync[T: ClassTag](message: Any): T = askSync(message, defaultAskTimeout)
  def askSync[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
    val future = ask[T](message, timeout)
    timeout.awaitResult(future)
  }//同步返回

RpcEnv:

private[spark] abstract class RpcEnv(conf: SparkConf) {

  private[spark] val defaultLookupTimeout = RpcUtils.lookupRpcTimeout(conf) //查询超时时间

  返回已经注册的RPCEndpointRef
  private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef

  监听的RpcAddress
  def address: RpcAddress
  
  向RpcEnv中注册
  def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
 //取回通过URL异步RpcEndpointRef
  def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef]
  
  def setupEndpointRefByURI(uri: String): RpcEndpointRef = {
    defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByURI(uri))
  }

  def setupEndpointRef(address: RpcAddress, endpointName: String): RpcEndpointRef = {
    setupEndpointRefByURI(RpcEndpointAddress(address, endpointName).toString)
  }
  //停止 RpcEndpoint
  def stop(endpoint: RpcEndpointRef): Unit
  //关闭
  def shutdown(): Unit
  //等待
  def awaitTermination(): Unit
  //反序列化 
  def deserialize[T](deserializationAction: () => T): T
  def fileServer: RpcEnvFileServer
   //根据URI开启文件下载
  def openChannel(uri: String): ReadableByteChannel
}

接下来看看nettyRpc的实现:

 private[netty] val transportConf = SparkTransportConf.fromSparkConf(
    conf.clone.set("spark.rpc.io.numConnectionsPerPeer", "1"),
    "rpc",
    conf.getInt("spark.rpc.io.threads", numUsableCores)) //传输的配置
  private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores) //dispatcher消息处理
  private val streamManager = new NettyStreamManager(this) //处理文件等
  private val transportContext = new TransportContext(transportConf,
    new NettyRpcHandler(dispatcher, this, streamManager)) //负责管理网路传输上下文信息
  private[netty] val clientConnectionExecutor = ThreadUtils.newDaemonCachedThreadPool(
    "netty-rpc-connection",
    conf.getInt("spark.rpc.connect.threads", 64)) //client连接池

client connector主要是处理的连接的任务在outbox中可以看到

private def launchConnectTask(): Unit = {
    connectFuture = nettyEnv.clientConnectionExecutor.submit(new Callable[Unit] {

      override def call(): Unit = {
        try {
          val _client = nettyEnv.createClient(address)
          outbox.synchronized {
            client = _client
            if (stopped) {
              closeClient()
            }
          }
        } catch {
          case ie: InterruptedException =>
            // exit
            return
          case NonFatal(e) =>
            outbox.synchronized { connectFuture = null }
            handleNetworkFailure(e)
            return
        }
        outbox.synchronized { connectFuture = null }
        // It's possible that no thread is draining now. If we don't drain here, we cannot send the
        // messages until the next message arrives.
        drainOutbox()
      }
    })
  }

继续看env:

  private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()
  //他们将每个rpcaddr出去的消息都用hashmap存起来,不会阻塞。
 //将消息发送到所有订阅的endpoint
 def postToAll(message: InboxMessage): Unit = {
    val iter = endpoints.keySet().iterator()
    while (iter.hasNext) {
      val name = iter.next
        postMessage(name, message, (e) => { e match {
          case e: RpcEnvStoppedException => logDebug (s"Message $message dropped. ${e.getMessage}")
          case e: Throwable => logWarning(s"Message $message dropped. ${e.getMessage}")
        }}
      )}
  }
//具体发送的msg的操作
private def postMessage(
      endpointName: String,
      message: InboxMessage,
      callbackIfStopped: (Exception) => Unit): Unit = {
    val error = synchronized {
      val data = endpoints.get(endpointName)
      if (stopped) {
        Some(new RpcEnvStoppedException())
      } else if (data == null) {
        Some(new SparkException(s"Could not find $endpointName."))
      } else {
        data.inbox.post(message)
        receivers.offer(data)
        None
      }
}

关键在data.inbox.post(message)

receivers.offer(data)

去看看indbox

inbox里有个

protected val messages = new java.util.LinkedListInboxMessage

专门用来存接收的消息的post message就是将消息放到linkedList中

receiver是dispatcher中的一个存储消息的 dispatcher中专门有个线程不断取这个消息的动作。

inbox outbox可理解为每个对象有个信箱不断地放和取。

Responses