爱豆吧!

idouba@beta.

【hadoop代码笔记】Mapreduce shuffle过程之Map输出过程

一、概要描述

shuffle是MapReduce的一个核心过程,因此没有在前面的MapReduce作业提交的过程中描述,而是单独拿出来比较详细的描述。 根据官方的流程图示如下:

本篇文章中只是想尝试从代码分析来说明在map端是如何将map的输出保存下来等待reduce来取。 在执行每个map task时,无论map方法中执行什么逻辑,最终都是要把输出写到磁盘上。如果没有reduce阶段,则直接输出到hdfs上,如果有有reduce作业,则每个map方法的输出在写磁盘前线在内存中缓存。每个map task都有一个环状的内存缓冲区,存储着map的输出结果,默认100m,在每次当缓冲区快满的时候由一个独立的线程将缓冲区的数据以一个溢出文件的方式存放到磁盘,当整个map task结束后再对磁盘中这个map task产生的所有溢出文件做合并,被合并成已分区且已排序的输出文件。然后等待reduce task来拉数据。

二、 流程描述

  1. 在child进程调用到runNewMapper时,会设置output为NewOutputCollector,来负责map的输出。
  2. 在map方法的最后,不管经过什么逻辑的map处理,最终一般都要调用到TaskInputOutputContext的write方法,进而调用到设置的output即NewOutputCollector的write方法
  3. NewOutputCollector其实只是对MapOutputBuffer的一个封装,其write方法调用的是MapOutputBuffer的collect方法。
  4. MapOutputBuffer的collect方法中把key和value序列化后存储在一个环形缓存中,如果缓存满了则会调用startspill方法设置信号量,使得一个独立的线程SpillThread可以对缓存中的数据进行处理。
  5. SpillThread线程的run方法中调用sortAndSpill方法对缓存中的数据进行排序后写溢出文件。
  6. 当map输出完成后,会调用output的close方法。
  7. 在close方法中调用flush方法,对剩余的缓存进行处理,最后调用mergeParts方法,将前面过程的多个溢出文件合并为一个。
Read more →

【hadoop代码笔记】hadoop作业提交之汇总

一、概述 在本篇博文中,试图通过代码了解hadoop job执行的整个流程。即用户提交的mapreduce的jar文件、输入提交到hadoop的集群,并在集群中运行。重点在代码的角度描述整个流程,有些细节描述的并不那么详细。 汇总的代码流程图附件:hadoop_mapreduce_jobsubmit 二、主要流程 Jobclient通过RPC方式调用到jobtracker的submitJob方法提交作业,包括作业的jar、分片和作业描述。 JobTracker的submitJob方法吧job加入到内存队列中 Read more →

【hadoop代码笔记】hadoop作业提交之Child启动map任务

一、概要描述

上篇博文描述了TaskTracker启动一个独立的java进程来执行Map或Reduce任务。在本篇和下篇博文中我们会关注启动的那个入口是org.apache.hadoop.mapred.Child的这个Java进程是如何执行用户定义的map或Reduce任务的。

上篇文章,TaskRunner线程执行中,会构造一个_java –D** Child address port tasked_这 样第一个java命令,单独启动一个java进程。在Child的main函数中通过TaskUmbilicalProtocol协议,从 TaskTracker获得需要执行的Task,并调用Task的run方法来执行,而Task的run方法会通过java反射机制构造 Mapper,InputFormat,mapperContext,然后调用构造的mapper的run方法执行mapper操作。

二、 流程描述

1.Child类根据前面输入的三个参数,即tasktracher的地址、端口、taskid。通过TaskUmbilicalProtocol协议,从TaskTracker获得需要执行的Task,在Child的main函数中调用执行。

  1. 在Chilld中,执行Task的run方法。Task 的run方法。是真正执行用户定义的map或者reduce任务的入口,通过TaskUmbilicalProtocol向tasktracker上报执行进度。

  2. 在MapTask的run中执行runMapper方法来调用mapper定义的方法。

  3. 在runNewMapper方法中构造mapper实例和mapper执行的配置信息。并执行mapper.run方法来调用到用户定义的mapper的方法。

  4. mapper的run方法中,从输入数据中逐一取出调用map方法来处理每一条数据

  5. mapper的map方法是真正用户定义的处理数据的类。也是用户唯一需要定义的方法。

Read more →

【hadoop代码笔记】hadoop作业提交之TaskTracker 启动task

一、概要描述

上篇博文描 述了TaskTracker从Jobtracker如何从JobTracker获取到要执行的Task。在从JobTracker获取到 LaunchTaskAction后,执行addToTaskQueue方法来把要执行的Task加入到queue。在本篇博文中,我们来关注下该方法 后,TaskTracker怎么来处理这些Task。

实际上,TaskTracker初始化时,会初始化并启动两个TaskLauncher类型的线程,mapLauncher,reduceLauncher。在TaskTracker从JobTracher获取到任务后,对应的会把任务添加到两个 TaskLauncher的Queue中,其实是TaskLauncher维护的一个列表List tasksToLaunch。

TaskLauncher线程一直会定时检查TaskTracher上面有slot开业运行新的Task,则启动 Task。在这个过程中,先把task运行需要的文件解压到本地,并创建根据Task类型(Map或者Reduce)创建一个TaskRunner线程, 在TaskRunner中JvmManager调用JvmManagerForType、JvmRunner来启动一个java进程来执行Map或Reduce任务。

