Hadoop-Yarn-NodeManager都做了什么

这篇具有很好参考价值的文章主要介绍了Hadoop-Yarn-NodeManager都做了什么。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

 一、源码下载

下面是hadoop官方源码下载地址,我下载的是hadoop-3.2.4,那就一起来看下吧

Index of /dist/hadoop/core

二、上下文

在我的<Hadoop-Yarn-启动篇>博客中已经简要的分析了NodeManager的启动过程,NodeManager是管理整个集群资源的直接角色,因此我们有必要细致的分析下NodeManager都做了什么,一般Hadoop源码中各个角色启动时都是在serviceInit()方法中初始化该角色所需要的服务并添加到服务列表,在serviceStart()中依次启动各个服务,下面我们就依次来分析下NodeManager中所有的服务已经每个服务都做了什么。

三、NodeManager服务列表

在我的<Hadoop-Yarn-启动篇>博客中的java部分>NodeManager>serviceInit中有详细的源码,下面我们依次来列举下:

1、NMLeveldbStateStoreService

2、DeletionService

3、NodeHealthCheckerService

4、NodeLabelsProvider

5、NodeAttributesProvider

6、NodeResourceMonitorImpl

7、ContainerManagerImpl

8、NMLogAggregationStatusTracker

9、WebServer

10、AsyncDispatcher

11、JvmPauseMonitor

12、NodeStatusUpdater

四、NodeManager服务源码分析

1、NMLeveldbStateStoreService

//父类
public abstract class NMStateStoreService extends AbstractService {
    //......省略......
    //初始化状态存储
    public void serviceInit(Configuration conf) throws IOException {
        initStorage(conf);
    }

    //启动状态存储以供使用
    public void serviceStart() throws IOException {
        startStorage();
    }
    //......省略......
}

//子类
public class NMLeveldbStateStoreService extends NMStateStoreService {
    //......省略......
    protected void initStorage(Configuration conf)
      throws IOException {
        //获取 yarn.nodemanager.recovery.dir 值 并依次作为根目录进行创建
        //默认值为 ${hadoop.tmp.dir}/yarn-nm-recovery 
        //启用恢复时,节点管理器将在其中存储状态的本地文件系统目录。
        //并将其中的内容封装成 LevelDB 的形式
        //LevelDB 是一种为分布式而生的键-值数据库 Chrome 浏览器中涉及的 IndexedDB(基于 HTML5 API 的数据库),就是基于 LevelDB 构建而成的
        db = openDatabase(conf);
        //校验版本
        checkVersion();
        //开始准备计时
        //获取 yarn.nodemanager.recovery.compaction-interval-secs 的值,默认值 3600
        //换算成毫秒为 3600 * 1000
        //创建一个新计时器,该计时器的关联线程具有指定的名称,并且可以指定为作为守护进程运行。
        //开始按照计时器调度任务
        startCompactionTimer(conf);
    }

    protected void startStorage() throws IOException {
    //假设我们开始时是健康的
        isHealthy = true;
    }

    //......省略......
}

2、DeletionService

public class DeletionService extends AbstractService {

    //ContainerExecutor : 用于在底层操作系统上启动容器的机制的抽象。所有执行器实现都必须扩展ContainerExecutor。
    //
    public DeletionService(ContainerExecutor containerExecutor,
      NMStateStoreService stateStore) {
        super(DeletionService.class.getName());
        this.containerExecutor = containerExecutor;
        this.debugDelay = 0;
        this.stateStore = stateStore;
    }

    protected void serviceInit(Configuration conf) throws Exception {
    ThreadFactory tf = new ThreadFactoryBuilder()
        .setNameFormat("DeletionService #%d")
        .build();
        if (conf != null) {
          //线程池:ScheduledThreadPoolExecutor的扩展,提供附加功能,主要是提供了周期任务和延迟任务相关的操作
          //获取 yarn.nodemanager.delete.thread-count 的值 默认值 4 (清理中使用的线程数)
          sched = new HadoopScheduledThreadPoolExecutor(
          conf.getInt(YarnConfiguration.NM_DELETE_THREAD_COUNT,
              YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT), tf);
          //删除资源之前延迟,以便于调试NM问题
          //获取 yarn.nodemanager.delete.debug-delay-sec 的值 默认为 0
          debugDelay = conf.getInt(YarnConfiguration.DEBUG_NM_DELETE_DELAY_SEC, 0);
        } else {
          sched = new HadoopScheduledThreadPoolExecutor(
              YarnConfiguration.DEFAULT_NM_DELETE_THREAD_COUNT, tf);
        }
        //设置是否执行现有延迟任务的策略,即使此执行器已关闭。在这种情况下,这些任务只会在shutdownNow时终止,或者在已经关闭时将策略设置为false后终止。默认情况下,此值为true。
        sched.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
        //设置线程在终止之前可以保持空闲的时间限制。如果池中当前的线程数超过核心线程数,则在等待此时间而不处理任务后,多余的线程将被终止。这将覆盖构造函数中设置的任何值。
        sched.setKeepAliveTime(60L, SECONDS);
        if (stateStore.canRecover()) {
              //恢复操作
              recover(stateStore.loadDeletionServiceState());
        }
        super.serviceInit(conf);
  }
)

3、NodeHealthCheckerService

