环境为hadoop 3.1.3
一、客户端
HDFS写流程源码分析(一)-客户端
二、NameNode端
(一)create
该方法用于创建一个文件。
首先找到NameNode的rpc服务端,进入NameNodeRpcServer.create()
。
public HdfsFileStatus create(String src, FsPermission masked,
String clientName, EnumSetWritable<CreateFlag> flag,
boolean createParent, short replication, long blockSize,
CryptoProtocolVersion[] supportedVersions, String ecPolicyName)
throws IOException {
checkNNStartup();
// 发起请求的客户端ip
String clientMachine = getClientMachine();
if (stateChangeLog.isDebugEnabled()) {
stateChangeLog.debug("*DIR* NameNode.create: file "
+src+" for "+clientName+" at "+clientMachine);
}
// 目录的长度(8000)和深度(1000)是否超出限制
if (!checkPathLength(src)) {
throw new IOException("create: Pathname too long. Limit "
+ MAX_PATH_LENGTH + " characters, " + MAX_PATH_DEPTH + " levels.");
}
// 当前NameNode的状态(active、backup、standby)是否支持该操作
namesystem.checkOperation(OperationCategory.WRITE);
CacheEntryWithPayload cacheEntry = RetryCache.waitForCompletion(retryCache, null);
if (cacheEntry != null && cacheEntry.isSuccess()) {
return (HdfsFileStatus) cacheEntry.getPayload();
}
HdfsFileStatus status = null;
try {
PermissionStatus perm = new PermissionStatus(getRemoteUser()
.getShortUserName(), null, masked);
// 创建文件
status = namesystem.startFile(src, perm, clientName, clientMachine,
flag.get(), createParent, replication, blockSize, supportedVersions,
ecPolicyName, cacheEntry != null);
} finally {
RetryCache.setState(cacheEntry, status != null, status);
}
metrics.incrFilesCreated();
metrics.incrCreateFileOps();
return status;
}
该方法创建了文件,并返回了fileId
以及权限等文件相关信息使客户端创建输出流。这里我们着重看FSNamesystem.startFile()
。
HdfsFileStatus startFile(String src, PermissionStatus permissions,
String holder, String clientMachine, EnumSet<CreateFlag> flag,
boolean createParent, short replication, long blockSize,
CryptoProtocolVersion[] supportedVersions, String ecPolicyName,
boolean logRetryCache) throws IOException {
HdfsFileStatus status;
try {
// 创建文件
status = startFileInt(src, permissions, holder, clientMachine, flag,
createParent, replication, blockSize, supportedVersions, ecPolicyName,
logRetryCache);
} catch (AccessControlException e) {
logAuditEvent(false, "create", src);
throw e;
}
logAuditEvent(true, "create", src, status);
return status;
}
不需要关注ecPolicy等相关参数,这是利用纠删码(Erasure Coding)实现条带式(striped)存储的方式,可以降低数据存储空间的开销,这里我们不考虑这些。继续看startFileInt()
。
private HdfsFileStatus startFileInt(String src,
PermissionStatus permissions, String holder, String clientMachine,
EnumSet<CreateFlag> flag, boolean createParent, short replication,
long blockSize, CryptoProtocolVersion[] supportedVersions,
String ecPolicyName, boolean logRetryCache) throws IOException {
if (NameNode.stateChangeLog.isDebugEnabled()) {
StringBuilder builder = new StringBuilder();
builder.append("DIR* NameSystem.startFile: src=").append(src)
.append(", holder=").append(holder)
.append(", clientMachine=").append(clientMachine)
.append(", createParent=").append(createParent)
.append(", replication=").append(replication)
.append(", createFlag=").append(flag)
.append(", blockSize=").append(blockSize)
.append(", supportedVersions=")
.append(Arrays.toString(supportedVersions));
NameNode.stateChangeLog.debug(builder.toString());
}
if (!DFSUtil.isValidName(src) || // 路径是否合法
FSDirectory.isExactReservedName(src) || // 路径是否是reserved的
(FSDirectory.isReservedName(src) // 同上
&& !FSDirectory.isReservedRawName(src) // 是否是预留raw
&& !FSDirectory.isReservedInodesName(src))) { // 是否是预留inode
throw new InvalidPathException(src);
}
boolean shouldReplicate = flag.contains(CreateFlag.SHOULD_REPLICATE);
if (shouldReplicate &&
(!org.apache.commons.lang.StringUtils.isEmpty(ecPolicyName))) {
throw new HadoopIllegalArgumentException("SHOULD_REPLICATE flag and " +
"ecPolicyName are exclusive parameters. Set both is not allowed!");
}
INodesInPath iip = null;
boolean skipSync = true; // until we do something that might create edits
HdfsFileStatus stat = null;
BlocksMapUpdateInfo toRemoveBlocks = null;
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot create file" + src);
// 获取路径中的inodes,INodesInPath中包含了从根目录到当前文件的各级inode信息
iip = FSDirWriteFileOp.resolvePathForStartFile(
dir, pc, src, flag, createParent);
if (blockSize < minBlockSize) {
throw new IOException("Specified block size is less than configured" +
" minimum value (" + DFSConfigKeys.DFS_NAMENODE_MIN_BLOCK_SIZE_KEY
+ "): " + blockSize + " < " + minBlockSize);
}
if (shouldReplicate) {
blockManager.verifyReplication(src, replication, clientMachine);
} else {
final ErasureCodingPolicy ecPolicy = FSDirErasureCodingOp
.getErasureCodingPolicy(this, ecPolicyName, iip);
if (ecPolicy != null && (!ecPolicy.isReplicationPolicy())) {
checkErasureCodingSupported("createWithEC");
if (blockSize < ecPolicy.getCellSize()) {
throw new IOException("Specified block size (" + blockSize
+ ") is less than the cell size (" + ecPolicy.getCellSize()
+") of the erasure coding policy (" + ecPolicy + ").");
}
} else {
// 判断副本数是否超出配置文件设置的限制
blockManager.verifyReplication(src, replication, clientMachine);
}
}
FileEncryptionInfo feInfo = null;
if (!iip.isRaw() && provider != null) {
EncryptionKeyInfo ezInfo = FSDirEncryptionZoneOp.getEncryptionKeyInfo(
this, iip, supportedVersions);
// if the path has an encryption zone, the lock was released while
// generating the EDEK. re-resolve the path to ensure the namesystem
// and/or EZ has not mutated
if (ezInfo != null) {
checkOperation(OperationCategory.WRITE);
iip = FSDirWriteFileOp.resolvePathForStartFile(
dir, pc, iip.getPath(), flag, createParent);
feInfo = FSDirEncryptionZoneOp.getFileEncryptionInfo(
dir, iip, ezInfo);
}
}
skipSync = false; // following might generate edits
toRemoveBlocks = new BlocksMapUpdateInfo();
// 目录上写锁
dir.writeLock();
try {
// 创建文件
stat = FSDirWriteFileOp.startFile(this, iip, permissions, holder,
clientMachine, flag, createParent, replication, blockSize, feInfo,
toRemoveBlocks, shouldReplicate, ecPolicyName, logRetryCache);
} catch (IOException e) {
skipSync = e instanceof StandbyException;
throw e;
} finally {
dir.writeUnlock();
}
} finally {
writeUnlock("create");
// There might be transactions logged while trying to recover the lease.
// They need to be sync'ed even when an exception was thrown.
if (!skipSync) {
// edit log落盘,实际上就是预写日志
getEditLog().logSync();
// 如果覆盖文件,则需要清理对应block
if (toRemoveBlocks != null) {
removeBlocks(toRemoveBlocks);
toRemoveBlocks.clear();
}
}
}
return stat;
}
着重看FSDirWriteFileOp.startFile()
。
static HdfsFileStatus startFile(
FSNamesystem fsn, INodesInPath iip,
PermissionStatus permissions, String holder, String clientMachine,
EnumSet<CreateFlag> flag, boolean createParent,
short replication, long blockSize,
FileEncryptionInfo feInfo, INode.BlocksMapUpdateInfo toRemoveBlocks,
boolean shouldReplicate, String ecPolicyName, boolean logRetryEntry)
throws IOException {
assert fsn.hasWriteLock();
boolean overwrite = flag.contains(CreateFlag.OVERWRITE);
boolean isLazyPersist = flag.contains(CreateFlag.LAZY_PERSIST);
final String src = iip.getPath();
// 目录树
FSDirectory fsd = fsn.getFSDirectory();
// 如果目标文件是已存在的
if (iip.getLastINode() != null) {
// 覆盖
if (overwrite) {
List<INode> toRemoveINodes = new ChunkedArrayList<>();
List<Long> toRemoveUCFiles = new ChunkedArrayList<>();
// 1、将文件从命名空间中移除
// 2、删除文件对应block
// toRemoveBlocks将在删除流程没出错的情况下在上级方法删除
long ret = FSDirDeleteOp.delete(fsd, iip, toRemoveBlocks,toRemoveINodes, toRemoveUCFiles, now());
if (ret >= 0) {
// 将INodesInPath中最后一级inode删掉,即被overwrite的文件
iip = INodesInPath.replace(iip, iip.length() - 1, null);
FSDirDeleteOp.incrDeletedFileCount(ret);
// 删除lease,将inode移除
fsn.removeLeasesAndINodes(toRemoveUCFiles, toRemoveINodes, true);
}
} else {
// If lease soft limit time is expired, recover the lease
fsn.recoverLeaseInternal(FSNamesystem.RecoverLeaseOp.CREATE_FILE, iip,
src, holder, clientMachine, false);
throw new FileAlreadyExistsException(src + " for client " +
clientMachine + " already exists");
}
}
// object(inode、block)数量是否超出限制
fsn.checkFsObjectLimit();
INodeFile newNode = null;
INodesInPath parent =
FSDirMkdirOp.createAncestorDirectories(fsd, iip, permissions);
if (parent != null) {
// 如果父目录不为空,创建目标文件
iip = addFile(fsd, parent, iip.getLastLocalName(), permissions,
replication, blockSize, holder, clientMachine, shouldReplicate,
ecPolicyName);
newNode = iip != null ? iip.getLastINode().asFile() : null;
}
if (newNode == null) {
throw new IOException("Unable to add " + src + " to namespace");
}
fsn.leaseManager.addLease( // 上lease,clientName -> files
newNode.getFileUnderConstructionFeature().getClientName(),
newNode.getId());
if (feInfo != null) {
FSDirEncryptionZoneOp.setFileEncryptionInfo(fsd, iip, feInfo,
XAttrSetFlag.CREATE);
}
// 设置存储策略
setNewINodeStoragePolicy(fsd.getBlockManager(), iip, isLazyPersist);
// 预写日志
fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.startFile: added " +
src + " inode " + newNode.getId() + " " + holder);
}
return FSDirStatAndListingOp.getFileInfo(fsd, iip, false, false);
}
继续看addFile()
。
private static INodesInPath addFile(
FSDirectory fsd, INodesInPath existing, byte[] localName,
PermissionStatus permissions, short replication, long preferredBlockSize,
String clientName, String clientMachine, boolean shouldReplicate,
String ecPolicyName) throws IOException {
Preconditions.checkNotNull(existing);
long modTime = now();
INodesInPath newiip;
fsd.writeLock();
try {
boolean isStriped = false;
ErasureCodingPolicy ecPolicy = null;
if (!shouldReplicate) {
ecPolicy = FSDirErasureCodingOp.getErasureCodingPolicy(
fsd.getFSNamesystem(), ecPolicyName, existing);
if (ecPolicy != null && (!ecPolicy.isReplicationPolicy())) {
isStriped = true;
}
}
final BlockType blockType = isStriped ?
BlockType.STRIPED : BlockType.CONTIGUOUS;
final Short replicationFactor = (!isStriped ? replication : null);
final Byte ecPolicyID = (isStriped ? ecPolicy.getId() : null);
// 创建inode
INodeFile newNode = newINodeFile(fsd.allocateNewInodeId(), permissions,
modTime, modTime, replicationFactor, ecPolicyID, preferredBlockSize,
blockType);
newNode.setLocalName(localName);
newNode.toUnderConstruction(clientName, clientMachine);
// 将inode加入命名空间中
newiip = fsd.addINode(existing, newNode, permissions.getPermission());
} finally {
fsd.writeUnlock();
}
if (newiip == null) {
NameNode.stateChangeLog.info("DIR* addFile: failed to add " +
existing.getPath() + "/" + DFSUtil.bytes2String(localName));
return null;
}
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* addFile: " +
DFSUtil.bytes2String(localName) + " is added");
}
return newiip;
}
在该方法中,创建了目标文件的inode
,并将其加入目录树中。
(二)addBlock
该方法用于申请一个块,选择并排序存取其的DataNode。
首先看NameNodeRpcServer
的addBlock()
方法,这是rpc的server端实现。
public LocatedBlock addBlock(String src, String clientName,
ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId,
String[] favoredNodes, EnumSet<AddBlockFlag> addBlockFlags)
throws IOException {
// NameNode是否完全启动
checkNNStartup();
// 申请block并获取其存储的DataNode
LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId,
clientName, previous, excludedNodes, favoredNodes, addBlockFlags);
if (locatedBlock != null) {
metrics.incrAddBlockOps();
}
return locatedBlock;
}
进入namesystem.getAdditionalBlock()
方法。
LocatedBlock getAdditionalBlock(
String src, long fileId, String clientName, ExtendedBlock previous,
DatanodeInfo[] excludedNodes, String[] favoredNodes,
EnumSet<AddBlockFlag> flags) throws IOException {
final String operationName = "getAdditionalBlock";
NameNode.stateChangeLog.debug("BLOCK* getAdditionalBlock: {} inodeId {}" +
" for {}", src, fileId, clientName);
// 用于判断当前块是不是重试块
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
FSDirWriteFileOp.ValidateAddBlockResult r;
// 检查NameNode当前状态(Active Backup StandBy)是否可以执行read操作
checkOperation(OperationCategory.READ);
final FSPermissionChecker pc = getPermissionChecker();
readLock();
try {
checkOperation(OperationCategory.READ);
// 1、是否可以添加block
// 2、是否有潜在的重试块
// 3、分配DataNode
r = FSDirWriteFileOp.validateAddBlock(this, pc, src, fileId, clientName,
previous, onRetryBlock);
} finally {
readUnlock(operationName);
}
// 如果是重试块,直接返回该块
if (r == null) {
assert onRetryBlock[0] != null : "Retry block is null";
// This is a retry. Just return the last block.
return onRetryBlock[0];
}
// 选择目标存储节点
DatanodeStorageInfo[] targets = FSDirWriteFileOp.chooseTargetForNewBlock(
blockManager, src, excludedNodes, favoredNodes, flags, r);
checkOperation(OperationCategory.WRITE);
writeLock();
LocatedBlock lb;
try {
checkOperation(OperationCategory.WRITE);
// block加入blocksMap,记录DataNode正在传输的block数等操作
lb = FSDirWriteFileOp.storeAllocatedBlock(
this, src, fileId, clientName, previous, targets);
} finally {
writeUnlock(operationName);
}
getEditLog().logSync();
return lb;
}
这个方法做了许多事,我们一个个来看。首先是FSDirWriteFileOp.validateAddBlock()
。
static ValidateAddBlockResult validateAddBlock(
FSNamesystem fsn, FSPermissionChecker pc,
String src, long fileId, String clientName,
ExtendedBlock previous, LocatedBlock[] onRetryBlock) throws IOException {
final long blockSize;
final short numTargets;
final byte storagePolicyID;
String clientMachine;
final BlockType blockType;
// 获取从根目录到目标文件每级的inode
INodesInPath iip = fsn.dir.resolvePath(pc, src, fileId);
/*
* 分析文件状态:
* 1、判断上一个块和当前名称空间是否为同一个block pool
* 2、判断object(inode及block)数是否超出限制
* 3、检查lease(单写多读)
* 4、校验多种情况下前一块是否合格以及是否为重试块
*/
FileState fileState = analyzeFileState(fsn, iip, fileId, clientName,
previous, onRetryBlock);
if (onRetryBlock[0] != null && onRetryBlock[0].getLocations().length > 0) {
// This is a retry. No need to generate new locations.
// Use the last block if it has locations.
return null;
}
final INodeFile pendingFile = fileState.inode;
// 是否可以添加新块,这个方法在complete rpc调用中会着重讲
if (!fsn.checkFileProgress(src, pendingFile, false)) {
throw new NotReplicatedYetException("Not replicated yet: " + src);
}
// 文件过大
if (pendingFile.getBlocks().length >= fsn.maxBlocksPerFile) {
throw new IOException("File has reached the limit on maximum number of"
+ " blocks (" + DFSConfigKeys.DFS_NAMENODE_MAX_BLOCKS_PER_FILE_KEY
+ "): " + pendingFile.getBlocks().length + " >= "
+ fsn.maxBlocksPerFile);
}
blockSize = pendingFile.getPreferredBlockSize(); // 块大小,128MB
clientMachine = pendingFile.getFileUnderConstructionFeature() // 客户端IP
.getClientMachine();
// 块类型
// CONTIGUOUS:连续存储,一般是用这个
// STRIPED:条带化,用纠删码存储,减少存储空间
blockType = pendingFile.getBlockType();
ErasureCodingPolicy ecPolicy = null;
// 条带化存储纠删码相关
if (blockType == BlockType.STRIPED) {
ecPolicy =
FSDirErasureCodingOp.unprotectedGetErasureCodingPolicy(fsn, iip);
numTargets = (short) (ecPolicy.getSchema().getNumDataUnits()
+ ecPolicy.getSchema().getNumParityUnits());
} else {
// 需要的副本数量
numTargets = pendingFile.getFileReplication();
}
storagePolicyID = pendingFile.getStoragePolicyID();
return new ValidateAddBlockResult(blockSize, numTargets, storagePolicyID,
clientMachine, blockType, ecPolicy);
}
该方法主要验证了文件状态(判断上一个块和当前名称空间是否为同一个block pool、判断object(inode及block)数是否超出限制、检查lease(单写多读)、校验多种情况下前一块是否合格以及是否为重试块),并封装了块相关信息。接下来回到上级方法,看FSDirWriteFileOp.chooseTargetForNewBlock()
。
static DatanodeStorageInfo[] chooseTargetForNewBlock(
BlockManager bm, String src, DatanodeInfo[] excludedNodes,
String[] favoredNodes, EnumSet<AddBlockFlag> flags,
ValidateAddBlockResult r) throws IOException {
Node clientNode = null;
boolean ignoreClientLocality = (flags != null
&& flags.contains(AddBlockFlag.IGNORE_CLIENT_LOCALITY));
// If client locality is ignored, clientNode remains 'null' to indicate
// 是否考虑客户端本机,因为客户端有可能也是DataNode
if (!ignoreClientLocality) {
clientNode = bm.getDatanodeManager().getDatanodeByHost(r.clientMachine);
if (clientNode == null) {
clientNode = getClientNode(bm, r.clientMachine);
}
}
// 排除的DataNode
Set<Node> excludedNodesSet =
(excludedNodes == null) ? new HashSet<>()
: new HashSet<>(Arrays.asList(excludedNodes));
// 倾向的DataNode
List<String> favoredNodesList =
(favoredNodes == null) ? Collections.emptyList()
: Arrays.asList(favoredNodes);
// choose targets for the new block to be allocated.
// 选择DataNodes
return bm.chooseTarget4NewBlock(src, r.numTargets, clientNode,
excludedNodesSet, r.blockSize,
favoredNodesList, r.storagePolicyID,
r.blockType, r.ecPolicy, flags);
}
这个方法主要选择用于存储该block的DataNode。其中excludedNodes
和favoredNodes
都由客户端决定,比如,当客户端尝试连接NameNode对某块分配的DataNode但发现连不上时,就会将该DataNode加入excludedNodes
并重新调用addBlock
分配block,以避免选择客户端不可达的DataNode作为副本。然后进入bm.chooseTarget4NewBlock()
。
public DatanodeStorageInfo[] chooseTarget4NewBlock(final String src,
final int numOfReplicas, final Node client,
final Set<Node> excludedNodes,
final long blocksize,
final List<String> favoredNodes,
final byte storagePolicyID,
final BlockType blockType,
final ErasureCodingPolicy ecPolicy,
final EnumSet<AddBlockFlag> flags) throws IOException {
// 优先选择节点
List<DatanodeDescriptor> favoredDatanodeDescriptors =
getDatanodeDescriptors(favoredNodes);
// 异构存储策略,用于选择不同类型的存储类型
// 默认为HOT,所有副本都保存到DISK类型的存储介质中
final BlockStoragePolicy storagePolicy =
storagePolicySuite.getPolicy(storagePolicyID);
// 块放置策略,CONTIGUOUS类型块的默认放置策略(为BlockPlacementPolicyDefault)为:
// 第一个副本为client本机(如果client为DataNode),第二个副本从其它机架中随机选择,
// 第三个副本在第二个副本同机架中随机选择,如果副本数量大于3,剩下的副本都随机选择
final BlockPlacementPolicy blockplacement =
placementPolicies.getPolicy(blockType);
// 选择DataNode
final DatanodeStorageInfo[] targets = blockplacement.chooseTarget(src,
numOfReplicas, client, excludedNodes, blocksize,
favoredDatanodeDescriptors, storagePolicy, flags);
final String errorMessage = "File %s could only be written to %d of " +
"the %d %s. There are %d datanode(s) running and %s "
+ "node(s) are excluded in this operation.";
if (blockType == BlockType.CONTIGUOUS && targets.length < minReplication) {
throw new IOException(String.format(errorMessage, src,
targets.length, minReplication, "minReplication nodes",
getDatanodeManager().getNetworkTopology().getNumOfLeaves(),
(excludedNodes == null? "no": excludedNodes.size())));
} else if (blockType == BlockType.STRIPED &&
targets.length < ecPolicy.getNumDataUnits()) {
throw new IOException(
String.format(errorMessage, src, targets.length,
ecPolicy.getNumDataUnits(),
String.format("required nodes for %s", ecPolicy.getName()),
getDatanodeManager().getNetworkTopology().getNumOfLeaves(),
(excludedNodes == null ? "no" : excludedNodes.size())));
}
return targets;
}
该方法选择了异构存储策略以及块置放策略,并基于这些策略选择合适的DataNode。进入blockplacement.chooseTarget()
(BlockPlacementPolicyDefault)。
private DatanodeStorageInfo[] chooseTarget(int numOfReplicas,
Node writer,
List<DatanodeStorageInfo> chosenStorage,
boolean returnChosenNodes,
Set<Node> excludedNodes,
long blocksize,
final BlockStoragePolicy storagePolicy,
EnumSet<AddBlockFlag> addBlockFlags,
EnumMap<StorageType, Integer> sTypes) {
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
return DatanodeStorageInfo.EMPTY_ARRAY;
}
if (excludedNodes == null) {
excludedNodes = new HashSet<>();
}
// 获取 idx0->待分配的节点数;idx1->单机架上最多分配几个副本
int[] result = getMaxNodesPerRack(chosenStorage.size(), numOfReplicas);
numOfReplicas = result[0];
int maxNodesPerRack = result[1];
// 将已选择的节点加入ExcludedNodes防止重复选择
for (DatanodeStorageInfo storage : chosenStorage) {
// add localMachine and related nodes to excludedNodes
addToExcludedNodes(storage.getDatanodeDescriptor(), excludedNodes);
}
List<DatanodeStorageInfo> results = null;
Node localNode = null;
// 是否忽略stale的节点(NameNode一定时间没收到其心跳)
boolean avoidStaleNodes = (stats != null
&& stats.isAvoidingStaleDataNodesForWrite());
// 是否排除本机
boolean avoidLocalNode = (addBlockFlags != null
&& addBlockFlags.contains(AddBlockFlag.NO_LOCAL_WRITE)
&& writer != null
&& !excludedNodes.contains(writer));
// Attempt to exclude local node if the client suggests so. If no enough
// nodes can be obtained, it falls back to the default block placement
// policy.
if (avoidLocalNode) {
results = new ArrayList<>(chosenStorage);
Set<Node> excludedNodeCopy = new HashSet<>(excludedNodes);
if (writer != null) {
// 排除本机
excludedNodeCopy.add(writer);
}
localNode = chooseTarget(numOfReplicas, writer,
excludedNodeCopy, blocksize, maxNodesPerRack, results,
avoidStaleNodes, storagePolicy,
EnumSet.noneOf(StorageType.class), results.isEmpty(), sTypes);
if (results.size() < numOfReplicas) {
// not enough nodes; discard results and fall back
results = null;
}
}
if (results == null) {
results = new ArrayList<>(chosenStorage);
// 获取节点
localNode = chooseTarget(numOfReplicas, writer, excludedNodes,
blocksize, maxNodesPerRack, results, avoidStaleNodes,
storagePolicy, EnumSet.noneOf(StorageType.class), results.isEmpty(),
sTypes);
}
if (!returnChosenNodes) {
results.removeAll(chosenStorage);
}
// sorting nodes to form a pipeline
// 根据网络距离排序
return getPipeline(
(writer != null && writer instanceof DatanodeDescriptor) ? writer
: localNode,
results.toArray(new DatanodeStorageInfo[results.size()]));
}
这里着重关注chooseTarget()
和getPipeline()
。首先是chooseTarget()
。
private Node chooseTarget(int numOfReplicas,
Node writer,
final Set<Node> excludedNodes,
final long blocksize,
final int maxNodesPerRack,
final List<DatanodeStorageInfo> results,
final boolean avoidStaleNodes,
final BlockStoragePolicy storagePolicy,
final EnumSet<StorageType> unavailableStorages,
final boolean newBlock,
EnumMap<StorageType, Integer> storageTypes) {
if (numOfReplicas == 0 || clusterMap.getNumOfLeaves()==0) {
return (writer instanceof DatanodeDescriptor) ? writer : null;
}
final int numOfResults = results.size();
final int totalReplicasExpected = numOfReplicas + numOfResults;
if ((writer == null || !(writer instanceof DatanodeDescriptor)) && !newBlock) {
writer = results.get(0).getDatanodeDescriptor();
}
// Keep a copy of original excludedNodes
final Set<Node> oldExcludedNodes = new HashSet<>(excludedNodes);
// choose storage types; use fallbacks for unavailable storages
// 获取存储类型,根据默认策略,这里返回三个DISK(三个节点都存到DISK里)
final List<StorageType> requiredStorageTypes = storagePolicy
.chooseStorageTypes((short) totalReplicasExpected,
DatanodeStorageInfo.toStorageTypes(results),
unavailableStorages, newBlock);
if (storageTypes == null) {
// 这里转换为 type -> count , 例:DISK -> 3
storageTypes = getRequiredStorageTypes(requiredStorageTypes);
}
if (LOG.isTraceEnabled()) {
LOG.trace("storageTypes=" + storageTypes);
}
try {
if ((numOfReplicas = requiredStorageTypes.size()) == 0) {
throw new NotEnoughReplicasException(
"All required storage types are unavailable: "
+ " unavailableStorages=" + unavailableStorages
+ ", storagePolicy=" + storagePolicy);
}
// 按序选择节点
writer = chooseTargetInOrder(numOfReplicas, writer, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, newBlock, storageTypes);
} catch (NotEnoughReplicasException e) {
final String message = "Failed to place enough replicas, still in need of "
+ (totalReplicasExpected - results.size()) + " to reach "
+ totalReplicasExpected
+ " (unavailableStorages=" + unavailableStorages
+ ", storagePolicy=" + storagePolicy
+ ", newBlock=" + newBlock + ")";
if (LOG.isTraceEnabled()) {
LOG.trace(message, e);
} else {
LOG.warn(message + " " + e.getMessage());
}
// 如果避免stale的节点,重新选取
if (avoidStaleNodes) {
// Retry chooseTarget again, this time not avoiding stale nodes.
// excludedNodes contains the initial excludedNodes and nodes that were
// not chosen because they were stale, decommissioned, etc.
// We need to additionally exclude the nodes that were added to the
// result list in the successful calls to choose*() above.
for (DatanodeStorageInfo resultStorage : results) {
addToExcludedNodes(resultStorage.getDatanodeDescriptor(), oldExcludedNodes);
}
// Set numOfReplicas, since it can get out of sync with the result list
// if the NotEnoughReplicasException was thrown in chooseRandom().
numOfReplicas = totalReplicasExpected - results.size();
return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
maxNodesPerRack, results, false, storagePolicy, unavailableStorages,
newBlock, null);
}
boolean retry = false;
// simply add all the remaining types into unavailableStorages and give
// another try. No best effort is guaranteed here.
for (StorageType type : storageTypes.keySet()) {
if (!unavailableStorages.contains(type)) {
unavailableStorages.add(type);
retry = true;
}
}
if (retry) {
for (DatanodeStorageInfo resultStorage : results) {
addToExcludedNodes(resultStorage.getDatanodeDescriptor(),
oldExcludedNodes);
}
numOfReplicas = totalReplicasExpected - results.size();
return chooseTarget(numOfReplicas, writer, oldExcludedNodes, blocksize,
maxNodesPerRack, results, false, storagePolicy, unavailableStorages,
newBlock, null);
}
}
return writer;
}
进入chooseTargetInOrder()
。
protected Node chooseTargetInOrder(int numOfReplicas,
Node writer,
final Set<Node> excludedNodes,
final long blocksize,
final int maxNodesPerRack,
final List<DatanodeStorageInfo> results,
final boolean avoidStaleNodes,
final boolean newBlock,
EnumMap<StorageType, Integer> storageTypes)
throws NotEnoughReplicasException {
final int numOfResults = results.size();
if (numOfResults == 0) {
/*
* 1.如果本机使DataNode,直接选本机;
* 2.如果不是,则在本机架随机选一个;
* 3.如果随机选的节点不满足条件(stale、负载大于平均负载的两倍(isGoodDatanode()方法)、空间不足等),则在所有节点中随机选择一个
*/
DatanodeStorageInfo storageInfo = chooseLocalStorage(writer,
excludedNodes, blocksize, maxNodesPerRack, results, avoidStaleNodes,
storageTypes, true);
writer = (storageInfo != null) ? storageInfo.getDatanodeDescriptor()
: null;
// 如果只要求一个副本,直接返回
if (--numOfReplicas == 0) {
return writer;
}
}
final DatanodeDescriptor dn0 = results.get(0).getDatanodeDescriptor();
if (numOfResults <= 1) {
// 第二个节点要在不同的机架上选取
chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageTypes);
if (--numOfReplicas == 0) {
return writer;
}
}
if (numOfResults <= 2) {
final DatanodeDescriptor dn1 = results.get(1).getDatanodeDescriptor();
if (clusterMap.isOnSameRack(dn0, dn1)) {
// 如果前两个节点在同一机架,第三个节点尝试选择其它机架上的
chooseRemoteRack(1, dn0, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageTypes);
} else if (newBlock){
// new block 如果前两个节点不在同一机架,且这是个新块,第三个节点选择第二个节点相同机架上的
chooseLocalRack(dn1, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageTypes);
} else {
// 否则第三个节点选择第一个节点相同机架上的
chooseLocalRack(writer, excludedNodes, blocksize, maxNodesPerRack,
results, avoidStaleNodes, storageTypes);
}
if (--numOfReplicas == 0) {
return writer;
}
}
// 如果副本总数大于3,剩下的副本随机选择
chooseRandom(numOfReplicas, NodeBase.ROOT, excludedNodes, blocksize,
maxNodesPerRack, results, avoidStaleNodes, storageTypes);
return writer;
}
该方法是默认放置策略的实现,简单来说:
- 第一个节点选择本机,不行的话(客户端不是DataNode)选择本机架的任意节点,还不行的话(一般不会)随机选择一个节点
- 第二个节点选择与第一个节点不同机架上的随机节点
- 第三个节点选择与第二个节点同机架上的另一节点
- 如果需求副本数大于3,剩下节点随机选取
然后回到getPipeline()
,该方法返回的顺序就是复制链的顺序。
private DatanodeStorageInfo[] getPipeline(Node writer,
DatanodeStorageInfo[] storages) {
if (storages.length == 0) {
return storages;
}
synchronized(clusterMap) {
int index=0;
if (writer == null || !clusterMap.contains(writer)) {
writer = storages[0].getDatanodeDescriptor();
}
/*
* 其实就是从writer出发,根据网络拓补距离的贪心算法,
* 找到离writer最近的节点A,将其连接,使A作为writer继续
* 找最近的节点,以此循环
*/
for(; index < storages.length; index++) {
DatanodeStorageInfo shortestStorage = storages[index];
// writer(即客户端,写数据的节点)到选定DataNode的网络拓补距离,
// 即网络树中两个节点的距离,比如同机架下的两个节点,它们具有相同的父节点(交换机),
// 所以它们的距离为2
int shortestDistance = clusterMap.getDistance(writer,
shortestStorage.getDatanodeDescriptor());
int shortestIndex = index;
for(int i = index + 1; i < storages.length; i++) {
int currentDistance = clusterMap.getDistance(writer,
storages[i].getDatanodeDescriptor());
if (shortestDistance>currentDistance) {
shortestDistance = currentDistance;
shortestStorage = storages[i];
shortestIndex = i;
}
}
//switch position index & shortestIndex
if (index != shortestIndex) {
storages[shortestIndex] = storages[index];
storages[index] = shortestStorage;
}
writer = shortestStorage.getDatanodeDescriptor();
}
}
return storages;
}
其实就是从writer出发,根据基于网络拓补距离的贪心算法,找到离writer最近的节点A,将其连接,使A作为writer继续找最近的节点,以此循环。网络拓补距离即网络树中两个节点的距离,比如同机架下的两个节点,它们具有相同的父节点(交换机),所以它们的距离为2。
最后回到getAdditionalBlock()
,继续看FSDirWriteFileOp.storeAllocatedBlock()
。
static LocatedBlock storeAllocatedBlock(FSNamesystem fsn, String src,
long fileId, String clientName, ExtendedBlock previous,
DatanodeStorageInfo[] targets) throws IOException {
long offset;
// Run the full analysis again, since things could have changed
// while chooseTarget() was executing.
// 这里重新走了一遍文件分析,与之前一样
LocatedBlock[] onRetryBlock = new LocatedBlock[1];
INodesInPath iip = fsn.dir.resolvePath(null, src, fileId);
FileState fileState = analyzeFileState(fsn, iip, fileId, clientName,
previous, onRetryBlock);
final INodeFile pendingFile = fileState.inode;
src = fileState.path;
if (onRetryBlock[0] != null) {
if (onRetryBlock[0].getLocations().length > 0) {
// This is a retry. Just return the last block if having locations.
return onRetryBlock[0];
} else {
// add new chosen targets to already allocated block and return
BlockInfo lastBlockInFile = pendingFile.getLastBlock();
lastBlockInFile.getUnderConstructionFeature().setExpectedLocations(
lastBlockInFile, targets, pendingFile.getBlockType());
offset = pendingFile.computeFileSize();
return makeLocatedBlock(fsn, lastBlockInFile, targets, offset);
}
}
// commit the last block and complete it if it has minimum replicas
// 提交或者完成前一个block
// 需要区分下commit和complete的区别
// commit:客户端报告其已经完成该块所有数据的传输,但DataNode还没有增量报告给NameNode其block信息
// complete:NameNode已经接收到了满足配置文件中要求的最小副本数的DataNode汇报其拥有此块
// 所以complete的条件比commit严格
// 该方法也会在complete rpc中详细解释
fsn.commitOrCompleteLastBlock(pendingFile, fileState.iip,
ExtendedBlock.getLocalBlock(previous));
// allocate new block, record block locations in INode.
final BlockType blockType = pendingFile.getBlockType();
// allocate new block, record block locations in INode.
Block newBlock = fsn.createNewBlock(blockType);
INodesInPath inodesInPath = INodesInPath.fromINode(pendingFile);
// block加入blocksMap,记录DataNode正在传输的block数
// 需要注意,这里更新了相关DataNode中各种存储类型(DISK、SSD等)正在传输的块数,
// 防止超发(选择DataNode时会根据剩余空间和正在传输的块数来判断空间是否足够容纳新的block)
saveAllocatedBlock(fsn, src, inodesInPath, newBlock, targets, blockType);
// 元数据edit log
persistNewBlock(fsn, src, pendingFile);
offset = pendingFile.computeFileSize();
// Return located block
return makeLocatedBlock(fsn, fsn.getStoredBlock(newBlock), targets, offset);
}
至此addBlock()
方法结束。
(三)complete
该方法用于完成对文件的操作。 在客户端关闭输出流时调用。
首先看NameNodeRpcServer的complete()方法,这是rpc的server端实现。
public boolean complete(String src, String clientName,
ExtendedBlock last, long fileId)
throws IOException {
checkNNStartup();
return namesystem.completeFile(src, clientName, last, fileId);
}
last
为该文件的最后一块。进入namesystem.completeFile()
。
boolean completeFile(final String src, String holder,
ExtendedBlock last, long fileId)
throws IOException {
boolean success = false;
checkOperation(OperationCategory.WRITE);
final FSPermissionChecker pc = getPermissionChecker();
writeLock();
try {
// NameNode当前状态是否能处理写操作
checkOperation(OperationCategory.WRITE);
checkNameNodeSafeMode("Cannot complete file " + src);
// complete文件
success = FSDirWriteFileOp.completeFile(this, pc, src, holder, last,
fileId);
} finally {
writeUnlock("completeFile");
}
// edit log落盘
getEditLog().logSync();
if (success) {
NameNode.stateChangeLog.info("DIR* completeFile: " + src
+ " is closed by " + holder);
}
return success;
}
进入FSDirWriteFileOp.completeFile()
。
static boolean completeFile(FSNamesystem fsn, FSPermissionChecker pc,
final String srcArg, String holder, ExtendedBlock last, long fileId)
throws IOException {
String src = srcArg;
if (NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug("DIR* NameSystem.completeFile: " +
src + " for " + holder);
}
// 检查 block pool id
checkBlock(fsn, last);
// 获取从根目录到目标文件的每一级的inode
INodesInPath iip = fsn.dir.resolvePath(pc, src, fileId);
// 完成文件
return completeFileInternal(fsn, iip, holder,
ExtendedBlock.getLocalBlock(last), fileId);
}
进入completeFileInternal()
。
private static boolean completeFileInternal(
FSNamesystem fsn, INodesInPath iip,
String holder, Block last, long fileId)
throws IOException {
assert fsn.hasWriteLock();
final String src = iip.getPath();
final INodeFile pendingFile;
INode inode = null;
try {
// 目标文件的inode
inode = iip.getLastINode();
// 检查lease,inode 2 inodefile
pendingFile = fsn.checkLease(iip, holder, fileId);
} catch (LeaseExpiredException lee) {
if (inode != null && inode.isFile() &&
!inode.asFile().isUnderConstruction()) {
// This could be a retry RPC - i.e the client tried to close
// the file, but missed the RPC response. Thus, it is trying
// again to close the file. If the file still exists and
// the client's view of the last block matches the actual
// last block, then we'll treat it as a successful close.
// See HDFS-3031.
final Block realLastBlock = inode.asFile().getLastBlock();
if (Block.matchingIdAndGenStamp(last, realLastBlock)) {
NameNode.stateChangeLog.info("DIR* completeFile: " +
"request from " + holder + " to complete inode " + fileId +
"(" + src + ") which is already closed. But, it appears to be " +
"an RPC retry. Returning success");
return true;
}
}
throw lee;
}
// Check the state of the penultimate block. It should be completed
// before attempting to complete the last one.
// 判断文件是否能继续操作(addBlock、complete等)
// 这里是判断倒数第二个块是否已经complete,否则不能尝试complete最后一个块
if (!fsn.checkFileProgress(src, pendingFile, false)) {
return false;
}
// commit the last block and complete it if it has minimum replicas
// commit最后一个块,可以的话(已经有配置文件设置最小副本数的DataNode通知NameNode自己拥有此快)complete它
fsn.commitOrCompleteLastBlock(pendingFile, iip, last);
// 这里第三个入参是true,表示判断文件中所有的块是否都已complete
// 但当numCommittedAllowed不为0时,最后numCommittedAllowed个块可以是COMMIT
if (!fsn.checkFileProgress(src, pendingFile, true)) {
return false;
}
/*
* numCommittedAllowed(配置文件设置)指的是,只有倒数numCommittedAllowed个块状态为COMMIT,
* 再往前的块状态都为COMPLETE时,才可以继续操作(addBlock、complete等)。默认的值是0,即只有
* 上一个块COMPLETE之后,才可以申请下一个块或者完成文件
*/
// 当配置文件中numCommittedAllowed参数不为0时需要将COMMIT但没有COMPLETE的块加入到pendingReconstruction中
fsn.addCommittedBlocksToPending(pendingFile);
/*
* 1、持久化inode(移除UnderConstruction信息)
* 2、删除lease
* 3、edit log
*/
fsn.finalizeINodeFileUnderConstruction(src, pendingFile,
Snapshot.CURRENT_STATE_ID, true);
return true;
}
该方法主要判断文件是否能complete以及尝试提交完成最后一个块。这里的关键方法主要有fsn.checkFileProgress()
、fsn.commitOrCompleteLastBlock()
以及fsn.finalizeINodeFileUnderConstruction()
。其中fsn.checkFileProgress()
和fsn.commitOrCompleteLastBlock()
在addBlock()
方法里也出现过,这里一起讲解。首先看fsn.checkFileProgress()
。
boolean checkFileProgress(String src, INodeFile v, boolean checkall) {
assert hasReadLock();
if (checkall) {
// 检查所有块是否都已COMPLETE
return checkBlocksComplete(src, true, v.getBlocks());
} else {
// 检查倒数第二个块是否已经COMPLETE
final BlockInfo[] blocks = v.getBlocks();
// 一般numCommittedAllowed默认为0,所以是倒数第二个块
final int i = blocks.length - numCommittedAllowed - 2;
return i < 0 || blocks[i] == null
|| checkBlocksComplete(src, false, blocks[i]);
}
}
这里需要注意,我们一般默认numCommittedAllowed
为0。进入checkBlocksComplete()
。
private boolean checkBlocksComplete(String src, boolean allowCommittedBlock,
BlockInfo... blocks) {
final int n = allowCommittedBlock? numCommittedAllowed: 0;
for(int i = 0; i < blocks.length; i++) {
final short min = blockManager.getMinStorageNum(blocks[i]);
// 依次判断状态是否为COMPLETE(最后几个也可能可以为COMMIT)
final String err = INodeFile.checkBlockComplete(blocks, i, n, min);
if (err != null) {
final int numNodes = blocks[i].numNodes();
LOG.info("BLOCK* " + err + "(numNodes= " + numNodes
+ (numNodes < min ? " < " : " >= ")
+ " minimum = " + min + ") in file " + src);
return false;
}
}
return true;
}
回到completeFileInternal()
,进入fsn.commitOrCompleteLastBlock()
。
void commitOrCompleteLastBlock(
final INodeFile fileINode, final INodesInPath iip,
final Block commitBlock) throws IOException {
assert hasWriteLock();
Preconditions.checkArgument(fileINode.isUnderConstruction());
blockManager.commitOrCompleteLastBlock(fileINode, commitBlock, iip);
}
进入blockManager.commitOrCompleteLastBlock()
。
public boolean commitOrCompleteLastBlock(BlockCollection bc,
Block commitBlock, INodesInPath iip) throws IOException {
if(commitBlock == null)
return false; // not committing, this is a block allocation retry
BlockInfo lastBlock = bc.getLastBlock();
if(lastBlock == null)
return false; // no blocks in file yet
if(lastBlock.isComplete())
return false; // already completed (e.g. by syncBlock)
if(lastBlock.isUnderRecovery()) {
throw new IOException("Commit or complete block " + commitBlock +
", whereas it is under recovery.");
}
// 尝试提交最后一块,一般lastBlock和commitBlock为同一块
final boolean committed = commitBlock(lastBlock, commitBlock);
if (committed && lastBlock.isStriped()) {
// update scheduled size for DatanodeStorages that do not store any
// internal blocks
lastBlock.getUnderConstructionFeature()
.updateStorageScheduledSize((BlockInfoStriped) lastBlock);
}
// Count replicas on decommissioning nodes, as these will not be
// decommissioned unless recovery/completing last block has finished
// 统计最后一块已收到汇报的可用的副本数
NumberReplicas numReplicas = countNodes(lastBlock);
int numUsableReplicas = numReplicas.liveReplicas() +
numReplicas.decommissioning() +
numReplicas.liveEnteringMaintenanceReplicas();
// 是否已收到设置的最小副本数的DataNode的汇报
if (hasMinStorage(lastBlock, numUsableReplicas)) {
if (committed) {
// 该方法:如果该块没收到预期副本数(一般是3)的DataNode的汇报,将其加入pendingReconstruction
// 因为当块已收到设置的最小副本数的DataNode的汇报时,就可以complete,但不代表其已经满足预期副本数
// 比如默认情况下,最小副本数为1,期望副本数为3
addExpectedReplicasToPending(lastBlock);
}
// 如果已收到设置的最小副本数的DataNode的汇报,complete block
completeBlock(lastBlock, iip, false);
} else if (pendingRecoveryBlocks.isUnderRecovery(lastBlock)) {
// We've just finished recovery for this block, complete
// the block forcibly disregarding number of replicas.
// This is to ignore minReplication, the block will be closed
// and then replicated out.
completeBlock(lastBlock, iip, true);
updateNeededReconstructions(lastBlock, 1, 0);
}
return committed;
}
首先进入commitBlock()
,该方法提交了最后一块。
private boolean commitBlock(final BlockInfo block,
final Block commitBlock) throws IOException {
// 已经COMMIT了
if (block.getBlockUCState() == BlockUCState.COMMITTED)
return false;
assert block.getNumBytes() <= commitBlock.getNumBytes() :
"commitBlock length is less than the stored one "
+ commitBlock.getNumBytes() + " vs. " + block.getNumBytes();
if(block.getGenerationStamp() != commitBlock.getGenerationStamp()) {
throw new IOException("Commit block with mismatching GS. NN has " +
block + ", client submits " + commitBlock);
}
// 将状态改为COMMIT,修改NumBytes,
// 根据commitBlock中的GenerationStamp判断block中的副本是否有stale的
List<ReplicaUnderConstruction> staleReplicas =
block.commitBlock(commitBlock);
// 移除stale的副本
removeStaleReplicas(staleReplicas, block);
return true;
}
这里需要关注的点有很多。首先入参block
是NameNode端记录的块信息,而入参commitBlock
是客户端发送来的最后一块的信息。block
的类型为BlocklnfoContiguous
,包含了完整的块信息(BlockUnderConstructionFeature
等),而commitBlock
的类型是Block
,只包含blockId
、numBytes
和generationStamp
。只有在COMMIT后,NameNode端块信息中的numBytes
才会根据commitBlock
进行改动,而commitBlock
中的generationStamp
用来判断NameNode端块信息中上报的副本中,是否有副本维护的是过时的块,并将其移除。
回到blockManager.commitOrCompleteLastBlock()
,进入completeBlock()
。
private void completeBlock(BlockInfo curBlock, INodesInPath iip,
boolean force) throws IOException {
if (curBlock.isComplete()) {
return;
}
int numNodes = curBlock.numNodes();
if (!force && !hasMinStorage(curBlock, numNodes)) {
throw new IOException("Cannot complete block: "
+ "block does not satisfy minimal replication requirement.");
}
if (!force && curBlock.getBlockUCState() != BlockUCState.COMMITTED) {
throw new IOException(
"Cannot complete block: block has not been COMMITTED by the client");
}
/*
* 1、删除BlockUnderConstructionFeature(即状态转为COMPLETE)
* 2、更新命名空间中的空间大小(包括配额(限制目录空间和数量))
*/
convertToCompleteBlock(curBlock, iip);
// Since safe-mode only counts complete blocks, and we now have
// one more complete block, we need to adjust the total up, and
// also count it as safe, if we have at least the minimum replica
// count. (We may not have the minimum replica count yet if this is
// a "forced" completion when a file is getting closed by an
// OP_CLOSE edit on the standby).
bmSafeMode.adjustBlockTotals(0, 1);
final int minStorage = curBlock.isStriped() ?
((BlockInfoStriped) curBlock).getRealDataBlockNum() : minReplication;
bmSafeMode.incrementSafeBlockCount(Math.min(numNodes, minStorage),
curBlock);
}
这里需要注意一点。inode和block都有一个UnderConstructionFeature用于记录构建过程(未COMPLETE)中的状态及某些信息,如inode中的lease所有者(客户端),以及block中的副本相关信息。当状态转为COMPLETE,这些信息就没有用了,因此就将UnderConstructionFeature置空,表明已COMPLETE,已持久化。
回到completeFileInternal()
,进入fsn.finalizeINodeFileUnderConstruction()
。
void finalizeINodeFileUnderConstruction(String src, INodeFile pendingFile,
int latestSnapshot, boolean allowCommittedBlock) throws IOException {
assert hasWriteLock();
FileUnderConstructionFeature uc = pendingFile.getFileUnderConstructionFeature();
if (uc == null) {
throw new IOException("Cannot finalize file " + src
+ " because it is not under construction");
}
pendingFile.recordModification(latestSnapshot);
// The file is no longer pending.
// Create permanent INode, update blocks. No need to replace the inode here
// since we just remove the uc feature from pendingFile
/*
* 1、移除UnderConstructionFeature(表示已COMPLETE)
* 2、检查所有块是否已经都COMPLETE
* 3、更新文件modify time
*/
pendingFile.toCompleteFile(now(),
allowCommittedBlock? numCommittedAllowed: 0,
blockManager.getMinReplication());
// shi放lease
leaseManager.removeLease(uc.getClientName(), pendingFile);
// close file and persist block allocations for this file
// edit log
closeFile(src, pendingFile);
// 如果有些块pending+live的副本数达不到预期副本数,就需要重新做冗余
blockManager.checkRedundancy(pendingFile);
}
至此,complete()
方法完成。文章来源:https://www.toymoban.com/news/detail-509186.html
三、DataNode端
HDFS写流程源码分析(三)-DataNode服务端文章来源地址https://www.toymoban.com/news/detail-509186.html
到了这里,关于HDFS写流程源码分析(二)-NameNode服务端的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!