在整个 Android 的源码世界里,有两大利剑,其一是 Binder IPC 机制,另一个便是消息机制 (由 Handler/Looper/MessageQueue 等构成的)。

Android 有大量的消息驱动方式来进行交互,比如 Android 的四剑客Activity, Service, Broadcast, ContentProvider的启动过程的交互,都离不开消息机制,Android 某种意义上也可以说成是一个以消息驱动的系统。消息机制涉及 MessageQueue/Message/Looper/Handler4 个类。

原文来自于 Gityuan Handler(framework) Handler(native) 源码更新于 AndroidCodeSearch 2021/1/9

Handler 是干嘛的?

  1. 推送未来某个时间点将要执行的 Message 或者 Runnable 到消息队列。
  2. 在子线程把需要在另一个线程执行的操作加入到消息队列中去。

Handler 组成

  • Message:消息分为硬件产生的消息(如按钮、触摸)和软件生成的消息;
  • MessageQueue:消息队列的主要功能向消息池投递消息(MessageQueue.enqueueMessage)和取走消息池的消息(MessageQueue.next);
  • Handler:消息辅助类,主要功能向消息池发送各种消息事件(Handler.sendMessage)和处理相应消息事件(Handler.handleMessage);
  • Looper:不断循环执行(Looper.loop),按分发机制将消息分发给目标处理者。

Handler 架构

  • Looper有一个MessageQueue消息队列;
  • MessageQueue有一组待处理的Message
  • Message中有一个用于处理消息的Handler;
  • Handler中有LooperMessageQueue

图片来源于《Android消息机制1-Handler(Java层)》

使用方式

利用 HandlerThread 创建

既然涉及多个线程的通信,会有同步的问题,Android 为了简化 Handler 的创建过程,提供了 HandlerThread 类, 很多时候,在 HandlerThread 线程中运行Loop() 方法,在其他线程中通过 Handler 发送消息到 HandlerThread 线程。通过 wait/notifyAll 的方式,有效地解决了多线程的同步问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// Step 1: 创建并启动 HandlerThread 线程,内部包含 Looper
HandlerThread handlerThread = new HandlerThread("calmCenter");// 【见 1.1】
handlerThread.start();

// Step 2: 创建Handler
Handler handler = new Handler(handlerThread.getLooper());// 【见 1.2】

// Step 3: 发送消息
handler.post(new Runnable() {
@Override
public void run() {
System.out.println("thread id="+Thread.currentThread().getId());
}
});

或者 handler.postDelayed(Runnable r, long delayMillis) 用于延迟执行。

直接创建线程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
class LooperThread extends Thread {
public Handler mHandler;

public void run() {
Looper.prepare(); // 【见 2.1.1】
// Step 1: 创建Handler
mHandler = new Handler() {
public void handleMessage(Message msg) {
//处理即将发送过来的消息
System.out.println("thread id="+Thread.currentThread().getId());
}
};

Looper.loop();
}
}

// Step 2: 创建并启动LooperThread线程,内部包含Looper
LooperThread looperThread = new LooperThread("calmCenter");
looperThread.start();

// Step 3: 发送消息
LooperThread.mHandler.sendEmptyMessage(10);

一、HandlerThread 源码分析

1.1 创建 HandlerThread 对象

HandlerThread 继承于 Thread

1
2
3
4
5
6
7
8
9
public HandlerThread(String name) {
super(name);
mPriority = Process.THREAD_PRIORITY_DEFAULT; //默认优先级
}

public HandlerThread(String name, int priority) {
super(name);
mPriority = priority;
}

1.2 获取 Looper 对象

1
2
// Step 2: 创建Handler
Handler handler = new Handler(handlerThread.getLooper());

获取 HandlerThread 线程中的 Looper 对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public Looper getLooper() {
// 当线程没有启动或者已经结束时,则返回null
if (!isAlive()) {
return null;
}

//当线程已经启动,则等待直到looper创建完成
synchronized (this) {
while (isAlive() && mLooper == null) {
try {
wait(); //休眠等待
} catch (InterruptedException e) {
}
}
}
return mLooper;
}

1.3 执行 handlerThread 的 run()

1
2
3
4
5
6
7
8
9
10
11
12
public void run() {
mTid = Process.myTid(); //获取线程的tid
Looper.prepare(); // 创建Looper对象
synchronized (this) {
mLooper = Looper.myLooper(); //获取looper对象
notifyAll(); //唤醒等待线程
}
Process.setThreadPriority(mPriority);
onLooperPrepared(); // 该方法可通过覆写,实现自己的逻辑
Looper.loop(); //进入循环模式
mTid = -1;
}

1.4 Looper 退出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
public boolean quit() {
Looper looper = getLooper();
if (looper != null) {
looper.quit(); //普通退出
return true;
}
return false;
}

public boolean quitSafely() {
Looper looper = getLooper();
if (looper != null) {
looper.quitSafely(); //安全退出
return true;
}
return false;
}

二、Handler Java 层源码分析

2.1 Looper

2.1.1 prepare

对于无参的情况,默认调用 prepare(true),表示的是这个 Looper 允许退出,而对于 false 的情况则表示当前 Looper 不允许退出,只有在 prepareMainLooper 的时候会传入 false

1
2
3
public static void prepare() {
prepare(true); // 允许退出
}

prepareMainLooper()

我们平时使用 Handler 的时候,并不需要执行 Looper.prepare(); ,但是在上面的两种方式中,都需要执行这句话,这是为什么呢?

其实我们在 Activity 等地方直接使用 Handler 的时候,系统已经为我们执行过 prepare 了,就是这里提到的 prepareMainLooper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public final class ActivityThread {
//...
public static final void main(String[] args) {
//...
Looper.prepareMainLooper();
//...
ActivityThread thread = new ActivityThread();
thread.attach(false);
//...
Looper.loop();
//...
thread.detach();
//...
}
}

public static void prepareMainLooper() {
prepare(false); // 不允许退出
synchronized (Looper.class) {
if (sMainLooper != null) {
throw new IllegalStateException("The main Looper has already been prepared.");
}
sMainLooper = myLooper();
}
}

Looper.prepare(boolean)

1
2
3
4
5
6
7
8
private static void prepare(boolean quitAllowed) {
//每个线程只允许执行一次该方法,第二次执行时线程的 TLS 已有数据,则会抛出异常。
if (sThreadLocal.get() != null) {
throw new RuntimeException("Only one Looper may be created per thread");
}
//创建 Looper 对象,并保存到当前线程的 TLS 区域 【见 2.2】
sThreadLocal.set(new Looper(quitAllowed));
}

这里 new 了一个 Looper 并保存进了 sThreadLocal。至于为什么每个线程在使用 HandlerLooper 必须先 prepare

因为 Handler 使用到了 Looper 在使用 Handler 之前,当前线程必须先初始化 Looper 具体见 ,每个线程都需要初始化,这就涉及到了 ThreadLocal 具体见 2.2.