NodeManager添加服务代码

  public static NodeHealthScriptRunner getNodeHealthScriptRunner(Configuration conf) {
    //运行状况检查脚本
    //获取 yarn.nodemanager.health-checker.script.path 的值 
    String nodeHealthScript = 
        conf.get(YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_PATH);
    if(!NodeHealthScriptRunner.shouldRun(nodeHealthScript)) {
      LOG.info("Node Manager health check script is not available "
          + "or doesn't have execute permission, so not "
          + "starting the node health script runner.");
      return null;
    }
    //运行脚本的频率
    //yarn.nodemanager.health-checker.interval-ms 默认值 10 * 60 * 1000 即10分钟
    long nmCheckintervalTime = conf.getLong(
        YarnConfiguration.NM_HEALTH_CHECK_INTERVAL_MS,
        YarnConfiguration.DEFAULT_NM_HEALTH_CHECK_INTERVAL_MS);
    //脚本超时期
    //yarn.nodemanager.health-checker.script.timeout-ms 默认值 2 * 10 * 60 * 1000 即20分钟
    long scriptTimeout = conf.getLong(
        YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS,
        YarnConfiguration.DEFAULT_NM_HEALTH_CHECK_SCRIPT_TIMEOUT_MS);
    //脚本参数
    //yarn.nodemanager.health-checker.script.opts 
    String[] scriptArgs = conf.getStrings(
        YarnConfiguration.NM_HEALTH_CHECK_SCRIPT_OPTS, new String[] {});
    //是否在NM启动前运行节点运行状况脚本
    //yarn.nodemanager.health-checker.run-before-startup 默认值 false
    boolean runBeforeStartup = conf.getBoolean(
        YarnConfiguration.NM_HEALTH_CHECK_RUN_BEFORE_STARTUP,
        YarnConfiguration.DEFAULT_NM_HEALTH_CHECK_RUN_BEFORE_STARTUP);
    //NodeHealthScriptRunner 提供使用配置的节点运行状况脚本检查节点运行状况并向要求运行状况检查器报告的服务报告的功能的类。
    return new NodeHealthScriptRunner(nodeHealthScript,
        nmCheckintervalTime, scriptTimeout, scriptArgs, runBeforeStartup);
  }

  nodeHealthChecker =
        new NodeHealthCheckerService(
            getNodeHealthScriptRunner(conf), dirsHandler);
  addService(nodeHealthChecker);

NodeHealthScriptRunner负责运行状况脚本检查并向提供服务报告

public class NodeHealthScriptRunner extends AbstractService {

    //用于初始化脚本路径和间隔时间的值
    protected void serviceInit(Configuration conf) throws Exception {
        super.serviceInit(conf);
    }

    protected void serviceStart() throws Exception {
        nodeHealthScriptScheduler = new Timer("NodeHealthMonitor-Timer", true);
        long delay = 0;
        if (runBeforeStartup) {
          //立即启动计时器任务,等待其返回
          //构造器中赋值,NodeHealthMonitorExecutor是它的内部类,该类用于定期执行节点运行状况脚本
          //private TimerTask timer = new NodeHealthMonitorExecutor(scriptArgs);
          timer.run();
          delay = intervalTime;
        }

        //将脚本设置为每隔一段时间定期运行
        nodeHealthScriptScheduler.scheduleAtFixedRate(timer, delay,intervalTime);
        super.serviceStart();
  }

  private class NodeHealthMonitorExecutor extends TimerTask {

    String exceptionStackTrace = "";

    public NodeHealthMonitorExecutor(String[] args) {
      ArrayList<String> execScript = new ArrayList<String>();
      execScript.add(nodeHealthScript);
      if (args != null) {
        execScript.addAll(Arrays.asList(args));
      }
      //ShellCommandExecutor是一个简单的shell命令执行器
      //如果命令的输出不需要显式解析,并且命令、工作目录和环境保持不变,则应使用ShellCommandExecutor。命令的输出按原样存储,并且预计输出较小。
      shexec = new ShellCommandExecutor(execScript
          .toArray(new String[execScript.size()]), null, null, scriptTimeout);
    }

    @Override
    public void run() {
      HealthCheckerExitStatus status = HealthCheckerExitStatus.SUCCESS;
      try {
        //执行命令
        shexec.execute();
      } catch (ExitCodeException e) {
        // 忽略脚本的退出代码
        status = HealthCheckerExitStatus.FAILED_WITH_EXIT_CODE;
        //在Windows上,我们不会遇到stdout缓冲读取器为超时事件抛出的流关闭IOException。
        if (Shell.WINDOWS && shexec.isTimedOut()) {
          status = HealthCheckerExitStatus.TIMED_OUT;
        }
      } catch (Exception e) {
        LOG.warn("Caught exception : " + e.getMessage());
        if (!shexec.isTimedOut()) {
          status = HealthCheckerExitStatus.FAILED_WITH_EXCEPTION;
        } else {
          status = HealthCheckerExitStatus.TIMED_OUT;
        }
        exceptionStackTrace = StringUtils.stringifyException(e);
      } finally {
        if (status == HealthCheckerExitStatus.SUCCESS) {
          if (hasErrors(shexec.getOutput())) {
            status = HealthCheckerExitStatus.FAILED;
          }
        }
        reportHealthStatus(status);
      }
    }

    /**
     * 用于解析节点运行状况监视器的输出并发送到报告地址
     * 
     * 超时的脚本或导致IOException输出的脚本被忽略
     * 
     * 如果有以下一种情况,节点被标记为不正常
     *     1、节点运行状况脚本超时
     *     2、节点运行状况脚本输出有一行以ERROR开头
     *     3、执行脚本时引发异常
     * 
     * 如果脚本抛出{@link IOException}或{@link ExitCodeException},输出将被忽略,
     * 节点将保持健康,因为脚本可能存在语法错误。
     * @param status
     */
    void reportHealthStatus(HealthCheckerExitStatus status) {
      long now = System.currentTimeMillis();
      switch (status) {
      case SUCCESS:
        setHealthStatus(true, "", now);
        break;
      case TIMED_OUT:
        setHealthStatus(false, NODE_HEALTH_SCRIPT_TIMED_OUT_MSG);
        break;
      case FAILED_WITH_EXCEPTION:
        setHealthStatus(false, exceptionStackTrace);
        break;
      case FAILED_WITH_EXIT_CODE:
        // see Javadoc above - we don't report bad health intentionally
        setHealthStatus(true, "", now);
        break;
      case FAILED:
        setHealthStatus(false, shexec.getOutput());
        break;
      }
    }

    /**
     * 检查输出字符串是否有以ERROR开头的行
     */
    private boolean hasErrors(String output) {
      String[] splits = output.split("\n");
      for (String split : splits) {
        if (split.startsWith(ERROR_PATTERN)) {
          return true;
        }
      }
      return false;
    }
  }


}

4、NodeLabelsProvider

默认不开启启用分布式节点标签配置

