spark 2.x的rpc目前只剩下netty的实现。spark-RPC主要包含三个模块:
RpcEnv:RPC调用的环境,消息通过RPCEnv决定发往哪个endpoint
rpcEndPoint:主要用来接收处理不同的消息。
RpcEndpointRef:rpcEndPoint的引用。A往B发送消息首先得从env拿到B的引用。
RPCEndpoint
val rpcEnv: RpcEnv//上下文环境
final def self: RpcEndpointRef = {
require(rpcEnv != null, "rpcEnv has not been initialized")
rpcEnv.endpointRef(this)
}//主要用于对自身消息的处理
//一下是对消息的处理
//接收消息
def receive: PartialFunction[Any, Unit] = {
case _ => throw new SparkException(self + " does not implement 'receive'")
}
//接收ref发来的 ask和reply消息无需应答
def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case _ => context.sendFailure(new SparkException(self + " won't reply anything"))
}
//错误时处理
def onError(cause: Throwable): Unit = {
// By default, throw e and let RpcEnv handle it
throw cause
}
//连接时的处理
def onConnected(remoteAddress: RpcAddress): Unit = {
// By default, do nothing.
}
//失联
def onDisconnected(remoteAddress: RpcAddress): Unit = {
// By default, do nothing.
}
//网络错误
def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit = {
// By default, do nothing.
}
//开启启动
def onStart(): Unit = {
// By default, do nothing.
}
//停止
def onStop(): Unit = {
// By default, do nothing.
}
//停止
final def stop(): Unit = {
val _self = self
if (_self != null) {
rpcEnv.stop(_self)
}
}
}
rpcEndPoint:construct->onstart->handler msg->onstop
RPCEndPointRef:
def address: RpcAddress //host:port rpc地址
def name: String //名字
def send(message: Any): Unit //发送消息 无需返回
def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]
def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)//发送消息在一定时间内返回异步
def askSync[T: ClassTag](message: Any): T = askSync(message, defaultAskTimeout)
def askSync[T: ClassTag](message: Any, timeout: RpcTimeout): T = {
val future = ask[T](message, timeout)
timeout.awaitResult(future)
}//同步返回
RpcEnv:
private[spark] abstract class RpcEnv(conf: SparkConf) {
private[spark] val defaultLookupTimeout = RpcUtils.lookupRpcTimeout(conf) //查询超时时间
返回已经注册的RPCEndpointRef
private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
监听的RpcAddress
def address: RpcAddress
向RpcEnv中注册
def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
//取回通过URL异步RpcEndpointRef
def asyncSetupEndpointRefByURI(uri: String): Future[RpcEndpointRef]
def setupEndpointRefByURI(uri: String): RpcEndpointRef = {
defaultLookupTimeout.awaitResult(asyncSetupEndpointRefByURI(uri))
}
def setupEndpointRef(address: RpcAddress, endpointName: String): RpcEndpointRef = {
setupEndpointRefByURI(RpcEndpointAddress(address, endpointName).toString)
}
//停止 RpcEndpoint
def stop(endpoint: RpcEndpointRef): Unit
//关闭
def shutdown(): Unit
//等待
def awaitTermination(): Unit
//反序列化
def deserialize[T](deserializationAction: () => T): T
def fileServer: RpcEnvFileServer
//根据URI开启文件下载
def openChannel(uri: String): ReadableByteChannel
}
接下来看看nettyRpc的实现:
private[netty] val transportConf = SparkTransportConf.fromSparkConf(
conf.clone.set("spark.rpc.io.numConnectionsPerPeer", "1"),
"rpc",
conf.getInt("spark.rpc.io.threads", numUsableCores)) //传输的配置
private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores) //dispatcher消息处理
private val streamManager = new NettyStreamManager(this) //处理文件等
private val transportContext = new TransportContext(transportConf,
new NettyRpcHandler(dispatcher, this, streamManager)) //负责管理网路传输上下文信息
private[netty] val clientConnectionExecutor = ThreadUtils.newDaemonCachedThreadPool(
"netty-rpc-connection",
conf.getInt("spark.rpc.connect.threads", 64)) //client连接池
client connector主要是处理的连接的任务在outbox中可以看到
private def launchConnectTask(): Unit = {
connectFuture = nettyEnv.clientConnectionExecutor.submit(new Callable[Unit] {
override def call(): Unit = {
try {
val _client = nettyEnv.createClient(address)
outbox.synchronized {
client = _client
if (stopped) {
closeClient()
}
}
} catch {
case ie: InterruptedException =>
// exit
return
case NonFatal(e) =>
outbox.synchronized { connectFuture = null }
handleNetworkFailure(e)
return
}
outbox.synchronized { connectFuture = null }
// It's possible that no thread is draining now. If we don't drain here, we cannot send the
// messages until the next message arrives.
drainOutbox()
}
})
}
继续看env:
private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()
//他们将每个rpcaddr出去的消息都用hashmap存起来,不会阻塞。
//将消息发送到所有订阅的endpoint
def postToAll(message: InboxMessage): Unit = {
val iter = endpoints.keySet().iterator()
while (iter.hasNext) {
val name = iter.next
postMessage(name, message, (e) => { e match {
case e: RpcEnvStoppedException => logDebug (s"Message $message dropped. ${e.getMessage}")
case e: Throwable => logWarning(s"Message $message dropped. ${e.getMessage}")
}}
)}
}
//具体发送的msg的操作
private def postMessage(
endpointName: String,
message: InboxMessage,
callbackIfStopped: (Exception) => Unit): Unit = {
val error = synchronized {
val data = endpoints.get(endpointName)
if (stopped) {
Some(new RpcEnvStoppedException())
} else if (data == null) {
Some(new SparkException(s"Could not find $endpointName."))
} else {
data.inbox.post(message)
receivers.offer(data)
None
}
}
关键在data.inbox.post(message)
receivers.offer(data)
去看看indbox
inbox里有个
protected val messages = new java.util.LinkedListInboxMessage
专门用来存接收的消息的post message就是将消息放到linkedList中
receiver是dispatcher中的一个存储消息的 dispatcher中专门有个线程不断取这个消息的动作。
inbox outbox可理解为每个对象有个信箱不断地放和取。
本文由 妖言君 创作,采用 知识共享署名4.0 国际许可协议进行许可
本站文章除注明转载/出处外,均为本站原创或翻译,转载前请务必署名
最后编辑时间为: Jan 10, 2021 at 02:50 pm