2.1.2 loop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public static void loop() {
final Looper me = myLooper(); //获取 TLS 存储的 Looper 对象
// loop 之前同样需要 Looper.prepare()
if (me == null) {
throw new RuntimeException("No Looper; Looper.prepare() wasn't called on this thread.");
}
final MessageQueue queue = me.mQueue; //获取Looper对象中的消息队列
//确保在权限检查时基于本地进程,而不是调用进程。
Binder.clearCallingIdentity();
final long ident = Binder.clearCallingIdentity();
//...
boolean slowDeliveryDetected = false;
//进入loop的主循环方法
for (;;) {
Message msg = queue.next(); // 可能会阻塞
if (msg == null) {
//没有消息,则退出循环
return;
}
//默认为null,可通过setMessageLogging()方法来指定输出,用于debug功能
final Printer logging = me.mLogging;
if (logging != null) {
logging.println(">>>>> Dispatching to " + msg.target + " " +
msg.callback + ": " + msg.what);
}
//...
try {
//用于分发 Message
msg.target.dispatchMessage(msg); // 【见 2.3.2】
//...
} finally {
//...
}
//...
if (logging != null) {
logging.println("<<<<< Finished to " + msg.target + " " + msg.callback);
}
//恢复调用者信息
final long newIdent = Binder.clearCallingIdentity();
//...
//将Message放入消息池
msg.recycleUnchecked();
}
}

loop() 进入循环模式,不断重复下面的操作,直到没有消息时退出循环

  • 读取 MessageQueue 的下一条 Message
  • Message 分发给相应的 target
  • 再把分发后的 Message 回收到消息池,以便重复利用。

这是这个消息处理的核心部分。另外,上面代码中可以看到有 logging 方法,这是用于 debug 的,默认情况下 logging == null,通过设置 setMessageLogging() 用来开启 debug 工作。

2.1.3 quit

1
2
3
4
5
6
7
public void quit() {
mQueue.quit(false); //消息移除
}

public void quitSafely() {
mQueue.quit(true); //安全地消息移除
}

Looper.quit() 方法的实现最终调用的是 MessageQueue.quit() 方法

MessageQueue.quit()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
void quit(boolean safe) {
// 当 mQuitAllowed 为 false,表示不运行退出,强行调用 quit() 会抛出异常
if (!mQuitAllowed) {
throw new IllegalStateException("Main thread not allowed to quit.");
}

synchronized (this) {
if (mQuitting) {//防止多次执行退出操作
return;
}
mQuitting = true;

if (safe) {
removeAllFutureMessagesLocked();//移除尚未触发的所有消息
} else {
removeAllMessagesLocked();//移除所有的消息
}
//mQuitting=false,那么认定为 mPtr != 0
nativeWake(mPtr);
}
}

消息退出的方式:

  • safe = true 时,只移除尚未触发的所有消息,对于正在触发的消息并不移除;
  • safe = flase 时,移除所有的消息

2.2 ThreadLocal

线程本地存储区(Thread Local Storage,简称为 TLS ),每个线程都有自己的私有的本地存储区域,不同线程之间彼此不能访问对方的 TLS 区域。

ThreadLocal 原理

图片来源于《ThreadLocal原理》

这张图很久之前看过,找了很久!ThreadLocal原理(简单易懂)

  • 每个 Thread 线程内部都有一个 ThreadLocalMap
  • ThreadLocalMap 里面存储线程本地对象(key)和线程的变量副本(value)。
  • Thread 内部的 ThreadLocalMap 是由 ThreadLocal 维护的,由 ThreadLocal 负责向 ThreadLocalMap 设置和获取线程的变量值。
  • 同一个 Thread 中多个 ThreadLocal 对象,共用一个 ThreadLocalMap
  • 不同 Thread,同一个 ThreadLocal 对象,对应不同的值

ThreadLocal.set(T value):将 value 存储到当前线程的 TLS 区域,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public void set(T value) {
Thread t = Thread.currentThread(); //获取当前线程
ThreadLocalMap map = getMap(t); // 查找当前线程的实际存储的数据结构
//如果存在 map 就直接 set,没有则创建 map 并 set
if (map != null)
map.set(this, value);
else
createMap(t, value);
}

ThreadLocalMap getMap(Thread t) {
// thread 中维护了一个 ThreadLocalMap
return t.threadLocals;
}

void createMap(Thread t, T firstValue) {
t.threadLocals = new ThreadLocalMap(this, firstValue);
}

ThreadLocal.get():获取当前线程 TLS 区域的数据,源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public T get() {
Thread t = Thread.currentThread();// 获取当前线程
ThreadLocalMap map = getMap(t);// 查找当前线程的实际存储的数据结构
if (map != null) {
ThreadLocalMap.Entry e = map.getEntry(this);//从 map 中获取 ThreadLocal 为 this 的 Entry节点。
if (e != null) {
@SuppressWarnings("unchecked")
T result = (T)e.value;
return result;// 从 Entry 节点 获取存储的 Value 值返回。
}
}
// map = null 则 创建 map 新增一个 Entry, key 为 this ,value 为 null
return setInitialValue();
}

ThreadLocalget()set() 方法操作的类型都是泛型,接着回到前面提到的sThreadLocal变量,其定义如下:

1
static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>()

可见sThreadLocalget()set() 操作的类型都是 Looper 类型。

2.3 Handler

2.3.1 创建 Handler

无参构造

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public Handler() {
this(null, false);
}

public Handler(Callback callback, boolean async) {
// 匿名类、内部类或本地类都必须申明为 static,否则会警告可能出现内存泄露
if (FIND_POTENTIAL_LEAKS) {
final Class<? extends Handler> klass = getClass();
if ((klass.isAnonymousClass() || klass.isMemberClass() || klass.isLocalClass()) &&
(klass.getModifiers() & Modifier.STATIC) == 0) {
Log.w(TAG, "The following Handler class should be static or leaks might occur: " +
klass.getCanonicalName());
}
}
//必须先执行 Looper.prepare(),才能获取 Looper 对象,否则为 null.
mLooper = Looper.myLooper();//从当前线程的 TLS 中获取 Looper 对象
if (mLooper == null) {
throw new RuntimeException(
"Can't create handler inside thread " + Thread.currentThread()
+ " that has not called Looper.prepare()");
}
mQueue = mLooper.mQueue; //消息队列,来自Looper对象
mCallback = callback; //回调方法
mAsynchronous = async; //设置消息是否为异步处理方式
}

对于 Handle 的无参构造方法,默认采用当前线程 TLS 中的 Looper 对象,并且 callback 回调方法为 null ,且消息为同步处理方式。只要执行的 Looper.prepare() 方法,那么便可以获取有效的 Looper 对象。

有参构造

1
2
3
4
5
6
7
8
9
10
public Handler(Looper looper) {
this(looper, null, false);
}

public Handler(Looper looper, Callback callback, boolean async) {
mLooper = looper;
mQueue = looper.mQueue;
mCallback = callback;
mAsynchronous = async;
}

Handler 类在构造方法中,可指定 LooperCallback 回调方法以及消息的处理方式(同步或异步),对于无参的 handler ,默认是当前线程的 Looper

2.3.2 dispatchMessage

消息分发机制,在 Looper.loop() 中,当发现有消息时,调用消息的目标 handler ,执行 dispatchMessage() 方法来分发消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public void dispatchMessage(Message msg) {
if (msg.callback != null) {
// 当 Message 存在回调方法,回调 msg.callback.run() 方法;
handleCallback(msg);
} else {
if (mCallback != null) {
// 当 Handler 存在 Callback 成员变量时,回调方法 handleMessage();
if (mCallback.handleMessage(msg)) {
return;
}
}
// Handler 自身的回调方法 handleMessage()
handleMessage(msg);
}
}