protected NodeLabelsProvider createNodeLabelsProvider(Configuration conf)
      throws IOException {
    NodeLabelsProvider provider = null;
    //获取  yarn.nodemanager.node-labels.provider 的值 默认为空
    //官方解释:
    //当在RM中将“yarn.node labels.configuration type”配置为“distributed”时,管理员可以通过配置此参数在NM中配置节点标签的提供程序。管理员可以配置“config”、“script”或提供程序的类名。配置的类需要扩展org.apache.hadoop.yarn.server.nodemanager.nodelabels。节点标签提供者。如果配置了“config”,则使用“ConfigurationNodeLabelsProvider”;如果配置“script”,则将使用“ScriptNodeLabelsProvider”。
    String providerString =
        conf.get(YarnConfiguration.NM_NODE_LABELS_PROVIDER_CONFIG, null);
    if (providerString == null || providerString.trim().length() == 0) {
      //似乎未启用分布式节点标签配置
      return provider;
    }
    switch (providerString.trim().toLowerCase()) {
    case YarnConfiguration.CONFIG_NODE_DESCRIPTOR_PROVIDER:
      provider = new ConfigurationNodeLabelsProvider();
      break;
    case YarnConfiguration.SCRIPT_NODE_DESCRIPTOR_PROVIDER:
      provider = new ScriptBasedNodeLabelsProvider();
      break;
    default:
      try {
        Class<? extends NodeLabelsProvider> labelsProviderClass =
            conf.getClass(YarnConfiguration.NM_NODE_LABELS_PROVIDER_CONFIG,
                null, NodeLabelsProvider.class);
        provider = labelsProviderClass.newInstance();
      } catch (InstantiationException | IllegalAccessException
          | RuntimeException e) {
        LOG.error("Failed to create NodeLabelsProvider based on Configuration",
            e);
        throw new IOException(
            "Failed to create NodeLabelsProvider : " + e.getMessage(), e);
      }
    }
    if (LOG.isDebugEnabled()) {
      LOG.debug("Distributed Node Labels is enabled"
          + " with provider class as : " + provider.getClass().toString());
    }
    return provider;
  }


nodeLabelsProvider = createNodeLabelsProvider(conf);
//默认不开启启用分布式节点标签配置
if (nodeLabelsProvider != null) {
   addIfService(nodeLabelsProvider);
   nodeStatusUpdater.setNodeLabelsProvider(nodeLabelsProvider);
}

5、NodeAttributesProvider

默认不开启启用分布式节点属性配置

protected NodeAttributesProvider createNodeAttributesProvider(
      Configuration conf) throws IOException {
    NodeAttributesProvider attributesProvider = null;
    //获取 yarn.nodemanager.node-attributes.provider 的值,默认空
    //官方解释:
    //此属性确定节点管理器将插入哪个提供程序来收集节点属性。管理员可以配置“config”、“script”或提供程序的类名。配置的类需要扩展org.apache.hadoop.yarn.server.nodemanager.nodelabels。节点属性提供者。如果配置了“config”,则使用“ConfigurationNodeLabelsProvider”;如果配置“script”,则将使用“ScriptBasedNodeAttributesProvider”。
    String providerString =
        conf.get(YarnConfiguration.NM_NODE_ATTRIBUTES_PROVIDER_CONFIG, null);
    if (providerString == null || providerString.trim().length() == 0) {
      return attributesProvider;
    }
    switch (providerString.trim().toLowerCase()) {
    case YarnConfiguration.CONFIG_NODE_DESCRIPTOR_PROVIDER:
      attributesProvider = new ConfigurationNodeAttributesProvider();
      break;
    case YarnConfiguration.SCRIPT_NODE_DESCRIPTOR_PROVIDER:
      attributesProvider = new ScriptBasedNodeAttributesProvider();
      break;
    default:
      try {
        Class<? extends NodeAttributesProvider> labelsProviderClass =
            conf.getClass(YarnConfiguration.NM_NODE_ATTRIBUTES_PROVIDER_CONFIG,
                null, NodeAttributesProvider.class);
        attributesProvider = labelsProviderClass.newInstance();
      } catch (InstantiationException | IllegalAccessException
          | RuntimeException e) {
        LOG.error("Failed to create NodeAttributesProvider"
                + " based on Configuration", e);
        throw new IOException(
            "Failed to create NodeAttributesProvider : "
                + e.getMessage(), e);
      }
    }
    if (LOG.isDebugEnabled()) {
      LOG.debug("Distributed Node Attributes is enabled"
          + " with provider class as : "
          + attributesProvider.getClass().toString());
    }
    return attributesProvider;
  }

nodeAttributesProvider = createNodeAttributesProvider(conf);
if (nodeAttributesProvider != null) {
    addIfService(nodeAttributesProvider);
    nodeStatusUpdater.setNodeAttributesProvider(nodeAttributesProvider);
}

6、NodeResourceMonitorImpl

nodeResourceMonitor = createNodeResourceMonitor();
addService(nodeResourceMonitor);
((NMContext) context).setNodeResourceMonitor(nodeResourceMonitor);

protected NodeResourceMonitor createNodeResourceMonitor() {
   //NodeResourceMonitorImpl 是 节点资源监视器的实现。它定期跟踪节点的资源利用情况,并将其报告给NM
   return new NodeResourceMonitorImpl(context);
}

NodeResourceMonitorImpl

public class NodeResourceMonitorImpl extends AbstractService implements
    NodeResourceMonitor {

  //初始化节点资源监视器
  public NodeResourceMonitorImpl(Context context) {
    super(NodeResourceMonitorImpl.class.getName());
    this.nmContext = context;
    this.monitoringThread = new MonitoringThread();
  }

  //使用正确的参数初始化服务
  protected void serviceInit(Configuration conf) throws Exception {
    //获取 yarn.nodemanager.resource-monitor.interval-ms 的值 默认 3000
    //官方解释:监视节点和容器的频率。如果为0或为负数,则禁用监视
    this.monitoringInterval =
        conf.getLong(YarnConfiguration.NM_RESOURCE_MON_INTERVAL_MS,
            YarnConfiguration.DEFAULT_NM_RESOURCE_MON_INTERVAL_MS);

    //在节点管理器中为节点资源监视器创建ResourceCalculatorPlugin并进行配置。如果未配置插件,此方法将尝试返回可用于此系统的内存计算器插件
    //LINUX 为 SysInfoLinux() 下一篇博客我们重点详细看下它
    //WINDOWS 为 SysInfoWindows()
    //其他操作系统不支持
    this.resourceCalculatorPlugin =
        ResourceCalculatorPlugin.getNodeResourceMonitorPlugin(conf);

    LOG.info(" Using ResourceCalculatorPlugin : "
        + this.resourceCalculatorPlugin);
  }

  //启动执行节点资源利用率监视的线程
  protected void serviceStart() throws Exception {
    if (this.isEnabled()) {
      //构造器中已经初始化过了,为new MonitoringThread().start()
      this.monitoringThread.start();
    }
    super.serviceStart();
  }
}

