RxJava2.X 源码分析(六):变换操作符的实现原理(上)

本贴最后更新于 1934 天前,其中的信息可能已经事过境迁

一、前言

  • Ok,RxJava 用的最爽的当然还有我们的变换操作符了
  • 看到这么好用的东西,估计你也有过想一探究竟的冲动,想看下内部是如何实现的
  • 操作符的分析我打算是分成两篇讲解,先从简单的 map 入手,当了解其本质后再分析强大的 flatMap 操作符

二、从 Demo 到源码

  • 我们依然是以前的套路,先看一个 demo

      Observable observable = Observable.create(new ObservableOnSubscribe() {
      	@Override
        public void subscribe(@NonNull ObservableEmitter emitter) throws Exception {
      		emitter.onNext(1);
      		emitter.onNext(2);
      		emitter.onNext(3);
    
      	}
    
      });
    
      observable.map(new Function() {
      			@Override
        public Integer apply(@NonNull Integer integer) throws Exception {
      				return integer*integer;
      			}
      		}).subscribe(new Consumer() {
      	@Override
        public void accept(@NonNull Integer integer) throws Exception {
      		Log.i(TAG, ">>>data is : " + integer);
      	}
      });
    
  • 输出结果

      07-17 09:34:52.123 3710-3729/? I/RxJavaDemo2: >>>data is : 1
      07-17 09:34:52.123 3710-3729/? I/RxJavaDemo2: >>>data is : 4
      07-17 09:34:52.123 3710-3729/? I/RxJavaDemo2: >>>data is : 9
    
  • 当然,操作符 map 提供的能力肯定不止这样,你可以的 apply 回调里面编写需要的逻辑代码。


三、源码分析

  • OK,从 demo 中我们看到,经过 map 后,我们的结果跟我们的预期一样。

  • 我们以 map 为切入点,看下内部都做了些什么呢

      public final <R> Observable<R> map(Functionsuper T, ? extends R> mapper) {
      	ObjectHelper.requireNonNull(mapper, "mapper is null");
      	return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
      }
    
  • 果然,还是熟悉的代码,变得只有 onAssembly 参数里面的东西,这里可以注意一下 T 上游 Observable 下发的数据类型,R 为下游 Observer 将要接收的数据类型,也就是说,暂时我们可以这样理解,T 转换为 R,为什么这样说呢,因为到 flatMap 时,就不能这样简单的理解了

  • Ok,RxJavaPlugins.onAssembly 我们都知道啦,有关 hook 的,我们继续往下看

      public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
    
      	final Function<? super T, ? extends U> function;
    
      	public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
      		//1、source 为上游的Observable
      		super(source);
      		 //2、 function 为我们传入的funcation对象
      		this.function = function;
      	}
    
      	@Override
        public void subscribeActual(Observer<? super U> t) {
      		//3、t为下游的Observer对象
      		source.subscribe(new MapObserver<T, U>(t, function));
      	}
    
      	static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
      		final Functionsuper T, ? extends U> mapper;
    
      		MapObserver(Observer<? super U> actual, Functionsuper T, ? extends U> mapper) {
      			//4、actual 为下游的Observer
      			super(actual);
      			//5、mapper为我们传入的function函数对象
      			this.mapper = mapper;
      		}
    
      		@Override
        public void onNext(T t) {
      			if (done) {
      				return;
      			}
      			...
      			U v;
    
      			try {
      				//6、调用mapper的apply方法,或者apply回调的返回值
      				v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
      			} catch (Throwable ex) {
      				fail(ex);
      				return;
      			}
      			//7、回调下游Obsever的onNext方法
      			actual.onNext(v);
      		}
    
      	  .....
      	}
      }
    
  • 按照流程应该是这样的:

下游 Obsever.subscribe-> 触发 ObservableMap.subscribeActual-> 在 subscribeActual 中通过中间 Observer 订阅上游 Observable->1、上游 Observable 执行 subscribeActual、2、执行中间 Observer 的 onSubscribe ;3、执行中间 Obsever 的 onXXX 方法下发数据。-> 中间 Observer 调用:1、下游的 Observer 的 onSubscribe 以及执行 mapper 回到后将 apply 的返回值传递给 onXXX 回调完成数据的转换级数据的下发传递。

  • 但是,我们目前没发现 MapObserver 里面的 onSubscribe 方法,估计是在父类了

    public abstract class BasicFuseableObserver<T, R> implements Observer<T>, QueueDisposable<R> {
    ...
    public BasicFuseableObserver(Observersuper R> actual) {
      	  this.actual = actual;
        }
    
    
      @SuppressWarnings("unchecked")
        @Override
      public final void onSubscribe(Disposable s) {
      	  if (DisposableHelper.validate(this.s, s)) {
      		  //1、接收下游的Disposable加入管理队列
      		  this.s = s;
      		  if (s instanceof QueueDisposable) {
      			  this.qs = (QueueDisposable<T>)s;
      		  }
      		  //2、可以重写弘治onSubscribe()的回调
      		  if (beforeDownstream()) {
    
      			  actual.onSubscribe(this);
      			//3、可重写在onSubscribe调用后做一些操作
      			  afterDownstream();
      		  }
    
      	  }
        }
    
    
      @Override
      public void onError(Throwable t) {
      	  if (done) {
      		  RxJavaPlugins.onError(t);
      		  return;
      	  }
      	  done = true;
      	  actual.onError(t);
        }
    
        @Override
      public void onComplete() {
      	  if (done) {
      		  return;
      	  }
      	  done = true;
      	  actual.onComplete();
        }
    ...
    }
    
  • Ok,果然在父类中抽取了一些公共的操作减少子类的代码量。


四、总结:

  • Ok,根据上面的分析,其实对于 map 的操作过程我们已经很清楚了,其跟之前的线程切换的实现原理基本一样,通过在中间使用装饰者模式插入一个中间的 Observable 和 Observe,你可以想象为代理。
  • 代理Observable 做的事就是接收 下游Obsever 的订阅事件,然后通过 代理Obsever 订阅 上游Observer,然后在 上游Observer 下发数据給 代理Observer 时,通过先调用 mapper.apply 转换回调函数获得转换后的数据,然后下发给 下游Obsever
  • Ok,其实就是这样,在 RxJava2 中大量运用装饰者模式来实现扩展功能。
  • RxJava2 的 flatMap 高级转换函数我们将再下篇进行分析。
  • 喜欢就给我留言哦,有好的建议也可以在下方留言。

五、相关文章

  • Java

    Java 是一种可以撰写跨平台应用软件的面向对象的程序设计语言,是由 Sun Microsystems 公司于 1995 年 5 月推出的。Java 技术具有卓越的通用性、高效性、平台移植性和安全性。

    3165 引用 • 8206 回帖
  • 学习

    “梦想从学习开始,事业从实践起步” —— 习近平

    160 引用 • 470 回帖

相关帖子

欢迎来到这里!

我们正在构建一个小众社区,大家在这里相互信任,以平等 • 自由 • 奔放的价值观进行分享交流。最终,希望大家能够找到与自己志同道合的伙伴,共同成长。

注册 关于
请输入回帖内容 ...