EventBus源码解析(下)

Posted by CoXier on April 7, 2016

这篇来分析post过程

一、post

接着上面的我们先看看postToSubscription

 private void postToSubscription(Subscription subscription, Object event, boolean isMainThread) {
        switch (subscription.subscriberMethod.threadMode) {
            case POSTING:
                invokeSubscriber(subscription, event);
                break;
            case MAIN:
                if (isMainThread) {
                    invokeSubscriber(subscription, event);
                } else {
                    mainThreadPoster.enqueue(subscription, event);
                }
                break;
            case BACKGROUND:
                if (isMainThread) {
                    backgroundPoster.enqueue(subscription, event);
                } else {
                    invokeSubscriber(subscription, event);
                }
                break;
            case ASYNC:
                asyncPoster.enqueue(subscription, event);
                break;
            default:
                throw new IllegalStateException("Unknown thread mode: " + subscription.subscriberMethod.threadMode);
        }
    }

由于需要分发事件,所以之前没讲的ThreadMode此时起到作用了。在几个分支中都会有invokeSubscriber(subscription, event)

void invokeSubscriber(Subscription subscription, Object event) {
        try {
            subscription.subscriberMethod.method.invoke(subscription.subscriber, event);
        } catch (InvocationTargetException e) {
            handleSubscriberException(subscription, event, e.getCause());
        } catch (IllegalAccessException e) {
            throw new IllegalStateException("Unexpected exception", e);
        }
    }

很显然用的反射来调用method,第一个参数是订阅此event的对象object,第二个参数是event。

第一个分支POSTING:直接调用invokeSubscriber,method.invoke和EventBus.getDefault.post(xxx)在同一个线程中。

第二个分支MAIN:如果post是在MainThread则直接调用上面的方法;如果不是则向mainThreadPoster加入队列。mainThreadPoster类型是HandlerPoster,其继承自Handler。到这里我们应该回顾一下Android的消息机制了,简单说一下:handler 涉及到发送和处理message,sendMessage 之后,message被加入到MessageQueue中,最终的message会由Looper交给handler处理。这里先看看HandlerPoster的enqueue:

 void enqueue(Subscription subscription, Object event) {
        PendingPost pendingPost = PendingPost.obtainPendingPost(subscription, event);
        synchronized (this) {
            queue.enqueue(pendingPost);
            if (!handlerActive) {
                handlerActive = true;
                if (!sendMessage(obtainMessage())) {
                    throw new EventBusException("Could not send handler message");
                }
            }
        }
    }

找到sendMessage方法了吧。发送之后,Handler就会有相应的处理方法:handlerMessage

 @Override
    public void handleMessage(Message msg) {
        boolean rescheduled = false;
        try {
            long started = SystemClock.uptimeMillis();
            while (true) {
                PendingPost pendingPost = queue.poll();
                if (pendingPost == null) {
                    synchronized (this) {
                        // Check again, this time in synchronized
                        pendingPost = queue.poll();
                        if (pendingPost == null) {
                            handlerActive = false;
                            return;
                        }
                    }
                }
                eventBus.invokeSubscriber(pendingPost);
                long timeInMethod = SystemClock.uptimeMillis() - started;
                if (timeInMethod >= maxMillisInsideHandleMessage) {
                    if (!sendMessage(obtainMessage())) {
                        throw new EventBusException("Could not send handler message");
                    }
                    rescheduled = true;
                    return;
                }
            }
        } finally {
            handlerActive = rescheduled;
        }
    }

这里不追究细节,只需要知道在这个方法里是怎么处理接收到的消息的。这里调用的是eventBus.invokeSubscriber(pendingPost);。源码:

 void invokeSubscriber(PendingPost pendingPost) {
        Object event = pendingPost.event;
        Subscription subscription = pendingPost.subscription;
        PendingPost.releasePendingPost(pendingPost);
        if (subscription.active) {
            invokeSubscriber(subscription, event);
        }
    }

还记得之前介绍Subscription时说过的active变量吗? 其实我之前很仔细看上面的enqueue和hadnleMessage时,看到PendingPost这个类时,很是懵,但是到这我就懂了一点:为了包装,不然你又要传subcription和event,不仅如此,上面的入队操作也会变难。其实PendingPost的设计上还有一个很好的地方:

static PendingPost obtainPendingPost(Subscription subscription, Object event) {
        synchronized (pendingPostPool) {
            int size = pendingPostPool.size();
            if (size > 0) {
                PendingPost pendingPost = pendingPostPool.remove(size - 1);
                pendingPost.event = event;
                pendingPost.subscription = subscription;
                pendingPost.next = null;
                return pendingPost;
            }
        }
        return new PendingPost(event, subscription);
    }

    static void releasePendingPost(PendingPost pendingPost) {
        pendingPost.event = null;
        pendingPost.subscription = null;
        pendingPost.next = null;
        synchronized (pendingPostPool) {
            // Don't let the pool grow indefinitely
            if (pendingPostPool.size() < 10000) {
                pendingPostPool.add(pendingPost);
            }
        }
    }