MonitoringThread

监视此节点的资源利用率的线程

private class MonitoringThread extends Thread {
    /**
     * 初始化节点资源监视线程
     */
    public MonitoringThread() {
      super("Node Resource Monitor");
      this.setDaemon(true);
    }

    /**
     * 定期监视节点的资源利用率.
     */
    @Override
    public void run() {
      while (true) {
        // 获取节点利用率并将其保存为运行状况 以下指标数据都是通过SysInfoLinux或者SysInfoWindows来获取的
        //系统中的物理内存的总大小 - 系统中可用物理内存总大小
        long pmem = resourceCalculatorPlugin.getPhysicalMemorySize() -
            resourceCalculatorPlugin.getAvailablePhysicalMemorySize();
        //系统中的虚拟内存的总大小 - 系统中可用虚拟内存总大小
        long vmem =
            resourceCalculatorPlugin.getVirtualMemorySize()
                - resourceCalculatorPlugin.getAvailableVirtualMemorySize();
        //获取已用的虚拟核数
        float vcores = resourceCalculatorPlugin.getNumVCoresUsed();
        //ResourceUtilization对集群中一组计算机资源的利用率进行建模
        nodeUtilization =
            ResourceUtilization.newInstance(
                (int) (pmem >> 20), // B -> MB
                (int) (vmem >> 20), // B -> MB
                vcores); // 已使用的虚拟核心

        // 将节点利用率指标发布到 NodeManager 指标系统
        NodeManagerMetrics nmMetrics = nmContext.getNodeManagerMetrics();
        if (nmMetrics != null) {
          nmMetrics.setNodeUsedMemGB(nodeUtilization.getPhysicalMemory());
          nmMetrics.setNodeUsedVMemGB(nodeUtilization.getVirtualMemory());
          nmMetrics.setNodeCpuUtilization(nodeUtilization.getCPU());
        }

        try {
          Thread.sleep(monitoringInterval);
        } catch (InterruptedException e) {
          LOG.warn(NodeResourceMonitorImpl.class.getName()
              + " is interrupted. Exiting.");
          break;
        }
      }
    }
  }

7、ContainerManagerImpl

    containerManager =
        createContainerManager(context, exec, del, nodeStatusUpdater,
        this.aclsManager, dirsHandler);
    addService(containerManager);
    ((NMContext) context).setContainerManager(containerManager);


  protected ContainerManagerImpl createContainerManager(Context context,
      ContainerExecutor exec, DeletionService del,
      NodeStatusUpdater nodeStatusUpdater, ApplicationACLsManager aclsManager,
      LocalDirsHandlerService dirsHandler) {
    return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
        metrics, dirsHandler);
  }

ContainerManagerImpl