分发消息流程:

  1. Message 的回调方法不为空时,则回调方法 msg.callback.run(),其中 callBack 数据类型为 Runnable ,否则进入步骤 2
  2. HandlermCallback成员变量不为空时,则回调方法mCallback.handleMessage(msg),否则进入步骤 3
  3. 调用Handler自身的回调方法 handleMessage(),该方法默认为空,Handler 子类通过覆写该方法来完成具体的逻辑。

对于很多情况下,消息分发后的处理方法是第 3 种情况,即 Handler.handleMessage() ,一般地往往通过覆写该方法从而实现自己的业务逻辑。

2.3.3 消息发送

sendEmptyMessage()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public final boolean sendEmptyMessage(int what)
{
return sendEmptyMessageDelayed(what, 0);
}

public final boolean sendEmptyMessageDelayed(int what, long delayMillis) {
Message msg = Message.obtain();// 【见 2.5.3】
msg.what = what;
return sendMessageDelayed(msg, delayMillis);
}

public final boolean sendMessageDelayed(Message msg, long delayMillis)
{
if (delayMillis < 0) {
delayMillis = 0;
}
return sendMessageAtTime(msg, SystemClock.uptimeMillis() + delayMillis);
}

public boolean sendMessageAtTime(Message msg, long uptimeMillis) {
// 获取当前 Handler 所在的消息队列
MessageQueue queue = mQueue;
if (queue == null) {
// 消息队列为空,直接返回
return false;
}
// 将消息添加到消息队列中
return enqueueMessage(queue, msg, uptimeMillis);
}

sendMessageAtFrontOfQueue

1
2
3
4
5
6
7
public final boolean sendMessageAtFrontOfQueue(Message msg) {
MessageQueue queue = mQueue;
if (queue == null) {
return false;
}
return enqueueMessage(queue, msg, 0);
}

该方法通过设置消息的触发时间为 0 ,从而使 Message 加入到消息队列的队头。

post

1
2
3
4
5
6
7
8
9
10
public final boolean post(Runnable r)
{
return sendMessageDelayed(getPostMessage(r), 0);
}

private static Message getPostMessage(Runnable r) {
Message m = Message.obtain();// 【见 2.5.3】
m.callback = r;
return m;
}

postAtFrontOfQueue

1
2
3
4
public final boolean postAtFrontOfQueue(Runnable r)
{
return sendMessageAtFrontOfQueue(getPostMessage(r));
}

enqueueMessage

1
2
3
4
5
6
7
private boolean enqueueMessage(MessageQueue queue, Message msg, long uptimeMillis) {
msg.target = this;
if (mAsynchronous) {
msg.setAsynchronous(true);
}
return queue.enqueueMessage(msg, uptimeMillis); // 【见 2.4.3】
}

Handler.sendEmptyMessage()等系列方法最终调用MessageQueue.enqueueMessage(msg, uptimeMillis),将消息添加到消息队列中,其中 uptimeMillis 为系统当前的运行时间,不包括休眠时间。

2.3.4 obtainMessage

获取消息

1
2
3
public final Message obtainMessage() {
return Message.obtain(this);// 【见 2.5.3】
}

Handler.obtainMessage()方法,最终调用Message.obtainMessage(this),其中 this 为当前的 Handler 对象。

2.3.5 removeMessages

1
2
3
public final void removeMessages(int what) {
mQueue.removeMessages(this, what, null);// 【见 2.4.4】
}

Handler 是消息机制中非常重要的辅助类,更多的实现都是 MessageQueue, Message 中的方法,Handler 的目的是为了更加方便的使用消息机制。

2.4 MessageQueue

2.4.1 创建 MessageQueue

Looper.prepare() 的时候会调用构造函数创建 MessageQueue

1
2
3
4
 MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit(); //【见 4.1.1】
}

2.4.2 next

提取下一条 message

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
Message next() {
final long ptr = mPtr;
if (ptr == 0) {// 当消息循环已经退出,则直接返回
return null;
}
int pendingIdleHandlerCount = -1; // 循环迭代的首次为 -1
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
// 阻塞操作,当等待 nextPollTimeoutMillis 时长,或者消息队列被唤醒,都会返回
nativePollOnce(ptr, nextPollTimeoutMillis);// 【见 4.1.3】
synchronized (this) {
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
// 当消息的 Handler 为空时,则查询异步消息
if (msg != null && msg.target == null) {
// 当查询到异步消息,则立刻退出循环
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
if (now < msg.when) {
// 当异步消息触发时间大于当前时间,则设置下一次轮询的超时时长
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
// 获取一条消息,并返回
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
//设置消息的使用状态,即flags |= FLAG_IN_USE
msg.markInUse();
return msg; //成功地获取MessageQueue中的下一条即将要执行的消息
}
} else {
// 没有消息
nextPollTimeoutMillis = -1;
}
// 消息正在退出,返回 null
if (mQuitting) {
dispose(); // 【见 4.1.2】
return null;
}
// 当消息队列为空,或者是消息队列的第一个消息时
if (pendingIdleHandlerCount < 0
&& (mMessages == null || now < mMessages.when)) {
pendingIdleHandlerCount = mIdleHandlers.size();
}
if (pendingIdleHandlerCount <= 0) {
// 没有 idle handlers 需要运行,则循环并等待。
mBlocked = true;
continue;
}
if (mPendingIdleHandlers == null) {
mPendingIdleHandlers = new IdleHandler[Math.max(pendingIdleHandlerCount, 4)];
}
mPendingIdleHandlers = mIdleHandlers.toArray(mPendingIdleHandlers);
}
// 只有第一次循环时,会运行 idle handlers,执行完成后,重置 pendingIdleHandlerCount 为 0.
for (int i = 0; i < pendingIdleHandlerCount; i++) {
final IdleHandler idler = mPendingIdleHandlers[i];
mPendingIdleHandlers[i] = null; // 去掉 handler 的引用
boolean keep = false;
try {
keep = idler.queueIdle();// idle 时执行的方法
} catch (Throwable t) {
Log.wtf(TAG, "IdleHandler threw exception", t);
}
if (!keep) {
synchronized (this) {
mIdleHandlers.remove(idler);
}
}
}
//重置idle handler个数为0,以保证不会再次重复运行
pendingIdleHandlerCount = 0;
//当调用一个空闲handler时,一个新message能够被分发,因此无需等待可以直接查询pending message.
nextPollTimeoutMillis = 0;
}
}

nativePollOnce 是阻塞操作,其中 nextPollTimeoutMillis 代表下一个消息到来前,还需要等待的时长;当 nextPollTimeoutMillis = -1 时,表示消息队列中无消息,会一直等待下去。

当处于空闲时,往往会执行IdleHandler中的方法。当 nativePollOnce() 返回后,next()mMessages 中提取一个消息。

nativePollOnce()native 做了大量的工作。

2.4.3 enqueueMessage

