Anxin 不乱于心,不困于情,不畏将来,不念过往,如此,安好.

RxBus

2019-09-07

使用RxJava实现事件总线RxBus

RxBus

目前大多数都会采用EventBus或者Otto作为事件总线通信库。由于项目引入RxJava,所以采用RxJava也可以轻松实现事件总线。

Subject

Rxbus核心使用到的是Subject。作为事件总线并不能确定什么时候发送事件,以及有多少事件。而SubjectFlowableProcessor都可以解决这样的需求。Subject 继承自 Observable并实现了 Observer;而FlowableProcessor 继承自 Flowable并实现了 ProcessorFlowableSubscriber。 所以FlowableProcessor 支持被压,而Subject 不支持被压。

Subject .java

package io.reactivex.subjects;

import io.reactivex.*;
import io.reactivex.annotations.*;

/**
 * Represents an {@link Observer} and an {@link Observable} at the same time, allowing
 * multicasting events from a single source to multiple child {@code Observer}s.
 * <p>
 * All methods except the {@link #onSubscribe(io.reactivex.disposables.Disposable)}, {@link #onNext(Object)},
 * {@link #onError(Throwable)} and {@link #onComplete()} are thread-safe.
 * Use {@link #toSerialized()} to make these methods thread-safe as well.
 *
 * @param <T> the item value type
 */
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
    /**
     * Returns true if the subject has any Observers.
     * <p>The method is thread-safe.
     * @return true if the subject has any Observers
     */
    public abstract boolean hasObservers();

    /**
     * Returns true if the subject has reached a terminal state through an error event.
     * <p>The method is thread-safe.
     * @return true if the subject has reached a terminal state through an error event
     * @see #getThrowable()
     * @see #hasComplete()
     */
    public abstract boolean hasThrowable();

    /**
     * Returns true if the subject has reached a terminal state through a complete event.
     * <p>The method is thread-safe.
     * @return true if the subject has reached a terminal state through a complete event
     * @see #hasThrowable()
     */
    public abstract boolean hasComplete();

    /**
     * Returns the error that caused the Subject to terminate or null if the Subject
     * hasn't terminated yet.
     * <p>The method is thread-safe.
     * @return the error that caused the Subject to terminate or null if the Subject
     * hasn't terminated yet
     */
    @Nullable
    public abstract Throwable getThrowable();

    /**
     * Wraps this Subject and serializes the calls to the onSubscribe, onNext, onError and
     * onComplete methods, making them thread-safe.
     * <p>The method is thread-safe.
     * @return the wrapped and serialized subject
     */
    @NonNull
    public final Subject<T> toSerialized() {
        if (this instanceof SerializedSubject) {
            return this;
        }
        return new SerializedSubject<T>(this);
    }
}

FlowableProcessor.java

package io.reactivex.processors;

import org.reactivestreams.Processor;

import io.reactivex.*;
import io.reactivex.annotations.*;

/**
 * Represents a Subscriber and a Flowable (Publisher) at the same time, allowing
 * multicasting events from a single source to multiple child Subscribers.
 * <p>All methods except the onSubscribe, onNext, onError and onComplete are thread-safe.
 * Use {@link #toSerialized()} to make these methods thread-safe as well.
 *
 * @param <T> the item value type
 */
public abstract class FlowableProcessor<T> extends Flowable<T> implements Processor<T, T>, FlowableSubscriber<T> {

    /**
     * Returns true if the FlowableProcessor has subscribers.
     * <p>The method is thread-safe.
     * @return true if the FlowableProcessor has subscribers
     */
    public abstract boolean hasSubscribers();

    /**
     * Returns true if the FlowableProcessor has reached a terminal state through an error event.
     * <p>The method is thread-safe.
     * @return true if the FlowableProcessor has reached a terminal state through an error event
     * @see #getThrowable()
     * @see #hasComplete()
     */
    public abstract boolean hasThrowable();

    /**
     * Returns true if the FlowableProcessor has reached a terminal state through a complete event.
     * <p>The method is thread-safe.
     * @return true if the FlowableProcessor has reached a terminal state through a complete event
     * @see #hasThrowable()
     */
    public abstract boolean hasComplete();

    /**
     * Returns the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor
     * hasn't terminated yet.
     * <p>The method is thread-safe.
     * @return the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor
     * hasn't terminated yet
     */
    @Nullable
    public abstract Throwable getThrowable();

    /**
     * Wraps this FlowableProcessor and serializes the calls to the onSubscribe, onNext, onError and
     * onComplete methods, making them thread-safe.
     * <p>The method is thread-safe.
     * @return the wrapped and serialized FlowableProcessor
     */
    @NonNull
    @CheckReturnValue
    public final FlowableProcessor<T> toSerialized() {
        if (this instanceof SerializedProcessor) {
            return this;
        }
        return new SerializedProcessor<T>(this);
    }
}

See Also