public class ContainerManagerImpl extends CompositeService implements
    ContainerManager {

  public ContainerManagerImpl(Context context, ContainerExecutor exec,
      DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
      NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) {
    super(ContainerManagerImpl.class.getName());
    this.context = context;
    this.dirsHandler = dirsHandler;

    // ContainerManager级调度程序
    //在单独的线程中调度。目前只有一个线程能做到这一点。每个事件类型类可能有多个通道,并且可以使用线程池来调度事件。
    dispatcher = new AsyncDispatcher("NM ContainerManager dispatcher");
    this.deletionService = deletionContext;
    this.metrics = metrics;

    //创建ResourceLocalizationService服务 本地资源跟踪服务
    rsrcLocalizationSrvc =
        createResourceLocalizationService(exec, deletionContext, context,
            metrics);
    addService(rsrcLocalizationSrvc);

    //要使用的容器启动器实现
    //获取 yarn.nodemanager.containers-launcher.class 的值 默认为 ContainersLauncher.class
    //只有在{@link ResourceLocalizationService}启动后才能启动此服务,因为它取决于在本地文件系统上创建系统目录
    //会处理这些事件
    //    LAUNCH_CONTAINER
    //    RELAUNCH_CONTAINER
    //    RECOVER_CONTAINER
    //    RECOVER_PAUSED_CONTAINER
    //    CLEANUP_CONTAINER
    //    CLEANUP_CONTAINER_FOR_REINIT
    //    SIGNAL_CONTAINER
    //    PAUSE_CONTAINER
    //    RESUME_CONTAINER
    containersLauncher = createContainersLauncher(context, exec);
    addService(containersLauncher);

    this.nodeStatusUpdater = nodeStatusUpdater;
    this.containerScheduler = createContainerScheduler(context);
    addService(containerScheduler);

    AuxiliaryLocalPathHandler auxiliaryLocalPathHandler =
        new AuxiliaryLocalPathHandlerImpl(dirsHandler);
    //配置服务启动
    auxiliaryServices = new AuxServices(auxiliaryLocalPathHandler,
        this.context, this.deletionService);
    auxiliaryServices.registerServiceListener(this);
    addService(auxiliaryServices);

    //如果启用了时间轴服务v.2并且启用了系统发布服务器,则初始化度量发布服务器
    Configuration conf = context.getConf();
    if (YarnConfiguration.timelineServiceV2Enabled(conf)) {
      if (YarnConfiguration.systemMetricsPublisherEnabled(conf)) {
        LOG.info("YARN system metrics publishing service is enabled");
        nmMetricsPublisher = createNMTimelinePublisher(context);
        context.setNMTimelinePublisher(nmMetricsPublisher);
      }
      this.timelineServiceV2Enabled = true;
    }
    this.containersMonitor = createContainersMonitor(exec);
    addService(this.containersMonitor);

    dispatcher.register(ContainerEventType.class,
        new ContainerEventDispatcher());
    dispatcher.register(ApplicationEventType.class,
        createApplicationEventDispatcher());
    dispatcher.register(LocalizationEventType.class,
        new LocalizationEventHandlerWrapper(rsrcLocalizationSrvc,
            nmMetricsPublisher));
    dispatcher.register(AuxServicesEventType.class, auxiliaryServices);
    dispatcher.register(ContainersMonitorEventType.class, containersMonitor);
    dispatcher.register(ContainersLauncherEventType.class, containersLauncher);
    dispatcher.register(ContainerSchedulerEventType.class, containerScheduler);

    addService(dispatcher);

    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    this.readLock = lock.readLock();
    this.writeLock = lock.writeLock();
  }


  public void serviceInit(Configuration conf) throws Exception {

    logHandler =
      createLogHandler(conf, this.context, this.deletionService);
    addIfService(logHandler);
    dispatcher.register(LogHandlerEventType.class, logHandler);
    
    //添加共享缓存上载服务(如果禁用共享缓存,它将不起任何作用)
    SharedCacheUploadService sharedCacheUploader =
        createSharedCacheUploaderService();
    addService(sharedCacheUploader);
    dispatcher.register(SharedCacheUploadEventType.class, sharedCacheUploader);

    createAMRMProxyService(conf);

    waitForContainersOnShutdownMillis =
        conf.getLong(YarnConfiguration.NM_SLEEP_DELAY_BEFORE_SIGKILL_MS,
            YarnConfiguration.DEFAULT_NM_SLEEP_DELAY_BEFORE_SIGKILL_MS) +
        conf.getLong(YarnConfiguration.NM_PROCESS_KILL_WAIT_MS,
            YarnConfiguration.DEFAULT_NM_PROCESS_KILL_WAIT_MS) +
        SHUTDOWN_CLEANUP_SLOP_MS;

    super.serviceInit(conf);
    recover();
  }

  protected void serviceStart() throws Exception {

    //在删除上下文中登记用户目录

    Configuration conf = getConfig();
    final InetSocketAddress initialAddress = conf.getSocketAddr(
        YarnConfiguration.NM_BIND_HOST,
        YarnConfiguration.NM_ADDRESS,
        YarnConfiguration.DEFAULT_NM_ADDRESS,
        YarnConfiguration.DEFAULT_NM_PORT);
    boolean usingEphemeralPort = (initialAddress.getPort() == 0);
    if (context.getNMStateStore().canRecover() && usingEphemeralPort) {
      throw new IllegalArgumentException("Cannot support recovery with an "
          + "ephemeral server port. Check the setting of "
          + YarnConfiguration.NM_ADDRESS);
    }
    // 如果正在恢复,则延迟打开RPC服务,直到资源和容器的恢复完成,否则在恢复过程中来自客户端的请求可能会干扰恢复过程。
    final boolean delayedRpcServerStart =
        context.getNMStateStore().canRecover();

    Configuration serverConf = new Configuration(conf);

    //始终强制它是基于令牌的
    serverConf.set(
      CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
      SaslRpcServer.AuthMethod.TOKEN.toString());
    
    YarnRPC rpc = YarnRPC.create(conf);

    server =
        rpc.getServer(ContainerManagementProtocol.class, this, initialAddress, 
            serverConf, this.context.getNMTokenSecretManager(),
            conf.getInt(YarnConfiguration.NM_CONTAINER_MGR_THREAD_COUNT, 
                YarnConfiguration.DEFAULT_NM_CONTAINER_MGR_THREAD_COUNT));
    
    // 启用服务授权?
    if (conf.getBoolean(
        CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHORIZATION, 
        false)) {
      refreshServiceAcls(conf, NMPolicyProvider.getInstance());
    }
    
    String bindHost = conf.get(YarnConfiguration.NM_BIND_HOST);
    String nmAddress = conf.getTrimmed(YarnConfiguration.NM_ADDRESS);
    String hostOverride = null;
    if (bindHost != null && !bindHost.isEmpty()
        && nmAddress != null && !nmAddress.isEmpty()) {
      //一个带有地址的绑定主机案例,为了支持用指定的地址重写查询主机名时发现的第一个主机名,请将指定的地址与服务器侦听的实际端口组合
      hostOverride = nmAddress.split(":")[0];
    }

    //启动 node ID
    InetSocketAddress connectAddress;
    if (delayedRpcServerStart) {
      connectAddress = NetUtils.getConnectAddress(initialAddress);
    } else {
      server.start();
      connectAddress = NetUtils.getConnectAddress(server);
    }
    NodeId nodeId = buildNodeId(connectAddress, hostOverride);
    ((NodeManager.NMContext)context).setNodeId(nodeId);
    this.context.getNMTokenSecretManager().setNodeId(nodeId);
    this.context.getContainerTokenSecretManager().setNodeId(nodeId);

    //开始剩下的服务
    super.serviceStart();

    if (delayedRpcServerStart) {
      waitForRecoveredContainers();
      server.start();

      //检查节点ID是否与之前公布的一样
      connectAddress = NetUtils.getConnectAddress(server);
      NodeId serverNode = buildNodeId(connectAddress, hostOverride);
      if (!serverNode.equals(nodeId)) {
        throw new IOException("Node mismatch after server started, expected '"
            + nodeId + "' but found '" + serverNode + "'");
      }
    }

    LOG.info("ContainerManager started at " + connectAddress);
    LOG.info("ContainerManager bound to " + initialAddress);
  }
}

8、NMLogAggregationStatusTracker

NMLogAggregationStatusTracker用于缓存已完成应用程序的日志聚合状态。它还将定期删除旧的缓存日志聚合状态。

    this.nmLogAggregationStatusTracker = createNMLogAggregationStatusTracker(
        context);
    addService(nmLogAggregationStatusTracker);
    ((NMContext)context).setNMLogAggregationStatusTracker(
        this.nmLogAggregationStatusTracker);


  private NMLogAggregationStatusTracker createNMLogAggregationStatusTracker(
      Context ctxt) {
    return new NMLogAggregationStatusTracker(ctxt);
  }