添加一条消息到消息队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
boolean enqueueMessage(Message msg, long when) {
// 每一个普通 Message 必须有一个 target
if (msg.target == null) {
throw new IllegalArgumentException("Message must have a target.");
}
if (msg.isInUse()) {
throw new IllegalStateException(msg + " This message is already in use.");
}

synchronized (this) {
if (mQuitting) {// 正在退出时,回收 msg ,加入到消息池
msg.recycle();
return false;
}

msg.markInUse();
msg.when = when;
Message p = mMessages;
boolean needWake;
if (p == null || when == 0 || when < p.when) {
// p 为 null (代表 MessageQueue 没有消息) 或者 msg 的触发时间是队列中最早的,则进入该该分支
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
// 将消息按时间顺序插入到 MessageQueue。一般地,不需要唤醒事件队列,除非
// 消息队头存在 barrier,并且同时 Message是队列中最早的异步消息。
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p;
prev.next = msg;
}

//消息没有退出,我们认为此时mPtr != 0
if (needWake) {
nativeWake(mPtr);// 【4.1.4】
}
}
return true;
}

MessageQueue是按照 Message 触发时间的先后顺序排列的,队头的消息是将要最早触发的消息。当有消息需要加入消息队列时,会从队列头开始遍历,直到找到消息应该插入的合适位置,以保证所有消息的时间顺序。

往消息队列添加 Message 时,需要根据 mBlocked 情况来决定是否需要调用 nativeWake

2.4.4 removeMessages

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
void removeMessages(Handler h, int what, Object object) {
if (h == null) {
return;
}
synchronized (this) {
Message p = mMessages;
// 从消息队列的头部开始,移除所有符合条件的消息
while (p != null && p.target == h && p.what == what
&& (object == null || p.obj == object)) {
Message n = p.next;
mMessages = n;
p.recycleUnchecked();
p = n;
}
// 移除剩余的符合要求的消息
while (p != null) {
Message n = p.next;
if (n != null) {
if (n.target == h && n.what == what
&& (object == null || n.obj == object)) {
Message nn = n.next;
n.recycleUnchecked();
p.next = nn;
continue;
}
}
p = n;
}
}
}

这个移除消息的方法,采用了两个 while 循环,第一个循环是从队头开始,移除符合条件的消息,第二个循环是从头部移除完连续的满足条件的消息之后,再从队列后面继续查询是否有满足条件的消息需要被移除。

2.4.5 postSyncBarrier

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public int postSyncBarrier() {
return postSyncBarrier(SystemClock.uptimeMillis());
}

private int postSyncBarrier(long when) {
// 输入一个新的同步障碍令牌。
// 我们不需要唤醒队列,因为barrier的目的是让它停止。
synchronized (this) {
final int token = mNextBarrierToken++;
final Message msg = Message.obtain();// 【见 2.5.3】
msg.markInUse();
msg.when = when;
msg.arg1 = token;

Message prev = null;
Message p = mMessages;
if (when != 0) {
while (p != null && p.when <= when) {
prev = p;
p = p.next;
}
}
if (prev != null) { // invariant: p == prev.next
msg.next = p;
prev.next = msg;
} else {
msg.next = p;
mMessages = msg;
}
return token;
}
}

【2.4.3】说明每一个普通 Message 必须有一个 target ,对于特殊的 message 是没有 target ,即同步 barrier token 。 这个消息的价值就是用于拦截同步消息,所以并不会唤醒 Looper .

2.5 Message

2.5.1 消息对象

每个消息用 Message 表示,Message 主要包含以下内容:

数据类型 成员变量 解释
int what 消息类别
long when 消息触发时间
int arg1 参数1
int arg2 参数2
Object obj 消息内容
Handler target 消息响应方
Runnable callback 回调方法

创建消息的过程,就是填充消息的上述内容的一项或多项。

2.5.2 消息池

1
private static Message sPool;

在代码中,可能经常看到 recycle() 方法,咋一看,可能是在做虚拟机的 gc() 相关的工作,其实不然,这是用于把消息加入到消息池的作用。这样的好处是,当消息池不为空时,可以直接从消息池中获取Message对象,而不是直接创建,提高效率。

静态变量 sPool 的数据类型为 Message ,通过 next 成员变量,维护一个消息池;静态变量 MAX_POOL_SIZE 代表消息池的可用大小;消息池的默认大小为 50

消息池常用的操作方法是 obtain()recycle()

2.5.3 obtain

从消息池中获取消息

1
2
3
4
5
6
7
8
9
10
11
12
13
public static Message obtain() {
synchronized(sPoolSync) {
if (sPool != null) {
Message m = sPool;
sPool = m.next;
m.next = null;// 从 sPool 中取出一个 Message 对象,并消息链表断开(单独的一个节点)
m.flags = 0; // 清除 in-use flag
sPoolSize--;// 消息池的可用大小进行减 1 操作
return m;
}
}
return new Message();// 当消息池为空时,直接创建Message对象
}

obtain() ,从消息池取 Message ,都是把消息池表头的 Message 取走,再把表头指向 next

2.5.4 recycle

把不再使用的消息加入消息池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public void recycle() {
// 判断是否该消息还在使用
if (isInUse()) {
if (gCheckRecycle) {// Android 5.0 以后的版本默认为 true ,之前的版本默认为 false.
throw new IllegalStateException("This message cannot be recycled because it "
+ "is still in use.");
}
return;
}
// 清空状态,并且将消息添加到消息池中
recycleUnchecked();
}

//对于不再使用的消息,加入到消息池
void recycleUnchecked() {
//将消息标示位置为 IN_USE ,并清空消息所有的参数。
flags = FLAG_IN_USE;
what = 0;
arg1 = 0;
arg2 = 0;
obj = null;
replyTo = null;
sendingUid = -1;
when = 0;
target = null;
callback = null;
data = null;
// 当消息池没有满时,将 Message 对象加入消息池
synchronized (sPoolSync) {
if (sPoolSize < MAX_POOL_SIZE) {
next = sPool;
sPool = this;
sPoolSize++;// 消息池的可用大小进行加 1 操作
}
}
}

recycle(),将 Message加入到消息池的过程,都是把 Message 加到链表的表头;

三、 Handler Java 层总结

图片来源于《Android消息机制1-Handler(Java层)》

图解:

  • Handler 通过 sendMessage() 发送 MessageMessageQueue 队列;
  • Looper 通过 loop() ,不断提取出达到触发条件的 Message ,并将 Message 交给 target 来处理;
  • 经过 dispatchMessage() 后,交回给 HandlerhandleMessage() 来进行相应地处理。
  • Message 加入 MessageQueue 时,往管道写入字符,可以会唤醒 loop 线程;如果 MessageQueue 中没有 Message ,并处于 Idle 状态,则会执行 IdelHandler 接口中的方法,往往用于做一些清理性地工作。

消息分发的优先级:

  1. Message 的回调方法:message.callback.run(),优先级最高;
  2. Handler 的回调方法:Handler.mCallback.handleMessage(msg),优先级仅次于 1
  3. Handler 的默认方法:Handler.handleMessage(msg),优先级最低。

消息缓存:

为了提供效率,提供了一个大小为 50Message 缓存队列,减少对象不断创建与销毁的过程。

四、 Native 分析

相关源码位置

1
2
3
4
5
6
7
8
9
10
framework/base/core/java/andorid/os/MessageQueue.java
framework/base/core/jni/android_os_MessageQueue.cpp
framework/base/core/java/andorid/os/Looper.java

system/core/libutils/Looper.cpp
system/core/include/utils/Looper.h
system/core/libutils/RefBase.cpp

framework/base/native/android/looper.cpp
framework/native/include/android/looper.h

讲解了 Java 层的消息处理机制,其中 MessageQueue 类里面涉及到多个 native 方法,除了 MessageQueuenative 方法,native 层本身也有一套完整的消息机制,用于处理 native 的消息,如下图 Native 层的消息机制。

