版本基于:Android R
0. 前言
在《Android 基于Handler 剖析消息机制》一文中,以 Handler 类为起点详细分析了异步通信,分析了Java 端 Handler 与Looper、MessageQueue、Message 之前的通信关系。
框架如下:
在Java 端的 Looper 中会创建一个 Java 端的 MessageQueue实例,并在loop() 函数中的死循环里通过 queue.next() 不停的获取监听到的下一个 Message,然后将其通过 dispatchMessage() 分发处理。详细看《Android 基于Handler 剖析消息机制》一文第 4.3 节。
在 Java 端的 MessageQueue 实现核心机制是通过 Native 端的 MessageQueue,该类的对象中维护了一个 native Looper,上面说到的 Java 端的next() 函数就是通过 native 端的 epoll 机制 进行阻塞监听。
之前《Android 基于Handler 剖析消息机制》一文只是对 native Looper 做了简单的介绍,本文将重点分析一下 native Looper 实现机制。
1. Looper 类
system/core/libutils/include/utils/Looper.h
class Looper : public RefBase {
protected:
virtual ~Looper();
public:
Looper(bool allowNonCallbacks);
bool getAllowNonCallbacks() const;
int pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData);
inline int pollOnce(int timeoutMillis) {
return pollOnce(timeoutMillis, nullptr, nullptr, nullptr);
}
int pollAll(int timeoutMillis, int* outFd, int* outEvents, void** outData);
inline int pollAll(int timeoutMillis) {
return pollAll(timeoutMillis, nullptr, nullptr, nullptr);
}
void wake();
int addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data);
int addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data);
int removeFd(int fd);
void sendMessage(const sp<MessageHandler>& handler, const Message& message);
void sendMessageDelayed(nsecs_t uptimeDelay, const sp<MessageHandler>& handler,
const Message& message);
void sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
const Message& message);
void removeMessages(const sp<MessageHandler>& handler);
void removeMessages(const sp<MessageHandler>& handler, int what);
bool isPolling() const;
static sp<Looper> prepare(int opts);
static void setForThread(const sp<Looper>& looper);
static sp<Looper> getForThread();
private:
...
const bool mAllowNonCallbacks; // immutable
android::base::unique_fd mWakeEventFd; // immutable
Vector<MessageEnvelope> mMessageEnvelopes; // guarded by mLock
...
};
简单了列举了下 Looper 类中的成员,后面根据流程再详细说明。
2. Looper 构造函数
system/core/libutils/Looper.cpp
Looper::Looper(bool allowNonCallbacks)
: mAllowNonCallbacks(allowNonCallbacks),
mSendingMessage(false),
mPolling(false),
mEpollRebuildRequired(false),
mNextRequestSeq(0),
mResponseIndex(0),
mNextMessageUptime(LLONG_MAX) {
mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC));
LOG_ALWAYS_FATAL_IF(mWakeEventFd.get() < 0, "Could not make wake event fd: %s", strerror(errno));
AutoMutex _l(mLock);
rebuildEpollLocked();
}
参数 allowNonCallbacks 指定该 Looper 中是否允许在 addFd() 的时候可以不设定 callback,如果为true,则在addFd() 时可以不用设定callback,如果为 false,则在 addFd() 的时候需要指定 callback,否则会报错,详细可以查看 addFd() 函数。
构造函数中主要做了两件事情:
- 创建一个 wakeEventFd;
- 调用 rebuildEpollLocked() 初始化 epoll;
2.1 rebuildEpollLocked()
system/core/libutils/Looper.cpp
void Looper::rebuildEpollLocked() {
//step1,如果是重构epoll 时的逻辑,需要进行reset
if (mEpollFd >= 0) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ rebuildEpollLocked - rebuilding epoll set", this);
#endif
mEpollFd.reset();
}
//step2,分配一个新的epoll,并将wakeEventFd添加到监听
mEpollFd.reset(epoll_create1(EPOLL_CLOEXEC));
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));
struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeEventFd.get();
int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &eventItem);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",
strerror(errno));
//step3,是否有其他的request,如果有,也重新加入到epoll中监听
//一般request 通过addFd 函数添加
for (size_t i = 0; i < mRequests.size(); i++) {
const Request& request = mRequests.valueAt(i);
struct epoll_event eventItem;
request.initEventItem(&eventItem);
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, request.fd, &eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",
request.fd, strerror(errno));
}
}
}
大致流程如上面注释,主要是三步:
- 如果是重构 epoll 时的逻辑,需要进行reset;
- 分配一个新的 epoll,并将 wakeEventFd添加到监听;
- 是否有其他的 request,如果有,也重新加入到epoll中监听;
3. MessageHandler 类
system/core/libutils/include/utils/Looper.h
class MessageHandler : public virtual RefBase {
protected:
virtual ~MessageHandler();
public:
/**
* Handles a message.
*/
virtual void handleMessage(const Message& message) = 0;
};
在 Looper.h 中还定义了 MessageHandler 这样的类,同Java 端的 Handler类,都是用来处理消息的。
当创建好 MessageHandler 对象之后,可以通过 Looper::sendMessage() 函数发送消息到Looper 中,Looper 会在后续的 pollOnce() 中进行监听,最终通过 handleMessage() 函数进行回调处理此次消息。
3.1 sendMessage()
在 Looper.h 中我们看到三个详细的函数:
void sendMessage(const sp<MessageHandler>& handler, const Message& message);
void sendMessageDelayed(nsecs_t uptimeDelay, const sp<MessageHandler>& handler,
const Message& message);
void sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
const Message& message);
但其实前面两个函数最终调用的都是 sendMessageAtTime() 函数:
system/core/libutils/Looper.cpp
void Looper::sendMessage(const sp<MessageHandler>& handler, const Message& message) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
sendMessageAtTime(now, handler, message);
}
void Looper::sendMessageDelayed(nsecs_t uptimeDelay, const sp<MessageHandler>& handler,
const Message& message) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
sendMessageAtTime(now + uptimeDelay, handler, message);
}
void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
const Message& message) {
#if DEBUG_CALLBACKS
ALOGD("%p ~ sendMessageAtTime - uptime=%" PRId64 ", handler=%p, what=%d",
this, uptime, handler.get(), message.what);
#endif
size_t i = 0;
{ // acquire lock
AutoMutex _l(mLock);
size_t messageCount = mMessageEnvelopes.size();
while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
i += 1;
}
MessageEnvelope messageEnvelope(uptime, handler, message);
mMessageEnvelopes.insertAt(messageEnvelope, i, 1);
// Optimization: If the Looper is currently sending a message, then we can skip
// the call to wake() because the next thing the Looper will do after processing
// messages is to decide when the next wakeup time should be. In fact, it does
// not even matter whether this code is running on the Looper thread.
if (mSendingMessage) {
return;
}
} // release lock
// Wake the poll loop only when we enqueue a new message at the head.
if (i == 0) {
wake();
}
}
这里涉及到了 Looper 中的另外一个成员变量:mMessageEnvelopes,这是一个 Vector 类型变量,用以存放通过 sendMessage() 发送到 Looper 中的消息。
如代码所示,会将 message、message handler、uptime 都存放在一个信封对象中,然后将该信封添加到 mMessageEnvelopes 中。
这里还涉及另外一个成员变量:mSendingMessage,用以标记当前是否正在 handleMessage() 处理消息。当该函数执行的时候,mSendingMessage 被置 true,标记正在处理,在处理函数完成之后会将其再次置 false。
4. LooperCallback 类
system/core/libutils/include/utils/Looper.h
class LooperCallback : public virtual RefBase {
protected:
virtual ~LooperCallback();
public:
/**
* Handles a poll event for the given file descriptor.
* It is given the file descriptor it is associated with,
* a bitmask of the poll events that were triggered (typically EVENT_INPUT),
* and the data pointer that was originally supplied.
*
* Implementations should return 1 to continue receiving callbacks, or 0
* to have this file descriptor and callback unregistered from the looper.
*/
virtual int handleEvent(int fd, int events, void* data) = 0;
};
该类用以处理通过 addFd() 添加监听的 fd。在 addFd() 函数中会同步指定 callback,就是这里的 LooperCallback 对象。
4.1 addFd()
在 Looper.h 中可以看到该函数的声明:
int addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data);
int addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data);
system/core/libutils/Looper.cpp
int Looper::addFd(int fd, int ident, int events, Looper_callbackFunc callback, void* data) {
return addFd(fd, ident, events, callback ? new SimpleLooperCallback(callback) : nullptr, data);
}
int Looper::addFd(int fd, int ident, int events, const sp<LooperCallback>& callback, void* data) {
if (!callback.get()) {
if (! mAllowNonCallbacks) {
ALOGE("Invalid attempt to set NULL callback but not allowed for this looper.");
return -1;
}
if (ident < 0) {
ALOGE("Invalid attempt to set NULL callback with ident < 0.");
return -1;
}
} else {
ident = POLL_CALLBACK;
}
{ // acquire lock
AutoMutex _l(mLock);
Request request;
request.fd = fd;
request.ident = ident;
request.events = events;
request.seq = mNextRequestSeq++;
request.callback = callback;
request.data = data;
if (mNextRequestSeq == -1) mNextRequestSeq = 0; // reserve sequence number -1
struct epoll_event eventItem;
request.initEventItem(&eventItem);
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex < 0) {
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, fd, &eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d: %s", fd, strerror(errno));
return -1;
}
mRequests.add(fd, request);
} else {
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_MOD, fd, &eventItem);
if (epollResult < 0) {
if (errno == ENOENT) {
epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, fd, &eventItem);
if (epollResult < 0) {
ALOGE("Error modifying or adding epoll events for fd %d: %s",
fd, strerror(errno));
return -1;
}
scheduleEpollRebuildLocked();
} else {
ALOGE("Error modifying epoll events for fd %d: %s", fd, strerror(errno));
return -1;
}
}
mRequests.replaceValueAt(requestIndex, request);
}
} // release lock
return 1;
}
这里涉及到 Looper 中的另外一个成员变量:mRequests,这是一个KeyedVector 类型的变量,用以存放通过 addFd() 添加的监听请求。
如代码所示,首先确认 mRequests 中是否已经存放该 fd 的监听,如果没有,则通过 epoll_ctl() 将对该 fd 的监听添加到 epoll 中,并将此次的请求添加到 mRequests 中;如果已经存在对该 fd 的监听,则对 epoll 进行更改,并将 mRequests 中的request 进行替换。
当 pollOnce() 中收到 epoll 的event 为该 mRequests 中的 fd 消息时,会将响应的 event 暂存在 Looper 中的另一个成员变量 mResponses 中,接着 pollOnce() 中会确认这些 fd 消息是否需要 callback 处理,如果callback 处理成功则将此次的响应消息从 mResponses 中移除。详细请查看下面第 6 节pollOnce() 函数。
另外,需要注意这里的 ident (identifier 简称)参数,可以在传入的时候指定为 POLL_CALLBACK 等枚举值(这些值都为负数),也可以设定为一个 >= 0 的其他值。但是,如果该 ident 值为 >=0 时,pollOnce() 调用时会将该次响应的数据取出并return,详细看第 6 节 pollOnce() 函数。
5. wake()
system/core/libutils/Looper.cpp
void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
ALOGD("%p ~ wake", this);
#endif
uint64_t inc = 1;
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
LOG_ALWAYS_FATAL("Could not write wake signal to fd %d (returned %zd): %s",
mWakeEventFd.get(), nWrite, strerror(errno));
}
}
}
通过往 mWakeEventFd 中写个数据来达到唤醒的目的。
6. pollOnce()
先来看下在Looper.h 中的声明:
int pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData);
inline int pollOnce(int timeoutMillis) {
return pollOnce(timeoutMillis, nullptr, nullptr, nullptr);
}
参数 timeoutMillis 单位为 毫秒;
注意,pollOnce() 可以只传一个 timeout 的参数,此时其他参数默认为 nullptr。
调用该函数用以等待一个 event 的产生。
- 如果timeout 为0,则不用阻塞立即返回;
- 如果timeout 为负数,则无限等待一直等到一个 event 出现;
- 如果在 timeout 到期之前,或者callback被调用之前,或者没有fd 准备好之前,epoll 被wake() 函数唤醒时,返回 POLL_WAKE,这也是默认返回类型;
- 如果是 callback 被调用处理时,返回POLL_CALLBACK;
- 如果是 timeout 到期时,返回 POLL_TIMEOUT;
- 如果出现错误时,返回 POLL_ERROR;
- 该函数直到完成所有的 fd 所有对应的合适的 callback 调用之后,才会 return;
详细的逻辑可以查看源码:
system/core/libutils/Looper.cpp
int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
//确定是否有fd 响应,如果 ident 为>=0的值,则直接返回;
// 如果ident 为负数,则进行下一次的 pollInner()等待、处理;
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
if (outFd != nullptr) *outFd = fd;
if (outEvents != nullptr) *outEvents = events;
if (outData != nullptr) *outData = data;
return ident;
}
}
//pollInner() 处理结果确定,一般不会为0
if (result != 0) {
if (outFd != nullptr) *outFd = 0;
if (outEvents != nullptr) *outEvents = 0;
if (outData != nullptr) *outData = nullptr;
return result;
}
//pollOnce 函数的核心处理部分,epoll机制监听、处理的地方
result = pollInner(timeoutMillis);
}
}
在调用 pollInner() 函数之前,会通过 while() 循环将所有 fd 的响应进行确认,如果出现了 fd 的消息没有经过callback 处理,那么该 fd 的响应还会留在 mResponses 中。设计的初衷应该是希望 fd 的消息可以通过指定的 callback(第4节中的LooperCallback 对象) 处理,如果不需要 callback 处理,则会将 ident 参数值设为 >=0 的值,这样在此处的 while 循环就可以从 pollOnce() 中 return。
6.1 pollInner()
system/core/libutils/Looper.cpp
int Looper::pollInner(int timeoutMillis) {
//step1,确定timeout
if (timeoutMillis != 0 && mNextMessageUptime != LLONG_MAX) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
int messageTimeoutMillis = toMillisecondTimeoutDelay(now, mNextMessageUptime);
if (messageTimeoutMillis >= 0
&& (timeoutMillis < 0 || messageTimeoutMillis < timeoutMillis)) {
timeoutMillis = messageTimeoutMillis;
}
}
//step2,进入真正poll处理,先清空 mResponses之前的残留
int result = POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;
// We are about to idle.
mPolling = true;
//step3,等待epoll监听到消息
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
// No longer idling.
mPolling = false;
// Acquire lock.
mLock.lock();
//step4,当fd的修改、删除出现问题时,需要重构epoll
if (mEpollRebuildRequired) {
mEpollRebuildRequired = false;
rebuildEpollLocked();
goto Done;
}
//step5, 如果event count 小于0,则表示epoll出错,答应错误并结束event处理
if (eventCount < 0) {
if (errno == EINTR) {
goto Done;
}
ALOGW("Poll failed with an unexpected error: %s", strerror(errno));
result = POLL_ERROR;
goto Done;
}
//step6,如果event count 为0,则表示timeout 到期,进入timeout处理
if (eventCount == 0) {
result = POLL_TIMEOUT;
goto Done;
}
//step7,收到了event,确定是wake() 触发,还是fd 消息
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
//是被调用wake() 唤醒的
if (fd == mWakeEventFd.get()) {
if (epollEvents & EPOLLIN) {
awoken();
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {//是fd消息唤醒的,将event暂存mResponses 中
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
pushResponse(events, mRequests.valueAt(requestIndex));
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
Done: ;
//step8,处理messages,timeout 触发
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
{ // obtain handler
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock();
//处理message
handler->handleMessage(message);
} // release handler
mLock.lock();
mSendingMessage = false;
result = POLL_CALLBACK;
} else {
// The last message left at the head of the queue determines the next wakeup time.
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}
// Release lock.
mLock.unlock();
//step9,fd 消息响应,进行fd 指定的callback处理
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
//如果fd 指定了callback,则调用handleEvent() 处理
if (response.request.ident == POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
int callbackResult = response.request.callback->handleEvent(fd, events, data);
//handleEvent 处理成功,则需要返回0,表示处理完成,退出Looper
//当然,也可以返回非0,表示继续等待callback处理,不退出Looper
if (callbackResult == 0) {
removeFd(fd, response.request.seq);
}
// Clear the callback reference in the response structure promptly because we
// will not clear the response vector itself until the next poll.
response.request.callback.clear();
result = POLL_CALLBACK;
}
}
return result;
}
流程比较长,详细的分析见代码中的注释,重点情况在addFd() 分析时或者pollOnce() 分析时已经说明,请参考上文。
7. 其他函数
7.1 removeFd()
与addFd() 相反,将 fd 监听从 mRequests 中移除,并从 epoll 监听中移除。
7.2 isPolling()
确定当前Looper 是否处于 epoll_wait() 阻塞,如果epoll_wait() 没有返回,则说明 Looper 处于 polling 状态。
至此,native Looper的原理已经基本分析完成。核心是通过 epoll 机制进行监听,针对:
- message 的timeout,通过 sendMessage() 加入到 mMessageEnvelopes 中;
- wake fd 的灵活唤醒,通过创建 wake event fd 监听,wake() 进行唤醒;
- 其他 fd 的poll event 唤醒,通过 addFd() 加入到 mRequests 中;
其他关联博文:
Android 基于Handler 剖析消息机制
Android 中Handler 详解
Android AsyncTask 详解文章来源:https://www.toymoban.com/news/detail-691240.html
Android HandlerThread 详解文章来源地址https://www.toymoban.com/news/detail-691240.html
到了这里,关于Android 中Looper机制详解的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!