本文只是介绍到启动一个java进程,至于是什么样的java进程,对于maptask和reducetask分别是怎么执行的,在后面的child启动maptask,和child启动reducetask 会比较详细的介绍。

二、 流程描述

  1. tasktracker的offerService方法获取到要执行的task后调用addToTaskQueue方法,其实是调用taskrunner的addToTaskQueue方法
  2. TaskLauncher内部维护了一个List tasksToLaunch,只是把task加入到该
  3. taskLauncher是一个线程,在其run方法中从tasksToLaunch集合中取出task来执行,调用Tasktracker的startNewTask方法启动task。
  4. startNewtask方法中调用localizeJob方法把job相关的配置信息和要运行的jar拷贝到tasktracker本地,然后调用taskInProgress的launchTask方法来启动task。
  5. TaskInProgress的launchTask方法先调用localizeTask(task把task相关的配置信息获取到本地。然后创建一个TaskRunner线程来启动task。
  6. 在TaskRunner的run方法中构建一个java命令的执行的条件,包括引用类,执行目录等,入口类是Child。然后调用JvmManager 的launchJvm方法来调用。
  7. JvmManager 进而调用 JvmManagerForType的reapJvm,和spawnNewJvm 方法,发起调用.
  8. 在JvmManagerForType的spawnNewJvm 方法中创建了一个JvmRunner线程类执行调用.
  9. JvmRunner线程的run反复调用runChild方法来执行 一个命令行的调用。
Read more →

【hadoop代码笔记】hadoop作业提交之Job初始化

一、概要描述 在上一篇博文中主要描述了JobTracker和其几个服务(或功能)模块的接收到提交的job后的一些处理。其中很重要的一部分就作业的初始化。因为代码片段图的表达问题,本应该在上篇描述的内容,分开在本篇描述。 二、 流程描述 代码也接上文的最后一个方法EagerTaskInitializationListener的 jobAdded方法把JobInProgress类型的job放到List类型的 jobInitQueue中,有个单独的线程会对新加入的每个job进行初始化,其初始化调用的方法就是Job Read more →

【hadoop代码笔记】hadoop作业提交之TaskTracker获取Task

一、概要描述 在上上一篇博文和上一篇博文中 分别描述了jobTracker和其服务(功能)模块初始化完成后,接收JobClient提交的作业,并进行初始化。本文着重描 述,JobTracker如何选择作业的Task分发到TaskTracker。本文只是描述一个TaskTracker如何从JobTracker获取 Task任务。Task任务在TaskTracker如何执行将在后面博文中描述。 二、 流程描述 TaskTracker在run中调用offerService()方法一直死循环的去连接Jobtracke Read more →

【hadoop代码笔记hadoop作业提交之JobTracker等相关功能模块初始化

一、概要描述 本文重点描述在JobTracker一端接收作业、调度作业等几个模块的初始化工作。想过模块的介绍会在其他文章中比较详细的描述。受理作业提交在下一篇文章中会进行描述。 为了表达的尽可能清晰一点只是摘录出影响逻辑流转的主要代码。重点强调直接的协作调用,每个内部完成的逻辑(一直可以更细的说明、有些细节可能自己也理解并不深刻:-()在后续会描述。 主要包括JobTracker、TaskScheduler(此处以FairScheduler为例)、JobInProgressListener(以用的较多 Read more →

【hadoop代码笔记】hadoop作业提交之JobTracker接收作业提交

一、概要描述 在上一篇博文中主要描述了JobTracker接收作业的几个服务(或功能)模块的初始化过程。本节将介绍这些服务(或功能)是如何接收到提交的job。本来作业的初始化也可以在本节内描述,但是涉及到JobInProgress的初始化过程放在一张图上太拥挤,就分开到下一篇文章中描述。 二、 流程描述 JobClient通过RPC的方式向JobTracker提交作业; 调用JobTracker的submitJob方法。该方法是JobTracker向外提供的供调用的提交作业的接口。 submit方法中调用J Read more →

【hadoop代码笔记】Hadoop作业提交中EagerTaskInitializationListener的作用

一、概述 继承自JobInProgressListener,实现了jobAdded,jobRemoved,jobUpdated方法。哦,不能说实现,应该说继承,JobInProgressListener居然是个抽象类,看着怎么这样的listener也应该是个interface。 在该listener被注册后,就响应jobAdded,jobRemoved,jobUpdated动作。在EagerTaskInitializationListener中,响应这三种动作来维护内部的一个job列表(Listjo Read more →

【hadoop代码笔记】通过JobClient对Jobtracker的调用详细了解Hadoop RPC

Hadoop的各个服务间,客户端和服务间的交互采用RPC方式。关于这种机制介绍的资源很多,也不难理解,这里不做背景介绍。只是尝试从Jobclient向JobTracker提交作业这个最简单的客户端服务器交互的代码中,去跟踪和了解下RPC是怎么被使用的。不同于准备发表博客时搜索的几篇博文,试图通过一种具体的场景来介绍,属于比较初级。其他DataNode和Namenode之间,Tasktracker和JobTracker之间的交互基本也都一样。为了引用的代码篇幅尽可能少,忽略了代码中写日志(包括Me Read more →