图片来源于《Android消息机制1-Handler(Java层)》

4.1 MessageQueue

MessageQueue 是消息机制的 Java 层和 C++ 层的连接纽带,Java 层可以向 MessageQueue 消息队列中添加消息,Native 层也可以向 MessageQueue 消息队列中添加消息,大部分核心方法都交给 native 层来处理,其中 MessageQueue 类中涉及的 native 方法如下:

1
2
3
4
5
6
private native static long nativeInit();
private native static void nativeDestroy(long ptr);
private native void nativePollOnce(long ptr, int timeoutMillis);
private native static void nativeWake(long ptr);
private native static boolean nativeIsPolling(long ptr);
private native static void nativeSetFileDescriptorEvents(long ptr, int fd, int events);

4.1.1 nativeInit

图片来源于《Android消息机制1-Handler(Java层)》

  • new MessageQueue()
1
2
3
4
5
6
7
8
MessageQueue.java

MessageQueue(boolean quitAllowed) {
mQuitAllowed = quitAllowed;
mPtr = nativeInit(); //mPtr记录native消息队列的信息 【android_os_MessageQueue_nativeInit()】
}

private native static long nativeInit();
  • android_os_MessageQueue_nativeInit()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
android_os_MessageQueue.cpp

static jlong android_os_MessageQueue_nativeInit(JNIEnv* env, jclass clazz) {
// 初始化 native 消息队列
NativeMessageQueue* nativeMessageQueue = new NativeMessageQueue();
if (!nativeMessageQueue) {
jniThrowRuntimeException(env, "Unable to allocate native queue");
return 0;
}
// 增加引用计数
nativeMessageQueue->incStrong(env);
// reinterpret_cast 强制类型转换符
return reinterpret_cast<jlong>(nativeMessageQueue);
}
  • new NativeMessageQueue()
1
2
3
4
5
6
7
8
9
10
android_os_MessageQueue.cpp

NativeMessageQueue::NativeMessageQueue() :
mPollEnv(NULL), mPollObj(NULL), mExceptionObj(NULL) {
mLooper = Looper::getForThread();//获取 TLS 中的 Looper 对象,功能类比于 Java 层的 Looper.myLooper();
if (mLooper == NULL) {
mLooper = new Looper(false);//创建 native 层的 Looper 【new Looper()】
Looper::setForThread(mLooper);//保存 native 层的 Looper 到 TLS,功能类比于 Java 层的 ThreadLocal.set();
}
}

此处 Native 层的 LooperJava 层的 Looper 没有任何的关系,只是在 Native 层重实现了一套类似功能的逻辑。

  • new Looper()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Looper.cpp

Looper::Looper(bool allowNonCallbacks)
: mAllowNonCallbacks(allowNonCallbacks),
mSendingMessage(false),
mPolling(false),
mEpollRebuildRequired(false),
mNextRequestSeq(0),
mResponseIndex(0),
mNextMessageUptime(LLONG_MAX) {
// 构造唤醒事件的 fd
mWakeEventFd.reset(eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC));
LOG_ALWAYS_FATAL_IF(mWakeEventFd.get() < 0, "Could not make wake event fd: %s", strerror(errno));

AutoMutex _l(mLock);
rebuildEpollLocked();// 重建 Epoll 事件 【epoll_create/epoll_ctl】
}
  • epoll_create/epoll_ctl
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
Looper.cpp

void Looper::rebuildEpollLocked() {
// Close old epoll instance if we have one.
if (mEpollFd >= 0) {
mEpollFd.reset(); // 闭旧的 poll 例
}

// 创建新的 epoll 实例,并注册 wake 管道
mEpollFd.reset(epoll_create1(EPOLL_CLOEXEC));
LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));

struct epoll_event eventItem;
memset(& eventItem, 0, sizeof(epoll_event)); // 把未使用的数据区域进行置 0 操作
eventItem.events = EPOLLIN;//可读事件
eventItem.data.fd = mWakeEventFd.get();
//将唤醒事件 (mWakeEventFd) 添加到 epoll 实例 (mEpollFd)
int result = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, mWakeEventFd.get(), &eventItem);
LOG_ALWAYS_FATAL_IF(result != 0, "Could not add wake event fd to epoll instance: %s",
strerror(errno));

for (size_t i = 0; i < mRequests.size(); i++) {
const Request& request = mRequests.valueAt(i);
struct epoll_event eventItem;
request.initEventItem(&eventItem);
// 将 request 队列的事件,分别添加到 epoll 实例
int epollResult = epoll_ctl(mEpollFd.get(), EPOLL_CTL_ADD, request.fd, &eventItem);
if (epollResult < 0) {
ALOGE("Error adding epoll events for fd %d while rebuilding epoll set: %s",
request.fd, strerror(errno));
}
}
}

Looper 对象中的 mWakeEventFd 添加到 epoll 监控,以及 mRequests 也添加到 epoll 的监控范围内。

4.1.2 nativeDestroy

图片来源于《Android消息机制1-Handler(Java层)》

  • MessageQueue.dispose()
1
2
3
4
5
6
7
8
MessageQueue.java

private void dispose() {
if (mPtr != 0) {
nativeDestroy(mPtr); //【android_os_MessageQueue_nativeDestroy()】
mPtr = 0;
}
}
  • android_os_MessageQueue_nativeDestroy()
1
2
3
4
5
6
android_os_MessageQueue.cpp

static void android_os_MessageQueue_nativeDestroy(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->decStrong(env); // 【RefBase::decStrong()】
}

nativeMessageQueue 继承自 RefBase 类,所以 decStrong 最终调用的是 RefBase.decStrong()

  • RefBase::decStrong()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
RefBase.cpp

void RefBase::decStrong(const void* id) const
{
weakref_impl* const refs = mRefs;
refs->removeStrongRef(id); // 移除强引用
const int32_t c = refs->mStrong.fetch_sub(1, std::memory_order_release);
LOG_ALWAYS_FATAL_IF(BAD_STRONG(c), "decStrong() called on %p too many times",
refs);
if (c == 1) {
std::atomic_thread_fence(std::memory_order_acquire);
refs->mBase->onLastStrongRef(id);
int32_t flags = refs->mFlags.load(std::memory_order_relaxed);
if ((flags&OBJECT_LIFETIME_MASK) == OBJECT_LIFETIME_STRONG) {
delete this;
// 在这种情况下,析构函数不删除引用。
}
}
// 移除弱引用
refs->decWeak(id);
}

4.1.3 nativePollOnce

nativePollOnce 用于提取消息队列中的消息,提取消息的调用链

图片来源于《Android消息机制1-Handler(Java层)》

  • android_os_MessageQueue_nativePollOnce()
1
2
3
4
5
6
7
8
android_os_MessageQueue.cpp

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
jlong ptr, jint timeoutMillis) {
//将 Java 层传递下来的 mPtr 转换为 nativeMessageQueue
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}
  • NativeMessageQueue::pollOnce()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
android_os_MessageQueue.cpp

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
mPollEnv = env;
mPollObj = pollObj;
mLooper->pollOnce(timeoutMillis);// 【Looper::pollOnce()】
mPollObj = NULL;
mPollEnv = NULL;

if (mExceptionObj) {
env->Throw(mExceptionObj);
env->DeleteLocalRef(mExceptionObj);
mExceptionObj = NULL;
}
}
  • Looper::pollOnce()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
