2021年7月15日星期四

hadoop源码_hdfs启动流程_2_DataNode

执行start-dfs.sh脚本后,集群是如何启动的?
本文阅读并注释了start-dfs脚本,以及datanode的启动主要流程流程源码。

DataNode 启动流程

脚本代码分析

start-dfs.sh中启动datanode的代码:

#---------------------------------------------------------# datanodes (using default workers file)echo "Starting datanodes"hadoop_uservar_su hdfs datanode "${HADOOP_HDFS_HOME}/bin/hdfs" \ --workers \ --config "${HADOOP_CONF_DIR}" \ --daemon start \ datanode ${dataStartOpt}(( HADOOP_JUMBO_RETCOUNTER=HADOOP_JUMBO_RETCOUNTER + $? ))

hadoop-hdfs > src > mian > bin > hdfs中查看namenode命令:

# 命令描述:用于启动DFS datanode hadoop_add_subcommand "datanode" daemon "run a DFS datanode" # 命令处理程序 datanode)  HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true"  HADOOP_SECURE_CLASSNAME="org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter"  HADOOP_CLASSNAME='org.apache.hadoop.hdfs.server.datanode.DataNode'  hadoop_deprecate_envvar HADOOP_SECURE_DN_PID_DIR HADOOP_SECURE_PID_DIR  hadoop_deprecate_envvar HADOOP_SECURE_DN_LOG_DIR HADOOP_SECURE_LOG_DIR ;;

这里定位到了具体的处理类org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarterorg.apache.hadoop.hdfs.server.namenode.NameNode

接着跟进脚本代码到hadoop-functions.sh中的hadoop_generic_java_subcmd_handler函数可以查看到以下代码:

 # do the hard work of launching a daemon or just executing our interactive # 是启动守护进程还是仅仅执行交互 # java class if [[ "${HADOOP_SUBCMD_SUPPORTDAEMONIZATION}" = true ]]; then if [[ "${HADOOP_SUBCMD_SECURESERVICE}" = true ]]; then  hadoop_secure_daemon_handler \  "${HADOOP_DAEMON_MODE}" \  "${HADOOP_SUBCMD}" \  "${HADOOP_SECURE_CLASSNAME}" \  "${daemon_pidfile}" \  "${daemon_outfile}" \  "${priv_pidfile}" \  "${priv_outfile}" \  "${priv_errfile}" \  "${HADOOP_SUBCMD_ARGS[@]}" else  hadoop_daemon_handler \  "${HADOOP_DAEMON_MODE}" \  "${HADOOP_SUBCMD}" \  "${HADOOP_CLASSNAME}" \  "${daemon_pidfile}" \  "${daemon_outfile}" \  "${HADOOP_SUBCMD_ARGS[@]}" fi exit $? else hadoop_java_exec "${HADOOP_SUBCMD}" "${HADOOP_CLASSNAME}" "${HADOOP_SUBCMD_ARGS[@]}" fi

这里需要分析一下最终走的是hadoop_secure_daemon_handler还是hadoop_daemon_handler

在满足HADOOP_SUBCMD_SUPPORTDAEMONIZATION = trueHADOOP_SUBCMD_SECURESERVICE = true两个条件时才会进行安全模式启动。

HADOOP_SUBCMD_SUPPORTDAEMONIZATIONdatanode的命令处理程序中会赋值:

# 在hdfs脚本中datanode)  HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true"   HADOOP_SECURE_CLASSNAME="org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter"# ......  ;;

HADOOP_SUBCMD_SECURESERVICEhadoop-functions.sh脚本中定义的默认值为:

 HADOOP_SUBCMD_SECURESERVICE=false

在函数hadoop_generic_java_subcmd_handler(我们的脚本执行函数)中,有条件判断是否赋值为true

## @description Handle subcommands from main program entries## @audience private## @stability evolving## @replaceable yesfunction hadoop_generic_java_subcmd_handler{# ...... # The default/expected way to determine if a daemon is going to run in secure # mode is defined by hadoop_detect_priv_subcmd. If this returns true # then setup the secure user var and tell the world we're in secure mode if hadoop_detect_priv_subcmd "${HADOOP_SHELL_EXECNAME}" "${HADOOP_SUBCMD}"; then HADOOP_SUBCMD_SECURESERVICE=true# ......

进入hadoop_detect_priv_subcmd函数中:

