老铁们好,我是V,今天我们简单聊聊ES中的索引存储类型
支持的存储类型
目前ES中主要支持以下几种存储类型
fs
默认文件系统实现。这将根据操作环境选择最佳实施,目前会默认启用hybridfs
simplefs
Simple FS 类型是SimpleFsDirectory
使用随机访问文件的文件系统存储(映射到 Lucene)的直接实现。这种实现的并发性能很差(多个线程会成为瓶颈),并且禁用了堆内存使用的一些优化。基本上使用的较少
niofs
NIO FS 类型使用 NIO 将分片索引存储在文件系统上(映射到 Lucene NIOFSDirectory
)。它允许多个线程同时读取同一个文件。但是不建议在 Windows 上使用,因为在window环境下Java 实现中存在一些错误,并且禁用了堆内存使用的某些优化。
mmapfs
MMap FS 类型通过将文件映射到内存 ( MMapDirectory
mmap) 将分片索引存储在文件系统上(映射到 Lucene)。内存映射占用进程中虚拟内存地址空间的一部分,其大小等于被映射文件的大小。在使用此类之前,请确保您已允许了足够的 虚拟地址空间。
hybridfs
该类型是niofs和mmaps的混合体,它根据读取访问模式为每种类型的文件选择最佳的文件系统类型。目前只有 Lucene 术语词典、规范和文档值文件是内存映射的。所有其他文件均使用 NIOFSDirectory 打开。和mmapfs类似,要确保你已允许了足够的 虚拟地址空间。
如何修改存储类型
修改存储类型首先需要关闭索引,然后修改存储类型后再打开索引
POST /xxx/_close
PUT /xxx/_settings
{
"index.store.type": "niofs"
}
POST /xxx/_open
存储类型源码解析
决定索引使用哪种存储类型的代码如下:
org.elasticsearch.index.store.FsDirectoryFactory#newFSDirectory
protected Directory newFSDirectory(Path location, LockFactory lockFactory, IndexSettings indexSettings) throws IOException {
final String storeType =
indexSettings.getSettings().get(IndexModule.INDEX_STORE_TYPE_SETTING.getKey(), IndexModule.Type.FS.getSettingsKey());
IndexModule.Type type;
if (IndexModule.Type.FS.match(storeType)) {
type = IndexModule.defaultStoreType(IndexModule.NODE_STORE_ALLOW_MMAP.get(indexSettings.getNodeSettings()));
} else {
type = IndexModule.Type.fromSettingsKey(storeType);
}
Set<String> preLoadExtensions = new HashSet<>(
indexSettings.getValue(IndexModule.INDEX_STORE_PRE_LOAD_SETTING));
switch (type) {
case HYBRIDFS:
// Use Lucene defaults
final FSDirectory primaryDirectory = FSDirectory.open(location, lockFactory);
if (primaryDirectory instanceof MMapDirectory) {
MMapDirectory mMapDirectory = (MMapDirectory) primaryDirectory;
return new HybridDirectory(lockFactory, setPreload(mMapDirectory, lockFactory, preLoadExtensions));
} else {
return primaryDirectory;
}
case MMAPFS:
return setPreload(new MMapDirectory(location, lockFactory), lockFactory, preLoadExtensions);
case SIMPLEFS:
return new SimpleFSDirectory(location, lockFactory);
case NIOFS:
return new NIOFSDirectory(location, lockFactory);
default:
throw new AssertionError("unexpected built-in store type [" + type + "]");
}
}
当index.store.type为空、或者fs或者hybridfs在linux环境下都会选择hybridfs,即混合使用
niofs和mmapfs
所以上面看似有很多种类型,其实fs和hybridfs都是基于niofs和mmapfs来实现的,而simplefs基本上因为性能问题没有人使用,所以我们着重介绍下niofs和mmapfs实现。
niofs
niofs的实现是org.apache.lucene.store.NIOFSDirectory
niofs如何获取ByteBuffer
niofs对外提供的ByteBuffer是HeapByteBuffer
流程图
存储类型打开后返回一个IndexInput供其他模块读取数据
# org.apache.lucene.store.NIOFSDirectory#openInput
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
ensureOpen();
ensureCanRead(name);
Path path = getDirectory().resolve(name);
FileChannel fc = FileChannel.open(path, StandardOpenOption.READ);
boolean success = false;
try {
final NIOFSIndexInput indexInput = new NIOFSIndexInput("NIOFSIndexInput(path=\"" + path + "\")", fc, context);
success = true;
return indexInput;
} finally {
if (success == false) {
IOUtils.closeWhileHandlingException(fc);
}
}
}
IndexInput的实现是NIOFSIndexInput
其中主要包含一个FileChannel、是否克隆、开始坐标和结束坐标
static final class NIOFSIndexInput extends BufferedIndexInput {
/**
* The maximum chunk size for reads of 16384 bytes.
*/
private static final int CHUNK_SIZE = 16384;
/** the file channel we will read from */
protected final FileChannel channel;
/** is this instance a clone and hence does not own the file to close it */
boolean isClone = false;
/** start offset: non-zero in the slice case */
protected final long off;
/** end offset (start+length) */
protected final long end;
}
父类BufferedIndexInput中的buffer才是真正对外提供数据的对象
public abstract class BufferedIndexInput extends IndexInput implements RandomAccessInput {
private static final ByteBuffer EMPTY_BYTEBUFFER = ByteBuffer.allocate(0);
/** Default buffer size set to {@value #BUFFER_SIZE}. */
public static final int BUFFER_SIZE = 1024;
/** Minimum buffer size allowed */
public static final int MIN_BUFFER_SIZE = 8;
// The normal read buffer size defaults to 1024, but
// increasing this during merging seems to yield
// performance gains. However we don't want to increase
// it too much because there are quite a few
// BufferedIndexInputs created during merging. See
// LUCENE-888 for details.
/**
* A buffer size for merges set to {@value #MERGE_BUFFER_SIZE}.
*/
public static final int MERGE_BUFFER_SIZE = 4096;
private int bufferSize = BUFFER_SIZE;
private ByteBuffer buffer = EMPTY_BYTEBUFFER;
private long bufferStart = 0; // position in file of buffer
...
}
其他模块会调用NIOFSIndexInput的父类BufferedIndexInput的readByte readLong readInt等方法获取数据,而这些方法又是调用内部的buffer来获取数据
@Override
public final short readShort() throws IOException {
if (Short.BYTES <= buffer.remaining()) {
return buffer.getShort();
} else {
return super.readShort();
}
}
@Override
public final int readInt() throws IOException {
if (Integer.BYTES <= buffer.remaining()) {
return buffer.getInt();
} else {
return super.readInt();
}
}
@Override
public final long readLong() throws IOException {
if (Long.BYTES <= buffer.remaining()) {
return buffer.getLong();
} else {
return super.readLong();
}
}
这些方法都会尝试先从buffer中获取数据,如果buffer中没有则调用父类的的方法
例如readInt会尝试先从buffer中获取数据,如果buffer中没有则调用父类的readInt方法
public int readInt() throws IOException {
return ((readByte() & 0xFF) << 24) | ((readByte() & 0xFF) << 16)
| ((readByte() & 0xFF) << 8) | (readByte() & 0xFF);
}
而父类的readInt又会调用readByte,最终buffer中如果没有数据则会出发refill逻辑
@Override
public final byte readByte() throws IOException {
if (buffer.hasRemaining() == false) {
refill();
}
return buffer.get();
}
BufferedIndexInput#refill
buffer中的数据是调用refill方法首先创建一个HeapByteBuffer,然后调用readInternal来填充buffer
# org.apache.lucene.store.BufferedIndexInput#refill
private void refill() throws IOException {
long start = bufferStart + buffer.position();
long end = start + bufferSize; // bufferSize是1024
if (end > length()) // don't read past EOF
end = length();
int newLength = (int)(end - start);
if (newLength <= 0)
throw new EOFException("read past EOF: " + this);
if (buffer == EMPTY_BYTEBUFFER) {
// 创建一个HeapByteBuffer
buffer = ByteBuffer.allocate(bufferSize); // allocate buffer lazily
// 检查当前坐标是否越界
seekInternal(bufferStart);
}
buffer.position(0);
buffer.limit(newLength);
bufferStart = start;
// 填充当前的buffer
readInternal(buffer);
// Make sure sub classes don't mess up with the buffer.
assert buffer.order() == ByteOrder.BIG_ENDIAN : buffer.order();
assert buffer.remaining() == 0 : "should have thrown EOFException";
assert buffer.position() == newLength;
// 切换到读模式
buffer.flip();
}
NIOFSIndexInput#readInternal
接下来我们来看看readInternal,里面是调用FileChannel的read方法来填充ByteBuffer
@Override
protected void readInternal(ByteBuffer b) throws IOException {
long pos = getFilePointer() + off;
if (pos + b.remaining() > end) {
throw new EOFException("read past EOF: " + this);
}
try {
int readLength = b.remaining();
while (readLength > 0) {
final int toRead = Math.min(CHUNK_SIZE, readLength);
b.limit(b.position() + toRead);
assert b.remaining() == toRead;
final int i = channel.read(b, pos);
if (i < 0) { // be defensive here, even though we checked before hand, something could have changed
throw new EOFException("read past EOF: " + this + " buffer: " + b + " chunkLen: " + toRead + " end: " + end);
}
assert i > 0 : "FileChannel.read with non zero-length bb.remaining() must always read at least one byte (FileChannel is in blocking mode, see spec of ReadableByteChannel)";
pos += i;
readLength -= i;
}
assert readLength == 0;
} catch (IOException ioe) {
throw new IOException(ioe.getMessage() + ": " + this, ioe);
}
}
FileChannelImpl#read
# sun.nio.ch.FileChannelImpl#read(java.nio.ByteBuffer, long)
public int read(ByteBuffer dst, long position) throws IOException {
if (dst == null)
throw new NullPointerException();
if (position < 0)
throw new IllegalArgumentException("Negative position");
if (!readable)
throw new NonReadableChannelException();
if (direct)
Util.checkChannelPositionAligned(position, alignment);
ensureOpen();
if (nd.needsPositionLock()) {
synchronized (positionLock) {
return readInternal(dst, position);
}
} else {
return readInternal(dst, position); // 走这里
}
}
FileChannelImpl#readInternal
# sun.nio.ch.FileChannelImpl#readInternal
private int readInternal(ByteBuffer dst, long position) throws IOException {
assert !nd.needsPositionLock() || Thread.holdsLock(positionLock);
int n = 0;
int ti = -1;
try {
// 标记可能会长时间block
beginBlocking();
ti = threads.add();
if (!isOpen())
return -1;
do {
n = IOUtil.read(fd, dst, position, direct, alignment, nd);
} while ((n == IOStatus.INTERRUPTED) && isOpen());
return IOStatus.normalize(n);
} finally {
threads.remove(ti);
// 解除block
endBlocking(n > 0);
assert IOStatus.check(n);
}
}
IOUtil#read
从缓存中获取DirectByteBuffer,如果没有则新建一个DirectByteBuffer
将文件内容读取到DirectByteBuffer
将DirectByteBuffer所有字节写入HeapByteBuffer
# sun.nio.ch.IOUtil#read(java.io.FileDescriptor, java.nio.ByteBuffer, long, boolean, int, sun.nio.ch.NativeDispatcher)
static int read(FileDescriptor fd, ByteBuffer dst, long position,
boolean directIO, int alignment, NativeDispatcher nd)
throws IOException
{
if (dst.isReadOnly())
throw new IllegalArgumentException("Read-only buffer");
if (dst instanceof DirectBuffer)
return readIntoNativeBuffer(fd, dst, position, directIO, alignment, nd);
// Substitute a native buffer
ByteBuffer bb;
int rem = dst.remaining();
if (directIO) { // directIO 是 flase
Util.checkRemainingBufferSizeAligned(rem, alignment);
bb = Util.getTemporaryAlignedDirectBuffer(rem, alignment);
} else {
bb = Util.getTemporaryDirectBuffer(rem); // 从缓存中返回java.nio.DirectByteBuffer
}
try {
// 从fd中根据坐标获取数据,directIO为false
int n = readIntoNativeBuffer(fd, bb, position, directIO, alignment,nd);
bb.flip();
if (n > 0)
// 非常朴实无华,循环获取byte放到目标ByteBuffer中
dst.put(bb); // dts是HeapByteBuffer
return n;
} finally {
// 释放ByteBuffer,如果可以尝试缓存ByteBuffer
Util.offerFirstTemporaryDirectBuffer(bb);
}
}
Util#getTemporaryDirectBuffer
从缓存中获取DirectByteBuffer,如果没有则新建一个DirectByteBuffer
# sun.nio.ch.Util#getTemporaryDirectBuffer
public static ByteBuffer getTemporaryDirectBuffer(int size) {
// If a buffer of this size is too large for the cache, there
// should not be a buffer in the cache that is at least as
// large. So we'll just create a new one. Also, we don't have
// to remove the buffer from the cache (as this method does
// below) given that we won't put the new buffer in the cache.
if (isBufferTooLarge(size)) {
return ByteBuffer.allocateDirect(size);
}
BufferCache cache = bufferCache.get();
ByteBuffer buf = cache.get(size);
if (buf != null) {
return buf; // 可能走这里
} else {
// No suitable buffer in the cache so we need to allocate a new
// one. To avoid the cache growing then we remove the first
// buffer from the cache and free it.
if (!cache.isEmpty()) {
buf = cache.removeFirst();
free(buf);
}
return ByteBuffer.allocateDirect(size); // 返回 java.nio.DirectByteBuffer
}
}
IOUtil#readIntoNativeBuffer
将文件内容读取到DirectByteBuffer
# sun.nio.ch.IOUtil#readIntoNativeBuffer
private static int readIntoNativeBuffer(FileDescriptor fd, ByteBuffer bb,
long position, boolean directIO,
int alignment, NativeDispatcher nd)
throws IOException
{
int pos = bb.position();
int lim = bb.limit();
assert (pos <= lim);
int rem = (pos <= lim ? lim - pos : 0);
if (directIO) { // directIO是false
Util.checkBufferPositionAligned(bb, pos, alignment);
Util.checkRemainingBufferSizeAligned(rem, alignment);
}
if (rem == 0)
return 0;
int n = 0;
if (position != -1) {
// 走的这里 最终调用linux的pread64
n = nd.pread(fd, ((DirectBuffer)bb).address() + pos, rem, position);
} else {
n = nd.read(fd, ((DirectBuffer)bb).address() + pos, rem);
}
if (n > 0)
// 更新bb的position
bb.position(pos + n);
return n;
}
FileDispatcherImpl#pread
调用native方法pread0
# sun.nio.ch.FileDispatcherImpl#pread
int pread(FileDescriptor fd, long address, int len, long position)
throws IOException
{
return pread0(fd, address, len, position);
}
static native int pread0(FileDescriptor fd, long address, int len,
long position) throws IOException;
FileDispatcherImpl.c.read0
https://github.com/AdoptOpenJDK/openjdk-jdk11/blob/19fb8f93c59dfd791f62d41f332db9e306bc1422/src/java.base/unix/native/libnio/ch/FileDispatcherImpl.c#L89
JNIEXPORT jint JNICALL
Java_sun_nio_ch_FileDispatcherImpl_pread0(JNIEnv *env, jclass clazz, jobject fdo,
jlong address, jint len, jlong offset)
{
jint fd = fdval(env, fdo);
void *buf = (void *)jlong_to_ptr(address);
return convertReturnVal(env, pread64(fd, buf, len, offset), JNI_TRUE);
}
ByteBuffer#put
非常朴实无华,循环获取byte放到目标ByteBuffer中
# java.nio.ByteBuffer#put(java.nio.ByteBuffer)
public ByteBuffer put(ByteBuffer src) {
if (src == this)
throw createSameBufferException();
if (isReadOnly())
throw new ReadOnlyBufferException();
int n = src.remaining();
if (n > remaining())
throw new BufferOverflowException();
for (int i = 0; i < n; i++)
put(src.get());
return this;
}
Util#offerFirstTemporaryDirectBuffer
清空DirectByteBuffer,并尝试将清空后的DirectByteBuffer缓存起来
# sun.nio.ch.Util#offerFirstTemporaryDirectBuffer
static void offerFirstTemporaryDirectBuffer(ByteBuffer buf) {
// If the buffer is too large for the cache we don't have to
// check the cache. We'll just free it.
if (isBufferTooLarge(buf)) {
free(buf);
return;
}
assert buf != null;
BufferCache cache = bufferCache.get();
if (!cache.offerFirst(buf)) { // 缓存buf
// cache is full
free(buf); // 清空buf
}
mmapfs
mmapfs的实现类是org.apache.lucene.store.MMapDirectory
mmapfs如何获取ByteBuffer
流程图
MMapDirectory#openInput
mmapfs使用MMapDirectory提供openInput返回一个IndexInput给其他的模块使用
# org.apache.lucene.store.MMapDirectory#openInput
@Override
public IndexInput openInput(String name, IOContext context) throws IOException {
ensureOpen();
ensureCanRead(name);
Path path = directory.resolve(name);
try (FileChannel c = FileChannel.open(path, StandardOpenOption.READ)) {
final String resourceDescription = "MMapIndexInput(path=\"" + path.toString() + "\")";
final boolean useUnmap = getUseUnmap(); // useUnmap=true
// 根据ByteBuffer数量选择使用SingleBufferImpl或者MultiBufferImpl包装
return ByteBufferIndexInput.newInstance(resourceDescription,
map(resourceDescription, c, 0, c.size()),
c.size(), chunkSizePower, new ByteBufferGuard(resourceDescription, useUnmap ? CLEANER : null)); // useUnmap为true
}
}
ByteBufferIndexInput.newInstance 就是个空壳
public static ByteBufferIndexInput newInstance(String resourceDescription, ByteBuffer[] buffers, long length, int chunkSizePower, ByteBufferGuard guard) {
if (buffers.length == 1) {
return new SingleBufferImpl(resourceDescription, buffers[0], length, chunkSizePower, guard);
} else {
return new MultiBufferImpl(resourceDescription, buffers, 0, length, chunkSizePower, guard);
}
}
其中SingleBufferImpl和MultiBufferImpl本质都是代理了ByteBuffer对象,所以这些我们直接跳过,重点是这个map方法
MMapDirectory#map
方法返回的ByteBuffer数组是MappedByteBuffer,具体的实现类是java.nio.DirectByteBufferR
maxChunkSize的获取规则是 Constants.
JRE_IS_64BIT
? (1 << 30) : (1 << 28);
在64位系统下是1073741824即1024MB
this.chunkSizePower = 31 Integer.numberOfLeadingZeros(maxChunkSize); = 30
这里面的preload一直是false,所以buffer.load()不会触发
# org.apache.lucene.store.MMapDirectory#map
final ByteBuffer[] map(String resourceDescription, FileChannel fc, long offset, long length) throws IOException {
if ((length >>> chunkSizePower) >= Integer.MAX_VALUE)
throw new IllegalArgumentException("RandomAccessFile too big for chunk size: " + resourceDescription);
final long chunkSize = 1L << chunkSizePower; // chunkSizePower是30, chunkSize是1G
// we always allocate one more buffer, the last one may be a 0 byte one
// 根据文件大小和块大小来计算分片数量
final int nrBuffers = (int) (length >>> chunkSizePower) + 1;
ByteBuffer buffers[] = new ByteBuffer[nrBuffers];
long bufferStart = 0L;
for (int bufNr = 0; bufNr < nrBuffers; bufNr++) {
int bufSize = (int) ( (length > (bufferStart + chunkSize))
? chunkSize
: (length - bufferStart)
);
MappedByteBuffer buffer; // buffer 是 java.nio.DirectByteBufferR
try {
buffer = fc.map(MapMode.READ_ONLY, offset + bufferStart, bufSize);
} catch (IOException ioe) {
throw convertMapFailedIOException(ioe, resourceDescription, bufSize);
}
if (preload) { // preload=false
buffer.load();
}
buffers[bufNr] = buffer;
bufferStart += bufSize;
}
return buffers;
}
重点看下fileChannel.map方法
FileChannelImpl#map
beginBlocking() 方法 标记可能会无限期阻塞,需要和endBlocking配合使用
nd.size(fd) 获取文件大小,然后校验文件大小是不是小于位点+块大小,如果是则扩张文件,但是我们是只读模式所以这个逻辑不会走
map0(imode, mapPosition, mapSize) 这段逻辑开启虚拟内存地址和文件之间的映射
nd.duplicateForMapping(fd); 创建了一个新的FileDescriptor
创建新的DirectByteBufferR Util.newMappedByteBufferR(isize, addr + pagePosition, mfd, um);
endBlocking()结束标记阻塞
# sun.nio.ch.FileChannelImpl#map
// mode是MapMode.READ_ONLY
// position是开始坐标
// size是小文件则是文件大小,大文件则是块大小
public MappedByteBuffer map(MapMode mode, long position, long size)
throws IOException
{
ensureOpen();
if (mode == null)
throw new NullPointerException("Mode is null");
if (position < 0L)
throw new IllegalArgumentException("Negative position");
if (size < 0L)
throw new IllegalArgumentException("Negative size");
if (position + size < 0)
throw new IllegalArgumentException("Position + size overflow");
if (size > Integer.MAX_VALUE)
throw new IllegalArgumentException("Size exceeds Integer.MAX_VALUE");
int imode = -1;
if (mode == MapMode.READ_ONLY)
imode = MAP_RO;
else if (mode == MapMode.READ_WRITE)
imode = MAP_RW;
else if (mode == MapMode.PRIVATE)
imode = MAP_PV;
assert (imode >= 0);
if ((mode != MapMode.READ_ONLY) && !writable)
throw new NonWritableChannelException();
if (!readable)
throw new NonReadableChannelException();
long addr = -1;
int ti = -1;
try {
// 标记可能会一直阻塞,需要和endBlocking配合使用
beginBlocking();
ti = threads.add();
if (!isOpen())
return null;
long mapSize;
int pagePosition;
synchronized (positionLock) {
long filesize;
do {
filesize = nd.size(fd); // 获取文件大小
} while ((filesize == IOStatus.INTERRUPTED) && isOpen());
if (!isOpen())
return null;
// 这段逻辑是写入时需要扩张文件,我们是只读模式不会走这段逻辑
if (filesize < position + size) { // Extend file size
if (!writable) {
throw new IOException("Channel not open for writing " +
"- cannot extend file to required size");
}
int rv;
do {
rv = nd.truncate(fd, position + size);
} while ((rv == IOStatus.INTERRUPTED) && isOpen());
if (!isOpen())
return null;
}
// 正常情况size 大于0
if (size == 0) {
addr = 0;
// a valid file descriptor is not required
FileDescriptor dummy = new FileDescriptor();
if ((!writable) || (imode == MAP_RO))
return Util.newMappedByteBufferR(0, 0, dummy, null);
else
return Util.newMappedByteBuffer(0, 0, dummy, null);
}
// allocationGranularity 16384 即16K
pagePosition = (int)(position % allocationGranularity);
long mapPosition = position - pagePosition;
mapSize = size + pagePosition;
try {
// If map0 did not throw an exception, the address is valid
// 开启虚拟内存地址
// imode MAP_RO
addr = map0(imode, mapPosition, mapSize);
} catch (OutOfMemoryError x) {
// An OutOfMemoryError may indicate that we've exhausted
// memory so force gc and re-attempt map
System.gc();
try {
Thread.sleep(100);
} catch (InterruptedException y) {
Thread.currentThread().interrupt();
}
try {
addr = map0(imode, mapPosition, mapSize);
} catch (OutOfMemoryError y) {
// After a second OOME, fail
throw new IOException("Map failed", y);
}
}
} // synchronized
// On Windows, and potentially other platforms, we need an open
// file descriptor for some mapping operations.
FileDescriptor mfd;
try {
mfd = nd.duplicateForMapping(fd);
} catch (IOException ioe) {
unmap0(addr, mapSize);
throw ioe;
}
assert (IOStatus.checkAll(addr));
assert (addr % allocationGranularity == 0);
int isize = (int)size;
Unmapper um = new Unmapper(addr, mapSize, isize, mfd);
if ((!writable) || (imode == MAP_RO)) {
return Util.newMappedByteBufferR(isize,
addr + pagePosition,
mfd,
um);
} else {
return Util.newMappedByteBuffer(isize,
addr + pagePosition,
mfd,
um);
}
} finally {
threads.remove(ti);
endBlocking(IOStatus.checkAll(addr));
}
}
FileChannelImpl#map0
这段代码的是将文件的一部分和虚拟内存空间映射起来
# sun.nio.ch.FileChannelImpl#map0
// Creates a new mapping
private native long map0(int prot, long position, long length)
throws IOException;
https://github.com/AdoptOpenJDK/openjdk-jdk11/blob/19fb8f93c59dfd791f62d41f332db9e306bc1422/src/java.base/unix/native/libnio/ch/FileChannelImpl.c#L74
map0源码
JNIEXPORT jlong JNICALL
Java_sun_nio_ch_FileChannelImpl_map0(JNIEnv *env, jobject this,
jint prot, jlong off, jlong len)
{
void *mapAddress = 0;
jobject fdo = (*env)->GetObjectField(env, this, chan_fd);
jint fd = fdval(env, fdo);
int protections = 0;
int flags = 0;
if (prot == sun_nio_ch_FileChannelImpl_MAP_RO) {
protections = PROT_READ;
flags = MAP_SHARED;
} else if (prot == sun_nio_ch_FileChannelImpl_MAP_RW) {
protections = PROT_WRITE | PROT_READ;
flags = MAP_SHARED;
} else if (prot == sun_nio_ch_FileChannelImpl_MAP_PV) {
protections = PROT_WRITE | PROT_READ;
flags = MAP_PRIVATE;
}
mapAddress = mmap64(
0, /* Let OS decide location */
len, /* Number of bytes to map */
protections, /* File permissions */
flags, /* Changes are shared */
fd, /* File descriptor of mapped file */
off); /* Offset into file */
if (mapAddress == MAP_FAILED) {
if (errno == ENOMEM) {
JNU_ThrowOutOfMemoryError(env, "Map failed");
return IOS_THROWN;
}
return handle(env, -1, "Map failed");
}
return ((jlong) (unsigned long) mapAddress);
}
Util#newMappedByteBufferR
# sun.nio.ch.Util#newMappedByteBufferR
static MappedByteBuffer newMappedByteBufferR(int size, long addr,
FileDescriptor fd,
Runnable unmapper)
{
MappedByteBuffer dbb;
if (directByteBufferRConstructor == null)
initDBBRConstructor(); // 如果构造器为空则初始化构造器
try {
// 反射创建新的DirectByteBufferR对象
dbb = (MappedByteBuffer)directByteBufferRConstructor.newInstance(
new Object[] { size,
addr,
fd,
unmapper });
} catch (InstantiationException |
IllegalAccessException |
InvocationTargetException e) {
throw new InternalError(e);
}
return dbb;
}
DirectByteBufferR 构造器
protected DirectByteBufferR(int cap, long addr,
FileDescriptor fd,
Runnable unmapper)
{
super(cap, addr, fd, unmapper);
this.isReadOnly = true;
}
DirectByteBufferR父类构造器
protected DirectByteBuffer(int cap, long addr,
FileDescriptor fd,
Runnable unmapper)
{
super(-1, 0, cap, cap, fd);
address = addr;
cleaner = Cleaner.create(this, unmapper);
att = null;
}
MappedByteBuffer(int mark, int pos, int lim, int cap, // package-private
FileDescriptor fd)
{
super(mark, pos, lim, cap);
this.fd = fd;
}
mmapfs如何读数据
@Override
public final byte readByte() throws IOException {
try {
// curBuf是 java.nio.DirectByteBufferR
return guard.getByte(curBuf); // guard 是 org.apache.lucene.store.ByteBufferGuard
} catch (BufferUnderflowException e) {
do {
curBufIndex++;
if (curBufIndex >= buffers.length) {
throw new EOFException("read past EOF: " + this);
}
setCurBuf(buffers[curBufIndex]);
curBuf.position(0);
} while (!curBuf.hasRemaining());
return guard.getByte(curBuf);
} catch (NullPointerException npe) {
throw new AlreadyClosedException("Already closed: " + this);
}
}
guard.getByte(curBuf)只是套了一层壳文章来源:https://www.toymoban.com/news/detail-861172.html
# org.apache.lucene.store.ByteBufferGuard#getByte(java.nio.ByteBuffer)
public byte getByte(ByteBuffer receiver) {
ensureValid();
return receiver.get();
}
最终调用java.nio.DirectByteBuffer#get()文章来源地址https://www.toymoban.com/news/detail-861172.html
# java.nio.DirectByteBuffer#get()
public byte get() {
try {
return ((UNSAFE.getByte(ix(nextGetIndex()))));
} finally {
Reference.reachabilityFence(this);
}
}
到了这里,关于简单介绍ES中的索引存储类型的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!