Looper.h

inline int pollOnce(int timeoutMillis) {
return pollOnce(timeoutMillis, nullptr, nullptr, nullptr);
}


Looper.cpp

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
int result = 0;
for (;;) {
// 先处理没有 Callback 方法的 Response 事件
while (mResponseIndex < mResponses.size()) {
const Response& response = mResponses.itemAt(mResponseIndex++);
int ident = response.request.ident;
if (ident >= 0) { // ident 大于 0,则表示没有 callback, 因为 POLL_CALLBACK = -2,
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
if (outFd != nullptr) *outFd = fd;
if (outEvents != nullptr) *outEvents = events;
if (outData != nullptr) *outData = data;
return ident;
}
}

if (result != 0) {
if (outFd != nullptr) *outFd = 0;
if (outEvents != nullptr) *outEvents = 0;
if (outData != nullptr) *outData = nullptr;
return result;
}
// 再处理内部轮询
result = pollInner(timeoutMillis); //【Looper::pollInner()】
}
}

参数说明:

  • timeoutMillis:超时时长
  • outFd:发生事件的文件描述符
  • outEvents:当前outFd上发生的事件,包含以下4类事件
    • EVENT_INPUT 可读
    • EVENT_OUTPUT 可写
    • EVENT_ERROR 错误
    • EVENT_HANGUP 中断
  • outData:上下文数据
  • Looper::pollInner()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
Looper.cpp

int Looper::pollInner(int timeoutMillis) {
// ...
int result = POLL_WAKE;
mResponses.clear();
mResponseIndex = 0;

mPolling = true; // 即将处于 idle 状态

struct epoll_event eventItems[EPOLL_MAX_EVENTS];// fd 最大个数为 16
// 等待事件发生或者超时,在 nativeWake() 方法,向管道写端写入字符,则该方法会返回;
int eventCount = epoll_wait(mEpollFd.get(), eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

mPolling = false;// 不再处于 idle 状态

mLock.lock();//请求锁

// 如果需要,重建 epoll 集。
if (mEpollRebuildRequired) {
mEpollRebuildRequired = false;
rebuildEpollLocked();// epoll 重建,直接跳转 Done;
goto Done;
}

// 轮询误差检查.
if (eventCount < 0) {
if (errno == EINTR) {
goto Done;
}
result = POLL_ERROR;// epoll 事件个数小于 0 ,发生错误,直接跳转 Done;
goto Done;
}

// 检查轮询超时时间。
if (eventCount == 0) {// epoll 事件个数等于0,发生超时,直接跳转 Done;
result = POLL_TIMEOUT;
goto Done;
}

// 循环遍历,处理所有的事件
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeEventFd.get()) {
if (epollEvents & EPOLLIN) {
awoken();// 已经唤醒了,则读取并清空管道数据
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake event fd.", epollEvents);
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= EVENT_HANGUP;
// 处理 request,生成对应的 reponse 对象,push 到响应数组
pushResponse(events, mRequests.valueAt(requestIndex));
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
Done: ;

// 再处理 Native 的 Message,调用相应回调方法
mNextMessageUptime = LLONG_MAX;
while (mMessageEnvelopes.size() != 0) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
const MessageEnvelope& messageEnvelope = mMessageEnvelopes.itemAt(0);
if (messageEnvelope.uptime <= now) {
{
sp<MessageHandler> handler = messageEnvelope.handler;
Message message = messageEnvelope.message;
mMessageEnvelopes.removeAt(0);
mSendingMessage = true;
mLock.unlock(); // 释放锁
handler->handleMessage(message);// 处理消息事件
}
mLock.lock(); // 请求锁
mSendingMessage = false;
result = POLL_CALLBACK; // 发生回调
} else {
// The last message left at the head of the queue determines the next wakeup time.
mNextMessageUptime = messageEnvelope.uptime;
break;
}
}

mLock.unlock();// 释放锁

// 处理带有 Callback() 方法的 Response 事件,执行 Reponse 相应的回调方法
for (size_t i = 0; i < mResponses.size(); i++) {
Response& response = mResponses.editItemAt(i);
if (response.request.ident == POLL_CALLBACK) {
int fd = response.request.fd;
int events = response.events;
void* data = response.request.data;
// 处理请求的回调方法
int callbackResult = response.request.callback->handleEvent(fd, events, data);
if (callbackResult == 0) {
removeFd(fd, response.request.seq);// 移除 fd
}
response.request.callback.clear();// 清除 reponse 引用的回调方法
result = POLL_CALLBACK; // 发生回调
}
}
return result;
}

pollOnce 返回值说明:

  • POLL_WAKE: 表示由 wake() 触发,即 pipe 写端的 write 事件触发;
  • POLL_CALLBACK: 表示某个被监听 fd 被触发。
  • POLL_TIMEOUT: 表示等待超时;
  • POLL_ERROR:表示等待期间发生错误;
  • Looper::awoken()
1
2
3
4
5
6
7
Looper.cpp

void Looper::awoken() {
uint64_t counter;
// 不断读取管道数据,目的就是为了清空管道内容
TEMP_FAILURE_RETRY(read(mWakeEventFd.get(), &counter, sizeof(uint64_t)));
}

poll小结

pollInner() 方法的处理流程:

  1. 先调用 epoll_wait(),这是阻塞方法,用于等待事件发生或者超时;
  2. 对于 epoll_wait() 返回,当且仅当以下 3 种情况出现:
    • POLL_ERROR,发生错误,直接跳转到 Done
    • POLL_TIMEOUT,发生超时,直接跳转到 Done
    • 检测到管道有事件发生,则再根据情况做相应处理:
      • 如果是管道读端产生事件,则直接读取管道的数据;
      • 如果是其他事件,则处理 request ,生成对应的 reponse 对象,pushreponse 数组;
  3. 进入 Done 标记位的代码段:
    • 先处理 NativeMessage ,调用 NativeHandler 来处理该 Message
    • 再处理 Response 数组,POLL_CALLBACK 类型的事件;

从上面的流程,可以发现对于 Request 先收集,一并放入 reponse 数组,而不是马上执行。真正在 Done 开始执行的时候,是先处理 native Message,再处理 Request ,说明 native Message 的优先级高于 Request 请求的优先级。

另外 pollOnce() 方法中,先处理 Response 数组中不带 Callback 的事件,再调用了 pollInner() 方法。

4.1.4 nativeWake

nativeWake 用于唤醒功能,在添加消息到消息队列 enqueueMessage() , 或者把消息从消息队列中全部移除 quit() ,再有需要时都会调用 nativeWake 方法。包含唤醒过程的添加消息的调用链,如下:

图片来源于《Android消息机制1-Handler(Java层)》

  • android_os_MessageQueue_nativeWake()
1
2
3
4
5
6
android_os_MessageQueue.cpp

static void android_os_MessageQueue_nativeWake(JNIEnv* env, jclass clazz, jlong ptr) {
NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
nativeMessageQueue->wake();
}
  • NativeMessageQueue::wake()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
android_os_MessageQueue.cpp

void NativeMessageQueue::wake() {
mLooper->wake();
}

Looper.cpp

void Looper::wake() {
uint64_t inc = 1;
// 向管道 mWakeEventFd 写入字符 1
ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd.get(), &inc, sizeof(uint64_t)));
if (nWrite != sizeof(uint64_t)) {
if (errno != EAGAIN) {
LOG_ALWAYS_FATAL("Could not write wake signal to fd %d (returned %zd): %s",
mWakeEventFd.get(), nWrite, strerror(errno));
}
}
}