Subject四个实现类

PublishSubject 发送订阅之后的所有数据 has an error

BehaviorSubject 发送订阅之前的一个数据和订阅之后的全部数据 has an error

ReplaySubject 不论什么时候订阅,都发送全部数据 AsyncSubject 不论什么时候订阅,只会发送最后一个数据 has an error

Subject 发射行为
PublishSubject 发送订阅之后的所有数据
BehaviorSubject 发送订阅之前的一个数据和订阅之后的全部数据
ReplaySubject 不论什么时候订阅,都发送全部数据
AsyncSubject 不论什么时候订阅,只会发送最后一个数据

RxBus

所以最终选择了PublishSubject来实现RxBus。基于事件的类型注册监听。

Bus.java

public interface Bus {

    void post(@NonNull Object event);

    <T> Observable<T> ofType(@NonNull Class<T> eventType);

    boolean hasObservers();
}

发射事件

可以发射一般事件和stick event

RxBus.getDefault().post(event);

RxBus.getDefault().postSticky(stickEvent);
@Override
    public void post(Object event) {
        bus.onNext(event);
    }
    
    private final ConcurrentMap<Class<?>, List<Object>> stickyEvents;
    
    public void postSticky(Object event) {
        if (event == null) return;
        synchronized (stickyEvents) {
            Class<?> clazz = event.getClass();
            List<Object> events = stickyEvents.get(clazz);
            //noinspection Java8MapApi
            if (events == null) {
                events = new ArrayList<>();
                stickyEvents.put(clazz, events);
            }
            events.add(event);
        }
        post(event);
    }

由于发射的事件可能发生在不同线程,而Subject是非线程安全的,为了避免该问题需要将其转换为SerializedSubject。可以使用PublishSubject.create().toSerialized()来保证发射事件线程安全。

订阅事件

提供了两种方式订阅

  1. 使用 ofType 或者 ofStickyType 对事件类型进行订阅
disposable = RxBus.getDefault()
                .ofType(String.class)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(s -> {
                    Loger.d(TAG, "accept() s : " + s);
                });
                
disposable = RxBus.getDefault().ofStickyType(String.class)
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(s -> {
                           Loger.d(TAG, "accept()  s : " + s);
                        },
                        e -> {
                        Loger.d(TAG, "accept() error : " e);
                        }
                );               
// RxBus.java
    @Override
    public <T> Observable<T> ofType(Class<T> eventType) {
        return bus.ofType(eventType);
    }

    private final ConcurrentMap<Class<?>, List<Object>> stickyEvents;
    
    public <T> Observable<T> ofStickyType(Class<T> eventType) {
        synchronized (stickyEvents) {
            //noinspection unchecked
            List<T> events = (List<T>) stickyEvents.get(eventType);
            if (events != null && events.size() > 0) {
                return Observable.fromIterable(events)
                        .mergeWith(ofType(eventType));
            }
        }
        return ofType(eventType);
    }

取消订阅(否则可能会导致内存泄露)

if (disposable != null) {
    disposable.dispose();
}
  1. 采用注解的方式来自动完成订阅

使用注解@Subscribe,支持sticky事件和指定接受事件线程

@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Subscribe {
    EventThread observeOnThread() default EventThread.MAIN;
    boolean isSticky() default false;
}

订阅

    @Override
    public void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        RxBus.getDefault().register(this);
    }

    @Subscribe
    public void onTestEvent1(TestEvent1 event1) {
               Loger.d(TAG, event1);
    }

    @Subscribe(observeOnThread = EventThread.NEW)
    public void onTestEvent2(TestEvent2 event2) {
        Loger.d(TAG, event2);
    }
    
    @Subscribe(isSticky = true, observeOnThread = EventThread.MAIN)
    public void onTestStickyEvnet(String s) {
              Loger.d(TAG, s);
    }    
// RxBus.java

    private final ConcurrentMap<Object, CompositeDisposable> disposableMap;
    
    public void register(Object object) {
        if (object != null) {
            synchronized (disposableMap) {
                if (hasRegistered(object)) {
                    if (sStrictMethodVerification) {
                        throw new RuntimeException(String.format("%s has already registered.", object));
                    }
                } else {
                    Class<?> subscriberClass = object.getClass();
                    List<SubscriberMethod> subscriberMethods = finder.getSubscriberMethod(subscriberClass);
                    CompositeDisposable compositeDisposable = disposableMap.get(object);
                    if ( subscriberMethods != null && subscriberMethods.size() > 0 ) {
                        for (SubscriberMethod subscriberMethod : subscriberMethods) {
                            Disposable disposable = (subscriberMethod.isSticky 
                            ? ofStickyType(subscriberMethod.eventType) 
                            : ofType(subscriberMethod.eventType))
                                    .observeOn(EventThread.getScheduler(subscriberMethod.thread))
                                    .subscribe(
                                            o -> {
                                                subscriberMethod.method.invoke(object, o);
                                            },
                                            throwable -> {
                                                if (sStrictMethodVerification) {
                                                    throw new RuntimeException(
                                                    "register " + object + " onError() -> subscribe error : " + throwable, throwable);
                                                }
                                            }
                                    );
                            if (compositeDisposable == null) {
                                compositeDisposable = new CompositeDisposable();
                                disposableMap.put(object, compositeDisposable);
                            }
                            compositeDisposable.add(disposable);
                        }
                    }
                }
            }
        }
    }