public class NMLogAggregationStatusTracker extends CompositeService {
  public NMLogAggregationStatusTracker(Context context) {
    super(NMLogAggregationStatusTracker.class.getName());
    this.nmContext = context;
    Configuration conf = context.getConf();
    //是否启用日志聚合
    //获取 yarn.log-aggregation-enable 的值 默认false
    //这也说明 disabled 默认值是 true
    if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
        YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
      disabled = true;
    }
    this.recoveryStatuses = new ConcurrentHashMap<>();
    //ReadWriteLock的一种实现,支持与ReentrantLock类似的语义
    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
    this.readLocker = lock.readLock();
    this.writeLocker = lock.writeLock();
    this.timer = new Timer();
    //获取 yarn.log-aggregation-status.time-out.ms 的值 默认值 10 * 60 * 1000
    //ResourceManager等待NodeManager报告其日志聚合状态的时间。如果从NodeManager报告日志聚合状态的等待时间超过配置的值,RM将报告此NodeManager的日志聚合状态为超时。
    //NodeManager中也将使用此配置来决定是否以及何时删除缓存的日志聚合状态。
    long configuredRollingInterval = conf.getLong(
        YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS,
        YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS);
    if (configuredRollingInterval <= 0) {
      this.rollingInterval = YarnConfiguration
          .DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS;
      LOG.warn("The configured log-aggregation-status.time-out.ms is "
          + configuredRollingInterval + " which should be larger than 0. "
          + "Using the default value:" + this.rollingInterval + " instead.");
    } else {
      this.rollingInterval = configuredRollingInterval;
    }
    LOG.info("the rolling interval seconds for the NodeManager Cached Log "
        + "aggregation status is " + (rollingInterval/1000));
  }

  @Override
  protected void serviceStart() throws Exception {
    if (disabled) {
      LOG.warn("Log Aggregation is disabled."
          + "So is the LogAggregationStatusTracker.");
    } else {
      //开启日志聚合才走这一步
      //LogAggregationStatusRoller 是内部类
      this.timer.scheduleAtFixedRate(new LogAggregationStatusRoller(),
          rollingInterval, rollingInterval);
    }
  }

  private class LogAggregationStatusRoller extends TimerTask {
    @Override
    public void run() {
      rollLogAggregationStatus();
    }
  }

  private void rollLogAggregationStatus() {
    //当我们调用rollLogAggregationStatus,基本上获取所有缓存的日志聚合状态,并删除超时期外的日志聚合状况时,我们应该阻止rollLogAg胶gationStatus调用以及pullCachedLogAggregateReports调用。所以,这里使用了writeLocker。
    this.writeLocker.lock();
    try {
      long currentTimeStamp = System.currentTimeMillis();
      LOG.info("Rolling over the cached log aggregation status.");
      Iterator<Entry<ApplicationId, AppLogAggregationStatusForRMRecovery>> it
          = recoveryStatuses.entrySet().iterator();
      while (it.hasNext()) {
        Entry<ApplicationId, AppLogAggregationStatusForRMRecovery> tracker =
            it.next();
        //应用已经完成了
        if (nmContext.getApplications().get(tracker.getKey()) == null) {
          if (currentTimeStamp - tracker.getValue().getLastModifiedTime()
              > rollingInterval) {
            it.remove();
          }
        }
      }
    } finally {
      this.writeLocker.unlock();
    }
  }
}

9、WebServer

    WebServer webServer = createWebServer(context, containerManager
        .getContainersMonitor(), this.aclsManager, dirsHandler);
    addService(webServer);
    ((NMContext) context).setWebServer(webServer);

  protected WebServer createWebServer(Context nmContext,
      ResourceView resourceView, ApplicationACLsManager aclsManager,
      LocalDirsHandlerService dirsHandler) {
    return new WebServer(nmContext, resourceView, aclsManager, dirsHandler);
  }
public class WebServer extends AbstractService {
  public WebServer(Context nmContext, ResourceView resourceView,
      ApplicationACLsManager aclsManager,
      LocalDirsHandlerService dirsHandler) {
    super(WebServer.class.getName());
    this.nmContext = nmContext;
    this.nmWebApp = new NMWebApp(resourceView, aclsManager, dirsHandler);
  }

  protected void serviceStart() throws Exception {
    Configuration conf = getConfig();
    //获取用于绑定的URL,其中可以指定绑定主机名以覆盖webAppURLWithoutScheme中的主机名。将使用webAppURLWithoutScheme中指定的端口。
    //NM_BIND_HOST NM的实际绑定地址
    //yarn.nodemanager.bind-host 的值
    //WebAppUtils.getNMWebAppURLWithoutScheme(conf)) 是获取 NM Webapp 地址
    //    获取 yarn.nodemanager.webapp.address 的值 默认值 0.0.0.0:8042
    String bindAddress = WebAppUtils.getWebAppBindURL(conf,
                          YarnConfiguration.NM_BIND_HOST,
                          WebAppUtils.getNMWebAppURLWithoutScheme(conf));
    //获取 CORS过滤器状态(启用/禁用)
    // yarn.nodemanager.webapp.cross-origin.enabled 默认 false
    //
    boolean enableCors = conf
        .getBoolean(YarnConfiguration.NM_WEBAPP_ENABLE_CORS_FILTER,
            YarnConfiguration.DEFAULT_NM_WEBAPP_ENABLE_CORS_FILTER);
    if (enableCors) {
      getConfig().setBoolean(HttpCrossOriginFilterInitializer.PREFIX
          + HttpCrossOriginFilterInitializer.ENABLED_SUFFIX, true);
    }

    //总是加载伪身份验证过滤器来解析URL中的“user.name”,以识别HTTP请求的用户。
    boolean hasHadoopAuthFilterInitializer = false;
    String filterInitializerConfKey = "hadoop.http.filter.initializers";
    Class<?>[] initializersClasses =
            conf.getClasses(filterInitializerConfKey);
    List<String> targets = new ArrayList<String>();
    if (initializersClasses != null) {
      for (Class<?> initializer : initializersClasses) {
        if (initializer.getName().equals(
            AuthenticationFilterInitializer.class.getName())) {
          hasHadoopAuthFilterInitializer = true;
          break;
        }
        targets.add(initializer.getName());
      }
    }
    if (!hasHadoopAuthFilterInitializer) {
      targets.add(AuthenticationFilterInitializer.class.getName());
      conf.set(filterInitializerConfKey, StringUtils.join(",", targets));
    }
    LOG.info("Instantiating NMWebApp at " + bindAddress);
    try {
      this.webApp =
          WebApps
            .$for("node", Context.class, this.nmContext, "ws")
            .at(bindAddress)
            .with(conf)
            .withHttpSpnegoPrincipalKey(
              YarnConfiguration.NM_WEBAPP_SPNEGO_USER_NAME_KEY)
            .withHttpSpnegoKeytabKey(
                YarnConfiguration.NM_WEBAPP_SPNEGO_KEYTAB_FILE_KEY)
              .withCSRFProtection(YarnConfiguration.NM_CSRF_PREFIX)
              .withXFSProtection(YarnConfiguration.NM_XFS_PREFIX)
            .start(this.nmWebApp);
      this.port = this.webApp.httpServer().getConnectorAddress(0).getPort();
    } catch (Exception e) {
      String msg = "NMWebapps failed to start.";
      LOG.error(msg, e);
      throw new YarnRuntimeException(msg, e);
    }
    super.serviceStart();
  }