其中 TEMP_FAILURE_RETRY 是一个宏定义, 当执行 write 失败后,会不断重复执行,直到执行成功为止。

4.1.5 sendMessage

这里主要讲 Native 层如何向 MessageQueue 发送消息

sendMessage

1
2
3
4
5
6
Looper.cpp

void Looper::sendMessage(const sp<MessageHandler>& handler, const Message& message) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
sendMessageAtTime(now, handler, message);
}

sendMessageDelayed

1
2
3
4
5
6
7
Looper.cpp

void Looper::sendMessageDelayed(nsecs_t uptimeDelay, const sp<MessageHandler>& handler,
const Message& message) {
nsecs_t now = systemTime(SYSTEM_TIME_MONOTONIC);
sendMessageAtTime(now + uptimeDelay, handler, message);
}

sendMessageAtTime

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
void Looper::sendMessageAtTime(nsecs_t uptime, const sp<MessageHandler>& handler,
const Message& message) {
size_t i = 0;
{ //请求锁
AutoMutex _l(mLock);

size_t messageCount = mMessageEnvelopes.size();
// 找到 message 应该插入的位置 i
while (i < messageCount && uptime >= mMessageEnvelopes.itemAt(i).uptime) {
i += 1;
}

MessageEnvelope messageEnvelope(uptime, handler, message);
mMessageEnvelopes.insertAt(messageEnvelope, i, 1);

// 如果当前正在发送消息,那么不再调用 wake(),直接返回。
if (mSendingMessage) {
return;
}
} // 释放锁
// 当把消息加入到消息队列的头部时,需要唤醒 poll 循环。
if (i == 0) {
wake();
}

4.1.6 总结

  • nativeInit():
    • 创建了 NativeMessageQueue 对象,增加其引用计数,并将 NativeMessageQueue 指针 mPtr 保存在 Java 层的 MessageQueue
    • 创建了 Native Looper 对象
    • 调用 epollepoll_create()/epoll_ctl() 来完成对 mWakeEventFdmRequests 的可读事件监听
  • nativeDestroy():
    • 调用 RefBase::decStrong() 来减少对象的引用计数
    • 当引用计数为 0 时,则删除 NativeMessageQueue 对象
  • nativePollOnce():
    • 调用 Looper::pollOnce() 来完成,空闲时停留在 epoll_wait() 方法,用于等待事件发生或者超时
  • nativeWake():
    • 调用 Looper::wake() 来完成,向管道 mWakeEventfd 写入字符;

4.2 认识 Native 结构体和类

Looper.h/Looper.cpp 文件中,定义了 Message 结构体,消息处理类,回调类,Looper 类。

4.2.1 Message 结构体

1
2
3
4
5
6
7
8
9
Looper.h

struct Message {
Message() : what(0) { }
Message(int w) : what(w) { }

/* The message type. (interpretation is left up to the handler) */
int what;
};

4.2.2 消息处理类

MessageHandler

1
2
3
4
5
6
7
8
9
Looper.h

class MessageHandler : public virtual RefBase {
protected:
virtual ~MessageHandler();

public:
virtual void handleMessage(const Message& message) = 0;
};

WeakMessageHandler 类,继承于 MessageHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Looper.h

class WeakMessageHandler : public MessageHandler {
protected:
virtual ~WeakMessageHandler();

public:
WeakMessageHandler(const wp<MessageHandler>& handler);
virtual void handleMessage(const Message& message);

private:
wp<MessageHandler> mHandler;
};

Looper.cpp

void WeakMessageHandler::handleMessage(const Message& message) {
sp<MessageHandler> handler = mHandler.promote();
if (handler != nullptr) {
handler->handleMessage(message);// 调用 MessageHandler 类的处理方法
}
}

4.2.3 回调类

LooperCallback

1
2
3
4
5
6
7
8
9
Looper.h

class LooperCallback : public virtual RefBase {
protected:
virtual ~LooperCallback();
public:
// 用于处理指定的文件描述符的poll事件
virtual int handleEvent(int fd, int events, void* data) = 0;
};

SimpleLooperCallback 类, 继承于 LooperCallback

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
Looper.h

class SimpleLooperCallback : public LooperCallback {
protected:
virtual ~SimpleLooperCallback();
public:
SimpleLooperCallback(Looper_callbackFunc callback);
virtual int handleEvent(int fd, int events, void* data);
private:
Looper_callbackFunc mCallback;
};

Looper.cpp

int SimpleLooperCallback::handleEvent(int fd, int events, void* data) {
return mCallback(fd, events, data);// 调用回调方法
}

4.2.4 Looper

1
static const int EPOLL_MAX_EVENTS = 16; //轮询事件的文件描述符的个数上限

其中 Looper 类的内部定义了 RequestResponseMessageEnvelope3 个结构体

图片来源于《Android消息机制2》

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
struct Request {// 请求结构体
int fd;
int ident;
int events;
int seq;
sp<LooperCallback> callback;
void* data;

void initEventItem(struct epoll_event* eventItem) const;
};

struct Response {// 响应结构体
int events;
Request request;
};

struct MessageEnvelope {// 信封结构体
MessageEnvelope() : uptime(0) { }

MessageEnvelope(nsecs_t u, sp<MessageHandler> h, const Message& m)
: uptime(u), handler(std::move(h)), message(m) {}

nsecs_t uptime;
sp<MessageHandler> handler;
Message message;
};

MessageEnvelope 正如其名字,信封。MessageEnvelope 里面记录着收信人 (handler) ,发信时间 (uptime) ,信件内容 (message)

4.2.5 ALooper

ALooper 类定义在通过 looper.cpp/looper.h(注意此文件是小写字母开头,与 Looper.cpp 不同,具体源码路径,可通过查看 上面 Native 分析 相关源码路径分析)

1
2
3
4
5
6
7
static inline Looper* ALooper_to_Looper(ALooper* alooper) {
return reinterpret_cast<Looper*>(alooper);
}

static inline ALooper* Looper_to_ALooper(Looper* looper) {
return reinterpret_cast<ALooper*>(looper);
}

ALooper 类 与前面介绍的 Looper 类,更多的操作是通过 ALooper_to_Looper()Looper_to_ALooper() 这两个方法转换完成的,也就是说 ALooper 类中定义的所有方法,都是通过转换为 Looper 类,再执行 Looper 中的方法。

4.2.6 总结

MessageQueue 通过 mPtr 变量保存 NativeMessageQueue 对象,从而使得 MessageQueue 成为 Java 层和 Native 层的枢纽,既能处理上层消息,也能处理 native 层消息;下面列举 Java 层与 Native 层的对应图

图片来源于《Android消息机制2》

图解:

  • 红色虚线关系:Java 层和 Native 层的 MessageQueue 通过 JNI 建立关联,彼此之间能相互调用,搞明白这个互调关系,也就搞明白了 Java 如何调用 C++ 代码,C++ 代码又是如何调用 Java 代码。
  • 蓝色虚线关系:Handler/Looper/Message 这三大类 Java 层与 Native 层并没有任何的真正关联,只是分别在 Java 层和 Native 层的 handler 消息模型中具有相似的功能。都是彼此独立的,各自实现相应的逻辑。
  • WeakMessageHandler 继承于 MessageHandler 类,NativeMessageQueue 继承于 MessageQueue

另外,消息处理流程是先处理 Native Message ,再处理 Native Request ,最后处理 Java Message 。理解了该流程,也就明白有时上层消息很少,但响应时间却较长的真正原因。

五、其他要点

问题:

Android 中为什么主线程不会因为 Looper.loop() 里的死循环卡死?

没看见哪里有相关代码为这个死循环准备了一个新线程去运转?

Activity 的生命周期这些方法这些都是在主线程里执行的吧,那这些生命周期方法是怎么实现在死循环体外能够执行起来的?


要完全彻底理解这个问题,需要准备以下 4 方面的知识: Process/ThreadAndroid Binder IPCHandler/Looper/MessageQueue 消息机制,Linux pipe/epoll 机制。

(1) Android 中为什么主线程不会因为 Looper.loop() 里的死循环卡死?

这里涉及线程,先说说 进程 / 线程

进程:每个 app 运行时前首先创建一个进程,该进程是由 Zygote fork 出来的,用于承载 App 上运行的各种 Activity/Service 等组件。进程对于上层应用来说是完全透明的,这也是 google 有意为之,让 App 程序都是运行在 Android Runtime 。大多数情况一个 App 就运行在一个进程中,除非在AndroidManifest.xml 中配置 Android:process 属性,或通过 native 代码 fork 进程。

线程:线程对应用来说非常常见,比如每次 new Thread().start 都会创建一个新的线程。该线程与 App 所在进程之间资源共享,从 Linux 角度来说进程与线程除了是否共享资源外,并没有本质的区别,都是一个 task_struct 结构体,在CPU看来进程或线程无非就是一段可执行的代码,CPU 采用 CFS 调度算法,保证每个 task 都尽可能公平的享有 CPU 时间片

对于线程既然是一段可执行的代码,当可执行代码执行完成后,线程生命周期便该终止了,线程退出。

而对于主线程,我们是绝不希望会被运行一段时间,自己就退出,那么如何保证能一直存活呢?

简单做法就是可执行代码是能一直执行下去的,死循环便能保证不会被退出,例如,binder 线程也是采用死循环的方法,通过循环方式不同与 Binder 驱动进行读写操作,当然并非简单地死循环,无消息时会休眠。但这里可能又引发了另一个问题,既然是死循环又如何去处理其他事务呢(比如谁来添加 Message 然后唤醒这个休眠呢)?通过创建新线程的方式。

真正会卡死主线程的操作是在回调方法 onCreate/onStart/onResume 等操作时间过长,会导致掉帧,甚至发生 ANRlooper.loop 本身不会导致应用卡死。

(2) 没看见哪里有相关代码为这个死循环准备了一个新线程去运转?

事实上,会在进入死循环之前便创建了新 binder 线程,在代码 ActivityThread.main() 中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public static void main(String[] args) {
....

//创建Looper和MessageQueue对象,用于处理主线程的消息
Looper.prepareMainLooper();

//创建ActivityThread对象
ActivityThread thread = new ActivityThread();

//建立Binder通道 (创建新线程)
thread.attach(false);

Looper.loop(); //消息循环运行
throw new RuntimeException("Main thread loop unexpectedly exited");
}

thread.attach(false);便会创建一个 Binder 线程(具体是指 ApplicationThread,Binder 的服务端,用于接收系统服务 AMS 发送来的事件),该 Binder 线程通过 Handler 将 Message 发送给主线程

ActivityThread 实际上并非线程,不像 HandlerThread 类,ActivityThread 并没有真正继承 Thread 类,只是往往运行在主线程,该人以线程的感觉,其实承载 ActivityThread 的主线程就是由 Zygote fork 而创建的进程。

主线程的死循环一直运行是不是特别消耗CPU资源呢?

其实不然,这里就涉及到 Linux pipe/epoll 机制,简单说就是在主线程的 MessageQueue 没有消息时,便阻塞在 loopqueue.next() 中的 nativePollOnce() 方法里,此时主线程会释放 CPU 资源进入休眠状态,直到下个消息到达或者有事务发生,通过往 pipe 管道写端写入数据来唤醒主线程工作。这里采用的 epoll 机制,是一种 IO 多路复用机制,可以同时监控多个描述符,当某个描述符就绪(读或写就绪),则立刻通知相应程序进行读或写操作,本质同步 I/O ,即读写是阻塞的。 所以说,主线程大多数时候都是处于休眠状态,并不会消耗大量 CPU 资源。

(3) Activity的生命周期是怎么实现在死循环体外能够执行起来的?

ActivityThread 的内部类 H 继承于 Handler ,通过 handler 消息机制,简单说 Handler 机制用于同一个进程的线程间通信。

Activity 的生命周期都是依靠主线程的 Looper.loop,当收到不同 Message 时则采用相应措施:
H.handleMessage(msg) 方法中,根据接收到不同的 msg ,执行相应的生命周期。

比如收到 msg=H.LAUNCH_ACTIVITY,则调用 ActivityThread.handleLaunchActivity() 方法,最终会通过反射机制,创建 Activity 实例,然后再执行 Activity.onCreate() 等方法;再比如收到 msg=H.PAUSE_ACTIVITY ,则调用 ActivityThread.handlePauseActivity() 方法,最终会执行 Activity.onPause() 等方法。

主线程的消息又是哪来的呢?

当然是 App 进程中的其他线程通过 Handler 发送给主线程

图片来源于《Gityuan》

system_server 进程是系统进程java framework 框架的核心载体,里面运行了大量的系统服务,比如这里提供 ApplicationThreadProxy(简称 ATP ),ActivityManagerService(简称 AMS ),这个两个服务都运行在 system_server 进程的不同线程中,由于 ATPAMS 都是基于 IBinder 接口,都是 binder 线程,binder 线程的创建与销毁都是由 binder 驱动来决定的。

App 进程则是我们常说的应用程序,主线程主要负责 Activity/Service 等组件的生命周期以及UI相关操作都运行在这个线程; 另外,每个 App 进程中至少会有两个 binder 线程 ApplicationThread (简称 AT )和 ActivityManagerProxy(简称 AMP ),除了图中画的线程,其中还有很多线程,比如 signal catcher 线程等,这里就不一一列举。

Binder 用于不同进程之间通信,由一个进程的 Binder 客户端向另一个进程的服务端发送事务,比如图中线程 2 向线程 4 发送事务;而 handler 用于同一个进程中不同线程的通信,比如图中线程 4 向主线程发送消息。

结合图说说 Activity 生命周期,比如暂停 Activity,流程如下:

  1. 线程 1 的 AMS 中调用线程 2 的 ATP ;(由于同一个进程的线程间资源共享,可以相互直接调用,但需要注意多线程并发问题)
  2. 线程 2 通过 binder 传输到 App 进程的线程 4;
  3. 线程 4 通过 handler 消息机制,将暂停 Activity 的消息发送给主线程;
  4. 主线程在 looper.loop() 中循环遍历消息,当收到暂停 Activity 的消息时,便将消息分发给 ActivityThread.H.handleMessage() 方法,再经过方法的调用,最后便会调用到 Activity.onPause() ,当 onPause() 处理完后,继续循环 loop 下去。

原文

可能问到的问题