订阅时查找当前类的所有方法是否有@Subcribe

public final class SubscribeAnnotationFinder implements Finder {
    private static final String TAG = "SubscribeAnnotationFinder";

    private static final ConcurrentMap<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();
    private static final int MODIFIERS_IGNORE = Modifier.ABSTRACT | Modifier.STATIC;
    private final boolean strictMethodVerification;

    public SubscribeAnnotationFinder(boolean strictMethodVerification) {
        this.strictMethodVerification = strictMethodVerification;
    }

    @Override
    public List<SubscriberMethod> getSubscriberMethod(Class<?> subscriberClass) {
        List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
        if ( subscriberMethods == null ) {
            subscriberMethods = findSubscriberMethods(subscriberClass);
            if (subscriberMethods != null) {
                METHOD_CACHE.put(subscriberClass, subscriberMethods);
            } else {
                if (strictMethodVerification) {
                    throw new RuntimeException("Subscribe " + subscriberClass
                            + " and its super classes have no public methods with the @Subscribe annotation");
                }
            }
        }
        return subscriberMethods;
    }

    private List<SubscriberMethod> findSubscriberMethods(Class<?> clazz) {
        List<SubscriberMethod> subscriberMethods = new ArrayList<>();
        String clazzName;
        boolean isCompleteFound;
        do {
            Method[] methods = clazz.getDeclaredMethods();
            for (Method method : methods) {
                String methodName = method.getDeclaringClass().getName() + "." + method.getName();
                int modifiers = method.getModifiers();
                if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
                    Class<?>[] parameterTypes = method.getParameterTypes();
                    if (parameterTypes.length == 1) {
                        Subscribe annotation = method.getAnnotation(Subscribe.class);
                        if (annotation != null) {
                            Class<?> eventType = ClassUtils.getEventType(parameterTypes[0]);
                            boolean isSticky = annotation.isSticky();
                            EventThread eventThread = annotation.observeOnThread();
                            subscriberMethods.add(new SubscriberMethod(method, eventType, isSticky, eventThread));
                        }
                    } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                        throw new RuntimeException("@Subscribe method " + methodName + 
                        " must have exactly 1 parameter but has " + parameterTypes.length);
                    }
                } else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
                    throw new RuntimeException("@Subscribe method " + methodName +
                            " must be public, non-static, and non-abstract");
                }
            }
            clazz = clazz.getSuperclass();
            clazzName = clazz.getName();
            isCompleteFound =
             clazzName.startsWith("java.") || clazzName.startsWith("javax.") || clazzName.startsWith("android.");
        } while (!isCompleteFound);
        return subscriberMethods;
    }
}

取消订阅

@Override
    public void onDestroy() {
        super.onDestroy();
        RxBus.getDefault().unregister(this);
    }
//RxBus.java

    private final ConcurrentMap<Object, CompositeDisposable> disposableMap;
    
    public void unregister(Object object) {
        LoggerUtil.d(TAG, "unregister() called with: object = [" + object + "]");
        if (object != null) {
            synchronized (disposableMap) {
                if (hasRegistered(object)) {
                    CompositeDisposable compositeDisposable = disposableMap.get(object);
                    if (compositeDisposable != null) {
                        compositeDisposable.dispose();
                        disposableMap.remove(object);
                    }
                }
            }
        }
    }
    
    public boolean hasRegistered(Object object) {
        return object != null && disposableMap.containsKey(object);
    }

删除已经发射的StickEvent

//RxBus.java

    public void removeStickyEvents(Class<?> clazz) {
        LoggerUtil.d(TAG, "removeStickyEvents() called with: clazz = [" + clazz + "]");
        synchronized (stickyEvents) {
            stickyEvents.remove(ClassUtils.getEventType(clazz));
        }
    }

    public void removeStickyEvent(Object event) {
        LoggerUtil.d(TAG, "removeStickyEvent() called with: event = [" + event + "]");
        synchronized (stickyEvents) {
            List<Object> objects = stickyEvents.get(event.getClass());
            if (objects != null) {
                objects.remove(event);
            }
        }
    }

    public void removeAllStickyEvent() {
        LoggerUtil.d(TAG, "removeAllStickyEvent() called");
        synchronized (stickyEvents) {
            stickyEvents.clear();
        }
    }


上一篇 String format

下一篇 Viewmod Livedata

Share

Comments

Content