## @description autodetect whether this is a priv subcmd## @description by whether or not a priv user var exists## @description and if HADOOP_SECURE_CLASSNAME is defined## @audience  public## @stability stable## @replaceable yes## @param  command## @param  subcommand## @return  1 = not priv## @return  0 = privfunction hadoop_detect_priv_subcmd{ declare program=$1 declare command=$2 # if [[ -z "${HADOOP_SECURE_CLASSNAME}" ]]; then hadoop_debug "No secure classname defined." return 1 fi uvar=$(hadoop_build_custom_subcmd_var "${program}" "${command}" SECURE_USER) if [[ -z "${!uvar}" ]]; then hadoop_debug "No secure user defined." return 1 fi return 0}

可以看到需要HADOOP_SECURE_CLASSNAME,和两个传入参数HADOOP_SHELL_EXECNAME,HADOOP_SUBCMD都存在的情况下才会返回0(在shell脚本中if function; then 格式,function返回0即会执行then后的语句)。

HADOOP_SECURE_CLASSNAME参数与HADOOP_SUBCMD_SUPPORTDAEMONIZATION相同会在hdfs脚本中的datanode的命令处理程序中赋值。

HADOOP_SHELL_EXECNAME参数在hdfs脚本中会定义默认值:

# The name of the script being executed.HADOOP_SHELL_EXECNAME="hdfs"

HADOOP_SUBCMD参数在hdfs脚本中被定义为:HADOOP_SUBCMD=$1,即取自第二个参数,我们返回start-dfs.sh脚本中查看调用命令的完整语句如下:

#---------------------------------------------------------# datanodes (using default workers file)echo "Starting datanodes"hadoop_uservar_su hdfs datanode "${HADOOP_HDFS_HOME}/bin/hdfs" \ --workers \ --config "${HADOOP_CONF_DIR}" \ --daemon start \ datanode ${dataStartOpt}(( HADOOP_JUMBO_RETCOUNTER=HADOOP_JUMBO_RETCOUNTER + $? ))

第二个参数为workers

所以可以得出,正常执行start-dfs.sh脚本的情况下,会默认值行hadoop_secure_daemon_handler函数,即通过执行SecureDataNodeStarter类来以安全模式启动datanode。

SecureDataNodeStarter

官方注释翻译:

在安全集群中启动datanode的实用程序类,首先在主启动前获得特权资源并将它们交给datanode。

