RxJava取消订阅的各种方式的实现
手动取消订阅
Consumer类型
Observable创建返回Disposable取消
publicclassSecondActivityextendsAppCompatActivity{ privatestaticfinalStringTAG="SecondActivity"; privateDisposabledisposable; @Override protectedvoidonCreate(BundlesavedInstanceState){ super.onCreate(savedInstanceState); setContentView(R.layout.activity_second); disposable=Observable.create(newObservableOnSubscribe(){ @Override publicvoidsubscribe(ObservableEmitter emitter)throwsException{ try{ Thread.sleep(5000); }catch(InterruptedExceptione){ e.printStackTrace(); } } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(newConsumer (){ @Override publicvoidaccept(Strings)throwsException{ Log.d(TAG,"accept:"+s); } }); } @Override protectedvoidonDestroy(){ super.onDestroy(); Log.d(TAG,"onDestroy:"); //取消订阅 if(disposable!=null&&!disposable.isDisposed()){ disposable.dispose(); Log.d(TAG,"onDestroy:dispose"); } } }
普通类型Observer
在Observer中获取Disposable然后取消
publicclassThirdActivityextendsAppCompatActivity{ privatestaticfinalStringTAG="ThirdActivity"; Disposabledisposable; @Override protectedvoidonCreate(BundlesavedInstanceState){ super.onCreate(savedInstanceState); setContentView(R.layout.activity_third); Observable.create(newObservableOnSubscribe(){ @Override publicvoidsubscribe(ObservableEmitter emitter)throwsException{ try{ Thread.sleep(5000); emitter.onNext("testInfo"); }catch(InterruptedExceptione){ e.printStackTrace(); } } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(newObserver (){ @Override publicvoidonSubscribe(Disposabled){ disposable=d; } @Override publicvoidonNext(Strings){ Log.d(TAG,"onNext:"+s); } @Override publicvoidonError(Throwablee){ Log.d(TAG,"onError:"); } @Override publicvoidonComplete(){ Log.d(TAG,"onComplete:"); } }); } @Override protectedvoidonDestroy(){ super.onDestroy(); Log.d(TAG,"onDestroy:"); //然后在需要取消订阅的地方调用即可 if(disposable!=null&&!disposable.isDisposed()){ Log.d(TAG,"dispose:"); disposable.dispose(); } } }
DisposableObserver类型
利用DisposableObserver和SubscribeWith直接返回Disposable,然后取消
publicclassFourthActivityextendsAppCompatActivity{ privatestaticfinalStringTAG="FourthActivity"; privateDisposableObserverobserver; @Override protectedvoidonCreate(BundlesavedInstanceState){ super.onCreate(savedInstanceState); setContentView(R.layout.activity_fourth); observer=Observable.create(newObservableOnSubscribe (){ @Override publicvoidsubscribe(ObservableEmitter emitter)throwsException{ try{ Thread.sleep(5000); emitter.onNext("testInfo"); }catch(InterruptedExceptione){ e.printStackTrace(); } } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribeWith(newDisposableObserver (){ @Override publicvoidonNext(Stringo){ Log.d(TAG,"onNext:"+o); } @Override publicvoidonError(Throwablee){ Log.d(TAG,"onError:"); } @Override publicvoidonComplete(){ Log.d(TAG,"onComplete:"); } }); } @Override protectedvoidonDestroy(){ super.onDestroy(); if(observer!=null&&!observer.isDisposed()){ Log.d(TAG,"dispose:"); observer.dispose(); } } }
取消多个Observer
把多个Observer添加CompositeDisposable,一次取消
publicclassComDisposableActivityextendsAppCompatActivity{ privateDisposabledisposable1; privateDisposabledisposable2; privatestaticfinalStringTAG="ComDisposableActivity"; @Override protectedvoidonCreate(BundlesavedInstanceState){ super.onCreate(savedInstanceState); setContentView(R.layout.activity_com_disposable); Observable.create(newObservableOnSubscribe(){ @Override publicvoidsubscribe(ObservableEmitter emitter)throwsException{ try{ Thread.sleep(5000); emitter.onNext("testInfo"); }catch(InterruptedExceptione){ e.printStackTrace(); } } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .doOnDispose(newAction(){ @Override publicvoidrun()throwsException{ Log.d(TAG,"run:UnsubscribingsubscriptionfromonCreate()"); } }) .subscribe(newObserver (){ @Override publicvoidonSubscribe(Disposabled){ disposable1=d; } @Override publicvoidonNext(Strings){ Log.d(TAG,"onNext:"+s); } @Override publicvoidonError(Throwablee){ Log.d(TAG,"onError:"); } @Override publicvoidonComplete(){ Log.d(TAG,"onComplete:"); } }); Observable.create(newObservableOnSubscribe (){ @Override publicvoidsubscribe(ObservableEmitter emitter)throwsException{ try{ Thread.sleep(5000); emitter.onNext("testInfo"); }catch(InterruptedExceptione){ e.printStackTrace(); } } }).subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(newObserver (){ @Override publicvoidonSubscribe(Disposabled){ disposable2=d; } @Override publicvoidonNext(Strings){ Log.d(TAG,"onNext:"+s); } @Override publicvoidonError(Throwablee){ Log.d(TAG,"onError:"); } @Override publicvoidonComplete(){ Log.d(TAG,"onComplete:"); } }); } @Override protectedvoidonDestroy(){ super.onDestroy(); CompositeDisposablecompositeDisposable=newCompositeDisposable(); //批量添加 compositeDisposable.add(disposable1); compositeDisposable.add(disposable2); //最后一次性全部取消订阅 compositeDisposable.dispose(); } }
RxLifecyle取消
OnDestory取消
Observable.interval(1,TimeUnit.SECONDS) .doOnDispose(newAction(){ @Override publicvoidrun()throwsException{ Log.d(TAG,"UnsubscribingbindToLifecyclefromonDestroy()"); } }) .compose(this.bindToLifecycle()) .subscribe(newConsumer (){ @Override publicvoidaccept(Longnum)throwsException{ Log.d(TAG,"accept:"+num); } });
指定生命周期取消
Observable.interval(1,TimeUnit.SECONDS) .doOnDispose(newAction(){ @Override publicvoidrun()throwsException{ Log.d(TAG,"UnsubscribingUbindUntilEventfromonPause()"); } }).compose(this.bindUntilEvent(ActivityEvent.PAUSE)) .subscribe(newConsumer (){ @Override publicvoidaccept(LongaLong)throwsException{ Log.d(TAG,"bindUntilEventaccept:"+aLong); } });
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。