  public static class NMWebApp extends WebApp implements YarnWebParams {

    private final ResourceView resourceView;
    private final ApplicationACLsManager aclsManager;
    private final LocalDirsHandlerService dirsHandler;

    public NMWebApp(ResourceView resourceView,
        ApplicationACLsManager aclsManager,
        LocalDirsHandlerService dirsHandler) {
      this.resourceView = resourceView;
      this.aclsManager = aclsManager;
      this.dirsHandler = dirsHandler;
    }

    @Override
    //设置资源绑定合访问路径
    public void setup() {
      bind(NMWebServices.class);
      bind(GenericExceptionHandler.class);
      bind(JAXBContextResolver.class);
      bind(ResourceView.class).toInstance(this.resourceView);
      bind(ApplicationACLsManager.class).toInstance(this.aclsManager);
      bind(LocalDirsHandlerService.class).toInstance(dirsHandler);
      route("/", NMController.class, "info");
      route("/node", NMController.class, "node");
      route("/allApplications", NMController.class, "allApplications");
      route("/allContainers", NMController.class, "allContainers");
      route(pajoin("/application", APPLICATION_ID), NMController.class,
          "application");
      route(pajoin("/container", CONTAINER_ID), NMController.class,
          "container");
      route(
          pajoin("/containerlogs", CONTAINER_ID, APP_OWNER, CONTAINER_LOG_TYPE),
          NMController.class, "logs");
      route("/errors-and-warnings", NMController.class, "errorsAndWarnings");
    }

    @Override
    protected Class<? extends GuiceContainer> getWebAppFilterClass() {
      return NMWebAppFilter.class;
    }
  }

}

10、AsyncDispatcher

Dispatches{@link Event}在一个单独的线程中。目前只有一个线程能做到这一点。每个事件类型类可能有多个通道,并且可以使用线程池来调度事件。

public class AsyncDispatcher extends AbstractService implements Dispatcher {
  protected void serviceInit(Configuration conf) throws Exception{
    super.serviceInit(conf);
    //获取 yarn.dispatcher.print-events-info.threshold  的值 默认 5000
    // 用于触发在RM的主事件调度程序中记录事件类型和计数的阈值。默认值为5000,这意味着RM将在每次队列大小累计达到5000时打印事件信息。这些信息可以用来揭示RM主要处理哪类事件,这有助于缩小某些性能问题的范围。
    this.detailsInterval = getConfig().getInt(YarnConfiguration.
                    YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD,
            YarnConfiguration.
                    DEFAULT_YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD);
  }

  @Override
  protected void serviceStart() throws Exception {
    //启动所有组件
    super.serviceStart();
    eventHandlingThread = new Thread(createThread());
    eventHandlingThread.setName(dispatcherThreadName);
    eventHandlingThread.start();
  }

 Runnable createThread() {
    return new Runnable() {
      @Override
      public void run() {
        while (!stopped && !Thread.currentThread().isInterrupted()) {
          drained = eventQueue.isEmpty();
          //blockNewEvents仅在调度程序要停止时设置,添加此检查是为了避免在循环的正常运行中每次获取锁和调用notify的开销。
          if (blockNewEvents) {
            synchronized (waitForDrained) {
              if (drained) {
                waitForDrained.notify();
              }
            }
          }
          Event event;
          try {
            event = eventQueue.take();
          } catch(InterruptedException ie) {
            if (!stopped) {
              LOG.warn("AsyncDispatcher thread interrupted", ie);
            }
            return;
          }
          if (event != null) {
            dispatch(event);
            if (printTrigger) {
              //记录最新的调度事件类型
              //可能导致排队的事件太多
              LOG.info("Latest dispatch event type: " + event.getType());
              printTrigger = false;
            }
          }
        }
      }
    };
  }
}

11、JvmPauseMonitor

    pauseMonitor = new JvmPauseMonitor();
    addService(pauseMonitor);
    metrics.getJvmMetrics().setPauseMonitor(pauseMonitor);

该类(JvmPauseMonitor)设置一个简单的线程,该线程在睡眠短时间间隔的循环中运行。如果睡眠时间明显长于其目标时间,则表示JVM或主机已暂停处理,这可能会导致其他问题。如果检测到这样的暂停,线程将记录一条消息。

public class JvmPauseMonitor extends AbstractService {
  protected void serviceInit(Configuration conf) throws Exception {
    //如果检测到暂停时间超过此阈值,则记录警告
    //获取 jvm.pause.warn-threshold.ms 的值 默认 10000
    this.warnThresholdMs = conf.getLong(WARN_THRESHOLD_KEY, WARN_THRESHOLD_DEFAULT);
    //如果检测到暂停时间超过此阈值,则记录信息
    //获取 jvm.pause.info-threshold.ms 的值 默认 1000
    this.infoThresholdMs = conf.getLong(INFO_THRESHOLD_KEY, INFO_THRESHOLD_DEFAULT);
    super.serviceInit(conf);
  }

  @Override
  protected void serviceStart() throws Exception {
    //构造守护进程线程
    monitorThread = new Daemon(new Monitor());
    monitorThread.start();
    super.serviceStart();
  }
}