SecureDataNodeStarter实现了Daemon,作为一个守护进程,我们先看它实现自Daemon的方法:

 @Override public void init(DaemonContext context) throws Exception { System.err.println("Initializing secure datanode resources"); // 创建一个新的HdfsConfiguration对象,以确保选中hdfs-site.

静态变量

可以看到SecureDataNodeStarter主要作用就是获取配置信息并存储起来,然后正常的初始化DateNode时再作为参数传递。接下来看看除了命令行参数外都还初始化了哪些参数:

	// 命令行参数 private String [] args; private SecureResources resources;	// 在安全的环境中存储datanode操作所需的资源	public static class SecureResources { // 是否启用sasl private final boolean isSaslEnabled; // rpc 端口是否为特权端口(端口号小于1024,不允许普通用户在其上运行服务器) // 详见https://www.w3.org/Daemon/User/Installation/PrivilegedPorts.html private final boolean isRpcPortPrivileged; // http 端口是否为特权端口 private final boolean isHttpPortPrivileged;		 	// 监听dfs.datanode.address配置的端口的服务器套接字 private final ServerSocket streamingSocket; 	// 监听dfs.datanode.http.address配置的端口的服务器套接字通道 private final ServerSocketChannel httpServerSocket; public SecureResources(ServerSocket streamingSocket, ServerSocketChannel  httpServerSocket, boolean saslEnabled, boolean rpcPortPrivileged,  boolean httpPortPrivileged) {  this.streamingSocket = streamingSocket;  this.httpServerSocket = httpServerSocket;  this.isSaslEnabled = saslEnabled;  this.isRpcPortPrivileged = rpcPortPrivileged;  this.isHttpPortPrivileged = httpPortPrivileged; } // getter / setter .... 略 }

getSecureResources(conf)

接下来看init()中调用的方法getSecureResources(conf),看看SecureResources中的参数都是从哪获取的。

 // 获取数据节点的特权资源(即特权端口)。 // 特权资源由RPC服务器的端口和HTTP(不是HTTPS)服务器的端口组成。 @VisibleForTesting public static SecureResources getSecureResources(Configuration conf)  throws Exception { // 获取http访问协议,HTTP_ONLY, HTTPS_ONLY, HTTP_AND_HTTPS HttpConfig.Policy policy = DFSUtil.getHttpPolicy(conf); // 尝试构建SaslPropertiesResolver,如果可以即为开启sasl boolean isSaslEnabled =  DataTransferSaslUtil.getSaslPropertiesResolver(conf) != null; boolean isRpcPrivileged; boolean isHttpPrivileged = false; System.err.println("isSaslEnabled:" + isSaslEnabled); // 获取数据流到datanode的安全端口,创建IP套接字地址 // 会通过配置项dfs.datanode.address来创建,配置的默认值为:0.0.0.0:9866 InetSocketAddress streamingAddr = DataNode.getStreamingAddr(conf); // 获取socket 写超时时间 // 配置项为:dfs.datanode.socket.write.timeout, 默认值为:8 * 60 秒 int socketWriteTimeout = conf.getInt(  DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY,  HdfsConstants.WRITE_TIMEOUT); // 获取请求的传入连接队列的最大长度。 // 配置项为ipc.server.listen.queue.size, 默认值为256 int backlogLength = conf.getInt(  CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY,  CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT); // 默认打开ServerSocketChannel进行datanode端口监听 ServerSocket ss = (socketWriteTimeout > 0) ?  ServerSocketChannel.open().socket() : new ServerSocket(); try {  // 绑定端口,设置请求的传入连接队列的最大长度  ss.bind(streamingAddr, backlogLength); } catch (BindException e) {  BindException newBe = appendMessageToBindException(e,   streamingAddr.toString());  throw newBe; } // 检查是否绑定到了正确 if (ss.getLocalPort() != streamingAddr.getPort()) {  throw new RuntimeException(   "Unable to bind on specified streaming port in secure "    + "context. Needed " + streamingAddr.getPort() + ", got "    + ss.getLocalPort()); } // 检查给定端口是否为特权端口。 // 在unix/linux系统中,小于1024的端口被视为特权端口。 // 对于其他操作系统,请谨慎使用此方法。 // 例如,Windows没有特权端口的概念。 // 但是,在Windows客户端上可以用来检查linux服务器的端口。 isRpcPrivileged = SecurityUtil.isPrivilegedPort(ss.getLocalPort()); System.err.println("Opened streaming server at " + streamingAddr); // 为web服务器绑定端口。 // 该代码打算仅将HTTP服务器绑定到特权端口,因为如果服务器通过SSL进行通信,客户端可以使用证书对服务器进行身份验证。 final ServerSocketChannel httpChannel; // 判断是否允许http访问 if (policy.isHttpEnabled()) {  httpChannel = ServerSocketChannel.open();  // 确定http服务器的有效地址  // 通过配置项dfs.datanode.http.address来生成,默认值为:0.0.0.0:9864  InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf);  try {  httpChannel.socket().bind(infoSocAddr);  } catch (BindException e) {  BindException newBe = appendMessageToBindException(e,   infoSocAddr.toString());  throw newBe;  }  InetSocketAddress localAddr = (InetSocketAddress) httpChannel.socket()  .getLocalSocketAddress();  // 校验httpChannel绑定的地址是否正确  if (localAddr.getPort() != infoSocAddr.getPort()) {  throw new RuntimeException("Unable to bind on specified info port in " +   "secure context. Needed " + infoSocAddr.getPort() + ", got " +    ss.getLocalPort());  }  System.err.println("Successfully obtained privileged resources (streaming port = "   + ss + " ) (http listener port = " + localAddr.getPort() +")");  // 判断端口号是否为特权端口(小于1024)  isHttpPrivileged = SecurityUtil.isPrivilegedPort(localAddr.getPort());  System.err.println("Opened info server at " + infoSocAddr); } else {  httpChannel = null; } // 将获取到的特权资源封装成SecureResources return new SecureResources(ss, httpChannel, isSaslEnabled,  isRpcPrivileged, isHttpPrivileged); }

至此,SecureDataNodeStarter类的init()方法结束。

继续看start()方法,可以看到就是正常的传入init()方法中初始化的配置。

 @Override public void start() throws Exception { System.err.println("Starting regular datanode initialization"); DataNode.secureMain(args, resources); }

resources参数在datanode中的具体作用见datanode代码分析

DataNode

dataNode官方注释反应如下:

DataNode是一个类(和程序),它为DFS部署存储一组块。
单个部署可以有一个或多个datanode。
每个DataNode定期与单个NameNode通信。
它还会不时地与客户机代码和其他datanode通信。
datanode存储一系列命名块。
DataNode允许客户端代码读取这些块,或者写入新的块数据。
DataNode也可以响应来自它的NameNode的指令,删除块或从其他DataNode复制块。
DataNode只维护一个关键表:block->这个信息存储在本地磁盘上。
DataNode会在启动时以及之后的每隔一段时间向NameNode报告表的内容。
datanode一辈子都在无止境地要求NameNode做点什么。
NameNode不能直接连接到DataNode;NameNode只是从DataNode调用的函数中返回值。
datanode维护一个开放的服务器套接字,以便客户端代码或其他datanode可以读写数据。
这个服务器的主机/端口报告给NameNode,然后NameNode将该信息发送给可能感兴趣的客户端或其他datanode。

静态代码块

dataNode的静态代码块与NameNode中相同,用于加载默认的配置文件

 static{ HdfsConfiguration.init(); }

mian方法

由上文中SecureDataNodeStarter#start方法可以看到,默认调用的是DataNode#secureMain方法来启动datanode。而默认的main方法也是调用DataNode#secureMain,接下来具体看看mainsecureMain方法的代码:

 public static void main(String args[]) { // 分析传入的参数,是否是帮助参数 if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) {  System.exit(0); }		// 调用 secureMain(args, null); }
 public static void secureMain(String args[], SecureResources resources) { int errorCode = 0; try {  //打印一些启动日志信息  StringUtils.startupShutdownMessage(DataNode.class, args, LOG);  // 创建datanode  DataNode datanode = createDataNode(args, null, resources);  if (datanode != null) {  // join各种线程,等待执行结束  // blockPoolManager.joinAll(); -> BPOfferService#jion -> BPServiceActor#join  // BPServiceActor: 每个活动或备用namenode要执行的线程:  // 预注册与namenode握手, 然后登记, 定期发送心跳到namenode, 处理从namenode接收到的命令  datanode.join();  } else {  errorCode = 1;  } } catch (Throwable e) {  LOG.error("Exception in secureMain", e);  terminate(1, e); } finally {  // We need to terminate the process here because either shutdown was called  // or some disk related conditions like volumes tolerated or volumes required  // condition was not met. Also, In secure mode, control will go to Jsvc  // and Datanode process hangs if it does not exit.  LOG.warn("Exiting Datanode");  terminate(errorCode); } }

DataNode#createDataNode

实例化&启动一个datanode守护进程并等待它完成。

 @VisibleForTesting @InterfaceAudience.Private public static DataNode createDataNode(String args[], Configuration conf,  SecureResources resources) throws IOException { // 初始化datanode DataNode dn = instantiateDataNode(args, conf, resources); if (dn != null) {  // 启动datanode进程  dn.runDatanodeDaemon(); } return dn; }

先来看看初始化datanode的流程:

DataNode#instantiateDataNode

// 实例化单个datanode对象及其安全资源。这必须通过随后调用datanodedaemon()来运行。public static DataNode instantiateDataNode(String args [], Configuration conf, SecureResources resources) throws IOException { if (conf == null) conf = new HdfsConfiguration(); if (args != null) { // 解析通用hadoop选项 GenericOptionsParser hParser = new GenericOptionsParser(conf, args); args = hParser.getRemainingArgs(); } // 解析和验证命令行参数并设置配置参数。 if (!parseArguments(args, conf)) { printUsage(System.err); return null; } // 根据配置dfs.datanode.data.dir 获取实际的存储路径集合 // StorageLocation: 封装描述存储目录的URI和存储介质。如果没有指定存储介质,则假定默认存储介质为DISK。 // 详细的关于获取存储目录的解析看这篇博文: https://blog.csdn.net/Androidlushangderen/article/details/51105876 Collection<StorageLocation> dataLocations = getStorageLocations(conf); // UserGroupInformation: Hadoop的用户和组信息。 // 该类封装了一个JAAS Subject,并提供了确定用户用户名和组的方法。 // 它同时支持Windows、Unix和Kerberos登录模块。 // UserGroupInformation#setConfiguration: 设置UGI的静态配置。特别是设置安全身份验证机制和组查找服务。 UserGroupInformation.setConfiguration(conf); // 作为config中指定的主体登录。将用户的Kerberos主体名中的$host替换为主机名。 如果是非安全模式-返回。 SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY,  DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, getHostName(conf)); // 创建DataNode实例 return makeInstance(dataLocations, conf, resources);}

DataNode#makeInstance

// 在确保可以创建至少一个给定的数据目录(以及它们的父目录,如果需要的话)之后,创建DataNode实例。static DataNode makeInstance(Collection<StorageLocation> dataDirs, Configuration conf, SecureResources resources) throws IOException { List<StorageLocation> locations; // StorageLocationChecker: 在DataNode启动期间封装存储位置检查的实用程序类。其中一些代码是从DataNode类中提取的。 StorageLocationChecker storageLocationChecker =  new StorageLocationChecker(conf, new Timer()); try { // 启动对提供的存储卷的检查,并返回运行正常的卷列表。 // 为了与现有单元测试兼容,storagellocations将按照与输入相同的顺序返回。 locations = storageLocationChecker.check(conf, dataDirs); } catch (InterruptedException ie) { throw new IOException("Failed to instantiate DataNode", ie); } // 初始化度量系统 DefaultMetricsSystem.initialize("DataNode"); // 检查数据目录的权限 assert locations.size() > 0 : "number of data directories should be > 0"; // 创建DataNode return new DataNode(conf, locations, storageLocationChecker, resources);}

StorageLocationChecker#check

来具体看一下都做了哪些检查:

 // 启动对提供的存储卷的检查,并返回运行正常的卷列表。 // 为了与现有单元测试兼容,storagellocations将按照与输入相同的顺序返回。 // 返回运行正常的卷列表。如果没有正常运行的卷,则返回一个空列表。 public List<StorageLocation> check(  final Configuration conf,  final Collection<StorageLocation> dataDirs)  throws InterruptedException, IOException { final HashMap<StorageLocation, Boolean> goodLocations =  new LinkedHashMap<>(); final Set<StorageLocation> failedLocations = new HashSet<>(); final Map<StorageLocation, ListenableFuture<VolumeCheckResult>> futures =  Maps.newHashMap(); // 获取本地文件系统。如果没有就创建一个新的 final LocalFileSystem localFS = FileSystem.getLocal(conf); final CheckContext context = new CheckContext(localFS, expectedPermission); // 在所有storagelocation上启动并行磁盘检查操作。 for (StorageLocation location : dataDirs) {  goodLocations.put(location, true);  // 对给定的Checkable安排异步检查。如果检查计划成功,则返回ListenableFuture。  Optional<ListenableFuture<VolumeCheckResult>> olf =   delegateChecker.schedule(location, context);  if (olf.isPresent()) {  futures.put(location, olf.get());  } } if (maxVolumeFailuresTolerated >= dataDirs.size()) {  throw new HadoopIllegalArgumentException("Invalid value configured for "   + DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY + " - "   + maxVolumeFailuresTolerated + ". Value configured is >= "   + "to the number of configured volumes (" + dataDirs.size() + ")."); } final long checkStartTimeMs = timer.monotonicNow(); // Retrieve the results of the disk checks. // 检索磁盘,检查磁盘状态是否健康 for (Map.Entry<StorageLocation,    ListenableFuture<VolumeCheckResult>> entry : futures.entrySet()) {  // Determine how much time we can allow for this check to complete.  // The cumulative wait time cannot exceed maxAllowedTimeForCheck.  final long waitSoFarMs = (timer.monotonicNow() - checkStartTimeMs);  final long timeLeftMs = Math.max(0,   maxAllowedTimeForCheckMs - waitSoFarMs);  final StorageLocation location = entry.getKey();  try {  final VolumeCheckResult result =   entry.getValue().get(timeLeftMs, TimeUnit.MILLISECONDS);  switch (result) {  case HEALTHY:   break;  case DEGRADED:   LOG.warn("StorageLocation {} appears to be degraded.", location);   break;  case FAILED:   LOG.warn("StorageLocation {} detected as failed.", location);   failedLocations.add(location);   goodLocations.remove(location);   break;  default:   LOG.error("Unexpected health check result {} for StorageLocation {}",    result, location);  }  } catch (ExecutionException|TimeoutException e) {  LOG.warn("Exception checking StorageLocation " + location,   e.getCause());  failedLocations.add(location);  goodLocations.remove(location);  } } if (maxVolumeFailuresTolerated == DataNode.MAX_VOLUME_FAILURE_TOLERATED_LIMIT) {  if (dataDirs.size() == failedLocations.size()) {  throw new DiskErrorException("Too many failed volumes - "   + "current valid volumes: " + goodLocations.size()   + ", volumes configured: " + dataDirs.size()   + ", volumes failed: " + failedLocations.size()   + ", volume failures tolerated: " + maxVolumeFailuresTolerated);  } } else {  if (failedLocations.size() > maxVolumeFailuresTolerated) {  throw new DiskErrorException("Too many failed volumes - "   + "current valid volumes: " + goodLocations.size()   + ", volumes configured: " + dataDirs.size()   + ", volumes failed: " + failedLocations.size()   + ", volume failures tolerated: " + maxVolumeFailuresTolerated);  } } if (goodLocations.size() == 0) {  throw new DiskErrorException("All directories in "   + DFS_DATANODE_DATA_DIR_KEY + " are invalid: "   + failedLocations); } return new ArrayList<>(goodLocations.keySet()); }

DataNode构造方法

// 给定一个配置、一个datadir数组和一个namenode代理,创建DataNode。 DataNode(final Configuration conf,   final List<StorageLocation> dataDirs,   final StorageLocationChecker storageLocationChecker,   final SecureResources resources) throws IOException { // 将配置文件赋值到父类的静态变量中 super(conf); // 初始化Tracer,与NameNode中此处相比,仅传入参数有区别 this.tracer = createTracer(conf); // TracerConfigurationManager类提供了通过RPC协议在运行时管理跟踪器配置的函数。 this.tracerConfigurationManager =  new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf); // FileIoProvider这个类抽象出DataNode执行的各种文件IO操作, // 并在每个文件IO之前和之后调用概要分析(用于收集统计数据)和故障注入(用于测试)事件钩子。 // 通过DFSConfigKeys启用剖析和/或错误注入事件钩子,可以将行为注入到这些事件中。 this.fileIoProvider = new FileIoProvider(conf, this); // 初始化卷扫描,BlockScanner负责管理所有的VolumeScanner this.blockScanner = new BlockScanner(this); // 初始化各种配置参数 this.lastDiskErrorCheck = 0; this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY,  DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT); this.usersWithLocalPathAccess = Arrays.asList(  conf.getTrimmedStrings(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY)); this.connectToDnViaHostname = conf.getBoolean(  DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME,  DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT); this.supergroup = conf.get(DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY,  DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT); this.isPermissionEnabled = conf.getBoolean(  DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY,  DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT); this.pipelineSupportECN = conf.getBoolean(  DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED,  DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED_DEFAULT); confVersion = "core-" +  conf.get("hadoop.common.configuration.version", "UNSPECIFIED") +  ",hdfs-" +  conf.get("hadoop.hdfs.configuration.version", "UNSPECIFIED"); // DatasetVolumeChecker: 对FsDatasetSpi的每个卷封装运行磁盘检查的类,并允许检索失败卷的列表。 // 这分离了最初跨DataNode、FsDatasetImpl和FsVolumeList实现的行为。 this.volumeChecker = new DatasetVolumeChecker(conf, new Timer()); // 创建了个ExecutorService,用于执行dataTransfer任务 // HadoopExecutors:ExecutorService、ScheduledExecutorService实例的工厂方法。这些执行器服务实例提供了额外的功能(例如记录未捕获的异常)。 // DataTransfer:是DataNode的内部类,用于传输一个数据块。这个类将一条数据发送到另一个DataNode。 this.xferService =  HadoopExecutors.newCachedThreadPool(new Daemon.DaemonFactory()); // Determine whether we should try to pass file descriptors to clients. // 确定是否应该尝试将文件描述符传递给客户端。 if (conf.getBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY,    HdfsClientConfigKeys.Read.ShortCircuit.DEFAULT)) {  String reason = DomainSocket.getLoadingFailureReason();  if (reason != null) {  LOG.warn("File descriptor passing is disabled because {}", reason);  this.fileDescriptorPassingDisabledReason = reason;  } else {  LOG.info("File descriptor passing is enabled.");  this.fileDescriptorPassingDisabledReason = null;  } } else {  this.fileDescriptorPassingDisabledReason =   "File descriptor passing was not configured.";  LOG.debug(this.fileDescriptorPassingDisabledReason); } // 获取socket工厂,配置项为:hadoop.rpc.socket.factory.class.default, // 默认为:org.apache.hadoop.net.StandardSocketFactory this.socketFactory = NetUtils.getDefaultSocketFactory(conf); try {  // 获取本datanode的主机名  hostName = getHostName(conf);  LOG.info("Configured hostname is {}", hostName);  // 启动datanode  startDataNode(dataDirs, resources); } catch (IOException ie) {  shutdown();  throw ie; } final int dncCacheMaxSize =  conf.getInt(DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY,   DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT) ; datanodeNetworkCounts =  CacheBuilder.newBuilder()   .maximumSize(dncCacheMaxSize)   .build(new CacheLoader<String, Map<String, Long>>() {    @Override    public Map<String, Long> load(String key) throws Exception {    final Map<String, Long> ret = new HashMap<String, Long>();    ret.put("networkErrors", 0L);    return ret;    }   }); initOOBTimeout(); this.storageLocationChecker = storageLocationChecker; }

DataNode#startDataNode

// 此方法使用指定的conf启动数据节点,如果设置了conf的config_property_simulation属性,则创建一个模拟的基于存储的数据节点void startDataNode(List<StorageLocation> dataDirectories,     SecureResources resources     ) throws IOException { // settings global for all BPs in the Data Node this.secureResources = resources; synchronized (this) { this.dataDirs = dataDirectories; } // DNConf: 一个简单的类,封装了DataNode在启动时加载的所有配置。 this.dnConf = new DNConf(this); // 检查secure模式下的配置 checkSecureConfig(dnConf, getConf(), resources); // 检查DataNode给缓存使用的最大内存量是否在正常范围 if (dnConf.maxLockedMemory > 0) { if (!NativeIO.POSIX.getCacheManipulator().verifyCanMlock()) {  throw new RuntimeException(String.format(   "Cannot start datanode because the configured max locked memory" +   " size (%s) is greater than zero and native code is not available.",   DFS_DATANODE_MAX_LOCKE......

原文转载:http://www.shaoqun.com/a/879536.html

跨境电商:https://www.ikjzd.com/

oklink:https://www.ikjzd.com/w/1362

贝贝网:https://www.ikjzd.com/w/1321

新蛋:https://www.ikjzd.com/w/79


执行start-dfs.sh脚本后,集群是如何启动的?本文阅读并注释了start-dfs脚本,以及datanode的启动主要流程流程源码。DataNode启动流程脚本代码分析start-dfs.sh中启动datanode的代码:#---------------------------------------------------------#datanodes(usingdefaultworke
2021连州地下河中高考生免费吗?连州地下河中考高考学生门票多少钱?:http://www.30bags.com/a/432216.html
2021连州全家去哪看桃花?湟川三峡2021连州桃花节怎么样?:http://www.30bags.com/a/432110.html
2021连州桃花节收费吗?连州湟川三峡桃花节怎么收费?:http://www.30bags.com/a/432109.html
2021连州中高生去哪里免费?中高考生连州免费景点?:http://www.30bags.com/a/432215.html
我被外国黑人3p过程 太粗太长弄死了我了:http://lady.shaoqun.com/a/256971.html
女班长扒开内裤让我们摸 我把女班长按到课桌上做:http://lady.shaoqun.com/a/247579.html
男朋友边摸边吃奶边做 小荡货夹得好紧太爽了:http://lady.shaoqun.com/a/247589.html
办公室里 男上司撕扯我的衣服:http://lady.shaoqun.com/m/a/106145.html
女性高潮来自阴蒂刺激占比最多,丈夫刺激阴蒂是如此重要! :http://lady.shaoqun.com/a/423161.html
床上七个小动作让你的男人晚上更爱你!尤其是第七个!:http://lady.shaoqun.com/a/423162.html
对美国士兵来说,有一夜情并呆在家里是老兵的特权:http://lady.shaoqun.com/a/423163.html
中东新电商扎堆,他们会成为中东版"阿里巴巴"吗?:https://www.ikjzd.com/articles/146650

没有评论:

发表评论