spark-core sparkJob作业提交
in bigdata with 0 comment

spark-core sparkJob作业提交

in bigdata with 0 comment

之前我们看有在sparkcontext中看到runJob的方法。我们每一个出发action的操作最终都会转化成为Job。

  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      resultHandler: (Int, U) => Unit): Unit = {
    if (stopped.get()) {
      throw new IllegalStateException("SparkContext has been shutdown")
    }
    val callSite = getCallSite
    val cleanedFunc = clean(func)
    logInfo("Starting job: " + callSite.shortForm)
    if (conf.getBoolean("spark.logLineage", false)) {
      logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
    }
    dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
    progressBar.foreach(_.finishAll())
    rdd.doCheckpoint()
  }

其实是交给DAGScheduler去提交相应的job

来看卡DAG中的RunJob

def runJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): Unit = {
    val start = System.nanoTime
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
    ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
    waiter.completionFuture.value.get match {
      case scala.util.Success(_) =>
        logInfo("Job %d finished: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
      case scala.util.Failure(exception) =>
        logInfo("Job %d failed: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
        // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
        val callerStackTrace = Thread.currentThread().getStackTrace.tail
        exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
        throw exception
    }
  }

关键的方法在submitJob方法:

那再看看submitJob

def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {
    val maxPartitions = rdd.partitions.length
    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
      throw new IllegalArgumentException(
        "Attempting to access a non-existent partition: " + p + ". " +
          "Total number of partitions: " + maxPartitions)
    }

    val jobId = nextJobId.getAndIncrement()
    if (partitions.size == 0) {
      return new JobWaiter[U](this, jobId, 0, resultHandler)
    }

    assert(partitions.size > 0)
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      SerializationUtils.clone(properties)))
    waiter
  }

这里有个eventProcessLoop来处理JobSubmitted这个事件。

首先来看卡DAGSchedulerEvent中的所有的事件
**JobSubmitted

MapStageSubmitted

StageCancelled

JobCancelled

JobGroupCancelled

AllJobsCancelled

ExecutorAdded

ExecutorLost

BeginEvent

GettingResultEvent

completion @ CompletionEvent

TaskSetFailed

ResubmitFailedStages**

eventProcessLoop 接收到这个消息后会将这个事件放入LinkedBlockQueue中等待处理

EventLoop这个线程启动后就会不断地从这个队列中取消息。收到这个消息后就会跟据消息的类型进行相应的处理

针对JobSubmitted 会调用

dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)

