简单介绍一下HDFS
hdfs的组件有两个:NameNode、DataNode
hdfs文件系统其实就是客户端,nn与dn多个服务(线程)交互的结果
Hadoop网络复杂:
1)服务之间的方法的调用通信,注册,心跳 用的是Hadoop RPC
2)同步元数据的时候 用的是HTTP
3)写数据的时候用的是socket
客户端通过调用DistributedFileSystem#create方法发送写请求(客户端和nn协议,rpc动态代理),nn验证用户权限、是否存在等后会创建一个文件,添加租约等操作;
客户端通过DataStreamer线程向nn申请block,nn根据机架感知向客户端返回存储的dn列表,客户端连接dn列表的第一个dn,建立数据管道,里面含有输入输出 流用于传输数据和返回的ack(socket);
客户端通过调用Sender.writeBlock()方法触发一个写数据块请求, 这个请求会传送到数据流管道中的每一个数据节点, 数据流管道中的最后一个数据节点会回复请求确认, 这个确认消息逆向地通过数据流管道送回客户端。
客户端收到请求确认后, 通过FSDataOutputStream将要写入的数据块切分成若干个数据包(创建packet(64kb),按照chunk的方式去写入数据(一个chunk等于512b数据+4b校验和,写满一个包约需要127个chunk,写满一个block后等待重新建立管道)),写满一个包后加入到数据队列中向数据流管道中发送这些数据包(实际上只发送给dn1,dn1的dataxceiver发送给dn2)。数据包会首先从DFSClient发送到数据流管道中的第一个数据节点 , Datanode1成功接收数据包后, 会将数据包写入磁盘, 然后将数据包发送到数据流管道中的第二个节点(Datanode2) 。 依此类推, 当数据包到达数据流管道中的最后一个节点(Datanode3)时, Datanode3会对收到的数据包进行校验, 如果校验成功, Datanode3会发送数据包确认消息, 这个确认消息会逆向地通过数据流管道送回DFSClient。 当一个数据块中的所有数据包都成功发送完毕, 并且收到确认消息后, DFSClient会发送一个空数据包标识当前数据块发送完毕。 至此, 整个数据块发送流程结束。
注:写流程图网上有人总结了,这边就不重复造了
客户端发起写文件请求,调用DistributedFileSystem.create方法
@Override
public FSDataOutputStream create(final Path f, final FsPermission permission,
final EnumSet<CreateFlag> cflags, final int bufferSize,
final short replication, final long blockSize,
final Progressable progress, final ChecksumOpt checksumOpt)
throws IOException {
return new FileSystemLinkResolver<FSDataOutputStream>() {
@Override
public FSDataOutputStream doCall(final Path p) throws IOException {
//创建一个DFSOutputStream
final DFSOutputStream dfsos = dfs.create(getPathName(p), permission,
cflags, replication, blockSize, progress, bufferSize,
checksumOpt);
return dfs.createWrappedOutputStream(dfsos, statistics);
}
@Override
public FSDataOutputStream next(final FileSystem fs, final Path p)
throws IOException {
return fs.create(p, permission, cflags, bufferSize,
replication, blockSize, progress, checksumOpt);
}
}.resolve(this, absF);
}
进入dfs.create()
一路调用该类其他的create方法
/**
* Call {@link #create(String, FsPermission, EnumSet, boolean, short,
* long, Progressable, int, ChecksumOpt)} with <code>createParent</code>
* set to true.
*/
public DFSOutputStream create(String src, FsPermission permission,
EnumSet<CreateFlag> flag, short replication, long blockSize,
Progressable progress, int buffersize, ChecksumOpt checksumOpt)
throws IOException {
return create(src, permission, flag, true,
replication, blockSize, progress, buffersize, checksumOpt, null);
}
public DFSOutputStream create(String src, FsPermission permission,
EnumSet<CreateFlag> flag, boolean createParent, short replication,
long blockSize, Progressable progress, int buffersize,
ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes)
throws IOException {
return create(src, permission, flag, createParent, replication, blockSize,
progress, buffersize, checksumOpt, favoredNodes, null);
}
public DFSOutputStream create(String src, FsPermission permission,
EnumSet<CreateFlag> flag, boolean createParent, short replication,
long blockSize, Progressable progress, int buffersize,
ChecksumOpt checksumOpt, InetSocketAddress[] favoredNodes,
String ecPolicyName) throws IOException {
checkOpen();
//校验权限
final FsPermission masked = applyUMask(permission);
LOG.debug("{}: masked={}", src, masked);
//这里主要做了三个动作,newStreamForCreate这个方法
// 1、往文件目录树INodeDirectory添加了INodeFile
// 2、添加文件契约lease
// 3、启动DataStreamer,负责与NameNodeRPCServer通信
final DFSOutputStream result = DFSOutputStream.newStreamForCreate(this,
src, masked, flag, createParent, replication, blockSize, progress,
dfsClientConf.createChecksum(checksumOpt),
getFavoredNodesStr(favoredNodes), ecPolicyName);
//开启契约并自动续约
beginFileLease(result.getFileId(), result);
return result;
}
进入DFSOutputStream.newStreamForCreate()
static DFSOutputStream newStreamForCreate(DFSClient dfsClient, String src,
FsPermission masked, EnumSet<CreateFlag> flag, boolean createParent,
short replication, long blockSize, Progressable progress,
DataChecksum checksum, String[] favoredNodes, String ecPolicyName)
throws IOException {
try (TraceScope ignored =
dfsClient.newPathTraceScope("newStreamForCreate", src)) {
HdfsFileStatus stat = null;
// Retry the create if we get a RetryStartFileException up to a maximum
// number of times
boolean shouldRetry = true;
int retryCount = CREATE_RETRY_COUNT;
//不断重试直到文件目录创建成功
while (shouldRetry) {
shouldRetry = false;
try {
//与NN进行通信,创建文件
stat = dfsClient.namenode.create(src, masked, dfsClient.clientName,
new EnumSetWritable<>(flag), createParent, replication,
blockSize, SUPPORTED_CRYPTO_VERSIONS, ecPolicyName);
break;
} ...
final DFSOutputStream out;
if(stat.getErasureCodingPolicy() != null) {
out = new DFSStripedOutputStream(dfsClient, src, stat,
flag, progress, checksum, favoredNodes);
} else {
//初始化了DataStreamer
out = new DFSOutputStream(dfsClient, src, stat,
flag, progress, checksum, favoredNodes, true);
}
//启动了DataStreamer线程
out.start();
return out;
}
}
#初始化了DFSOutputStream
/** Construct a new output stream for creating a file. */
protected DFSOutputStream(DFSClient dfsClient, String src,
HdfsFileStatus stat, EnumSet<CreateFlag> flag, Progressable progress,
DataChecksum checksum, String[] favoredNodes, boolean createStreamer) {
this(dfsClient, src, flag, progress, stat, checksum);
this.shouldSyncBlock = flag.contains(CreateFlag.SYNC_BLOCK);
/**
* Directory -> File -> Block(128M) -> packet(64k)-> chunk(516byte)
*/
//计算数据单元的值
computePacketChunkSize(dfsClient.getConf().getWritePacketSize(),
bytesPerChecksum);
if (createStreamer) {
//创建了DataStreamer
streamer = new DataStreamer(stat, null, dfsClient, src, progress,
checksum, cachingStrategy, byteArrayManager, favoredNodes,
addBlockFlags);
}
}
#进入out.start();
protected synchronized void start() {
//实际上启动了DataStreamer
getStreamer().start();
}
深究create方法,最后通过动态代理,NameNodeRpcServer接受到rpc请求,创建文件
#进入ClientProtocol#create
@AtMostOnce
HdfsFileStatus create(String src, FsPermission masked,
String clientName, EnumSetWritable<CreateFlag> flag,
boolean createParent, short replication, long blockSize,
CryptoProtocolVersion[] supportedVersions, String ecPolicyName)
throws IOException;
#ClientNamenodeProtocolTranslatorPB#create
@Override
public HdfsFileStatus create(String src, FsPermission masked,
String clientName, EnumSetWritable<CreateFlag> flag,
boolean createParent, short replication, long blockSize,
CryptoProtocolVersion[] supportedVersions, String ecPolicyName)
throws IOException {
CreateRequestProto.Builder builder = CreateRequestProto.newBuilder()
.setSrc(src)
.setMasked(PBHelperClient.convert(masked))
.setClientName(clientName)
.setCreateFlag(PBHelperClient.convertCreateFlag(flag))
.setCreateParent(createParent)
.setReplication(replication)
.setBlockSize(blockSize);
if (ecPolicyName != null) {
builder.setEcPolicyName(ecPolicyName);
}
FsPermission unmasked = masked.getUnmasked();
if (unmasked != null) {
builder.setUnmasked(PBHelperClient.convert(unmasked));
}
builder.addAllCryptoProtocolVersion(
PBHelperClient.convert(supportedVersions));
CreateRequestProto req = builder.build();
try {
CreateResponseProto res = rpcProxy.create(null, req);
return res.hasFs() ? PBHelperClient.convert(res.getFs()) : null;
} catch (ServiceException e) {
throw ProtobufHelper.getRemoteException(e);
}
}
#动态代理ClientNamenodeProtocolProtos.java
public org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateResponseProto create(
com.google.protobuf.RpcController controller,
org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos.CreateRequestProto request)
throws com.google.protobuf.ServiceException;
#ClientNamenodeProtocolServerSideTranslatorPB.java
@Override
public CreateResponseProto create(RpcController controller,
CreateRequestProto req) throws ServiceException {
try {
FsPermission masked = req.hasUnmasked() ?
FsCreateModes.create(PBHelperClient.convert(req.getMasked()),
PBHelperClient.convert(req.getUnmasked())) :
PBHelperClient.convert(req.getMasked());
HdfsFileStatus result = server.create(req.getSrc(),
masked, req.getClientName(),
PBHelperClient.convertCreateFlag(req.getCreateFlag()), req.getCreateParent(),
(short) req.getReplication(), req.getBlockSize(),
PBHelperClient.convertCryptoProtocolVersions(
req.getCryptoProtocolVersionList()),
req.getEcPolicyName());
if (result != null) {
return CreateResponseProto.newBuilder().setFs(PBHelperClient.convert(result))
.build();
}
return VOID_CREATE_RESPONSE;
} catch (IOException e) {
throw new ServiceException(e);
}
}
DataStreamer是一个线程,启动后实现run方法中逻辑代码,这个在写流程中介绍
返回DFSClient#create方法,进入到beginFileLease方法,开启文件租约
/** Get a lease and start automatic renewal */
private void beginFileLease(final long inodeId, final DFSOutputStream out)
throws IOException {
synchronized (filesBeingWritten) {
putFileBeingWritten(inodeId, out);
//租约线程在第一个out流创建之前不会启动
getLeaseRenewer().put(this);
}
}
//这里使用了synchronized关键字,保证同一个时间只有一个客户端能写数据,其他的阻塞
public synchronized void put(final DFSClient dfsc) {
if (dfsc.isClientRunning()) {
if (!isRunning() || isRenewerExpired()) {
//start a new deamon with a new id.
final int id = ++currentId;
daemon = new Daemon(new Runnable() {
@Override
public void run() {
LeaseRenewer.this.run(id);
}
});
daemon.start();
}
}
}
/**
* Periodically check in with the namenode and renew all the leases
* when the lease period is half over.
*/
private void run(final int id) throws InterruptedException {
//代码就是每隔1秒就会检查
for(long lastRenewed = Time.monotonicNow(); !Thread.interrupted();
Thread.sleep(getSleepPeriod())) {
//当前时间 - 上一次续约的时间
final long elapsed = Time.monotonicNow() - lastRenewed;
//如果已经超过30秒没有进行续约
if (elapsed >= getRenewalTime()) {
//就进行续约
renew();
lastRenewed = Time.monotonicNow();
}
}
}
//续约
private void renew() throws IOException {
final List<DFSClient> copies;
for (final DFSClient c : copies) {
//skip if current client name is the same as the previous name.
//重点代码
if (!c.getClientName().equals(previousName)) {
if (!c.renewLease()) {
LOG.debug("Did not renew lease for client {}", c);
continue;
}
previousName = c.getClientName();
LOG.debug("Lease renewed for client {}", previousName);
}
}
}
/**
* Renew leases.
* @return true if lease was renewed. May return false if this
* client has been closed or has no files open.
**/
public boolean renewLease() throws IOException {
if (clientRunning && !isFilesBeingWrittenEmpty()) {
try {
//获取namenode的代理进行续约
namenode.renewLease(clientName);
//修改上一次的续约时间
updateLastLeaseRenewal();
return true;
}
}
return false;
}
NameNodeRpcServer创建文件
@Override // ClientProtocol
public HdfsFileStatus create(String src, FsPermission masked,
String clientName, EnumSetWritable<CreateFlag> flag,
boolean createParent, short replication, long blockSize,
CryptoProtocolVersion[] supportedVersions, String ecPolicyName)
throws IOException {
checkNNStartup();
String clientMachine = getClientMachine();
//验证操作是否被允许
namesystem.checkOperation(OperationCategory.WRITE);
HdfsFileStatus status = null;
try {
//权限
PermissionStatus perm = new PermissionStatus(getRemoteUser()
.getShortUserName(), null, masked);
//创建文件
status = namesystem.startFile(src, perm, clientName, clientMachine,
flag.get(), createParent, replication, blockSize, supportedVersions,
ecPolicyName, cacheEntry != null);
} finally {
RetryCache.setState(cacheEntry, status != null, status);
}
metrics.incrFilesCreated();
metrics.incrCreateFileOps();
return status;
}
在名称空间中创建文件实体
/**
* Create a new file entry in the namespace.
*
* For description of parameters and exceptions thrown see
* {@link ClientProtocol#create}, except it returns valid file status upon
* success
*/
HdfsFileStatus startFile(String src, PermissionStatus permissions,
String holder, String clientMachine, EnumSet<CreateFlag> flag,
boolean createParent, short replication, long blockSize,
CryptoProtocolVersion[] supportedVersions, String ecPolicyName,
boolean logRetryCache) throws IOException {
HdfsFileStatus status;
//调用同类的startFileInt方法
status = startFileInt(src, permissions, holder, clientMachine, flag,
createParent, replication, blockSize, supportedVersions, ecPolicyName,
logRetryCache);
logAuditEvent(true, "create", src, status);
return status;
}
private HdfsFileStatus startFileInt(String src,
PermissionStatus permissions, String holder, String clientMachine,
EnumSet<CreateFlag> flag, boolean createParent, short replication,
long blockSize, CryptoProtocolVersion[] supportedVersions,
String ecPolicyName, boolean logRetryCache) throws IOException {
//验证路径是否有效
if (!DFSUtil.isValidName(src) ||
FSDirectory.isExactReservedName(src) ||
(FSDirectory.isReservedName(src)
&& !FSDirectory.isReservedRawName(src)
&& !FSDirectory.isReservedInodesName(src))) {
throw new InvalidPathException(src);
}
INodesInPath iip = null;
writeLock();
try {
checkOperation(OperationCategory.WRITE);
//判断安全模式
checkNameNodeSafeMode("Cannot create file" + src);
//验证是否目录/是否为文件/是否允许覆盖等/检查当前用户是否允许访问该路径
iip = FSDirWriteFileOp.resolvePathForStartFile(
dir, pc, src, flag, createParent);
//副本策略
if (shouldReplicate) {
blockManager.verifyReplication(src, replication, clientMachine);
} else {
final ErasureCodingPolicy ecPolicy = FSDirErasureCodingOp
.getErasureCodingPolicy(this, ecPolicyName, iip);
if (ecPolicy != null && (!ecPolicy.isReplicationPolicy())) {
checkErasureCodingSupported("createWithEC");
if (blockSize < ecPolicy.getCellSize()) {
throw new IOException("Specified block size (" + blockSize
+ ") is less than the cell size (" + ecPolicy.getCellSize()
+") of the erasure coding policy (" + ecPolicy + ").");
}
} else {
blockManager.verifyReplication(src, replication, clientMachine);
}
}
skipSync = false; // following might generate edits
toRemoveBlocks = new BlocksMapUpdateInfo();
dir.writeLock();
try {
stat = FSDirWriteFileOp.startFile(this, iip, permissions, holder,
clientMachine, flag, createParent, replication, blockSize, feInfo,
toRemoveBlocks, shouldReplicate, ecPolicyName, logRetryCache);
}
return stat;
}
进入FSDirWriteFileOp#startFile
/**
* Create a new file or overwrite an existing file<br>
*
* Once the file is create the client then allocates a new block with the next
* call using {@link ClientProtocol#addBlock}.
* <p>
* For description of parameters and exceptions thrown see
* {@link ClientProtocol#create}
*/
static HdfsFileStatus startFile(
FSNamesystem fsn, INodesInPath iip,
PermissionStatus permissions, String holder, String clientMachine,
EnumSet<CreateFlag> flag, boolean createParent,
short replication, long blockSize,
FileEncryptionInfo feInfo, INode.BlocksMapUpdateInfo toRemoveBlocks,
boolean shouldReplicate, String ecPolicyName, boolean logRetryEntry)
throws IOException {
//添加inode节点
iip = addFile(fsd, parent, iip.getLastLocalName(), permissions,
replication, blockSize, holder, clientMachine, shouldReplicate,
ecPolicyName);
newNode = iip != null ? iip.getLastINode().asFile() : null;
//添加租约,自动续约
fsn.leaseManager.addLease(
newNode.getFileUnderConstructionFeature().getClientName(),
newNode.getId());
//写入editlog
fsd.getEditLog().logOpenFile(src, newNode, overwrite, logRetryEntry);
return FSDirStatAndListingOp.getFileInfo(fsd, iip, false, false);
}
至此,hdfs文件创建完成,返回到DFSOutputStream#newStreamForCreate()
DFSOutputStream并没有write方法,所以找父类FSOutputSummer
/** Write one byte */
@Override
public synchronized void write(int b) throws IOException {
buf[count++] = (byte)b;
if(count == buf.length) {
//写文件
flushBuffer();
}
}
/* Forces any buffered output bytes to be checksumed and written out to
* the underlying output stream.
*/
protected synchronized void flushBuffer() throws IOException {
flushBuffer(false, true);
}
/* Forces buffered output bytes to be checksummed and written out to
* the underlying output stream. If there is a trailing partial chunk in the
* buffer,
* 1) flushPartial tells us whether to flush that chunk
* 2) if flushPartial is true, keep tells us whether to keep that chunk in the
* buffer (if flushPartial is false, it is always kept in the buffer)
*
* Returns the number of bytes that were flushed but are still left in the
* buffer (can only be non-zero if keep is true).
*/
protected synchronized int flushBuffer(boolean keep,
boolean flushPartial) throws IOException {
int bufLen = count;
int partialLen = bufLen % sum.getBytesPerChecksum();
int lenToFlush = flushPartial ? bufLen : bufLen - partialLen;
if (lenToFlush != 0) {
//核心的代码
//HDFS File -> Block(128M) -> packet(64K) = 127chunk -> chunk 512 + chunksum 4 = 516
//目录树->目录->文件->Block文件块(默认128M) ->packet(64K) 大约是127个chunk ->chunk 512字节+4字节 =chunksize 516字节
writeChecksumChunks(buf, 0, lenToFlush);
if (!flushPartial || keep) {
count = partialLen;
System.arraycopy(buf, bufLen - count, buf, 0, count);
} else {
count = 0;
}
}
// total bytes left minus unflushed bytes left
return count - (bufLen - lenToFlush);
}
private void writeChecksumChunks(byte b[], int off, int len)
throws IOException {
//计算出来chunk的校验和
sum.calculateChunkedSums(b, off, len, checksum, 0);
TraceScope scope = createWriteTraceScope();
try {
//按照chunk的大小遍历数据
for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
writeChunk(b, off + i, chunkLen, checksum, ckOffset,
getChecksumSize());
}
} finally {
if (scope != null) {
scope.close();
}
}
}
FSOutputSummer#writeChunk实际上进入DFSOutputStream#writeChunk
// @see FSOutputSummer#writeChunk()
@Override
protected synchronized void writeChunk(byte[] b, int offset, int len,
byte[] checksum, int ckoff, int cklen) throws IOException {
//创建packet
writeChunkPrepare(len, ckoff, cklen);
//往packet里面写 chunk的校验和 4 byte
currentPacket.writeChecksum(checksum, ckoff, cklen);
//往packet里面写一个chunk 512 byte
currentPacket.writeData(b, offset, len);
//累计一共有多少个chunk -> packet 如果写满了127chunk 那就是一个完整的packet
currentPacket.incNumChunks();
//Block -> packet Block -> 128那就是写满了一个文件块
getStreamer().incBytesCurBlock(len);
// If packet is full, enqueue it for transmission
//TODO 两个条件:
//1:如果写满了一个packet(127 chunk) = packet
//2:一个文件块写满了 Block (128M) 2048 packet
if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
getStreamer().getBytesCurBlock() == blockSize) {
enqueueCurrentPacketFull();
}
}
synchronized void enqueueCurrentPacketFull() throws IOException {
LOG.debug("enqueue full {}, src={}, bytesCurBlock={}, blockSize={},"
+ " appendChunk={}, {}", currentPacket, src, getStreamer()
.getBytesCurBlock(), blockSize, getStreamer().getAppendChunk(),
getStreamer());
//写满一个packet后将packet入队,唤醒队列
enqueueCurrentPacket();
adjustChunkBoundary();
//发送一个空包作为标识
endBlock();
}
DataStreamer是一个线程,启动时候进行run方法代码逻辑
/*
* streamer thread is the only thread that opens streams to datanode,
* and closes them. Any error recovery is also done by this thread.
*/
@Override
public void run() {
long lastPacket = Time.monotonicNow();
TraceScope scope = null;
while (!streamerClosed && dfsClient.clientRunning) {
// if the Responder encountered an error, shutdown Responder
//一旦发生错误,连接关闭
if (errorState.hasError()) {
closeResponder();
}
DFSPacket one;
try {
// process datanode IO errors if any
boolean doSleep = processDatanodeOrExternalError();
final int halfSocketTimeout = dfsClient.getConf().getSocketTimeout()/2;
synchronized (dataQueue) {
// wait for a packet to be sent.
long now = Time.monotonicNow();
// 刚开始创建文件的时候,dataQueue.size() == 0
while ((!shouldStop() && dataQueue.size() == 0 &&
(stage != BlockConstructionStage.DATA_STREAMING ||
now - lastPacket < halfSocketTimeout)) || doSleep) {
long timeout = halfSocketTimeout - (now-lastPacket);
timeout = timeout <= 0 ? 1000 : timeout;
timeout = (stage == BlockConstructionStage.DATA_STREAMING)?
timeout : 1000;
try {
//如果dataQueue队列里面没有数据,代码就会阻塞在这里,等待被唤醒。
dataQueue.wait(timeout);
} catch (InterruptedException e) {
LOG.debug("Thread interrupted", e);
}
doSleep = false;
now = Time.monotonicNow();
}
if (shouldStop()) {
continue;
}
// get packet to be sent.
if (dataQueue.isEmpty()) {
one = createHeartbeatPacket();
} else {
try {
backOffIfNecessary();
} catch (InterruptedException e) {
LOG.debug("Thread interrupted", e);
}
//如果队列不为空,从往队列里面取出packet
one = dataQueue.getFirst(); // regular data packet
SpanId[] parents = one.getTraceParents();
if (parents.length > 0) {
scope = dfsClient.getTracer().
newScope("dataStreamer", parents[0]);
scope.getSpan().setParents(parents);
}
}
}
// get new block from namenode.
LOG.debug("stage={}, {}", stage, this);
// 建立数据管道pipeline,向NameNode申请block
if (stage == BlockConstructionStage.PIPELINE_SETUP_CREATE) {
LOG.debug("Allocating new block: {}", this);
//建立数据管道,保存管道中block的存储信息,包括位置,类型和ID
//nextBlockOutputStream()这个方法很重要,向namenode申请block用于写入数据,选择存放block的DataNode策略也是在这个方法里面
//nextBlockOutputStream 这个方法里面完成了两个事:
//向Namenode申请block
//建立数据管道
setPipeline(nextBlockOutputStream());
//初始化DataStreaming服务,启动了ResponseProcessor,用来监听packet发送的状态
initDataStreaming();
} else if (stage == BlockConstructionStage.PIPELINE_SETUP_APPEND) {
LOG.debug("Append to block {}", block);
setupPipelineForAppendOrRecovery();
if (streamerClosed) {
continue;
}
initDataStreaming();
}
long lastByteOffsetInBlock = one.getLastByteOffsetBlock();
if (lastByteOffsetInBlock > stat.getBlockSize()) {
throw new IOException("BlockSize " + stat.getBlockSize() +
" < lastByteOffsetInBlock, " + this + ", " + one);
}
//如果是block的最后一个包,等待返回的ack
if (one.isLastPacketInBlock()) {
// wait for all data packets have been successfully acked
synchronized (dataQueue) {
while (!shouldStop() && ackQueue.size() != 0) {
try {
// wait for acks to arrive from datanodes
dataQueue.wait(1000);
} catch (InterruptedException e) {
LOG.debug("Thread interrupted", e);
}
}
}
if (shouldStop()) {
continue;
}
stage = BlockConstructionStage.PIPELINE_CLOSE;
}
// send the packet
SpanId spanId = SpanId.INVALID;
synchronized (dataQueue) {
// move packet from dataQueue to ackQueue
if (!one.isHeartbeatPacket()) {
if (scope != null) {
spanId = scope.getSpanId();
scope.detach();
one.setTraceScope(scope);
}
scope = null;
dataQueue.removeFirst();
ackQueue.addLast(one);
packetSendTime.put(one.getSeqno(), Time.monotonicNow());
dataQueue.notifyAll();
}
}
LOG.debug("{} sending {}", this, one);
// write out data to remote datanode
try (TraceScope ignored = dfsClient.getTracer().
newScope("DataStreamer#writeTo", spanId)) {
//这个就是我们写数据代码
one.writeTo(blockStream);
blockStream.flush();
} catch (IOException e) {
// HDFS-3398 treat primary DN is down since client is unable to
// write to primary DN. If a failed or restarting node has already
// been recorded by the responder, the following call will have no
// effect. Pipeline recovery can handle only one node error at a
// time. If the primary node fails again during the recovery, it
// will be taken out then.
//PrimaryDatanode 指的是数据管道第一个datanode
errorState.markFirstNodeIfNotMarked();
throw e;
}
lastPacket = Time.monotonicNow();
// update bytesSent
long tmpBytesSent = one.getLastByteOffsetBlock();
if (bytesSent < tmpBytesSent) {
bytesSent = tmpBytesSent;
}
if (shouldStop()) {
continue;
}
// Is this block full?
if (one.isLastPacketInBlock()) {
// wait for the close packet has been acked
synchronized (dataQueue) {
while (!shouldStop() && ackQueue.size() != 0) {
dataQueue.wait(1000);// wait for acks to arrive from datanodes
}
}
if (shouldStop()) {
continue;
}
endBlock();
}
if (progress != null) { progress.progress(); }
// This is used by unit test to trigger race conditions.
if (artificialSlowdown != 0 && dfsClient.clientRunning) {
Thread.sleep(artificialSlowdown);
}
} catch (Throwable e) {
// Log warning if there was a real error.
if (!errorState.isRestartingNode()) {
// Since their messages are descriptive enough, do not always
// log a verbose stack-trace WARN for quota exceptions.
if (e instanceof QuotaExceededException) {
LOG.debug("DataStreamer Quota Exception", e);
} else {
LOG.warn("DataStreamer Exception", e);
}
}
lastException.set(e);
assert !(e instanceof NullPointerException);
errorState.setInternalError();
if (!errorState.isNodeMarked()) {
// Not a datanode issue
streamerClosed = true;
}
} finally {
if (scope != null) {
scope.close();
scope = null;
}
}
}
closeInternal();
}
//构建管道
setPipeline(nextBlockOutputStream());
/**
* Open a DataStreamer to a DataNode so that it can be written to.
* This happens when a file is created and each time a new block is allocated.
* Must get block ID and the IDs of the destinations from the namenode.
* Returns the list of target datanodes.
*/
//向NN申请块、返回目标DN的列表
protected LocatedBlock nextBlockOutputStream() throws IOException {
LocatedBlock lb;
DatanodeInfo[] nodes;
StorageType[] nextStorageTypes;
String[] nextStorageIDs;
int count = dfsClient.getConf().getNumBlockWriteRetry();
boolean success;
final ExtendedBlock oldBlock = block.getCurrentBlock();
/**
* 因为申请block或者 建立数据管道,这些都是重要的操作
* 务必要执行成功,但是这些操作都涉及到网络的请求,进行多次尝试。
* HDFS里面的很多地方的代码都是用的循环
*/
do {
errorState.resetInternalError();
lastException.clear();
DatanodeInfo[] excluded = getExcludedNodes();
// TODO 向NameNode申请一个block
/**
* 服务端的操作:
* 1) 创建了一个block,往文件目录树里面挂载了block的信息
* 2)在磁盘上面记录了元数据信息
* 3)在BLockMananger里面记录了block的元数据信息
*/
lb = locateFollowingBlock(
excluded.length > 0 ? excluded : null, oldBlock);
block.setCurrentBlock(lb.getBlock());
block.setNumBytes(0);
bytesSent = 0;
accessToken = lb.getBlockToken();
nodes = lb.getLocations();
nextStorageTypes = lb.getStorageTypes();
nextStorageIDs = lb.getStorageIDs();
// Connect to first DataNode in the list.
// TODO 其实HDFS管道的建立就是靠的这段代码完成的。
success = createBlockOutputStream(nodes, nextStorageTypes, nextStorageIDs,
0L, false);
if (!success) {
LOG.warn("Abandoning " + block);
//TODO 如果管道建立不成功,那么就是放弃这个block
dfsClient.namenode.abandonBlock(block.getCurrentBlock(),
stat.getFileId(), src, dfsClient.clientName);
block.setCurrentBlock(null);
final DatanodeInfo badNode = nodes[errorState.getBadNodeIndex()];
LOG.warn("Excluding datanode " + badNode);
excludedNodes.put(badNode, badNode);
}
} while (!success && --count >= 0);
if (!success) {
throw new IOException("Unable to create new block.");
}
return lb;
}
//创建block,实际上走的rpc代理
private LocatedBlock locateFollowingBlock(DatanodeInfo[] excluded,
ExtendedBlock oldBlock) throws IOException {
return DFSOutputStream.addBlock(excluded, dfsClient, src, oldBlock,
stat.getFileId(), favoredNodes, addBlockFlags);
}
//添加block
static LocatedBlock addBlock(DatanodeInfo[] excludedNodes,
DFSClient dfsClient, String src, ExtendedBlock prevBlock, long fileId,
String[] favoredNodes, EnumSet<AddBlockFlag> allocFlags)
throws IOException {
final DfsClientConf conf = dfsClient.getConf();
int retries = conf.getNumBlockWriteLocateFollowingRetry();
long sleeptime = conf.getBlockWriteLocateFollowingInitialDelayMs();
long localstart = Time.monotonicNow();
while (true) {
//TODO 通过RPC 调用NameNode服务端的代码
return dfsClient.namenode.addBlock(src, dfsClient.clientName, prevBlock,
excludedNodes, fileId, favoredNodes, allocFlags);
...
}
}
boolean createBlockOutputStream(DatanodeInfo[] nodes,
StorageType[] nodeStorageTypes, String[] nodeStorageIDs,
long newGS, boolean recoveryFlag) {
if (nodes.length == 0) {
LOG.info("nodes are empty for write pipeline of " + block);
return false;
}
String firstBadLink = "";
boolean checkRestart = false;
if (LOG.isDebugEnabled()) {
LOG.debug("pipeline = " + Arrays.toString(nodes) + ", " + this);
}
// persist blocks on namenode on next flush
persistBlocks.set(true);
int refetchEncryptionKey = 1;
while (true) {
boolean result = false;
DataOutputStream out = null;
try {
assert null == s : "Previous socket unclosed";
assert null == blockReplyStream : "Previous blockReplyStream unclosed";
//socket rpc http 与第一个dn连接
s = createSocketForPipeline(nodes[0], nodes.length, dfsClient);
long writeTimeout = dfsClient.getDatanodeWriteTimeout(nodes.length);
long readTimeout = dfsClient.getDatanodeReadTimeout(nodes.length);
//输出流
OutputStream unbufOut = NetUtils.getOutputStream(s, writeTimeout);
//输入流
InputStream unbufIn = NetUtils.getInputStream(s, readTimeout);
//socket
IOStreamPair saslStreams = dfsClient.saslClient.socketSend(s,
unbufOut, unbufIn, dfsClient, accessToken, nodes[0]);
unbufOut = saslStreams.out;
unbufIn = saslStreams.in;
//这个输出流是把客户端的数据写到DataNode上面
out = new DataOutputStream(new BufferedOutputStream(unbufOut,
DFSUtilClient.getSmallBufferSize(dfsClient.getConfiguration())));
//客户端通过这个输入流来读DataNode返回来的信息
blockReplyStream = new DataInputStream(unbufIn);
//
// Xmit header info to datanode
//
BlockConstructionStage bcs = recoveryFlag ?
stage.getRecoveryStage() : stage;
// We cannot change the block length in 'block' as it counts the number
// of bytes ack'ed.
ExtendedBlock blockCopy = block.getCurrentBlock();
blockCopy.setNumBytes(stat.getBlockSize());
boolean[] targetPinnings = getPinnings(nodes);
// send the request
//发送写block请求
//datanode那儿会启动一个DataXceiver服务接受socket请求
new Sender(out).writeBlock(blockCopy, nodeStorageTypes[0], accessToken,
dfsClient.clientName, nodes, nodeStorageTypes, null, bcs,
nodes.length, block.getNumBytes(), bytesSent, newGS,
checksum4WriteBlock, cachingStrategy.get(), isLazyPersistFile,
(targetPinnings != null && targetPinnings[0]), targetPinnings,
nodeStorageIDs[0], nodeStorageIDs);
// receive ack for connect
//接受返回的ack
BlockOpResponseProto resp = BlockOpResponseProto.parseFrom(
PBHelperClient.vintPrefixed(blockReplyStream));
Status pipelineStatus = resp.getStatus();
firstBadLink = resp.getFirstBadLink();
// Got an restart OOB ack.
// If a node is already restarting, this status is not likely from
// the same node. If it is from a different node, it is not
// from the local datanode. Thus it is safe to treat this as a
// regular node error.
if (PipelineAck.isRestartOOBStatus(pipelineStatus) &&
!errorState.isRestartingNode()) {
checkRestart = true;
throw new IOException("A datanode is restarting.");
}
String logInfo = "ack with firstBadLink as " + firstBadLink;
DataTransferProtoUtil.checkBlockOpStatus(resp, logInfo);
assert null == blockStream : "Previous blockStream unclosed";
blockStream = out;
result = true; // success
errorState.resetInternalError();
lastException.clear();
// remove all restarting nodes from failed nodes list
failed.removeAll(restartingNodes);
restartingNodes.clear();
} catch (IOException ie) {
if (!errorState.isRestartingNode()) {
LOG.info("Exception in createBlockOutputStream " + this, ie);
}
if (ie instanceof InvalidEncryptionKeyException &&
refetchEncryptionKey > 0) {
LOG.info("Will fetch a new encryption key and retry, "
+ "encryption key was invalid when connecting to "
+ nodes[0] + " : " + ie);
// The encryption key used is invalid.
refetchEncryptionKey--;
dfsClient.clearDataEncryptionKey();
// Don't close the socket/exclude this node just yet. Try again with
// a new encryption key.
continue;
}
// find the datanode that matches
if (firstBadLink.length() != 0) {
for (int i = 0; i < nodes.length; i++) {
// NB: Unconditionally using the xfer addr w/o hostname
if (firstBadLink.equals(nodes[i].getXferAddr())) {
errorState.setBadNodeIndex(i);
break;
}
}
} else {
assert !checkRestart;
errorState.setBadNodeIndex(0);
}
final int i = errorState.getBadNodeIndex();
// Check whether there is a restart worth waiting for.
if (checkRestart) {
errorState.initRestartingNode(i,
"Datanode " + i + " is restarting: " + nodes[i],
shouldWaitForRestart(i));
}
errorState.setInternalError();
lastException.set(ie);
result = false; // error
} finally {
if (!result) {
IOUtils.closeSocket(s);
s = null;
IOUtils.closeStream(out);
IOUtils.closeStream(blockReplyStream);
blockReplyStream = null;
}
}
return result;
}
}
/**
* Initialize for data streaming
*/
private void initDataStreaming() {
this.setName("DataStreamer for file " + src +
" block " + block);
if (LOG.isDebugEnabled()) {
LOG.debug("nodes {} storageTypes {} storageIDs {}",
Arrays.toString(nodes),
Arrays.toString(storageTypes),
Arrays.toString(storageIDs));
}
response = new ResponseProcessor(nodes);
response.start();
stage = BlockConstructionStage.DATA_STREAMING;
}
启动了ResponseProcessor 用来监听我们一个packet发送是否成功
@Override
public void run() {
setName("ResponseProcessor for block " + block);
//ack响应队列,返回DN写数据的结果
PipelineAck ack = new PipelineAck();
TraceScope scope = null;
while (!responderClosed && dfsClient.clientRunning && !isLastPacketInBlock) {
// process responses from datanodes.
try {
// read an ack from the pipeline
//读取下游的处理结果
ack.readFields(blockReplyStream);
if (ack.getSeqno() != DFSPacket.HEART_BEAT_SEQNO) {
Long begin = packetSendTime.get(ack.getSeqno());
if (begin != null) {
long duration = Time.monotonicNow() - begin;
if (duration > dfsclientSlowLogThresholdMs) {
LOG.info("Slow ReadProcessor read fields for block " + block
+ " took " + duration + "ms (threshold="
+ dfsclientSlowLogThresholdMs + "ms); ack: " + ack
+ ", targets: " + Arrays.asList(targets));
}
}
}
LOG.debug("DFSClient {}", ack);
long seqno = ack.getSeqno();
// processes response status from datanodes.
//接受所有的ack
ArrayList<DatanodeInfo> congestedNodesFromAck = new ArrayList<>();
for (int i = ack.getNumOfReplies()-1; i >=0 && dfsClient.clientRunning; i--) {
final Status reply = PipelineAck.getStatusFromHeader(ack
.getHeaderFlag(i));
if (PipelineAck.getECNFromHeader(ack.getHeaderFlag(i)) ==
PipelineAck.ECN.CONGESTED) {
congestedNodesFromAck.add(targets[i]);
}
// Restart will not be treated differently unless it is
// the local node or the only one in the pipeline.
if (PipelineAck.isRestartOOBStatus(reply)) {
final String message = "Datanode " + i + " is restarting: "
+ targets[i];
errorState.initRestartingNode(i, message,
shouldWaitForRestart(i));
throw new IOException(message);
}
// node error
if (reply != SUCCESS) {
errorState.setBadNodeIndex(i); // mark bad datanode
throw new IOException("Bad response " + reply +
" for " + block + " from datanode " + targets[i]);
}
}
if (!congestedNodesFromAck.isEmpty()) {
synchronized (congestedNodes) {
congestedNodes.clear();
congestedNodes.addAll(congestedNodesFromAck);
}
} else {
synchronized (congestedNodes) {
congestedNodes.clear();
lastCongestionBackoffTime = 0;
}
}
assert seqno != PipelineAck.UNKOWN_SEQNO :
"Ack for unknown seqno should be a failed ack: " + ack;
if (seqno == DFSPacket.HEART_BEAT_SEQNO) { // a heartbeat ack
continue;
}
// a success ack for a data packet
DFSPacket one;
synchronized (dataQueue) {
one = ackQueue.getFirst();
}
if (one.getSeqno() != seqno) {
throw new IOException("ResponseProcessor: Expecting seqno " +
" for block " + block +
one.getSeqno() + " but received " + seqno);
}
isLastPacketInBlock = one.isLastPacketInBlock();
// Fail the packet write for testing in order to force a
// pipeline recovery.
if (DFSClientFaultInjector.get().failPacket() &&
isLastPacketInBlock) {
failPacket = true;
throw new IOException(
"Failing the last packet for testing.");
}
// update bytesAcked
block.setNumBytes(one.getLastByteOffsetBlock());
synchronized (dataQueue) {
scope = one.getTraceScope();
if (scope != null) {
scope.reattach();
one.setTraceScope(null);
}
lastAckedSeqno = seqno;
pipelineRecoveryCount = 0;
//如果ack发送成功那么就会把ackQueue里面packet移除来
ackQueue.removeFirst();
packetSendTime.remove(seqno);
dataQueue.notifyAll();
one.releaseBuffer(byteArrayManager);
}
} catch (Throwable e) {
if (!responderClosed) {
lastException.set(e);
errorState.setInternalError();
errorState.markFirstNodeIfNotMarked();
synchronized (dataQueue) {
dataQueue.notifyAll();
}
if (!errorState.isRestartingNode()) {
LOG.warn("Exception for " + block, e);
}
responderClosed = true;
}
} finally {
if (scope != null) {
scope.close();
}
scope = null;
}
}
}
@Override
public LocatedBlock addBlock(String src, String clientName,
ExtendedBlock previous, DatanodeInfo[] excludedNodes, long fileId,
String[] favoredNodes, EnumSet<AddBlockFlag> addBlockFlags)
throws IOException {
checkNNStartup();
//TODO 添加一个block
/**
* 1) 选择三台DataNode副本机器
* 2) 修改了目录树
* 3) 存储元数据信息
*/
LocatedBlock locatedBlock = namesystem.getAdditionalBlock(src, fileId,
clientName, previous, excludedNodes, favoredNodes, addBlockFlags);
if (locatedBlock != null) {
metrics.incrAddBlockOps();
}
return locatedBlock;
}
FSNamesystem#getAdditionalBlock添加block
LocatedBlock getAdditionalBlock(
String src, long fileId, String clientName, ExtendedBlock previous,
DatanodeInfo[] excludedNodes, String[] favoredNodes,
EnumSet<AddBlockFlag> flags) throws IOException {
final String operationName = "getAdditionalBlock";
NameNode.stateChangeLog.debug("BLOCK* getAdditionalBlock: {} inodeId {}" +
" for {}", src, fileId, clientName);
//验证
r = FSDirWriteFileOp.validateAddBlock();
//机架感知,为block选择合适的dn
DatanodeStorageInfo[] targets = FSDirWriteFileOp.chooseTargetForNewBlock(
blockManager, src, excludedNodes, favoredNodes, flags, r);
//创建block
lb = FSDirWriteFileOp.storeAllocatedBlock();
return lb;
}
NN根据机架感知为block选择合适的DN,创建block,修改内存和磁盘的元数据信息。
static LocatedBlock storeAllocatedBlock(FSNamesystem fsn, String src,
long fileId, String clientName, ExtendedBlock previous,
DatanodeStorageInfo[] targets) throws IOException {
long offset;
//创建block块,
Block newBlock = fsn.createNewBlock(blockType);
INodesInPath inodesInPath = INodesInPath.fromINode(pendingFile);
//修改了内存里面的目录树(修改内存里面的元数据信息)
saveAllocatedBlock(fsn, src, inodesInPath, newBlock, targets, blockType);
//把元数据写入到磁盘
persistNewBlock(fsn, src, pendingFile);
// Return located block
return makeLocatedBlock(fsn, fsn.getStoredBlock(newBlock), targets, offset);
}
由于这部分源码在DN章节介绍过,所以在此简单说明以下即可
DataXceiverServer对象用于在Datanode上监听流式接口的请求, 每当有Client通过Sender类发起流式接口请求时, DataXceiverServer就会监听并接收这个请求, 然后创建一个DataXceiver对象用于响应这个请求并执行对应的操作。Receiver.processOp()方法用于处理流式接口的请求, 它首先从数据流中读取序列化后的参数, 对参数反序列化, 然后根据操作码调用DataTransferProtocol中定义的方法, 这些方法都是在DataXceiver中具体实现的。
processOp实际是调用了writeBlock()
@Override
public void writeBlock() throws IOException {
if (isDatanode ||
stage != BlockConstructionStage.PIPELINE_CLOSE_RECOVERY) {
// open a block receiver
//创建一个BlockReceiver
setCurrentBlockReceiver(getBlockReceiver(block, storageType, in,
peer.getRemoteAddressString(),
peer.getLocalAddressString(),
stage, latestGenerationStamp, minBytesRcvd, maxBytesRcvd,
clientname, srcDataNode, datanode, requestedChecksum,
cachingStrategy, allowLazyPersist, pinning, storageId));
replica = blockReceiver.getReplica();
}
//
// Connect to downstream machine, if appropriate
//
//连接下游dn
if (targets.length > 0) {
InetSocketAddress mirrorTarget = null;
// Connect to backup machine
mirrorNode = targets[0].getXferAddr(connectToDnViaHostname);
LOG.debug("Connecting to datanode {}", mirrorNode);
mirrorTarget = NetUtils.createSocketAddr(mirrorNode);
//镜像副本
mirrorSock = datanode.newSocket();
if (targetPinnings != null && targetPinnings.length > 0) {
//向下游发送socket连接
new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
blockToken, clientname, targets, targetStorageTypes,
srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
latestGenerationStamp, requestedChecksum, cachingStrategy,
allowLazyPersist, targetPinnings[0], targetPinnings,
targetStorageId, targetStorageIds);
} else {
new Sender(mirrorOut).writeBlock(originalBlock, targetStorageTypes[0],
blockToken, clientname, targets, targetStorageTypes,
srcDataNode, stage, pipelineSize, minBytesRcvd, maxBytesRcvd,
latestGenerationStamp, requestedChecksum, cachingStrategy,
allowLazyPersist, false, targetPinnings,
targetStorageId, targetStorageIds);
}
mirrorOut.flush();
// receive the block and mirror to the next target
if (blockReceiver != null) {
String mirrorAddr = (mirrorSock == null) ? null : mirrorNode;
//接受block
blockReceiver.receiveBlock(mirrorOut, mirrorIn, replyOut,
mirrorAddr, null, targets, false);
}
}
}
因篇幅问题不能全部显示,请点此查看更多更全内容