12、NodeStatusUpdater

    //StatusUpdater应该最后添加,这样它才能最后启动,这样我们就可以在向RM注册之前确保一切正常。
    addService(nodeStatusUpdater);
    ((NMContext) context).setNodeStatusUpdater(nodeStatusUpdater);
    nmStore.setNodeStatusUpdater(nodeStatusUpdater);

五、总结

1、初始化本地文件系统目录,并将其中的内容封装成 LevelDB 的形式,用于失败恢复

2、创建线程池(默认4个),提供周期任务和延迟任务相关操作

3、利用脚本定期检查节点健康状态,并进行上报

4、检查是否开启启用分布式节点标签配置,默认不开启

5、检查是否开启启用分布式节点属性配置,默认不开启

6、创建节点资源监视器,并启动一个线程定期跟踪节点的资源利用情况

7、创建本节点容器管理调度程序,用来处理容器的申请、创建等事件

8、检查日志聚合状态是否开启,并定期删除旧的缓存日志

9、启动WebServer 服务,提供应用查询、容器查询、容器日志查询等功能

10、创建节点守护进程

11、创建节点状态更新服务文章来源地址https://www.toymoban.com/news/detail-834365.html

到了这里,关于Hadoop-Yarn-NodeManager都做了什么的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包 赞助服务器费用

相关文章

  • 价值 1k 嵌入式面试题-单片机 main 函数之前都做了啥?

            请说下单片机(Arm)在运行到 main() 函数前,都做了哪些工作? 系统初始化工作,太泛泛 硬件初始化,比较不具体         这道题应该从两方面回答,一个是比较表面的硬件的初始化(价值 200),另一个比较深层次的 C 环境的初始化,这也是加分比较多的一点(价

    2024年02月14日
    浏览(40)
  • Spark on Yarn 最佳运行参数调优-计算方式_spark on yarn 调优 nodemanager

    先自我介绍一下,小编浙江大学毕业,去过华为、字节跳动等大厂,目前阿里P7 深知大多数程序员,想要提升技能,往往是自己摸索成长,但自己不成体系的自学效果低效又漫长,而且极易碰到天花板技术停滞不前! 因此收集整理了一份《2024年最新软件测试全套学习资料》

    2024年04月26日
    浏览(43)
  • 【深入浅出 Yarn 架构与实现】6-3 NodeManager 分布式缓存

    不要跳过这部分知识,对了解 NodeManager 本地目录结构,和熟悉 Container 启动流程有帮助。 主要作用就是将用户应用程序执行时,所需的外部文件资源下载缓存到各个节点。 YARN 分布式缓存工作流程如下: 客户端将应用程序所需的文件资源 (外部字典、JAR 包、二进制文件) 提交

    2024年02月03日
    浏览(41)
  • hadoop中ResourceManager 进程或 NodeManager 进程没有启动

    如果 ResourceManager 进程或 NodeManager 进程没有启动,可能是由于以下原因导致的: 可能是 hadoop 配置文件中的错误导致的。您可以检查 hadoop 配置文件,确保所有参数都设置正确。 可能是由于网络问题导致的。您可以检查网络连接是否正常,确保所有节点都能够连接到同一网络。

    2024年02月06日
    浏览(44)
  • hadoop集群中增加新节点服务器(DataNode + NodeManager)方案及验证

    现根据业务需要,需要在原有的3台完全分布式的集群(hadoop1、hadoop2、hadoop3仨节点)增设一台新的服务器节点(hadoop4),以下是在原有的完全分布式hadoop集群中增设新节点( DataNode + NodeManager )的部署步骤。 基础服务配置 hadoop4上依次执行以下步骤: 1)用户:重置root用户密

    2024年01月19日
    浏览(41)
  • 启动Hadoop集群,出现Cannot set priority of nodemanager(resourcemanager) process xxx问题

    (不感兴趣可以跳过背景介绍) 配置 在安装hive的过程中,初始化数据库成功后(mysql),输入 命令: ./bin/hive 启动hive时出错(忘记截图了)。后发现原因是hive3.x仅支持JDK 1.8,并不支持openJDK 11,尽管hadoop3.3.x是支持JDK 1.8和openJDK 11的。当降低JDK版本后启动集群,便出现启动

    2024年02月01日
    浏览(46)
  • mv: 无法获取“/opt/module/hadoop/logs/hadoop-atguigu-nodemanager-hadoop102.out.1“ 的文件状态(stat): 没有那个文件或目录

    最近在回顾之前做过的离线数仓项目,在启动hadoop时出现了如下错误: 经过一些资料搜集发现问题的根本原因在于/opt/module/hadoop/etc/hadoop/目录下的workers文件配置出现错误。 文件内容不能够包含localhost 我原本的文件配置如下: 修改后配置如下: 我们简单解释一下workers文件的

    2024年02月05日
    浏览(50)
  • 【Hadoop】Apache Hadoop YARN

    🦄 个人主页 — —🎐开着拖拉机回家_Linux,Java基础学习,大数据运维-CSDN博客 🎐✨🍁 感谢点赞和关注 ,每天进步一点点!加油! 目录 一、YARN概述 二、YARN基础架构 2.1 ResourceManager(RM) 2.1.1 Scheduler 2.1.2 ApplicationManager 2.2 ApplicationMaster(AM) 2.3 NodeManager(NM) 2.4 Container 三、

    2024年02月05日
    浏览(37)
  • 【Hadoop】二、Hadoop MapReduce与Hadoop YARN

    md笔记 1、Hadoop MapReduce 1.1、理解MapReduce思想 MapReduce的思想核心是“ 先分再合,分而治之 ”。 所谓“分而治之”就是把一个复杂的问题,按照一定的“分解”方法分为等价的规模较小的若干部分,然后逐个解决,分别找出各部分的结果,然后把各部分的结果组成整个问题的最

    2024年02月05日
    浏览(58)
  • Hadoop(Yarn)

    Yarn 是一个资源调度平台,负责为运算程序提供服务器运算资源,相当于一个分布式的操作系统平台,而 MapReduce 等运算程序则相当于运行于操作系统之上的应用程序。 YARN 主要由 ResourceManager、NodeManager、ApplicationMaster 和 Container 等组件构成。 1.4.1 先进先出调度器(FIFO) 1.4

    2024年02月05日
    浏览(24)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包