handleJonSubmitted具体实现方法:

  private[scheduler] def handleJobSubmitted(jobId: Int,
     finalRDD: RDD[_],
     func: (TaskContext, Iterator[_]) => _,
     partitions: Array[Int],
     callSite: CallSite,
     listener: JobListener,
     properties: Properties) {
   var finalStage: ResultStage = null
   try {
     // New stage creation may throw an exception if, for example, jobs are run on a
     // HadoopRDD whose underlying HDFS files have been deleted.
     finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
   } catch {
     case e: BarrierJobSlotsNumberCheckFailed =>
       logWarning(s"The job $jobId requires to run a barrier stage that requires more slots " +
         "than the total number of slots in the cluster currently.")
       // If jobId doesn't exist in the map, Scala coverts its value null to 0: Int automatically.
       val numCheckFailures = barrierJobIdToNumTasksCheckFailures.compute(jobId,
         new BiFunction[Int, Int, Int] {
           override def apply(key: Int, value: Int): Int = value + 1
         })
       if (numCheckFailures <= maxFailureNumTasksCheck) {
         messageScheduler.schedule(
           new Runnable {
             override def run(): Unit = eventProcessLoop.post(JobSubmitted(jobId, finalRDD, func,
               partitions, callSite, listener, properties))
           },
           timeIntervalNumTasksCheck,
           TimeUnit.SECONDS
         )
         return
       } else {
         // Job failed, clear internal data.
         barrierJobIdToNumTasksCheckFailures.remove(jobId)
         listener.jobFailed(e)
         return
       }

     case e: Exception =>
       logWarning("Creating new stage failed due to exception - job: " + jobId, e)
       listener.jobFailed(e)
       return
   }
   // Job submitted, clear internal data.
   barrierJobIdToNumTasksCheckFailures.remove(jobId)

   val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
   clearCacheLocs()
   logInfo("Got job %s (%s) with %d output partitions".format(
     job.jobId, callSite.shortForm, partitions.length))
   logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
   logInfo("Parents of final stage: " + finalStage.parents)
   logInfo("Missing parents: " + getMissingParentStages(finalStage))
   val jobSubmissionTime = clock.getTimeMillis()
   jobIdToActiveJob(jobId) = job
   activeJobs += job
   finalStage.setActiveJob(job)
   val stageIds = jobIdToStageIds(jobId).toArray
   val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
   listenerBus.post(
     SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
   submitStage(finalStage)
 }

这里有几个重要的方法

createResultStage

submitStage

创建resultState首先去找parentStage然后根据parentStage构造当前的Stage.

  private def createResultStage(
    rdd: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    jobId: Int,
    callSite: CallSite): ResultStage = {
  checkBarrierStageWithDynamicAllocation(rdd)
  checkBarrierStageWithNumSlots(rdd)
  checkBarrierStageWithRDDChainPattern(rdd, partitions.toSet.size)
  val parents = getOrCreateParentStages(rdd, jobId)
  val id = nextStageId.getAndIncrement()
  val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
  stageIdToStage(id) = stage
  updateJobIdStageIdMaps(jobId, stage)
  stage
}

那parentStage又是怎么操作的呢?

  private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
   getShuffleDependencies(rdd).map { shuffleDep =>
     getOrCreateShuffleMapStage(shuffleDep, firstJobId) //每个stage以shuffleMap结束为标志
   }.toList
 }

很明显是根据shuffle的依赖进行一层层的往上回溯的

private[scheduler] def getShuffleDependencies(
     rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]] = {
   val parents = new HashSet[ShuffleDependency[_, _, _]]
   val visited = new HashSet[RDD[_]]
   val waitingForVisit = new ArrayStack[RDD[_]]
   waitingForVisit.push(rdd)
   while (waitingForVisit.nonEmpty) {
     val toVisit = waitingForVisit.pop()
     if (!visited(toVisit)) {
       visited += toVisit
       toVisit.dependencies.foreach {
         case shuffleDep: ShuffleDependency[_, _, _] =>
           parents += shuffleDep
         case dependency =>
           waitingForVisit.push(dependency.rdd)
       }
     }
   }
   parents
 }

这里有个堆栈首先将final放进去,然后取出来找它的依赖,将它的依赖找出来后又放进堆栈里,然后继续直到最后堆栈里没有数据。

构建完成stage后就要准备提交stage啦。

 finalStage.setActiveJob(job)
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    submitStage(finalStage)

这几句的意思是设置job的状态加入监控,并将finalStage进行提交。

总结下:

分析一下 newStage() 如何划分 stage:

  1. 该方法在 new Stage() 的时候会调用 finalRDD 的 getParentStages()。
  2. getParentStages() 从 finalRDD 出发,反向 visit 逻辑执行图,遇到 NarrowDependency

就将依赖的 RDD 加入到 stage,遇到 ShuffleDependency 切开 stage,并递归到

ShuffleDepedency 依赖的 stage。

  1. 一个 ShuffleMapStage(不是最后形成 result 的 stage)形成后,会将该 stage 最后一个 RDD

注册到MapOutputTrackerMaster.registerShuffle(shuffleDep.shuffleId,

rdd.partitions.size),这一步很重要,因为 shuffle 过程需要 MapOutputTrackerMaster

来指示 ShuffleMapTask 输出数据的位置。

Responses