使用RxJava实现事件总线RxBus
RxBus
目前大多数都会采用EventBus
或者Otto
作为事件总线通信库。由于项目引入RxJava,所以采用RxJava也可以轻松实现事件总线。
Subject
Rxbus
核心使用到的是Subject
。作为事件总线并不能确定什么时候发送事件,以及有多少事件。而Subject
和FlowableProcessor
都可以解决这样的需求。Subject
继承自 Observable
并实现了 Observer
;而FlowableProcessor
继承自 Flowable
并实现了 Processor
和FlowableSubscriber
。 所以FlowableProcessor
支持被压,而Subject
不支持被压。
Subject .java
package io.reactivex.subjects;
import io.reactivex.*;
import io.reactivex.annotations.*;
/**
* Represents an {@link Observer} and an {@link Observable} at the same time, allowing
* multicasting events from a single source to multiple child {@code Observer}s.
* <p>
* All methods except the {@link #onSubscribe(io.reactivex.disposables.Disposable)}, {@link #onNext(Object)},
* {@link #onError(Throwable)} and {@link #onComplete()} are thread-safe.
* Use {@link #toSerialized()} to make these methods thread-safe as well.
*
* @param <T> the item value type
*/
public abstract class Subject<T> extends Observable<T> implements Observer<T> {
/**
* Returns true if the subject has any Observers.
* <p>The method is thread-safe.
* @return true if the subject has any Observers
*/
public abstract boolean hasObservers();
/**
* Returns true if the subject has reached a terminal state through an error event.
* <p>The method is thread-safe.
* @return true if the subject has reached a terminal state through an error event
* @see #getThrowable()
* @see #hasComplete()
*/
public abstract boolean hasThrowable();
/**
* Returns true if the subject has reached a terminal state through a complete event.
* <p>The method is thread-safe.
* @return true if the subject has reached a terminal state through a complete event
* @see #hasThrowable()
*/
public abstract boolean hasComplete();
/**
* Returns the error that caused the Subject to terminate or null if the Subject
* hasn't terminated yet.
* <p>The method is thread-safe.
* @return the error that caused the Subject to terminate or null if the Subject
* hasn't terminated yet
*/
@Nullable
public abstract Throwable getThrowable();
/**
* Wraps this Subject and serializes the calls to the onSubscribe, onNext, onError and
* onComplete methods, making them thread-safe.
* <p>The method is thread-safe.
* @return the wrapped and serialized subject
*/
@NonNull
public final Subject<T> toSerialized() {
if (this instanceof SerializedSubject) {
return this;
}
return new SerializedSubject<T>(this);
}
}
FlowableProcessor.java
package io.reactivex.processors;
import org.reactivestreams.Processor;
import io.reactivex.*;
import io.reactivex.annotations.*;
/**
* Represents a Subscriber and a Flowable (Publisher) at the same time, allowing
* multicasting events from a single source to multiple child Subscribers.
* <p>All methods except the onSubscribe, onNext, onError and onComplete are thread-safe.
* Use {@link #toSerialized()} to make these methods thread-safe as well.
*
* @param <T> the item value type
*/
public abstract class FlowableProcessor<T> extends Flowable<T> implements Processor<T, T>, FlowableSubscriber<T> {
/**
* Returns true if the FlowableProcessor has subscribers.
* <p>The method is thread-safe.
* @return true if the FlowableProcessor has subscribers
*/
public abstract boolean hasSubscribers();
/**
* Returns true if the FlowableProcessor has reached a terminal state through an error event.
* <p>The method is thread-safe.
* @return true if the FlowableProcessor has reached a terminal state through an error event
* @see #getThrowable()
* @see #hasComplete()
*/
public abstract boolean hasThrowable();
/**
* Returns true if the FlowableProcessor has reached a terminal state through a complete event.
* <p>The method is thread-safe.
* @return true if the FlowableProcessor has reached a terminal state through a complete event
* @see #hasThrowable()
*/
public abstract boolean hasComplete();
/**
* Returns the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor
* hasn't terminated yet.
* <p>The method is thread-safe.
* @return the error that caused the FlowableProcessor to terminate or null if the FlowableProcessor
* hasn't terminated yet
*/
@Nullable
public abstract Throwable getThrowable();
/**
* Wraps this FlowableProcessor and serializes the calls to the onSubscribe, onNext, onError and
* onComplete methods, making them thread-safe.
* <p>The method is thread-safe.
* @return the wrapped and serialized FlowableProcessor
*/
@NonNull
@CheckReturnValue
public final FlowableProcessor<T> toSerialized() {
if (this instanceof SerializedProcessor) {
return this;
}
return new SerializedProcessor<T>(this);
}
}
See Also
Subject四个实现类
PublishSubject
发送订阅之后的所有数据 has an error
BehaviorSubject
发送订阅之前的一个数据和订阅之后的全部数据 has an error
ReplaySubject
不论什么时候订阅,都发送全部数据AsyncSubject
不论什么时候订阅,只会发送最后一个数据 has an error
Subject | 发射行为 |
---|---|
PublishSubject | 发送订阅之后的所有数据 |
BehaviorSubject | 发送订阅之前的一个数据和订阅之后的全部数据 |
ReplaySubject | 不论什么时候订阅,都发送全部数据 |
AsyncSubject | 不论什么时候订阅,只会发送最后一个数据 |
RxBus
所以最终选择了PublishSubject
来实现RxBus
。基于事件的类型注册监听。
Bus.java
public interface Bus {
void post(@NonNull Object event);
<T> Observable<T> ofType(@NonNull Class<T> eventType);
boolean hasObservers();
}
发射事件
可以发射一般事件和stick event
RxBus.getDefault().post(event);
RxBus.getDefault().postSticky(stickEvent);
@Override
public void post(Object event) {
bus.onNext(event);
}
private final ConcurrentMap<Class<?>, List<Object>> stickyEvents;
public void postSticky(Object event) {
if (event == null) return;
synchronized (stickyEvents) {
Class<?> clazz = event.getClass();
List<Object> events = stickyEvents.get(clazz);
//noinspection Java8MapApi
if (events == null) {
events = new ArrayList<>();
stickyEvents.put(clazz, events);
}
events.add(event);
}
post(event);
}
由于发射的事件可能发生在不同线程,而Subject
是非线程安全的,为了避免该问题需要将其转换为SerializedSubject
。可以使用PublishSubject.create().toSerialized()
来保证发射事件线程安全。
订阅事件
提供了两种方式订阅
- 使用
ofType
或者ofStickyType
对事件类型进行订阅
disposable = RxBus.getDefault()
.ofType(String.class)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(s -> {
Loger.d(TAG, "accept() s : " + s);
});
disposable = RxBus.getDefault().ofStickyType(String.class)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(s -> {
Loger.d(TAG, "accept() s : " + s);
},
e -> {
Loger.d(TAG, "accept() error : " e);
}
);
// RxBus.java
@Override
public <T> Observable<T> ofType(Class<T> eventType) {
return bus.ofType(eventType);
}
private final ConcurrentMap<Class<?>, List<Object>> stickyEvents;
public <T> Observable<T> ofStickyType(Class<T> eventType) {
synchronized (stickyEvents) {
//noinspection unchecked
List<T> events = (List<T>) stickyEvents.get(eventType);
if (events != null && events.size() > 0) {
return Observable.fromIterable(events)
.mergeWith(ofType(eventType));
}
}
return ofType(eventType);
}
取消订阅(否则可能会导致内存泄露)
if (disposable != null) {
disposable.dispose();
}
- 采用注解的方式来自动完成订阅
使用注解@Subscribe
,支持sticky事件和指定接受事件线程
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Subscribe {
EventThread observeOnThread() default EventThread.MAIN;
boolean isSticky() default false;
}
订阅
@Override
public void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
RxBus.getDefault().register(this);
}
@Subscribe
public void onTestEvent1(TestEvent1 event1) {
Loger.d(TAG, event1);
}
@Subscribe(observeOnThread = EventThread.NEW)
public void onTestEvent2(TestEvent2 event2) {
Loger.d(TAG, event2);
}
@Subscribe(isSticky = true, observeOnThread = EventThread.MAIN)
public void onTestStickyEvnet(String s) {
Loger.d(TAG, s);
}
// RxBus.java
private final ConcurrentMap<Object, CompositeDisposable> disposableMap;
public void register(Object object) {
if (object != null) {
synchronized (disposableMap) {
if (hasRegistered(object)) {
if (sStrictMethodVerification) {
throw new RuntimeException(String.format("%s has already registered.", object));
}
} else {
Class<?> subscriberClass = object.getClass();
List<SubscriberMethod> subscriberMethods = finder.getSubscriberMethod(subscriberClass);
CompositeDisposable compositeDisposable = disposableMap.get(object);
if ( subscriberMethods != null && subscriberMethods.size() > 0 ) {
for (SubscriberMethod subscriberMethod : subscriberMethods) {
Disposable disposable = (subscriberMethod.isSticky
? ofStickyType(subscriberMethod.eventType)
: ofType(subscriberMethod.eventType))
.observeOn(EventThread.getScheduler(subscriberMethod.thread))
.subscribe(
o -> {
subscriberMethod.method.invoke(object, o);
},
throwable -> {
if (sStrictMethodVerification) {
throw new RuntimeException(
"register " + object + " onError() -> subscribe error : " + throwable, throwable);
}
}
);
if (compositeDisposable == null) {
compositeDisposable = new CompositeDisposable();
disposableMap.put(object, compositeDisposable);
}
compositeDisposable.add(disposable);
}
}
}
}
}
}
订阅时查找当前类的所有方法是否有@Subcribe
public final class SubscribeAnnotationFinder implements Finder {
private static final String TAG = "SubscribeAnnotationFinder";
private static final ConcurrentMap<Class<?>, List<SubscriberMethod>> METHOD_CACHE = new ConcurrentHashMap<>();
private static final int MODIFIERS_IGNORE = Modifier.ABSTRACT | Modifier.STATIC;
private final boolean strictMethodVerification;
public SubscribeAnnotationFinder(boolean strictMethodVerification) {
this.strictMethodVerification = strictMethodVerification;
}
@Override
public List<SubscriberMethod> getSubscriberMethod(Class<?> subscriberClass) {
List<SubscriberMethod> subscriberMethods = METHOD_CACHE.get(subscriberClass);
if ( subscriberMethods == null ) {
subscriberMethods = findSubscriberMethods(subscriberClass);
if (subscriberMethods != null) {
METHOD_CACHE.put(subscriberClass, subscriberMethods);
} else {
if (strictMethodVerification) {
throw new RuntimeException("Subscribe " + subscriberClass
+ " and its super classes have no public methods with the @Subscribe annotation");
}
}
}
return subscriberMethods;
}
private List<SubscriberMethod> findSubscriberMethods(Class<?> clazz) {
List<SubscriberMethod> subscriberMethods = new ArrayList<>();
String clazzName;
boolean isCompleteFound;
do {
Method[] methods = clazz.getDeclaredMethods();
for (Method method : methods) {
String methodName = method.getDeclaringClass().getName() + "." + method.getName();
int modifiers = method.getModifiers();
if ((modifiers & Modifier.PUBLIC) != 0 && (modifiers & MODIFIERS_IGNORE) == 0) {
Class<?>[] parameterTypes = method.getParameterTypes();
if (parameterTypes.length == 1) {
Subscribe annotation = method.getAnnotation(Subscribe.class);
if (annotation != null) {
Class<?> eventType = ClassUtils.getEventType(parameterTypes[0]);
boolean isSticky = annotation.isSticky();
EventThread eventThread = annotation.observeOnThread();
subscriberMethods.add(new SubscriberMethod(method, eventType, isSticky, eventThread));
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
throw new RuntimeException("@Subscribe method " + methodName +
" must have exactly 1 parameter but has " + parameterTypes.length);
}
} else if (strictMethodVerification && method.isAnnotationPresent(Subscribe.class)) {
throw new RuntimeException("@Subscribe method " + methodName +
" must be public, non-static, and non-abstract");
}
}
clazz = clazz.getSuperclass();
clazzName = clazz.getName();
isCompleteFound =
clazzName.startsWith("java.") || clazzName.startsWith("javax.") || clazzName.startsWith("android.");
} while (!isCompleteFound);
return subscriberMethods;
}
}
取消订阅
@Override
public void onDestroy() {
super.onDestroy();
RxBus.getDefault().unregister(this);
}
//RxBus.java
private final ConcurrentMap<Object, CompositeDisposable> disposableMap;
public void unregister(Object object) {
LoggerUtil.d(TAG, "unregister() called with: object = [" + object + "]");
if (object != null) {
synchronized (disposableMap) {
if (hasRegistered(object)) {
CompositeDisposable compositeDisposable = disposableMap.get(object);
if (compositeDisposable != null) {
compositeDisposable.dispose();
disposableMap.remove(object);
}
}
}
}
}
public boolean hasRegistered(Object object) {
return object != null && disposableMap.containsKey(object);
}
删除已经发射的StickEvent
//RxBus.java
public void removeStickyEvents(Class<?> clazz) {
LoggerUtil.d(TAG, "removeStickyEvents() called with: clazz = [" + clazz + "]");
synchronized (stickyEvents) {
stickyEvents.remove(ClassUtils.getEventType(clazz));
}
}
public void removeStickyEvent(Object event) {
LoggerUtil.d(TAG, "removeStickyEvent() called with: event = [" + event + "]");
synchronized (stickyEvents) {
List<Object> objects = stickyEvents.get(event.getClass());
if (objects != null) {
objects.remove(event);
}
}
}
public void removeAllStickyEvent() {
LoggerUtil.d(TAG, "removeAllStickyEvent() called");
synchronized (stickyEvents) {
stickyEvents.clear();
}
}