执行start-dfs.sh脚本后,集群是如何启动的?
本文阅读并注释了start-dfs脚本,以及datanode的启动主要流程流程源码。
DataNode 启动流程
脚本代码分析
start-dfs.sh
中启动datanode的代码:
#---------------------------------------------------------# datanodes (using default workers file)echo "Starting datanodes"hadoop_uservar_su hdfs datanode "${HADOOP_HDFS_HOME}/bin/hdfs" \ --workers \ --config "${HADOOP_CONF_DIR}" \ --daemon start \ datanode ${dataStartOpt}(( HADOOP_JUMBO_RETCOUNTER=HADOOP_JUMBO_RETCOUNTER + $? ))
去hadoop-hdfs > src > mian > bin > hdfs
中查看namenode
命令:
# 命令描述:用于启动DFS datanode hadoop_add_subcommand "datanode" daemon "run a DFS datanode" # 命令处理程序 datanode) HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true" HADOOP_SECURE_CLASSNAME="org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter" HADOOP_CLASSNAME='org.apache.hadoop.hdfs.server.datanode.DataNode' hadoop_deprecate_envvar HADOOP_SECURE_DN_PID_DIR HADOOP_SECURE_PID_DIR hadoop_deprecate_envvar HADOOP_SECURE_DN_LOG_DIR HADOOP_SECURE_LOG_DIR ;;
这里定位到了具体的处理类org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter
和org.apache.hadoop.hdfs.server.namenode.NameNode
。
接着跟进脚本代码到hadoop-functions.sh
中的hadoop_generic_java_subcmd_handler
函数可以查看到以下代码:
# do the hard work of launching a daemon or just executing our interactive # 是启动守护进程还是仅仅执行交互 # java class if [[ "${HADOOP_SUBCMD_SUPPORTDAEMONIZATION}" = true ]]; then if [[ "${HADOOP_SUBCMD_SECURESERVICE}" = true ]]; then hadoop_secure_daemon_handler \ "${HADOOP_DAEMON_MODE}" \ "${HADOOP_SUBCMD}" \ "${HADOOP_SECURE_CLASSNAME}" \ "${daemon_pidfile}" \ "${daemon_outfile}" \ "${priv_pidfile}" \ "${priv_outfile}" \ "${priv_errfile}" \ "${HADOOP_SUBCMD_ARGS[@]}" else hadoop_daemon_handler \ "${HADOOP_DAEMON_MODE}" \ "${HADOOP_SUBCMD}" \ "${HADOOP_CLASSNAME}" \ "${daemon_pidfile}" \ "${daemon_outfile}" \ "${HADOOP_SUBCMD_ARGS[@]}" fi exit $? else hadoop_java_exec "${HADOOP_SUBCMD}" "${HADOOP_CLASSNAME}" "${HADOOP_SUBCMD_ARGS[@]}" fi
这里需要分析一下最终走的是hadoop_secure_daemon_handler
还是hadoop_daemon_handler
。
在满足HADOOP_SUBCMD_SUPPORTDAEMONIZATION = true
和HADOOP_SUBCMD_SECURESERVICE = true
两个条件时才会进行安全模式启动。
HADOOP_SUBCMD_SUPPORTDAEMONIZATION
在datanode
的命令处理程序中会赋值:
# 在hdfs脚本中datanode) HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true" HADOOP_SECURE_CLASSNAME="org.apache.hadoop.hdfs.server.datanode.SecureDataNodeStarter"# ...... ;;
HADOOP_SUBCMD_SECURESERVICE
在hadoop-functions.sh
脚本中定义的默认值为:
HADOOP_SUBCMD_SECURESERVICE=false
在函数hadoop_generic_java_subcmd_handler
(我们的脚本执行函数)中,有条件判断是否赋值为true
:
## @description Handle subcommands from main program entries## @audience private## @stability evolving## @replaceable yesfunction hadoop_generic_java_subcmd_handler{# ...... # The default/expected way to determine if a daemon is going to run in secure # mode is defined by hadoop_detect_priv_subcmd. If this returns true # then setup the secure user var and tell the world we're in secure mode if hadoop_detect_priv_subcmd "${HADOOP_SHELL_EXECNAME}" "${HADOOP_SUBCMD}"; then HADOOP_SUBCMD_SECURESERVICE=true# ......
进入hadoop_detect_priv_subcmd
函数中:
## @description autodetect whether this is a priv subcmd## @description by whether or not a priv user var exists## @description and if HADOOP_SECURE_CLASSNAME is defined## @audience public## @stability stable## @replaceable yes## @param command## @param subcommand## @return 1 = not priv## @return 0 = privfunction hadoop_detect_priv_subcmd{ declare program=$1 declare command=$2 # if [[ -z "${HADOOP_SECURE_CLASSNAME}" ]]; then hadoop_debug "No secure classname defined." return 1 fi uvar=$(hadoop_build_custom_subcmd_var "${program}" "${command}" SECURE_USER) if [[ -z "${!uvar}" ]]; then hadoop_debug "No secure user defined." return 1 fi return 0}
可以看到需要HADOOP_SECURE_CLASSNAME
,和两个传入参数HADOOP_SHELL_EXECNAME
,HADOOP_SUBCMD
都存在的情况下才会返回0(在shell脚本中if function; then
格式,function返回0即会执行then后的语句)。
HADOOP_SECURE_CLASSNAME
参数与HADOOP_SUBCMD_SUPPORTDAEMONIZATION
相同会在hdfs脚本中的datanode的命令处理程序中赋值。
HADOOP_SHELL_EXECNAME
参数在hdfs
脚本中会定义默认值:
# The name of the script being executed.HADOOP_SHELL_EXECNAME="hdfs"
HADOOP_SUBCMD
参数在hdfs
脚本中被定义为:HADOOP_SUBCMD=$1
,即取自第二个参数,我们返回start-dfs.sh
脚本中查看调用命令的完整语句如下:
#---------------------------------------------------------# datanodes (using default workers file)echo "Starting datanodes"hadoop_uservar_su hdfs datanode "${HADOOP_HDFS_HOME}/bin/hdfs" \ --workers \ --config "${HADOOP_CONF_DIR}" \ --daemon start \ datanode ${dataStartOpt}(( HADOOP_JUMBO_RETCOUNTER=HADOOP_JUMBO_RETCOUNTER + $? ))
第二个参数为workers
。
所以可以得出,正常执行start-dfs.sh
脚本的情况下,会默认值行hadoop_secure_daemon_handler
函数,即通过执行SecureDataNodeStarter
类来以安全模式启动datanode。
SecureDataNodeStarter
官方注释翻译:
在安全集群中启动datanode的实用程序类,首先在主启动前获得特权资源并将它们交给datanode。
SecureDataNodeStarter
实现了Daemon
,作为一个守护进程,我们先看它实现自Daemon
的方法:
@Override public void init(DaemonContext context) throws Exception { System.err.println("Initializing secure datanode resources"); // 创建一个新的HdfsConfiguration对象,以确保选中hdfs-site.
静态变量
可以看到SecureDataNodeStarter
主要作用就是获取配置信息并存储起来,然后正常的初始化DateNode
时再作为参数传递。接下来看看除了命令行参数外都还初始化了哪些参数:
// 命令行参数 private String [] args; private SecureResources resources; // 在安全的环境中存储datanode操作所需的资源 public static class SecureResources { // 是否启用sasl private final boolean isSaslEnabled; // rpc 端口是否为特权端口(端口号小于1024,不允许普通用户在其上运行服务器) // 详见https://www.w3.org/Daemon/User/Installation/PrivilegedPorts.html private final boolean isRpcPortPrivileged; // http 端口是否为特权端口 private final boolean isHttpPortPrivileged; // 监听dfs.datanode.address配置的端口的服务器套接字 private final ServerSocket streamingSocket; // 监听dfs.datanode.http.address配置的端口的服务器套接字通道 private final ServerSocketChannel httpServerSocket; public SecureResources(ServerSocket streamingSocket, ServerSocketChannel httpServerSocket, boolean saslEnabled, boolean rpcPortPrivileged, boolean httpPortPrivileged) { this.streamingSocket = streamingSocket; this.httpServerSocket = httpServerSocket; this.isSaslEnabled = saslEnabled; this.isRpcPortPrivileged = rpcPortPrivileged; this.isHttpPortPrivileged = httpPortPrivileged; } // getter / setter .... 略 }
getSecureResources(conf)
接下来看init()
中调用的方法getSecureResources(conf)
,看看SecureResources
中的参数都是从哪获取的。
// 获取数据节点的特权资源(即特权端口)。 // 特权资源由RPC服务器的端口和HTTP(不是HTTPS)服务器的端口组成。 @VisibleForTesting public static SecureResources getSecureResources(Configuration conf) throws Exception { // 获取http访问协议,HTTP_ONLY, HTTPS_ONLY, HTTP_AND_HTTPS HttpConfig.Policy policy = DFSUtil.getHttpPolicy(conf); // 尝试构建SaslPropertiesResolver,如果可以即为开启sasl boolean isSaslEnabled = DataTransferSaslUtil.getSaslPropertiesResolver(conf) != null; boolean isRpcPrivileged; boolean isHttpPrivileged = false; System.err.println("isSaslEnabled:" + isSaslEnabled); // 获取数据流到datanode的安全端口,创建IP套接字地址 // 会通过配置项dfs.datanode.address来创建,配置的默认值为:0.0.0.0:9866 InetSocketAddress streamingAddr = DataNode.getStreamingAddr(conf); // 获取socket 写超时时间 // 配置项为:dfs.datanode.socket.write.timeout, 默认值为:8 * 60 秒 int socketWriteTimeout = conf.getInt( DFSConfigKeys.DFS_DATANODE_SOCKET_WRITE_TIMEOUT_KEY, HdfsConstants.WRITE_TIMEOUT); // 获取请求的传入连接队列的最大长度。 // 配置项为ipc.server.listen.queue.size, 默认值为256 int backlogLength = conf.getInt( CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_KEY, CommonConfigurationKeysPublic.IPC_SERVER_LISTEN_QUEUE_SIZE_DEFAULT); // 默认打开ServerSocketChannel进行datanode端口监听 ServerSocket ss = (socketWriteTimeout > 0) ? ServerSocketChannel.open().socket() : new ServerSocket(); try { // 绑定端口,设置请求的传入连接队列的最大长度 ss.bind(streamingAddr, backlogLength); } catch (BindException e) { BindException newBe = appendMessageToBindException(e, streamingAddr.toString()); throw newBe; } // 检查是否绑定到了正确 if (ss.getLocalPort() != streamingAddr.getPort()) { throw new RuntimeException( "Unable to bind on specified streaming port in secure " + "context. Needed " + streamingAddr.getPort() + ", got " + ss.getLocalPort()); } // 检查给定端口是否为特权端口。 // 在unix/linux系统中,小于1024的端口被视为特权端口。 // 对于其他操作系统,请谨慎使用此方法。 // 例如,Windows没有特权端口的概念。 // 但是,在Windows客户端上可以用来检查linux服务器的端口。 isRpcPrivileged = SecurityUtil.isPrivilegedPort(ss.getLocalPort()); System.err.println("Opened streaming server at " + streamingAddr); // 为web服务器绑定端口。 // 该代码打算仅将HTTP服务器绑定到特权端口,因为如果服务器通过SSL进行通信,客户端可以使用证书对服务器进行身份验证。 final ServerSocketChannel httpChannel; // 判断是否允许http访问 if (policy.isHttpEnabled()) { httpChannel = ServerSocketChannel.open(); // 确定http服务器的有效地址 // 通过配置项dfs.datanode.http.address来生成,默认值为:0.0.0.0:9864 InetSocketAddress infoSocAddr = DataNode.getInfoAddr(conf); try { httpChannel.socket().bind(infoSocAddr); } catch (BindException e) { BindException newBe = appendMessageToBindException(e, infoSocAddr.toString()); throw newBe; } InetSocketAddress localAddr = (InetSocketAddress) httpChannel.socket() .getLocalSocketAddress(); // 校验httpChannel绑定的地址是否正确 if (localAddr.getPort() != infoSocAddr.getPort()) { throw new RuntimeException("Unable to bind on specified info port in " + "secure context. Needed " + infoSocAddr.getPort() + ", got " + ss.getLocalPort()); } System.err.println("Successfully obtained privileged resources (streaming port = " + ss + " ) (http listener port = " + localAddr.getPort() +")"); // 判断端口号是否为特权端口(小于1024) isHttpPrivileged = SecurityUtil.isPrivilegedPort(localAddr.getPort()); System.err.println("Opened info server at " + infoSocAddr); } else { httpChannel = null; } // 将获取到的特权资源封装成SecureResources return new SecureResources(ss, httpChannel, isSaslEnabled, isRpcPrivileged, isHttpPrivileged); }
至此,SecureDataNodeStarter
类的init()
方法结束。
继续看start()
方法,可以看到就是正常的传入init()
方法中初始化的配置。
@Override public void start() throws Exception { System.err.println("Starting regular datanode initialization"); DataNode.secureMain(args, resources); }
resources参数在datanode中的具体作用见datanode代码分析
DataNode
dataNode官方注释反应如下:
DataNode是一个类(和程序),它为DFS部署存储一组块。
单个部署可以有一个或多个datanode。
每个DataNode定期与单个NameNode通信。
它还会不时地与客户机代码和其他datanode通信。
datanode存储一系列命名块。
DataNode允许客户端代码读取这些块,或者写入新的块数据。
DataNode也可以响应来自它的NameNode的指令,删除块或从其他DataNode复制块。
DataNode只维护一个关键表:block->这个信息存储在本地磁盘上。
DataNode会在启动时以及之后的每隔一段时间向NameNode报告表的内容。
datanode一辈子都在无止境地要求NameNode做点什么。
NameNode不能直接连接到DataNode;NameNode只是从DataNode调用的函数中返回值。
datanode维护一个开放的服务器套接字,以便客户端代码或其他datanode可以读写数据。
这个服务器的主机/端口报告给NameNode,然后NameNode将该信息发送给可能感兴趣的客户端或其他datanode。
静态代码块
dataNode的静态代码块与NameNode中相同,用于加载默认的配置文件
static{ HdfsConfiguration.init(); }
mian方法
由上文中SecureDataNodeStarter#start
方法可以看到,默认调用的是DataNode#secureMain
方法来启动datanode。而默认的main
方法也是调用DataNode#secureMain
,接下来具体看看main
和secureMain
方法的代码:
public static void main(String args[]) { // 分析传入的参数,是否是帮助参数 if (DFSUtil.parseHelpArgument(args, DataNode.USAGE, System.out, true)) { System.exit(0); } // 调用 secureMain(args, null); }
public static void secureMain(String args[], SecureResources resources) { int errorCode = 0; try { //打印一些启动日志信息 StringUtils.startupShutdownMessage(DataNode.class, args, LOG); // 创建datanode DataNode datanode = createDataNode(args, null, resources); if (datanode != null) { // join各种线程,等待执行结束 // blockPoolManager.joinAll(); -> BPOfferService#jion -> BPServiceActor#join // BPServiceActor: 每个活动或备用namenode要执行的线程: // 预注册与namenode握手, 然后登记, 定期发送心跳到namenode, 处理从namenode接收到的命令 datanode.join(); } else { errorCode = 1; } } catch (Throwable e) { LOG.error("Exception in secureMain", e); terminate(1, e); } finally { // We need to terminate the process here because either shutdown was called // or some disk related conditions like volumes tolerated or volumes required // condition was not met. Also, In secure mode, control will go to Jsvc // and Datanode process hangs if it does not exit. LOG.warn("Exiting Datanode"); terminate(errorCode); } }
DataNode#createDataNode
实例化&启动一个datanode守护进程并等待它完成。
@VisibleForTesting @InterfaceAudience.Private public static DataNode createDataNode(String args[], Configuration conf, SecureResources resources) throws IOException { // 初始化datanode DataNode dn = instantiateDataNode(args, conf, resources); if (dn != null) { // 启动datanode进程 dn.runDatanodeDaemon(); } return dn; }
先来看看初始化datanode的流程:
DataNode#instantiateDataNode
// 实例化单个datanode对象及其安全资源。这必须通过随后调用datanodedaemon()来运行。public static DataNode instantiateDataNode(String args [], Configuration conf, SecureResources resources) throws IOException { if (conf == null) conf = new HdfsConfiguration(); if (args != null) { // 解析通用hadoop选项 GenericOptionsParser hParser = new GenericOptionsParser(conf, args); args = hParser.getRemainingArgs(); } // 解析和验证命令行参数并设置配置参数。 if (!parseArguments(args, conf)) { printUsage(System.err); return null; } // 根据配置dfs.datanode.data.dir 获取实际的存储路径集合 // StorageLocation: 封装描述存储目录的URI和存储介质。如果没有指定存储介质,则假定默认存储介质为DISK。 // 详细的关于获取存储目录的解析看这篇博文: https://blog.csdn.net/Androidlushangderen/article/details/51105876 Collection<StorageLocation> dataLocations = getStorageLocations(conf); // UserGroupInformation: Hadoop的用户和组信息。 // 该类封装了一个JAAS Subject,并提供了确定用户用户名和组的方法。 // 它同时支持Windows、Unix和Kerberos登录模块。 // UserGroupInformation#setConfiguration: 设置UGI的静态配置。特别是设置安全身份验证机制和组查找服务。 UserGroupInformation.setConfiguration(conf); // 作为config中指定的主体登录。将用户的Kerberos主体名中的$host替换为主机名。 如果是非安全模式-返回。 SecurityUtil.login(conf, DFS_DATANODE_KEYTAB_FILE_KEY, DFS_DATANODE_KERBEROS_PRINCIPAL_KEY, getHostName(conf)); // 创建DataNode实例 return makeInstance(dataLocations, conf, resources);}
DataNode#makeInstance
// 在确保可以创建至少一个给定的数据目录(以及它们的父目录,如果需要的话)之后,创建DataNode实例。static DataNode makeInstance(Collection<StorageLocation> dataDirs, Configuration conf, SecureResources resources) throws IOException { List<StorageLocation> locations; // StorageLocationChecker: 在DataNode启动期间封装存储位置检查的实用程序类。其中一些代码是从DataNode类中提取的。 StorageLocationChecker storageLocationChecker = new StorageLocationChecker(conf, new Timer()); try { // 启动对提供的存储卷的检查,并返回运行正常的卷列表。 // 为了与现有单元测试兼容,storagellocations将按照与输入相同的顺序返回。 locations = storageLocationChecker.check(conf, dataDirs); } catch (InterruptedException ie) { throw new IOException("Failed to instantiate DataNode", ie); } // 初始化度量系统 DefaultMetricsSystem.initialize("DataNode"); // 检查数据目录的权限 assert locations.size() > 0 : "number of data directories should be > 0"; // 创建DataNode return new DataNode(conf, locations, storageLocationChecker, resources);}
StorageLocationChecker#check
来具体看一下都做了哪些检查:
// 启动对提供的存储卷的检查,并返回运行正常的卷列表。 // 为了与现有单元测试兼容,storagellocations将按照与输入相同的顺序返回。 // 返回运行正常的卷列表。如果没有正常运行的卷,则返回一个空列表。 public List<StorageLocation> check( final Configuration conf, final Collection<StorageLocation> dataDirs) throws InterruptedException, IOException { final HashMap<StorageLocation, Boolean> goodLocations = new LinkedHashMap<>(); final Set<StorageLocation> failedLocations = new HashSet<>(); final Map<StorageLocation, ListenableFuture<VolumeCheckResult>> futures = Maps.newHashMap(); // 获取本地文件系统。如果没有就创建一个新的 final LocalFileSystem localFS = FileSystem.getLocal(conf); final CheckContext context = new CheckContext(localFS, expectedPermission); // 在所有storagelocation上启动并行磁盘检查操作。 for (StorageLocation location : dataDirs) { goodLocations.put(location, true); // 对给定的Checkable安排异步检查。如果检查计划成功,则返回ListenableFuture。 Optional<ListenableFuture<VolumeCheckResult>> olf = delegateChecker.schedule(location, context); if (olf.isPresent()) { futures.put(location, olf.get()); } } if (maxVolumeFailuresTolerated >= dataDirs.size()) { throw new HadoopIllegalArgumentException("Invalid value configured for " + DFS_DATANODE_FAILED_VOLUMES_TOLERATED_KEY + " - " + maxVolumeFailuresTolerated + ". Value configured is >= " + "to the number of configured volumes (" + dataDirs.size() + ")."); } final long checkStartTimeMs = timer.monotonicNow(); // Retrieve the results of the disk checks. // 检索磁盘,检查磁盘状态是否健康 for (Map.Entry<StorageLocation, ListenableFuture<VolumeCheckResult>> entry : futures.entrySet()) { // Determine how much time we can allow for this check to complete. // The cumulative wait time cannot exceed maxAllowedTimeForCheck. final long waitSoFarMs = (timer.monotonicNow() - checkStartTimeMs); final long timeLeftMs = Math.max(0, maxAllowedTimeForCheckMs - waitSoFarMs); final StorageLocation location = entry.getKey(); try { final VolumeCheckResult result = entry.getValue().get(timeLeftMs, TimeUnit.MILLISECONDS); switch (result) { case HEALTHY: break; case DEGRADED: LOG.warn("StorageLocation {} appears to be degraded.", location); break; case FAILED: LOG.warn("StorageLocation {} detected as failed.", location); failedLocations.add(location); goodLocations.remove(location); break; default: LOG.error("Unexpected health check result {} for StorageLocation {}", result, location); } } catch (ExecutionException|TimeoutException e) { LOG.warn("Exception checking StorageLocation " + location, e.getCause()); failedLocations.add(location); goodLocations.remove(location); } } if (maxVolumeFailuresTolerated == DataNode.MAX_VOLUME_FAILURE_TOLERATED_LIMIT) { if (dataDirs.size() == failedLocations.size()) { throw new DiskErrorException("Too many failed volumes - " + "current valid volumes: " + goodLocations.size() + ", volumes configured: " + dataDirs.size() + ", volumes failed: " + failedLocations.size() + ", volume failures tolerated: " + maxVolumeFailuresTolerated); } } else { if (failedLocations.size() > maxVolumeFailuresTolerated) { throw new DiskErrorException("Too many failed volumes - " + "current valid volumes: " + goodLocations.size() + ", volumes configured: " + dataDirs.size() + ", volumes failed: " + failedLocations.size() + ", volume failures tolerated: " + maxVolumeFailuresTolerated); } } if (goodLocations.size() == 0) { throw new DiskErrorException("All directories in " + DFS_DATANODE_DATA_DIR_KEY + " are invalid: " + failedLocations); } return new ArrayList<>(goodLocations.keySet()); }
DataNode构造方法
// 给定一个配置、一个datadir数组和一个namenode代理,创建DataNode。 DataNode(final Configuration conf, final List<StorageLocation> dataDirs, final StorageLocationChecker storageLocationChecker, final SecureResources resources) throws IOException { // 将配置文件赋值到父类的静态变量中 super(conf); // 初始化Tracer,与NameNode中此处相比,仅传入参数有区别 this.tracer = createTracer(conf); // TracerConfigurationManager类提供了通过RPC协议在运行时管理跟踪器配置的函数。 this.tracerConfigurationManager = new TracerConfigurationManager(DATANODE_HTRACE_PREFIX, conf); // FileIoProvider这个类抽象出DataNode执行的各种文件IO操作, // 并在每个文件IO之前和之后调用概要分析(用于收集统计数据)和故障注入(用于测试)事件钩子。 // 通过DFSConfigKeys启用剖析和/或错误注入事件钩子,可以将行为注入到这些事件中。 this.fileIoProvider = new FileIoProvider(conf, this); // 初始化卷扫描,BlockScanner负责管理所有的VolumeScanner this.blockScanner = new BlockScanner(this); // 初始化各种配置参数 this.lastDiskErrorCheck = 0; this.maxNumberOfBlocksToLog = conf.getLong(DFS_MAX_NUM_BLOCKS_TO_LOG_KEY, DFS_MAX_NUM_BLOCKS_TO_LOG_DEFAULT); this.usersWithLocalPathAccess = Arrays.asList( conf.getTrimmedStrings(DFSConfigKeys.DFS_BLOCK_LOCAL_PATH_ACCESS_USER_KEY)); this.connectToDnViaHostname = conf.getBoolean( DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME, DFSConfigKeys.DFS_DATANODE_USE_DN_HOSTNAME_DEFAULT); this.supergroup = conf.get(DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_KEY, DFSConfigKeys.DFS_PERMISSIONS_SUPERUSERGROUP_DEFAULT); this.isPermissionEnabled = conf.getBoolean( DFSConfigKeys.DFS_PERMISSIONS_ENABLED_KEY, DFSConfigKeys.DFS_PERMISSIONS_ENABLED_DEFAULT); this.pipelineSupportECN = conf.getBoolean( DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED, DFSConfigKeys.DFS_PIPELINE_ECN_ENABLED_DEFAULT); confVersion = "core-" + conf.get("hadoop.common.configuration.version", "UNSPECIFIED") + ",hdfs-" + conf.get("hadoop.hdfs.configuration.version", "UNSPECIFIED"); // DatasetVolumeChecker: 对FsDatasetSpi的每个卷封装运行磁盘检查的类,并允许检索失败卷的列表。 // 这分离了最初跨DataNode、FsDatasetImpl和FsVolumeList实现的行为。 this.volumeChecker = new DatasetVolumeChecker(conf, new Timer()); // 创建了个ExecutorService,用于执行dataTransfer任务 // HadoopExecutors:ExecutorService、ScheduledExecutorService实例的工厂方法。这些执行器服务实例提供了额外的功能(例如记录未捕获的异常)。 // DataTransfer:是DataNode的内部类,用于传输一个数据块。这个类将一条数据发送到另一个DataNode。 this.xferService = HadoopExecutors.newCachedThreadPool(new Daemon.DaemonFactory()); // Determine whether we should try to pass file descriptors to clients. // 确定是否应该尝试将文件描述符传递给客户端。 if (conf.getBoolean(HdfsClientConfigKeys.Read.ShortCircuit.KEY, HdfsClientConfigKeys.Read.ShortCircuit.DEFAULT)) { String reason = DomainSocket.getLoadingFailureReason(); if (reason != null) { LOG.warn("File descriptor passing is disabled because {}", reason); this.fileDescriptorPassingDisabledReason = reason; } else { LOG.info("File descriptor passing is enabled."); this.fileDescriptorPassingDisabledReason = null; } } else { this.fileDescriptorPassingDisabledReason = "File descriptor passing was not configured."; LOG.debug(this.fileDescriptorPassingDisabledReason); } // 获取socket工厂,配置项为:hadoop.rpc.socket.factory.class.default, // 默认为:org.apache.hadoop.net.StandardSocketFactory this.socketFactory = NetUtils.getDefaultSocketFactory(conf); try { // 获取本datanode的主机名 hostName = getHostName(conf); LOG.info("Configured hostname is {}", hostName); // 启动datanode startDataNode(dataDirs, resources); } catch (IOException ie) { shutdown(); throw ie; } final int dncCacheMaxSize = conf.getInt(DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_KEY, DFS_DATANODE_NETWORK_COUNTS_CACHE_MAX_SIZE_DEFAULT) ; datanodeNetworkCounts = CacheBuilder.newBuilder() .maximumSize(dncCacheMaxSize) .build(new CacheLoader<String, Map<String, Long>>() { @Override public Map<String, Long> load(String key) throws Exception { final Map<String, Long> ret = new HashMap<String, Long>(); ret.put("networkErrors", 0L); return ret; } }); initOOBTimeout(); this.storageLocationChecker = storageLocationChecker; }
DataNode#startDataNode
// 此方法使用指定的conf启动数据节点,如果设置了conf的config_property_simulation属性,则创建一个模拟的基于存储的数据节点void startDataNode(List<StorageLocation> dataDirectories, SecureResources resources ) throws IOException { // settings global for all BPs in the Data Node this.secureResources = resources; synchronized (this) { this.dataDirs = dataDirectories; } // DNConf: 一个简单的类,封装了DataNode在启动时加载的所有配置。 this.dnConf = new DNConf(this); // 检查secure模式下的配置 checkSecureConfig(dnConf, getConf(), resources); // 检查DataNode给缓存使用的最大内存量是否在正常范围 if (dnConf.maxLockedMemory > 0) { if (!NativeIO.POSIX.getCacheManipulator().verifyCanMlock()) { throw new RuntimeException(String.format( "Cannot start datanode because the configured max locked memory" + " size (%s) is greater than zero and native code is not available.", DFS_DATANODE_MAX_LOCKE......原文转载:http://www.shaoqun.com/a/879536.html
跨境电商:https://www.ikjzd.com/
oklink:https://www.ikjzd.com/w/1362
贝贝网:https://www.ikjzd.com/w/1321
新蛋:https://www.ikjzd.com/w/79
执行start-dfs.sh脚本后,集群是如何启动的?本文阅读并注释了start-dfs脚本,以及datanode的启动主要流程流程源码。DataNode启动流程脚本代码分析start-dfs.sh中启动datanode的代码:#---------------------------------------------------------#datanodes(usingdefaultworke
2021连州地下河中高考生免费吗?连州地下河中考高考学生门票多少钱?:http://www.30bags.com/a/432216.html
2021连州全家去哪看桃花?湟川三峡2021连州桃花节怎么样?:http://www.30bags.com/a/432110.html
2021连州桃花节收费吗?连州湟川三峡桃花节怎么收费?:http://www.30bags.com/a/432109.html
2021连州中高生去哪里免费?中高考生连州免费景点?:http://www.30bags.com/a/432215.html
我被外国黑人3p过程 太粗太长弄死了我了:http://lady.shaoqun.com/a/256971.html
女班长扒开内裤让我们摸 我把女班长按到课桌上做:http://lady.shaoqun.com/a/247579.html
男朋友边摸边吃奶边做 小荡货夹得好紧太爽了:http://lady.shaoqun.com/a/247589.html
办公室里 男上司撕扯我的衣服:http://lady.shaoqun.com/m/a/106145.html
女性高潮来自阴蒂刺激占比最多,丈夫刺激阴蒂是如此重要! :http://lady.shaoqun.com/a/423161.html
床上七个小动作让你的男人晚上更爱你!尤其是第七个!:http://lady.shaoqun.com/a/423162.html
对美国士兵来说,有一夜情并呆在家里是老兵的特权:http://lady.shaoqun.com/a/423163.html
中东新电商扎堆,他们会成为中东版"阿里巴巴"吗?:https://www.ikjzd.com/articles/146650
没有评论:
发表评论