不难发现这里很好的控制了pendingPostPool的大小,因为当你obtain一个post时,pendingPostPool的大小会减一,然后在你release时又会增加一。不得不说,设计者在细节上还是花了一些心思的。PendingPostQueue很简单很单纯,就是入队和出队。

第三个分支BACKGROUND:看一下BackgroundPoster

final class BackgroundPoster implements Runnable {

   	......

    public void enqueue(Subscription subscription, Object event) {
       ......
	   if (!executorRunning) {
             executorRunning = true;
             eventBus.getExecutorService().execute(this);
          }
    }

    @Override
    public void run() {
        ......
    }

}

BackgroundPoster实现Runnable接口,使用的是线程池,其他的细节和上面的HandlerPoster类似。

最后一个分支ASYNC:这里只有一点和第三分支不同,他不管post event所在的线程是不是MainThread,其他的都差不多一样。

看完postToSubscription之后,我们对如何分发一个事件已经很熟悉了,有了“底层”知识,我们来看看EventBus的post入口吧:

/** Posts the given event to the event bus. */
    public void post(Object event) {
        PostingThreadState postingState = currentPostingThreadState.get();
        List<Object> eventQueue = postingState.eventQueue;
        eventQueue.add(event);

        if (!postingState.isPosting) {
            postingState.isMainThread = Looper.getMainLooper() == Looper.myLooper();
            postingState.isPosting = true;
            if (postingState.canceled) {
                throw new EventBusException("Internal error. Abort state was not reset");
            }
            try {
                while (!eventQueue.isEmpty()) {
                    postSingleEvent(eventQueue.remove(0), postingState);
                }
            } finally {
                postingState.isPosting = false;
                postingState.isMainThread = false;
            }
        }
    }

这里有一个比较有意思的东西currentPostingThreadState,类型是ThreadLocal,之前看过《Android开发艺术探索》,知道了ThreadLoacl可以在不同的线程中维护一套数据的副本,换个说法就是ThreadLoacl会根据不同的线程拿到相应的数据,彼此互不干扰。

在while循环中一直执行postSingleEvent(eventQueue.remove(0), postingState);直至事件队列为空。我们再来看看这个方法:

private void postSingleEvent(Object event, PostingThreadState postingState) throws Error {
        Class<?> eventClass = event.getClass();
        boolean subscriptionFound = false;
        if (eventInheritance) {
            List<Class<?>> eventTypes = lookupAllEventTypes(eventClass);
            int countTypes = eventTypes.size();
            for (int h = 0; h < countTypes; h++) {
                Class<?> clazz = eventTypes.get(h);
                subscriptionFound |= postSingleEventForEventType(event, postingState, clazz);
            }
        } else {
            subscriptionFound = postSingleEventForEventType(event, postingState, eventClass);
        }
        if (!subscriptionFound) {
            if (logNoSubscriberMessages) {
                Log.d(TAG, "No subscribers registered for event " + eventClass);
            }
            if (sendNoSubscriberEvent && eventClass != NoSubscriberEvent.class &&
                    eventClass != SubscriberExceptionEvent.class) {
                post(new NoSubscriberEvent(this, event));
            }
        }
    }

如果eventInheritance为true,也就是考虑event的继承关系。订阅了此event超类的method也会收到此消息。剩下的postSingleEventForEventType(eventQueue.remove(0) postingState);我不打算继续讲了,因为已经毫无难度了,其中会调用之前讲的postToSubscription。

到此大体的源码分析已经完成,unregister和subcribe过程相对应,相对而言比较简单。

二、分析整体框架

弄清楚上面的四个步骤之后,我们一起来梳理一下设计者是如何完成观察者模式中的订阅和发送事件以及取消订阅。这里我尝试用图的形式来表达我的所思所想。

我们先看subscriber。

一个subscriber可以订阅多个类型的Event,而且对于同一种类型的Event可以有多个method进行处理(尽管我们一般不会这样做)。

在subscribe方法中,引入Subscription对subscriber、event进行了封装。经过判断之后,把“合格”的subscription加入subscriptionsByEventType中看下面的图来梳理这个map的结构。

当我们分发事件时,也就是post(Object event)时,他会间接调用postSingleEventForEventType这个方法,通过传入的参数event,在subscriptionsByEventType找到event对应的value,再继续相应的操作。

为了取消订阅这里引入typesBySubscriber和上面的类似,那我们再画一个图来梳理逻辑层次。

我们取消一个subscriber的订阅时,也就是unregister(Object subscriber).我们会在typesBySubscriber中找到该subsriber对应的evnt。然后再由此event去subscriptionsByEventType找到一系列的subscription,并把他们remove。