爱豆吧!

idouba@beta.

Sort源码实现比较Go&Java语言风格(3)

前面两部分分别描述了Go和java两种语言对sort的使用方式和源码实现。作为go初学者,最想做的是通过例子和源码来对新的语言有个理解。这部分就结合自己的理解整理下,可以看到,是非常主观。 4 语言语法比较 可以看到,两种语言的思路基本上是一样的,用户必须定义比较规则。在排序过程中都要考察集合长度,使用用户定义的比较规则,然后调整元素位置来达到排序的效果,对应go的interface要求的三个方法。但是还是有挺多不同。 首先从使用方式上看,go的规则(通过方法来体现)定义在集合上,并且定义了三个方法,分 Read more →

Sort源码实现比较Go&Java语言风格(2)

接上篇博文,工作中用到了go排序的功能,作为go新手,在参照着例子,并读了go的sort.go部分涉及的源码,了解了go中排序的细节和实现算法,这些在上篇博文中有介绍。作为一个java ZHONGDU*2的用户,不由自主的想到了java中对应实现的样子,在这里也非常简要的贴出来,描述下java中排序的用法,和java源码中对应部分的实现,比较好奇java是和go一样,也用的时候快速排序吗? 3 Java 排序源码解析 主要代码Looks like this: 3.1 使用例子 public class Person implements Comparable<Person> { private String name; private int age; @Override public int compareTo(Person o) { return this.age - o.age; } public static void main(String Read more →

Sort源码实现比较Go&Java语言风格(1)

1 前言 刚开始拥抱go,非常新鲜!才使用没多久,属于没有经验,有一点感受的那种。具体也说不了特别清楚,就是觉得:简单,直接,灵活,方便。作为一个 java 重度中毒(ZHANGDU*2)用户,过程中还是习惯对照着思考,至少在这个阶段。也好,发现对照着想,似乎更容易融会贯通。 对资深的go程序员来说,应该都是非常基础基本的问题,但也挡不住咱这个小白要发表下感想。 第一篇文章首先结合最近做一个小feature时用到go中元素排序的功能,顺便了解下两种语言中排序功能的使用方式,各自源码中对排序功能的实现。当然最主要的 Read more →

戏(细)说Executor框架线程池任务执行全过程(下)

归档下发表于infoq.com 2015年6月的两篇文章。本来是一篇文章,经过Infoq编辑Alice Ding建议,拆分为<上>和<下>两部分。谢谢Alice对文章的细心校对,帮我发下了其中的很多问题。添加下infoq要求的声明:本文为InfoQ中文站特供稿件,首发地址为:http://www.infoq.com/cn/articles/executor-framework-thread-pool-task-execution-part-02 。如需转载,请与InfoQ中文站 Read more →

源码剖析AQS在几个同步工具类中的使用

1. 前言 AQS(AbstractQueuedSynchronizer)是 java.util.concurrent的基础。J.U.C中宣传的封装良好的好用的同步工具类Semaphore、CountDownLatch、ReentrantLock、ReentrantReadWriteLock、FutureTask等虽然各自都有不同特征,但是简单看一下源码,每个类内部都包含一个如下的内部类定义: 同时每个类内部都包含有这样一个属性,连属性名都一样!注释已经暗示了,该类的同步机制正是通过这个AQS的子类来完成 Read more →

Java NIO 要点总结

来自Jenkov.com的比较完整但是足够brief的一个系列:Java NIO Tutorial,介绍了NIO的主要机制和其中几个重要对象的作用和工作。 1. 三个对象 NIO核心的三个对象: Channels Buffers Selectors 简单讲三个对象:Channel 像IO的流,Buffer就像名字一样,就是个缓存。 数据可以从Channel读到Buffer中,也可以从Buffer 写到Channel中。IO是面向流的,连接到一个源或者目标(对应于输入流或者输出流),如Java IO Overview中说明,比较典型的数据源和目标类型有:Files、Pipe Read more →

【hadoop代码笔记】Mapreduce shuffle过程之Map输出过程

一、概要描述

shuffle是MapReduce的一个核心过程,因此没有在前面的MapReduce作业提交的过程中描述,而是单独拿出来比较详细的描述。 根据官方的流程图示如下:

本篇文章中只是想尝试从代码分析来说明在map端是如何将map的输出保存下来等待reduce来取。 在执行每个map task时,无论map方法中执行什么逻辑,最终都是要把输出写到磁盘上。如果没有reduce阶段,则直接输出到hdfs上,如果有有reduce作业,则每个map方法的输出在写磁盘前线在内存中缓存。每个map task都有一个环状的内存缓冲区,存储着map的输出结果,默认100m,在每次当缓冲区快满的时候由一个独立的线程将缓冲区的数据以一个溢出文件的方式存放到磁盘,当整个map task结束后再对磁盘中这个map task产生的所有溢出文件做合并,被合并成已分区且已排序的输出文件。然后等待reduce task来拉数据。

二、 流程描述

  1. 在child进程调用到runNewMapper时,会设置output为NewOutputCollector,来负责map的输出。
  2. 在map方法的最后,不管经过什么逻辑的map处理,最终一般都要调用到TaskInputOutputContext的write方法,进而调用到设置的output即NewOutputCollector的write方法
  3. NewOutputCollector其实只是对MapOutputBuffer的一个封装,其write方法调用的是MapOutputBuffer的collect方法。
  4. MapOutputBuffer的collect方法中把key和value序列化后存储在一个环形缓存中,如果缓存满了则会调用startspill方法设置信号量,使得一个独立的线程SpillThread可以对缓存中的数据进行处理。
  5. SpillThread线程的run方法中调用sortAndSpill方法对缓存中的数据进行排序后写溢出文件。
  6. 当map输出完成后,会调用output的close方法。
  7. 在close方法中调用flush方法,对剩余的缓存进行处理,最后调用mergeParts方法,将前面过程的多个溢出文件合并为一个。
Read more →

【hadoop代码笔记】hadoop作业提交之汇总

一、概述 在本篇博文中,试图通过代码了解hadoop job执行的整个流程。即用户提交的mapreduce的jar文件、输入提交到hadoop的集群,并在集群中运行。重点在代码的角度描述整个流程,有些细节描述的并不那么详细。 汇总的代码流程图附件:hadoop_mapreduce_jobsubmit 二、主要流程 Jobclient通过RPC方式调用到jobtracker的submitJob方法提交作业,包括作业的jar、分片和作业描述。 JobTracker的submitJob方法吧job加入到内存队列中 Read more →

【hadoop代码笔记】hadoop作业提交之Child启动map任务

一、概要描述

上篇博文描述了TaskTracker启动一个独立的java进程来执行Map或Reduce任务。在本篇和下篇博文中我们会关注启动的那个入口是org.apache.hadoop.mapred.Child的这个Java进程是如何执行用户定义的map或Reduce任务的。

上篇文章,TaskRunner线程执行中,会构造一个_java –D** Child address port tasked_这 样第一个java命令,单独启动一个java进程。在Child的main函数中通过TaskUmbilicalProtocol协议,从 TaskTracker获得需要执行的Task,并调用Task的run方法来执行,而Task的run方法会通过java反射机制构造 Mapper,InputFormat,mapperContext,然后调用构造的mapper的run方法执行mapper操作。

二、 流程描述

1.Child类根据前面输入的三个参数,即tasktracher的地址、端口、taskid。通过TaskUmbilicalProtocol协议,从TaskTracker获得需要执行的Task,在Child的main函数中调用执行。

  1. 在Chilld中,执行Task的run方法。Task 的run方法。是真正执行用户定义的map或者reduce任务的入口,通过TaskUmbilicalProtocol向tasktracker上报执行进度。

  2. 在MapTask的run中执行runMapper方法来调用mapper定义的方法。

  3. 在runNewMapper方法中构造mapper实例和mapper执行的配置信息。并执行mapper.run方法来调用到用户定义的mapper的方法。

  4. mapper的run方法中,从输入数据中逐一取出调用map方法来处理每一条数据

  5. mapper的map方法是真正用户定义的处理数据的类。也是用户唯一需要定义的方法。

Read more →

【hadoop代码笔记】hadoop作业提交之TaskTracker 启动task

一、概要描述

上篇博文描 述了TaskTracker从Jobtracker如何从JobTracker获取到要执行的Task。在从JobTracker获取到 LaunchTaskAction后,执行addToTaskQueue方法来把要执行的Task加入到queue。在本篇博文中,我们来关注下该方法 后,TaskTracker怎么来处理这些Task。

实际上,TaskTracker初始化时,会初始化并启动两个TaskLauncher类型的线程,mapLauncher,reduceLauncher。在TaskTracker从JobTracher获取到任务后,对应的会把任务添加到两个 TaskLauncher的Queue中,其实是TaskLauncher维护的一个列表List tasksToLaunch。

TaskLauncher线程一直会定时检查TaskTracher上面有slot开业运行新的Task,则启动 Task。在这个过程中,先把task运行需要的文件解压到本地,并创建根据Task类型(Map或者Reduce)创建一个TaskRunner线程, 在TaskRunner中JvmManager调用JvmManagerForType、JvmRunner来启动一个java进程来执行Map或Reduce任务。

本文只是介绍到启动一个java进程,至于是什么样的java进程,对于maptask和reducetask分别是怎么执行的,在后面的child启动maptask,和child启动reducetask 会比较详细的介绍。

二、 流程描述

  1. tasktracker的offerService方法获取到要执行的task后调用addToTaskQueue方法,其实是调用taskrunner的addToTaskQueue方法
  2. TaskLauncher内部维护了一个List tasksToLaunch,只是把task加入到该
  3. taskLauncher是一个线程,在其run方法中从tasksToLaunch集合中取出task来执行,调用Tasktracker的startNewTask方法启动task。
  4. startNewtask方法中调用localizeJob方法把job相关的配置信息和要运行的jar拷贝到tasktracker本地,然后调用taskInProgress的launchTask方法来启动task。
  5. TaskInProgress的launchTask方法先调用localizeTask(task把task相关的配置信息获取到本地。然后创建一个TaskRunner线程来启动task。
  6. 在TaskRunner的run方法中构建一个java命令的执行的条件,包括引用类,执行目录等,入口类是Child。然后调用JvmManager 的launchJvm方法来调用。
  7. JvmManager 进而调用 JvmManagerForType的reapJvm,和spawnNewJvm 方法,发起调用.
  8. 在JvmManagerForType的spawnNewJvm 方法中创建了一个JvmRunner线程类执行调用.
  9. JvmRunner线程的run反复调用runChild方法来执行 一个命令行的调用。
Read more →