RxJava مدتی است که معرفی شده و برنامه نویسان حتما با تواناییها و کارکرد آن آشنایی دارند، اما بعضی از برنامه نویسان هنوز کار با Rxjava را شروع نکردهاند. اگر یکی از آنها هستید، در مقاله آموزش کامل RxJava در اندروید ، ما قصد داریم مبانی مربوط به RxJava و RxAndroid را از مباحث مبتدی تا پیشرفته آموزش دهیم.
این مقاله RxJava 2 را در مورد backpressure مشاهده کنید: RxJava2 Backpressure.
...Input: List<T>, ArrayList<T>, Set<T>, etc
<Ouput: Observable<T
مثال ()fromIterable:
ایجاد لیست Observable از task objects.
اولین و بارزترین کاربرد اپراتور ()Buffer بسته بندی (Bundle) اشیای منتشر شده به گروهها است. شاید شما فقط میخواهید به طور همزمان 2 اشیاء منتشر کنید و یک تأخیر زمانی بین آنها اضافه کنید. این همان چیزی است که در اولین مثال زیر مشاهده خواهید کرد.
برنامه بسیار مفید دیگر tracking UI interactions است. این همان چیزی است که در مثال دوم در زیر مشاهده خواهید کرد. همچنین به شما نشان خواهیم داد که چگونه از یک کتابخانه بسیار مفید ساخته شده توسط Jake Wharton، کتابخانه RxBinding، استفاده کنید. میتوانیم از کتابخانه RxBinding استفاده کنیم تا رویدادهای کلیک قابل مشاهده باشد.
توجه داشته باشید که برای مثالهای زیر به این دو کلاس مراجعه میکنیم:
Task.java (کلاس Task):
فرض کنید SearchView را در برنامه خود دارید. از آنجا که کاربر کاراکترهایی را در SearchView وارد میکند، میخواهید Query مربوط به سرور را انجام دهید. اگر گرفتن کاراکترها را برای یک دوره زمانی محدود نکنید، هر بار که یک کاراکتر جدید را وارد SearchView میکنید، درخواست جدیدی ایجاد میشود. به طور معمول این کار غیر ضروری است و نتایج نامطلوب به همراه خواهد داشت. اجرای یک جستجوی جدید در هر 0.5 ثانیه، خوب است.
اپراتور ()ThrottleFirst در توسعه اندروید بسیار مفید است. به عنوان مثال: اگر یک کاربر یک دکمه (Button) را اسپم (spamming) میکند و شما نمیخواهید هر کلیک را ثبت کنید. شما میتوانید از اپراتور ()ThrottleFirst استفاده کنید تا فقط در هر بازه زمانی رویدادهای جدید کلیک را ثبت کنید.
اپراتور ()FlatMap در توسعه اندروید بسیار مفید است و کارکرد اصلی دارد:
ما باید برای درخواستهای نظرات، Data را مدل سازی کنیم که در کد زیر میبینید.
برای نشان دادن این موضوع ، میخواهیم دقیقاً همان مثالی که در بخش Flatmap انجام دادیم را انجام بدهیم، اما به جای آن از اپراتور ()Concatmap استفاده میکنیم. اگر قبلاً از آن مثال استفاده کرده باشید، نتایج آن واقعاً جالب است.
حال به جای اینکه کامنتها همه با هم بازیابی شوند و به صورت تصادفی منتشر شوند، به صورت متوالی بازیابی میشوند. بنابراین لیست بعدی نظرات تا منتشر نشدن فهرست قبلی قابل بازیابی نیست.
ستون اول: App launch
ستون دوم: پس از گذشت 3000 میلی ثانیه
ستون سوم: پس از گذشت 20.000 میلی ثانیه
این خروجی مربوط به کدهای MainActivity بالا است.
همانطور که در تعریف فوق بیان کردیم، ()SwitchMap مشکلی را حل میکند که ()ConcatMap و ()FlatMap هر دو دارای آن هستند. این اطمینان را میدهد که فقط یک Observer میتواند در هر زمان معین Subscribe شود.
برنامه نویسی واکنشی (Reactive Programming) چیست ؟
برنامه نویسی واکنشی بهطور اساسی برنامه نویسی غیرهمزمان مبتنی بر رویداد است. هر چیزی که میبینید، یک جریان داده غیرهمزمان است، که میتواند مشاهده (Observed) شود و هنگامی که مقادیر منتشر شود، عملی صورت میگیرد. میتوانید جریان داده را از هر چیزی ایجاد کنید: تغییرات متغیر، رویدادهای کلیک، فراخوانی http، ذخیره داده ها، خطاها و ... هنگامی که میگویم برنامه نویسی واکنشی همزمان نیست، به معنی این است که هر ماژول کد بر روی Thread خود اجرا میشود، بنابراین چندین بلوک کد را همزمان اجرا میکند. یک مزیت رویکرد غیرهمزمان به این دلیل است که هر کار بر روی Thread خاص خود اجرا میشود، همه کارها میتوانند همزمان شروع شوند و مدت زمان طول کشیدن تمام کارها، معادل کار طولانیتر در لیست است. وقتی صحبت از برنامههای تلفن همراه است، با شروع کار در (Background Thread)، میتوانید بدون مسدود کردن Thread اصلی، به تجربه کاربری یکپارچه دست پیدا کنید. [note] اگر به مباحث برنامه نویسی اندروید علاقه مند هستید پیشنهاد ما دورههای آموزش برنامه نویسی اندروید با جاوا و دوره آموزش فلاتر سون لرن است که هر دو توسط یکی از مجربترین اساتید اندروید ایران تدریس شده است. [/note]یک مثال ساده
x میتواند برابر با y + z (x = y + z) باشد؛ که در آن مقدار y و z به x اختصاص داده میشود. در برنامه نویسی واکنشی (Reactive Programming)، هنگامی که مقدار y تغییر میکند ، مقدار x بدون اجرای مجدد عبارت x = y + z به طور خودکار به روز میشود. این میتواند با مشاهده (Observe) مقدار y یا z حاصل شود. یک لیست آرایه میتواند یک جریان داده باشد و میتوان هر بخشی را که از آن منتشر میشود، عملی کرد. ممکن است شما بخواهید شمارههای زوج را فیلتر کنید و اعداد فرد را نادیده بگیرید. این کار را میتوان با استفاده از حلقههای معمولی و عبارات شرطی انجام داد، اما در برنامه نویسی واکنشی میتوانید به روش کاملاً متفاوت به این هدف دست پیدا کنید. هنگامی که برنامه خود را در برنامه نویسی واکنشی (Reactive Programming) شروع میکنید، نحوه طراحی معماری و نحوه نوشتن کد به طور کامل تغییر میکند. حتی بیشتر قدرت مییابد وقتی با Clean Architecture ، MVP Architecture ، MVVM Architecture و سایر Design Patterns آشنا شوید.Reactive Extensions
ReactiveX یا RX کتابخانهای است که از اصول برنامه نویسی واکنش پذیر پیروی میکند، یعنی با استفاده از توالی قابل مشاهده (observable sequence) برنامه هایی غیرهمزمان و مبتنی بر رویدادها میسازد. Reactive Extensions در چندین زبان (C++ (RxCpp)، C# (Rx.NET ، جاوا (RxJava) ، کاتلین (RxKotlin)، سویفت (Swift (RxSwift) و موارد دیگر موجود است. ما در این مقاله به طور خاص به توضیح RxJava و RxAndroid میپردازیم.RxJava چیست ؟
در ادامهی مقالهی آموزش کامل RxJava در اندروید ، به شما توضیح میدهیم که rxjava چیست؟ RxJava پیاده سازی جاوا از Reactive Extensions است. در واقع این کتابخانه با دنبال کردن الگوی Observer، رویدادهای غیرهمزمان را میسازد. میتوانید جریان دادهای غیرهمزمان را در هر Thread ایجاد کنید، دادهها را تغییر داده و توسط هر Observer در هر Thread استفاده کنید. این کتابخانه طیف گسترده ای از عملگرهای شگفت انگیز مانند نقشه ، ترکیب ، ادغام ، فیلتر و موارد دیگر را ارائه میدهد که میتوانند در جریان داده استفاده شوند. این کتابخانه طیف گستردهای از عملگرهای شگفت انگیز مانند map ، combine ، merge ، filter و موارد دیگر را ارائه میدهد که میتوانند در جریان داده استفاده شوند. هنگامی که شروع به کار کنید و مثالها یا کدهای واقعی بنویسید، اطلاعات شما درباره اپراتورها و تحولات بیشتر شده و بهتر آن را درک میکنید.RxAndroid چیست ؟
در این بخش مقالهی آموزش کامل RxJava در اندروید ، به شما توضیح میدهیم که RxAndroid چیست؟ RxAndroid مخصوص به برنامههای اندرویدی (Android Platform) است که دارای چند کلاس اضافه در RxJava است. به طور خاص، Schedulers در (()RxAndroid (AndroidSchedulers.mainThread معرفی شده که نقش عمده ای در پشتیبانی از مفهوم multithreading در برنامههای اندرویدی دارد. Schedulers بهطور اساسی تصمیم میگیرد Thread مورد نظر که کد خاصی دارد بر روی Thread Background اجرا شود یا بر روی Main Thread اجرا شود. جدا از آن، هر آنچه ما استفاده میکنیم فقط از کتابخانه RxJava است. برنامههای در دسترس بسیاری وجود دارد، بهطور گسترده در برنامه نویسی اندروید از ()Schedulers.io و ()AndroidSchedulers.mainThread استفاده میشود. در زیر لیستی از Schedulers موجود و معرفی مختصر آنها آورده شده است.()io:
این مورد از Schedulers برای انجام عملیات غیر فشرده Central Processing Unit) CPU) مانند برقراری تماسهای شبکه، خواندن دیسک / فایل ها، عملیات پایگاه داده و غیره مورد استفاده قرار میگیرد.()mainThread:
این مورد از Schedulers امکان دسترسی به Main Thread /Thread UI را در اندروید فراهم میکند. معمولاً عملیاتی مانند به روزرسانی UI، فعل و انفعالات کاربر در این موضوع اتفاق میافتد. ما نباید عملیات فشرده ای را در مورد این موضوع انجام دهیم زیرا در اپلیکیشن باعث glitchy یا ANR میشود.()newThread:
با استفاده از این مورد از Schedulers، هر زمان که یک کار انجام میشود ، Thread جدیدی ایجاد میشود. معمولاً توصیه میشود از این Schedulers استفاده نکنید، مگر اینکه عملیاتی طولانی انجام شود. Threadهای ایجاد شده از طریق ()newThread: مورد استفاده مجدد قرار نمیگیرند.()computation
این مورد از Schedulers میتوان برای انجام عملیات فشرده CPU مانند پردازش دادههای عظیم، پردازش bitmap و غیره استفاده کرد، تعداد Threadهای ایجاد شده با استفاده از این Scheduler به طور کامل به تعداد هستههای موجود CPU بستگی دارد.()single:
این مورد از Schedulers همه وظایف را به ترتیبی که اضافه شده است، انجام میدهد. این مورد میتواند در صورت لزوم اجرای متوالی مورد استفاده قرار گیرد.()immediate:
این Scheduler بلافاصله با مسدود کردن Thread اصلی (MainThread)، کار را به صورت همزمان انجام میدهد.()trampoline:
این Scheduler وظایفش را به روش First In - First Out انجام میدهد. تمام کارهای برنامه ریزی شده با محدود کردن تعداد Background Threads به یک، به صورت یک به یک انجام میشوند.()from:
این مورد از Schedulers به ما این امکان را میدهد که با محدود کردن تعداد Threadهایی که ایجاد میشود، Scheduler را از یک executor ایجاد کنیم. وقتی Thread Pool اشغال شد، وظایف صف میشوند. اکنون ما مفاهیم اساسی لازم را داریم. بیایید با برخی از مفاهیم اصلی RxJava که همه باید از آن آگاه باشند، شروع کنیم.مبانی RxJava : Observable، Observer
RxJava در مورد دو components مهم است: Observable و Observer علاوه بر این موارد، موارد دیگری مانندSchedulers ، Operators و Subscription نیز وجود دارد. Observable :Observable یک جریان داده است که برخی از کارها را انجام میدهد و داده را منتشر میکند. Observer :Observer بخش مهمی از Observable است. Observer دادههای منتشر شده توسط Observable را دریافت میکند. Subscription: پیوند بین Observable و Observer به عنوان Subscription خوانده میشود. چندین Observers کننده در یک Observable میتوانند مشترک باشند. Operator / Transformation: اپراتورها دادههای منتشر شده توسط Observable را قبل از اینکه Observer آنها را دریافت کند، تغییر میدهد. Schedulers :Schedulerها Threadها را تعیین و تصمیم گیری میکنند که Observable باید دادهها را منتشر کند و Observer کننده باید دادهها را از قبیل Background Thread ،Main Thread و... دریافت کند.نمونه پایهای از RxJava
اکنون ما در مورد برنامه نویسی Reactive ، RxJava و RxAndroid دانش نظری خوبی داریم. بیایید به چند نمونه کد برویم تا بهتر مفاهیم را درک کنیم.اضافه کردن وابستگیها (Dependencies)
برای شروع، باید وابستگیهای RxJava و RxAndroid را به پروژههای خود در build.gradle اضافه کنید و پروژه را همگام سازی (sync) کنید.// RxJava
implementation 'io.reactivex.rxjava2:rxjava:2.1.9'
// RxAndroid
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
مراحل اساسی
Observable -1 ایجاد میکنیم که داده را منتشر کند. در زیر ما یک Observable ایجاد کرده ایم که لیستی از اسامی حیوانات را منتشر میکند. در اینجا از اپراتور ()just برای انتشار چند نام حیوان استفاده میشود.Observable<String> animalsObservable = Observable.just("Eagle", "Bee", "Lion", "Dog", "Wolf");
Observer -2 ایجاد کنید که به Observable گوش دهد. Observer متدهای Interface زیر را برای شناخت وضعیت Observable ارائه میدهد.
- ()onSubscribe : وقتی Observer عضو Observable شود، این متد فراخوانی میشود.
- ()onNext : این متد هنگامی که Observable شروع به انتشار اطلاعات میکند، فراخوانی میشود.
- ()onError : در صورت بروز هرگونه خطا، متد onError () فراخوانی میشود.
- ()onComplete : هنگامی که یک Observable کننده انتشار همه موارد را انجام میدهد،()onComplete فراخوانی میشود.

Observer<String> animalsObserver = getAnimalsObserver();
private Observer<String> getAnimalsObserver() {
return new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String s) {
Log.d(TAG, "Name: " + s);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "All items are emitted!");
}
};
}
Observer -3 را در عضویت Observable قرار دهید تا بتواند شروع به دریافت داده کند. در اینجا، شما میتوانید دو روش دیگر، ()ObserveOn و ()subscribeOn را مشاهده کنید.
- (()SubscribOn (Schedulers.io : این مورد به Observable میگوید که وظیفه خود را بر روی یک Background Thread اجرا کند.
- (()observeOn (AndroidSchedulers.mainThread : این مورد به Observer میگوید که دادههای مربوط به Android Ui Thread را دریافت کند تا بتوانید هرگونه کار مرتبط با UI (رابط کاربری) را انجام دهید.
animalsObservable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(animalsObserver);
اگر برنامه را اجرا کنید، میتوانید خروجی زیر را در LogCat خود مشاهده کنید.
onSubscribe
Name: Eagle
Name: Bee
Name: Lion
Name: Dog
Name: Wolf
All items are emitted!
شما اولین برنامه RxJava خود را به همین راحتی نوشتید. ما در ادامه قصد داریم اطلاعات بیشتری درباره Schedulers و Observers کسب کنیم. اما در حال حاضر این اطلاعات برای شروع کافی است.
اساس Observable و Observer
در اینجا کد کاملی از مثال بالا آورده شده است. فعالیت را اجرا کنید و خروجی را در LogCat بررسی کنید.import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;
import info.androidhive.rxandroidexamples.R;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
public class Example1Activity extends AppCompatActivity {
/**
* Basic Observable, Observer, Subscriber example
* Observable emits list of animal names
*/
private static final String TAG = Example1Activity.class.getSimpleName();
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_example1);
// observable
Observable<String> animalsObservable = getAnimalsObservable();
// observer
Observer<String> animalsObserver = getAnimalsObserver();
// observer subscribing to observable
animalsObservable
.observeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(animalsObserver);
}
private Observer<String> getAnimalsObserver() {
return new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String s) {
Log.d(TAG, "Name: " + s);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "All items are emitted!");
}
};
}
private Observable<String> getAnimalsObservable() {
return Observable.just("Eagle", "Bee", "Lion", "Dog", "Wolf");
}
}
onSubscribe
Name: Eagle
Name: Bee
Name: Lion
Name: Dog
Name: Wolf
All items are emitted!
معرفی Disposable
در این مثال میخواهیم component جدیدی به نام Disposable را معرفی کنیم. Disposable: زمانی که Observer دیگر نمیخواهد منتظر رسیدن آیتمها از Observable باشد، بهطوری دیگر به آن گوش ندهد، از Disposable برای این کار استفاده میشود. در اندروید Disposable برای جلوگیری از memory leaks بسیار مفید است. برای مثال فرض کنیم که شما در حال فراخوانی طولانی شبکه هستید و رابط کاربری (UI) را به روز میکنید. زمانی که فراخوانی شبکه تکمیل و کار خود را تمام میکند، اگر Activity / Fragment از قبل از بین رفته (Destroyed) باشد، چون اشتراک Observer هنوز باقی است، سعی میکند Activity از قبل Destroyed شده را به روز کند. در این حالت میتواند باعث memory leaks شود. بنابراین با استفاده از Disposables، میتوان در صورت از بین رفتن Activity از این مورد جلوگیری کرد. در مثال زیر میتوانید مشاهده کنید که از Disposable استفاده میشود و فراخوانی ()disposable.dispose در ()onDestroy باعث اشتراک نداشتن با Observer میشود.import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import info.androidhive.rxandroidexamples.R;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableObserver;
import io.reactivex.schedulers.Schedulers;
public class Example2Activity extends AppCompatActivity {
/**
* Basic Observable, Observer, Subscriber example
* Observable emits list of animal names
* You can see Disposable introduced in this example
*/
private static final String TAG = Example2Activity.class.getSimpleName();
private Disposable disposable;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_example2);
// observable
Observable<String> animalsObservable = getAnimalsObservable();
// observer
Observer<String> animalsObserver = getAnimalsObserver();
// observer subscribing to observable
animalsObservable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribeWith(animalsObserver);
}
private Observer<String> getAnimalsObserver() {
return new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe");
disposable = d;
}
@Override
public void onNext(String s) {
Log.d(TAG, "Name: " + s);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "All items are emitted!");
}
};
}
private Observable<String> getAnimalsObservable() {
return Observable.just("Eagle", "Bee", "Lion", "Dog", "Wolf");
}
@Override
protected void onDestroy() {
super.onDestroy();
// don't send events once the activity is destroyed
disposable.dispose();
}
}
این کد همان خروجی مشابه نمونه قبلی را تولید میکند.
onSubscribe
Name: Eagle
Name: Bee
Name: Lion
Name: Dog
Name: Wolf
All items are emitted!
عملگرهای RxJava Operators) RxJava)
در این بخش مقالهی آموزش کامل RxJava در اندروید ، عملگرهای RxJava Operators) RxJava) را برای شما شرح میدهیم.- Introduction
- Map
- Buffer
- Debounce
- ThrottleFirst
- FlatMap
- ConcatMap
- SwitchMap
Map
یک تابع را برای هر مورد منتشر شده اعمال میکند. با استفاده از تابعی از آن، هر مورد منتشر شده را تغییر میدهد. (نظم ارسال حفظ میشود) نمودار Map :
Buffer
بهطور دوره ای items را از یک Observable به صورت bundles جمع میکند و به جای اینکه یکبار items را منتشر کند، bundles را منتشر میکند. (نظم ارسال حفظ میشود) نمودار :
Debounce
اگر یک زمان خاص گذشته باشد، یک Item را از یک Observable منتشر میکند بدون آنکه یک Item دیگر را منتشر کند. این عملگر برای کلیک دکمه (button clicks) بسیار عالی است. اگر کاربر بارها و بارها یک دکمه را کلیک کند، دیگر نیازی به اجرای چندین بار کار نیست. میتوانیم از اپراتور ()Debounce استفاده کنیم تا با معرفی یک بازه زمانی مجاز بین کلیک، کلیکهای خود را کنترل کنیم. اگر مدت زمانی سپری نشده باشد، میتوانیم از هر دو متد جلوگیری کنیم. (نظم ارسال حفظ میشود) نمودار :
ThrottleFirst
موارد منتشر شده توسط منبع Observable را که در یک بازه زمانی قرار دارند، فیلتر میکند.(نظم ارسال حفظ میشود) نمودار :
FlatMap
موارد منتشر شده توسط یک Observable را به Observables تبدیل میکند، و سپس انتشار از آن را به یک Observable Single قسمت میکند. اگر با LiveData آشنا باشید، MediatorLiveData میتواند کاری بسیار مشابه انجام دهد. در مورد ()FlatMap در ادامه بیشتر صحبت میکنیم. (نظم ارسال حفظ نمیشود) نمودار :
ConcatMap
موارد منتشر شده توسط یک Observable را به Observables تبدیل میکند. این در اصل همان مورد ()FlatMap است، اما نظم ارسال حفظ میشود. اما از آنجا که ()ConcatMap باید منتظر بماند تا هر یک از Observable کار خود را انجام دهند پس از نظر فنی آن غیر همزمان نیست. (نظم ارسال حفظ میشود) نمودار :
SwitchMap
()SwitchMap آیتمهای منتشر شده توسط یک Observable را به یک Observable تبدیل میکند درست مثل ()ConcatMap و ()FlatMap. تفاوت این است که به محض مشترک شدن یک Observer جدید، Observer قبلی را لغو میکند. اساساً ()SwitchMap محدودیتی را حل میکند که ()ConcatMap و ()FlatMap هم دارند.(نظم ارسال حفظ میشود) نمودار :
Observers چندگانه و CompositDisposable
یک مورد را در نظر بگیرید که دارای چندین Observables و Observers باشد. Dispose کردن آنها یک به یک در Destroy یک کار خسته کننده است و ممکن است در معرض خطا باشد به این دلیل که شاید شما Dispose کردن را فراموش کنید. در این حالت میتوانیم از CompositeDisposable استفاده کنیم.CompositeDisposable
CompositeDisposable میتواند لیست اشتراکها (subscriptions) را در یک استخر حفظ کند و میتواند همه آنها را یک باره Dispose کند. معمولاً ما ()compositeDisposable.clear را در ()onDestroy فراخوانی میکنیم اما شما میتوانید جای دیگری که خودتان میخواهید آن را فراخوانی کنید. در مثال زیر، شما میتوانید متوجه شوید که دو animalsObserver ،Observer و animalsObserverAllCaps در همان Observable مشترک هستند. هر دو Observer دادههای یکسانی را دریافت میکنند اما دادهها با اعمال اپراتورهای مختلف در stream تغییر میکنند.- AnimalObserver : از عملگر ()filter برای فیلتر کردن نام حیوانات با حرف "b" استفاده میشود.
- AnimalObserverAllCaps : عملگر ()filter برای فیلتر کردن نام حیوانات با شروع حرف "c" استفاده میشود. از نقشه عملگر map() بعدی برای تبدیل نام هر حیوانی به حروف بزرگ استفاده میشود. استفاده از چندین اپراتور روی یک observer مفرد، زنجیره عملگرها نام (chaining of operators) دارد.
import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import info.androidhive.rxandroidexamples.R;
import io.reactivex.Observable;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.reactivex.observers.DisposableObserver;
import io.reactivex.schedulers.Schedulers;
public class Example4Activity extends AppCompatActivity {
/**
* Basic Observable, Observer, Subscriber example
* Observable emits list of animal names
* You can see filter() operator is used to filter out the
* animal names that starts with letter `b`
*/
private static final String TAG = Example4Activity.class.getSimpleName();
private CompositeDisposable compositeDisposable = new CompositeDisposable();
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_example4);
Observable<String> animalsObservable = getAnimalsObservable();
DisposableObserver<String> animalsObserver = getAnimalsObserver();
DisposableObserver<String> animalsObserverAllCaps = getAnimalsAllCapsObserver();
/**
* filter() is used to filter out the animal names starting with `b`
* */
compositeDisposable.add(
animalsObservable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.filter(new Predicate<String>() {
@Override
public boolean test(String s) throws Exception {
return s.toLowerCase().startsWith("b");
}
})
.subscribeWith(animalsObserver));
/**
* filter() is used to filter out the animal names starting with 'c'
* map() is used to transform all the characters to UPPER case
* */
compositeDisposable.add(
animalsObservable
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.filter(new Predicate<String>() {
@Override
public boolean test(String s) throws Exception {
return s.toLowerCase().startsWith("c");
}
})
.map(new Function<String, String>() {
@Override
public String apply(String s) throws Exception {
return s.toUpperCase();
}
})
.subscribeWith(animalsObserverAllCaps));
}
private DisposableObserver<String> getAnimalsObserver() {
return new DisposableObserver<String>() {
@Override
public void onNext(String s) {
Log.d(TAG, "Name: " + s);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "All items are emitted!");
}
};
}
private DisposableObserver<String> getAnimalsAllCapsObserver() {
return new DisposableObserver<String>() {
@Override
public void onNext(String s) {
Log.d(TAG, "Name: " + s);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.d(TAG, "All items are emitted!");
}
};
}
private Observable<String> getAnimalsObservable() {
return Observable.fromArray(
"Ant", "Ape",
"Bat", "Bee", "Bear", "Butterfly",
"Cat", "Crab", "Cod",
"Dog", "Dove",
"Fox", "Frog");
}
@Override
protected void onDestroy() {
super.onDestroy();
// don't send events once the activity is destroyed
compositeDisposable.clear();
}
}
اگر این مثال را اجرا کنید، میتوانید خروجی زیر را مشاهده کنید. در اینجا نام حیواناتی که با "B" شروع میشوند توسط animalsObserver ،Observed میشوند و همه نامهای شروع شده با حرف "c" و همه animalsObserverAllCaps با Observed میشوند.
Name: Bat
Name: Bee
Name: Bear
Name: Butterfly
All items are emitted!
Name: CAT
Name: CRAB
Name: COD
All items are emitted!
Flowables
- Flowable چیست؟
- Backpressure
- Backpressure Strategies
- Flowables and Observables
Flowables موضوع قابل توجهی است. از آنجا که این یک دوره مبتدی است، ما نمیخواهیم به جزییات Flowables بپردازیم. ما در انتهای این مقاله منابعی را درج خواهیم کرد که در صورت تمایل میتوانید اطلاعات بیشتری درباره Flowables پیدا کنید (که توصیه میکنیم آنها را بررسی کنید).
در اصل هدف این بخش از مقاله ارائه توضیحی در مورد مهمترین مفاهیم مربوط به Flowables است تا شما بدانید که چه اطلاعاتی را در کجا جستجو کنید.
Flowable چیست ؟
Flowables در RxJava2 به عنوان راه حلی برای یک مشکل معرفی شد. مشکل مربوط به backpressure بود. اگر نمیدانید backpressure چیست، در بخش بعدی در مورد آن صحبت خواهیم کرد.
بهطور کلی شما در RxJava2 به جای یک کلاس از Observable دو کلاس از آن را دارید:
- Observable
- Flowable
- Observables are not backpressure-aware
- Flowables are backpressure-aware
backpressure
از اسناد Rx: backpressure زمانی است که در یک خط طولانی پردازشی "Flowable" نمیتواند مقادیر را با سرعت کافی پردازش کند و به روشی برای کاهش سرعت تولیدکننده بالادست احتیاج دارد. مانند این است که اگر دادههای زیادی را برای Observerها پخش میکنید و Observerها نتوانند از پس آن برآیند، در موقعیت هایی مانند این ممکن است باعث Out of Memory شود. دستگاه به معنای واقعی کلمه نمیتواند اطلاعات ورودی را به اندازه کافی سریع اداره کند. در رابطه با backpressure، برخی اصطلاحات وجود دارد که باید در مورد آنها بدانید: منابع داغ و منابع سرد (Hot sources و Cold sources).- Hot sources: به این عنوان به شکل یک فشار نسبی فکر کنید. Observables بدون در نظر گرفتن اینکه آیا آنها میتوانند به کار خود ادامه دهند یا خیر، هدایت اشیا را به سمت Observers ادامه میدهند. در اصل Observables اشیاء را منتشر میکنند و این امر به عهده Observers است که به کار خود ادامه دهند. هنگامی که Observables نتواند اشیای منتشر شده را نگهداری کند، بایدبافر شوند یا به طریقی آنها handle شوند. در ادامه در مورد برخی استراتژیهای بافر صحبت خواهیم کرد.
- Cold sources: این مورد برخلاف Hot sources است. میتوان گفت که اشیاء توسط Observables به صورت lazily منتشر میشوند. به معنی اینکه: Observables هنگامی که Observerها اشیاء را میخواهند و با یک نرخ مناسب برای Observer، اشیاء را منتشر میکنند. اشیاء منتشر شده توسط Observables نیازی به بافر ندارند زیرا تمام فرایند بهطور کلی در اختیار Observables است. به آن به عنوان یک رابطه کششی نگاه کنید.Observerها وقتی اشیاء یا آیتمها را میخواهند میتوانند از Observables آنها را دریافت کنند.
Backpressure Strategies
همان طور که در بخش بالا گفتیم، Hot sources نیاز به یک استراتژی بافر دارند. تعدادی از اپراتورهای backpressure وجود دارد که میتوانید برای اجرای استراتژیهای مختلف بافر، در Flowables به کار ببرید.استراتژی 1 ()onBackpressureBuffer :
این اپراتور بافر نامحدود را بین منبع upstream و اپراتور downstream معرفی میکند.حال 'نامحدود' به چه معنی است؟
"نامحدود بودن به معنای عدم وجود JVM در حافظه است، میتواند با هر هزینه ای که از یک منبع پراکنده حاصل میشود، آن را handle کند.
مثال onBackPressure:
حتی با وجود 1 میلیون عدد صحیح ، این مورد باعث خارج شدن از حافظه (Out of Memory Exception) نمیشود.
Flowable.range(0, 1000000)
.onBackpressureBuffer()
.observeOn(Schedulers.computation())
.subscribe(new FlowableSubscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.e(TAG, "onError: ", t);
}
@Override
public void onComplete() {
}
});
سایر استراتژی ها:
Observable و Flowable
Flowables قابل تبدیل به Observables و Observables قابل تبدیل به Flowables هستند.
Observable to Flowable:
هنگام تبدیل یک Observable به یک Flowable ، باید یک استراتژی بافر را مشخص کنید.- BackpressureStrategy.BUFFER
- BackpressureStrategy.DROP
- BackpressureStrategy.ERROR
- BackpressureStrategy.LATEST
- BackpressureStrategy.MISSING
Observable<Integer> observable = Observable
.just(1, 2, 3, 4, 5);
Flowable<Integer> flowable = observable.toFlowable(BackpressureStrategy.BUFFER);
Flowable to Observable:
با فراخوانی متد ()toObservable روی Flowable میتوانید یک Flowable را به یک Observable تبدیل کنید.Observable<Integer> observable = Observable
.just(1, 2, 3, 4, 5);
Flowable<Integer> flowable = observable.toFlowable(BackpressureStrategy.BUFFER);
Observable<Integer> backToObservable = flowable.toObservable(
ساخت عملگرهای (Operators : Create، Just، Range، Repeat)
- مقدمه
- چرا باید این اپراتورها برای شما مهم باشد
- ()create
- ()just
- ()range
- ()repeat
مقدمه
در این بخش از مقاله میخواهیم به شما چهار اپراتور را آموزش دهیم که کارشان ایجاد Observables است.- ()create
- ()just
- ()range
- ()repeat
چرا باید این اپراتورها برای شما مهم باشند
اگر میخواهید یک Single Observable ایجاد کنید، میتوانید از اپراتورهای "()just" و "()create" استفاده کنید. اپراتور ()just توانایی پذیرش لیستی تا 10 ورودی را دارد. با این حال، ما امتیازی برای آن نمیبینیم زیرا اپراتورهای دیگری نیز وجود دارند که لیست را میپذیرند و فقط به 10 ورودی محدود نمیشوند. بنابراین ما فقط برای ایجاد یک Single Observable قابل استفاده، آن را به شما پیشنهاد میکنیم.()range و ()repeat برای جایگزین کردن حلقهها (loops)، هر پردازش تکراری یا متد تکرار شونده عالی هستند. میتوانید کارها را روی یک Background Thread انجام دهید و نتایج را در Main Thread مشاهده کنید.
()create
Input : T<Output : Observable<T
اپراتور ()create به طور مشخصی برای ایجاد Observables استفاده میشود. این سطحیترین مورد استفاده است اما همچنین میتواند انعطاف پذیر باشد. اگر میخواهید یک Observeable ایجاد کنید و هیچ یک از اپراتورهای دیگر متناسب با نیازهای شما نیستند، اپراتور ()create را در نظر بگیرید.
ما میخواهیم به شما دو روش مختلف برای ایجاد یک Observable با استفاده از اپراتور ()create را نشان دهیم.
- ایجاد یک Observable از یک single object
-
ایجاد یک Observable از لیستی از اشیاء
ایجاد یک Observable از یک single object
- یک شیء را برای تبدیل شدن به یک Observable تعریف کنید.
- یک Observable ایجاد کنید.
- Observable را Subscribe کرده و شیء منتشر شده را دریافت کنید.
// Instantiate the object to become an Observable
final Task task = new Task("Walk the dog", false, 4);
// Create the Observable
Observable<Task> singleTaskObservable = Observable
.create(new ObservableOnSubscribe<Task>() {
@Override
public void subscribe(ObservableEmitter<Task> emitter) throws Exception {
if(!emitter.isDisposed()){
emitter.onNext(task);
emitter.onComplete();
}
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
// Subscribe to the Observable and get the emitted object
singleTaskObservable.subscribe(new Observer<Task>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Task task) {
Log.d(TAG, "onNext: single task: " + task.getDescription());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
خروجی کد بالا به صورت زیر میباشد.
MainActivity: onNext: single task: Walk the dog
ایجاد یک Observable از لیستی از اشیاء
- Observable را ایجاد کنید.
- در داخل متد Subscribe از طریق لیست Taskهای تکرار شده میتوانید ()onNext را فراخوانی کنید.
- پس از اتمام حلقه (Loop)، میتوانید متد ()onComplete را فراخوانی کنید.
- Observable را Subscribe کرده و شیء منتشر شده را دریافت کنید.
// Create the Observable
Observable<Task> taskListObservable = Observable
.create(new ObservableOnSubscribe<Task>() {
@Override
public void subscribe(ObservableEmitter<Task> emitter) throws Exception {
// Inside the subscribe method iterate through the list of tasks and call onNext(task)
for(Task task: DataSource.createTasksList()){
if(!emitter.isDisposed()){
emitter.onNext(task);
}
}
// Once the loop is complete, call the onComplete() method
if(!emitter.isDisposed()){
emitter.onComplete();
}
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
// Subscribe to the Observable and get the emitted objects
taskListObservable.subscribe(new Observer<Task>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Task task) {
Log.d(TAG, "onNext: task list: " + task.getDescription());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
خروجی کد بالا به صورت زیر میباشد.
MainActivity: onNext: task list: Take out the trash
MainActivity: onNext: task list: Walk the dog
MainActivity: onNext: task list: Make my bed
MainActivity: onNext: task list: Unload the dishwasher
MainActivity: onNext: task list: Make dinner
()just
([Input : T... (Optional Array[10 <Output : Observable<T در این مثال ما از اپراتور "()just" استفاده میکنیم و مجموعه داده ای از 10 رشته (String) را منتقل میکنیم. توجه : شما میتوانید حداکثر 10 اشیاء را به اپراتور "()just" منتقل کنید و نمیتوانید همیشه از آن استفاده کنید.Observable.just("first", "second", "third", "fourth", "fifth", "sixth",
"seventh", "eighth", "ninth", "tenth")
.subscribeOn(Schedulers.io()) // What thread to do the work on
.observeOn(AndroidSchedulers.mainThread()) // What thread to observe the results on
.subscribe(new Observer<String>() { // view the results by creating a new observer
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: called");
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: " + s);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: ", e);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: done...");
}
});
خروجی کد بالا به صورت زیر میباشد.
onNext: first
onNext: second
onNext: third
onNext: fourth
onNext: fifth
onNext: sixth
onNext: seventh
onNext: eighth
onNext: ninth
onNext: tenth
به خاطر داشته باشید که این مثال به هیچ وجه عملی نیست، اما همچنان دارای ارزش است زیرا تمام عملیات از همان ساختار کلی پیروی میکنند. بنابراین ما از آن برای نشان دادن ساختار کلی عملیات استفاده میکنیم.
()range
[Input: [x, x+1, ..., x + y <Output: Observable<Integer ()range بسیار ساده و به راحتی قابل درک است. محدوده ای از اشیاء را منتشر میکند. به عنوان ورودی، مقدار min و مقدار max را وارد میکنید. تمام مقادیر موجود در این محدوده را منتشر میکند.Observable.range(0,11)
.observeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
خروجی کد بالا به صورت زیر میباشد.
onNext: 0
onNext: 1
onNext: 2
onNext: 3
onNext: 4
onNext: 5
onNext: 6
onNext: 7
onNext: 8
onNext: 9
onNext: 10
()repeat
()repeat یکی دیگر از اپراتورها است. repeat باید در رابطه با اپراتور دیگری استفاده شود. مثال خوب با اپراتور ()range است.Observable.range(0,3)
.repeat(2)
.observeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
خروجی کد بالا به صورت زیر میباشد.
onNext: 0
onNext: 1
onNext: 2
onNext: 0
onNext: 1
onNext: 2
ساخت عملگرهای (Operators : Interval - Timer)
- مقدمه
- ()interval
- ()timer
مقدمه
یک مشکل بسیار رایج که ما برنامه نویسان اندروید با آن مواجه هستیم، "اجرای یک متد" یا "اجرای نوعی فرآیند" در یک بازه زمانی خاص یا دارای تأخیر است. به طور معمول، عملکردهایی مانند این با استفاده از فراخوانی Runnable و Handler با فراخوانی متد ()postDelayed اجرا میشود. در اینجا مثالی آورده شده است که log کل زمان سپری شده در هر 1 ثانیه را چاپ میکند تا زمانی که 5 تکرار را اجرا کند:final Handler handler = new Handler();
final Runnable runnable = new Runnable() {
int elapsedTime = 0;
@Override
public void run() {
if(elapsedTime >= 5){ // if greater than 5 seconds
handler.removeCallbacks(this);
}
else{
elapsedTime = elapsedTime + 1;
handler.postDelayed(this, 1000);
Log.d(TAG, "run: " + elapsedTime);
}
}
};
handler.postDelayed(runnable, 1000);
خروجی کد بالا به صورت زیر میباشد.
MainActivity: run: 1
MainActivity: run: 2
MainActivity: run: 3
MainActivity: run: 4
MainActivity: run: 5
این کار را انجام میدهد، اما خیلی خوب نیست و اضافه کردن ویژگیهای بیشتر معمولاً سخت است.
RxJava اپراتورهای مرتبط با زمان مختلفی را برای کمک به این نوع کارها دارد.
()interval
اپراتور Interval یک بازه Observable را با فاصله زمانی ثابت از زمان انتخاب شما در بین انتشار، یک دنباله نامحدود از اعداد صعودی را منتشر میکند. ما با استفاده از اپراتور ()takeWhile میتوانیم هر مقدار منتشر شده را چک کنیم. اگر مقدار از 5 بیشتر شود، Observable نتایج را متوقف میکند. Observable : کدهای Observable// emit an observable every time interval
Observable<Long> intervalObservable = Observable
.interval(1, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.takeWhile(new Predicate<Long>() { // stop the process if more than 5 seconds passes
@Override
public boolean test(Long aLong) throws Exception {
return aLong <= 5;
}
})
.observeOn(AndroidSchedulers.mainThread());
Subscribe to the Observable :
در این بخش Observable را Subscribe میکنیم.
intervalObservable.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "onNext: interval: " + aLong);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
خروجی کد بالا به صورت زیر میباشد.
MainActivity: onNext: interval: 0
MainActivity: onNext: interval: 1
MainActivity: onNext: interval: 2
MainActivity: onNext: interval: 3
MainActivity: onNext: interval: 4
MainActivity: onNext: interval: 5
()timer
اپراتور Timer یک Observable را ایجاد میکند که یک آیتم خاص را بعد از مدت زمان مشخص شده، منتشر میکند. Observable : کدهای Observable// emit single observable after a given delay
Observable<Long> timeObservable = Observable
.timer(3, TimeUnit.SECONDS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
Subscribe to the Observable :
در این بخش Observable را Subscribe میکنیم.
timeObservable.subscribe(new Observer<Long>() {
long time = 0; // variable for demonstating how much time has passed
@Override
public void onSubscribe(Disposable d) {
time = System.currentTimeMillis() / 1000;
}
@Override
public void onNext(Long aLong) {
Log.d(TAG, "onNext: " + ((System.currentTimeMillis() / 1000) - time) + " seconds have elapsed." );
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
خروجی کد بالا به صورت زیر میباشد.
MainActivity: onNext: 3 seconds have elapsed.
ساخت عملگرهای (fromArray, fromIterable, fromCallable)
- مقدمه
- ()fromArray
- ()fromIterable
- ()fromCallable
public class Task {
private String description;
private boolean isComplete;
private int priority;
public Task(String description, boolean isComplete, int priority) {
this.description = description;
this.isComplete = isComplete;
this.priority = priority;
}
// getter and setters ....
}
()fromarray
()fromarray آرایه ای از اشیاء را به عنوان ورودی گرفته و خروجی یک Observable میباشد. این متد بلافاصله اجرا نمیشود. ()fromarray تنها زمانی که Subscribed ،Subscribe شود، این متد را اجرا میکند. چه زمانی باید از این اپراتور استفاده کنید؟ برای انتشار تعداد دلخواه از مواردی که به صورت مقدماتی شناخته شده اند. []Input: T <Ouput: Observable<T مثال ()fromarray :ایجاد یک آرایه Observable از task objects.
Task[] list = new Task[5];
list[0] = (new Task("Take out the trash", true, 3));
list[1] = (new Task("Walk the dog", false, 2));
list[2] = (new Task("Make my bed", true, 1));
list[3] = (new Task("Unload the dishwasher", false, 0));
list[4] = (new Task("Make dinner", true, 5));
Observable<Task> taskObservable = Observable
.fromArray(list)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
taskObservable.subscribe(new Observer<Task>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Task task) {
Log.d(TAG, "onNext: : " + task.getDescription());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});)
()fromIterable
()fromIterable میتواند به عنوان ورودی و خروجی در Observable، اشیاء قابل تکرار را بگیرد. انواع Iterable شامل: List ،ArrayList ،Set و غیره ... میباشد. این متد بلافاصله اجرا نمیشود. ()fromIterable تنها زمانی که Subscribed ،Subscribe شود، این متد را اجرا میکند. چه زمانی باید از این اپراتور استفاده کنید؟ برای انتشار تعداد دلخواه از مواردی که به صورت مقدماتی شناخته شده اند استفاده میشود. همانند اپراتور ()fromarray اما قابل تکرار است.List<Task> taskList = new ArrayList<>();
taskList.add(new Task("Take out the trash", true, 3));
taskList.add(new Task("Walk the dog", false, 2));
taskList.add(new Task("Make my bed", true, 1));
taskList.add(new Task("Unload the dishwasher", false, 0));
taskList.add(new Task("Make dinner", true, 5));
Observable<Task> taskObservable = Observable
.fromIterable(taskList)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
taskObservable.subscribe(new Observer<Task>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Task task) {
Log.d(TAG, "onNext: : " + task.getDescription());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
()fromCallable
()fromCallable یک بلوک کد را اجرا میکند (معمولاً یک متد) و نتیجه را برمیگرداند. این متد بلافاصله اجرا نمیشود. ()fromIterable تنها زمانی که Subscribed ،Subscribe شود ، این متد را اجرا میکند. چه زمانی باید از این اپراتور استفاده کنید؟ برای تولید یک مورد Single Observable در صورت تقاضا استفاده میشود. مانند فراخوانی متد برای بازیابی برخی اشیاء یا لیستی از اشیاء. <Input: Callable<TOutput: T
مثال ()fromCallable: شما باید یک Task Object را از local SQLite database cache برگردانید. کلیهی عملیات پایگاه داده باید بر روی background thread انجام شود. سپس نتیجه به Main Thread بازگردانده میشود.- برای اجرای متد بر روی background thread میتوانید از یک callable استفاده کنید.
- سپس نتایج را به Main Thread برگردانید.
// create Observable (method will not execute yet)
Observable<Task> callable = Observable
.fromCallable(new Callable<Task>() {
@Override
public Task call() throws Exception {
return MyDatabase.getTask();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
// method will be executed since now something has subscribed
callable.subscribe(new Observer<Task>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Task task) {
Log.d(TAG, "onNext: : " + task.getDescription());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
ساخت عملگر (fromFuture)
- مقدمه
- ()fromFuture
- مثال
مقدمه
در اینجا در مورد اپراتور ()fromFuture صحبت خواهیم کرد. ما توضیح مختصری خواهیم داد و سپس به مثال بسیار مفصلی میپردازیم. نکته: ما انتظار داریم که شما نحوهی استفاده از معماری Model, View, ViewModel) MVVM)،Retrofit را بدانید و آن را درک کنید زیرا ()fromFuture چیزی است که برای معماری MVVM طراحی شده است.()fromFuture
قبل از اینکه در مورد آنچه این متد انجام میدهد صحبت کنیم، بگذارید به طور خلاصه توضیح دهیم که Future چیست.Future اساساً یک کار معلق است.
این یک وعده برای نتیجه از کاری است که در آینده انجام میشود.
این کار میتواند از طریق Runnable یا Callable انجام شود (با یک Rx Callable اشتباه نمیشود). مثالی از چیزی که میتواند این runnables یا callableها را اجرا کند، یک ExecutorService است.
بنابراین اگر میخواهید از Executor در رابطه با RxJava استفاده کنید، در اصل از این امر استفاده خواهید کرد.
<Input: Future<T
<Ouput: Observable<T
در این مثال، ما به شما نشان میدهیم که چگونه میتوانید با استفاده از Retrofit و MVVM با Future Observable، یک درخواست شبکه را به REST API انجام دهید.
وابستگی (Dependencies)
ما برای این مثال به وابستگی Retrofit احتیاج داریم زیرا دارای درخواست شبکه (network request) هستیم. همچنین وابستگی به LiveData و ViewModels داریم زیرا این مثال از معماری MVVM استفاده میکند.
همچنین وابستگی به RxJava Call Adapter داریم. به طور پیش فرض، شما نمیتوانید یک Observable یا یک Flowable را از یک درخواست Retrofit بازگردانید. این وابستگی به شما امکان میدهد تا Retrofit Call objects را به Flowables / Observables تبدیل کنید.
implementation "com.squareup.retrofit2:adapter-rxjava2:2.5.0"
def retrofitVersion = "2.5.0"
def rxjava_version = '2.2.7'
def rxandroid_version = '2.1.1'
def lifecycle_version = "1.1.1"
// ViewModel and LiveData
implementation "android.arch.lifecycle:extensions:$lifecycle_version"
// Retrofit
implementation "com.squareup.retrofit2:retrofit:$retrofitVersion"
implementation "com.squareup.retrofit2:converter-gson:$retrofitVersion"
// RxJava
implementation "io.reactivex.rxjava2:rxjava:$rxjava_version"
// RxAndroid
implementation "io.reactivex.rxjava2:rxandroid:$rxandroid_version"
// RxJava Call Adapter
implementation "com.squareup.retrofit2:adapter-rxjava2:2.5.0"
RequestApi :
Request Api در اینجا متدی است که Retrofit برای ایجاد query به REST API استفاده میکند. توجه کنید که یک Observable برمیگردد.
این به دلیل وابستگی RxJava Call Adapter میباشد که در بخش وابستگیها به آن اشاره کردیم، که میتواند یک Observable را بازگرداند.
public interface RequestApi {
@GET("todos/1")
Observable<ResponseBody> makeObservableQuery();
}
ServiceGenerator :
این کلاس مسئول ایجاد نمونه Retrofit، مراجعه و گرفتن کلاس RequestApi است که ما در بالا تعریف کردیم.
نکته ای که در اینجا باید به آن توجه کنیم، فراخوانی متد addCallAdapterFactory است. بدون آن، ما نمیتوانیم
Retrofit Call objects را به Flowables / Observables تبدیل کنیم.
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
public class ServiceGenerator {
public static final String BASE_URL = "https://jsonplaceholder.typicode.com";
private static Retrofit.Builder retrofitBuilder =
new Retrofit.Builder()
.baseUrl(BASE_URL)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create());
private static Retrofit retrofit = retrofitBuilder.build();
private static RequestApi requestApi = retrofit.create(RequestApi.class);
public static RequestApi getRequestApi(){
return requestApi;
}
}
Repository :
در اینجا مثالی از "bread and butter" آورده شده است. اینجاست که ما از Executor برای فراخوانی شبکه استفاده میکنیم و سپس Future Observable را به ViewModel باز میگردانیم.
public class Repository {
private static Repository instance;
public static Repository getInstance(){
if(instance == null){
instance = new Repository();
}
return instance;
}
public Future<Observable<ResponseBody>> makeFutureQuery(){
final ExecutorService executor = Executors.newSingleThreadExecutor();
final Callable<Observable<ResponseBody>> myNetworkCallable = new Callable<Observable<ResponseBody>>() {
@Override
public Observable<ResponseBody> call() throws Exception {
return ServiceGenerator.getRequestApi().makeObservableQuery();
}
};
final Future<Observable<ResponseBody>> futureObservable = new Future<Observable<ResponseBody>>(){
@Override
public boolean cancel(boolean mayInterruptIfRunning) {
if(mayInterruptIfRunning){
executor.shutdown();
}
return false;
}
@Override
public boolean isCancelled() {
return executor.isShutdown();
}
@Override
public boolean isDone() {
return executor.isTerminated();
}
@Override
public Observable<ResponseBody> get() throws ExecutionException, InterruptedException {
return executor.submit(myNetworkCallable).get();
}
@Override
public Observable<ResponseBody> get(long timeout, TimeUnit unit) throws ExecutionException, InterruptedException, TimeoutException {
return executor.submit(myNetworkCallable).get(timeout, unit);
}
};
return futureObservable;
}
}
View Model :
از آنجا که ما از معماری MVVM استفاده میکنیم، باید یک ViewModel داشته باشیم. فقط یک متد واحد وجود دارد که به Repository دسترسی پیدا میکند و Future Observable را برمی گرداند.
public class MainViewModel extends ViewModel {
private Repository repository;
public MainViewModel() {
repository = Repository.getInstance();
}
public Future<Observable<ResponseBody>> makeFutureQuery(){
return repository.makeFutureQuery();
}
}
MainActivity:
و سرانجام در MainActivity ،query با Subscribe کردن observable شروع میشود و پاسخ از سرور در Log چاپ میشود.MainViewModel viewModel = ViewModelProviders.of(this).get(MainViewModel.class);
try {
viewModel.makeFutureQuery().get()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<ResponseBody>() {
@Override
public void onSubscribe(Disposable d) {
Log.d(TAG, "onSubscribe: called.");
}
@Override
public void onNext(ResponseBody responseBody) {
Log.d(TAG, "onNext: got the response from server!");
try {
Log.d(TAG, "onNext: " + responseBody.string());
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: ", e);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete: called.");
}
});
} catch (ExecutionException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
خروجی کد بالا به صورت زیر میباشد.
MainActivity: onNext: got the response from server!
MainActivity: onNext: {
"userId": 1,
"id": 1,
"title": "delectus aut autem",
"completed": false
}
ساخت عملگر (fromPublisher)
- مقدمه
- ()fromPublisher
- مثال
مقدمه
در این بخش در مورد اپراتور ()fromPublisher صحبت خواهیم کرد. ما توضیح مختصری خواهیم داد و سپس به مثالی بسیار مفصل میپردازیم. نکته: ما انتظار داریم شما بدانید که چگونه از معماری MVVM و Retrofit استفاده کنید.()fromPublisher
()fromPublisher برای تبدیل اشیاء LiveData به reactive streams یا از reactive streams به اشیاء LiveData استفاده میشود. احتمالا هرگز نمیخواهید LiveData را به یک reactive Observable تبدیل کنید، غیر از موارد نادری که ممکن است بخواهید از اپراتور استفاده کنید. از طرف دیگر، تبدیل یک Observable به LiveData در واقع بسیار کاربردی است. موارد زیادی برای این کار وجود دارد. مثال زیر یک درخواست شبکه با استفاده از Retrofit ایجاد میکند و پاسخی را به صورت یک شیء Flowable بازیابی میکند. Publisher interface ،Flowable را پیاده سازی میکند. اشیاء Publisher دقیقاً مانند subscribed ،Observable میشوند. <Input: LiveData<T <Ouput: Observable<T یا <Input: Observable<T <Ouput: LiveData<Tمثالی از FromPublisher:
مجوز اینترنت (Internet Permission): Internet Permission را به Manifest اضافه کنید زیرا میخواهیم با استفاده از Retrofit درخواستی انجام دهیم.<uses-permission android:name="android.permission.INTERNET"/>
وابستگی (Dependencies):
ما به وابستگیهای زیادی احتیاج داریم. انتظار داریم که شما در حال حاضر بدانید که چگونه از Retrofit و معماری MVVM استفاده کنید. بنابراین تنها دو وابستگی زیر را میخواهیم توضیح دهیم:
(RxJava Call Adapter (Call object to Observable:
به طور پیش فرض، شما نمیتوانید یک Observable یا یک flowable را از یک Retrofit request بازگردانید.
implementation "com.squareup.retrofit2:adapter-rxjava2:2.9.0"
جدیدترین نسخه این کتابخانه را میتوانید دریافت کنید.
تبدیل LiveData به Observable:
به ما این امکان را میدهد تا Observables (یا Flowables) را به اشیاء LiveData تبدیل کنیم.
implementation "android.arch.lifecycle:reactivestreams:1.1.1"
جدیدترین نسخه این کتابخانه را میتوانید دریافت کنید.
def retrofitVersion = "2.5.0"
def rxjava_version = '2.2.7'
def rxandroid_version = '2.1.1'
def lifecycle_version = "1.1.1"
// ViewModel and LiveData
implementation "android.arch.lifecycle:extensions:$lifecycle_version"
// Retrofit
implementation "com.squareup.retrofit2:retrofit:$retrofitVersion"
implementation "com.squareup.retrofit2:converter-gson:$retrofitVersion"
// RxJava
implementation "io.reactivex.rxjava2:rxjava:$rxjava_version"
// RxAndroid
implementation "io.reactivex.rxjava2:rxandroid:$rxandroid_version"
// RxJava Call Adapter (Call object to Observable)
implementation "com.squareup.retrofit2:adapter-rxjava2:2.5.0"
// Convert Observable to LiveData
implementation "android.arch.lifecycle:reactivestreams:1.1.1"
RequestApi :
این کلاس جایی است که متدهای Retrofit request در آن هستند. در این حالت تنها یک مورد وجود دارد. توجه کنید که یک شیء Flowable را برمی گرداند.public interface RequestApi {
@GET("todos/1")
Flowable<ResponseBody> makeQuery();
}
ServiceGenerator:
این کلاس مسئول ایجاد نمونه Retrofit مراجعه و گرفتن کلاس RequestApi است که ما در بالا تعریف کردیم. نکته: موردی که در اینجا باید به آن توجه کنیم ، فراخوانی متد addCallAdapterFactory است. بدون آن، ما نمیتوانیم Flowables / Observables را به Retrofit Call objects تبدیل کنیم..addCallAdapterFactory(RxJava2CallAdapterFactory.create())
public class ServiceGenerator {
public static final String BASE_URL = "https://jsonplaceholder.typicode.com";
private static Retrofit.Builder retrofitBuilder =
new Retrofit.Builder()
.baseUrl(BASE_URL)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create());
private static Retrofit retrofit = retrofitBuilder.build();
private static RequestApi requestApi = retrofit.create(RequestApi.class);
public static RequestApi getRequestApi(){
return requestApi;
}
}
Repository:
همان طور که چندین بار بیان کردیم، ما از معماری MVVM استفاده میکنیم. و معماری مناسب MVVM همیشه شامل یک repository (حداقل در هر زمان انجام معاملات بانک اطلاعاتی) است. این جایی است که جادو اتفاق میافتد. متد ()makeReactiveQuery از متد ()fromPublisher برای تبدیل یک Flowable به LiveData استفاده میکند. توجه کنید که ما در یک background thread مشترک (Subscribe) میشویم! این خیلی مهم است. کلیه عملیات شبکه باید بر روی یک background thread انجام شود.public class Repository {
private static Repository instance;
public static Repository getInstance(){
if(instance == null){
instance = new Repository();
}
return instance;
}
public LiveData<ResponseBody> makeReactiveQuery(){
return LiveDataReactiveStreams.fromPublisher(ServiceGenerator.getRequestApi()
.makeQuery()
.subscribeOn(Schedulers.io()));
}
}
ViewModel:
ما یک بار دیگر که از معماری MVVM استفاده میکنیم باید ViewModel داشته باشیم. فقط یک متد واحد وجود دارد که به مخزن دسترسی پیدا میکند و LiveData را برمی گرداند.public class MainViewModel extends ViewModel {
private Repository repository;
public MainViewModel() {
repository = Repository.getInstance();
}
public LiveData<ResponseBody> makeQuery(){
return repository.makeReactiveQuery();
}
}
MainActivity:
قسمت آخر Subscirbe کردن LiveData در MainActivity است. خروجی کد بالا به صورت زیر میباشد.
MainActivity: onChanged: this is a live data response!MainActivity: onChanged: {
"userId": 1,
"id": 1,
"title": "delectus aut autem",
"completed": false
}
عملگرهای Filter : Filter
وقتی لیستی از custom objects را در اختیار دارید و میخواهید بر اساس یک فیلد خاص آن را فیلتر کنید، چند بار در این وضعیت قرار گرفتهاید؟ اگر لیست بسیار بزرگ است پس فیلتر کردن باید روی یک background thread انجام شود. اگر چنین اتفاقی برای شما بیفتد، عملگر ()filter بهترین دوست شما خواهد بود. ()filter یک اپراتور عالی است. ما حدس میزنیم که شما از این اپراتور زیاد استفاده خواهید کرد.مثال 1: فیلتر کردن یک فیلد String در یک لیست
در این مثال میخواهم به شما نشان دهیم چگونه لیستی از Task POJO's) custom java objects) را در یک فیلد String خاص فیلتر کنید. بنابراین فقط Task Objects را که شامل یک پارامتر String خاص هستند، منتشر میکند.- ما یک Observable از لیستی از Task Objects را ایجاد میکنیم.
- Tasks را برای توضیحات خاص فیلتر میکنیم.
- سپس Subscribe کردن و منتشر کردن کارهایی (Task) که از Test عبور میکنند.
Observable<Task> taskObservable = Observable
.fromIterable(DataSource.createTasksList())
.filter(new Predicate<Task>() {
@Override
public boolean test(Task task) throws Exception {
if(task.getDescription().equals("Walk the dog")){
return true;
}
return false;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
taskObservable.subscribe(new Observer<Task>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Task task) {
Log.d(TAG, "onNext: This task matches the description: " + task.getDescription());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
public static List<Task> createTasksList(){
List<Task> tasks = new ArrayList<>();
tasks.add(new Task("Take out the trash", true, 3));
tasks.add(new Task("Walk the dog", false, 2));
tasks.add(new Task("Make my bed", true, 1));
tasks.add(new Task("Unload the dishwasher", false, 0));
tasks.add(new Task("Make dinner", true, 5));
return tasks;
}
خروجی کد بالا به صورت زیر میباشد.
onNext: This task matches the description: Walk the dog
مثال 2 : فیلتر کردن یک فیلد Boolean در یک لیست
ما به شما نشان میدهیم که چگونه لیستی از custom objects را در یک فیلد خاص Boolean فیلتر کنید. اگر فیلد Boolean صحیح (True) باشد، Task Object در Observer منتشر میشود.- ما یک Observable از لیستی از Task Objects را ایجاد میکنیم.
- Tasksرا در قسمت ()isComplete فیلتر میکنیم.
- سپس Subscribe کردن و منتشر کردن کارهایی (Task) که از Test عبور میکنند.
Observable<Task> taskObservable = Observable
.fromIterable(DataSource.createTasksList())
.filter(new Predicate<Task>() {
@Override
public boolean test(Task task) throws Exception {
return task.isComplete();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
taskObservable.subscribe(new Observer<Task>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Task task) {
Log.d(TAG, "onNext: This is a completed task: " + task.getDescription());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
public static List<Task> createTasksList(){
List<Task> tasks = new ArrayList<>();
tasks.add(new Task("Take out the trash", true, 3));
tasks.add(new Task("Walk the dog", false, 2));
tasks.add(new Task("Make my bed", true, 1));
tasks.add(new Task("Unload the dishwasher", false, 0));
tasks.add(new Task("Make dinner", true, 5));
return tasks;
}
خروجی کد بالا به صورت زیر میباشد.
onNext: : Take out the trash
onNext: : Make my bed
onNext: : Make dinner
عملگرهای Distinct : Filter
- مقدمه
- اسفاده نادرست از ()Distinct
- استفاده درست از ()Distinct
مقدمه
اپراتور ()Distinct نوع دیگری است که شما زیاد از آن استفاده خواهید کرد. میتوانید اشیاء سفارشی (Custom Objects) را بر اساس فیلدهای مشخص فیلتر کنید. فکر میکنیم بدون مثال درک این موضوع کمی سخت باشد. بنابراین اگر دچار سردرگمی شدید، به مثالهای زیر توجه کنید. اپراتور Distinct فقط با دسترسی به آیتم هایی که از قبل منتشر نشده اند، Observable را فیلتر میکند.استفاده نادرست از ()Distinct
ما فکر میکنیم اپراتور ()Distinct در نگاه اول خیلی قابل درک نیست. بنابراین میخواهیم روش نادرست استفاده از آن را به شما نشان دهیم (که ممکن است روشی باشد که شما سعی کرده اید از آن استفاده کنید).- ما یک Observable از لیستی از Task Objects را ایجاد میکنیم.
- اپراتور ()Distinct را اعمال کرده و یک custom function را پاس میدهیم.
public static List<Task> createTasksList(){
List<Task> tasks = new ArrayList<>();
tasks.add(new Task("Take out the trash", true, 3));
tasks.add(new Task("Walk the dog", false, 2));
tasks.add(new Task("Make my bed", true, 1));
tasks.add(new Task("Unload the dishwasher", false, 0));
tasks.add(new Task("Make dinner", true, 5));
tasks.add(new Task("Make dinner", true, 5)); // duplicate for testing the distinct operator
return tasks;
}
Observable<Task> taskObservable = Observable
.fromIterable(DataSource.createTasksList())
.distinct(new Function<Task, Task>() { // <--- WRONG
@Override
public Task apply(Task task) throws Exception {
return task;
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
taskObservable.subscribe(new Observer<Task>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Task task) {
Log.d(TAG, "onNext: " + task.getDescription());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
خروجی کد بالا به صورت زیر میباشد.
MainActivity: onNext: Take out the trash
MainActivity: onNext: Walk the dog
MainActivity: onNext: Make my bed
MainActivity: onNext: Unload the dishwasher
MainActivity: onNext: Make dinner
MainActivity: onNext: Make dinner
توجه کنید که خروجی یک نسخهی تکراری را نشان میدهد. این صحیح نیست. اپراتور ()Distinct برای حذف نسخههای تکراری است. پس کجا اشتباه کردیم؟
نوع دادهی دوم در عملکرد (Funtcion) نادرست است. به مثال دوم زیر نگاهی بیندازید تا ببینید چگونه قرار است از آن استفاده شود.
استفاده درست از ()Distinct
همانطور که در مثال قبلی دیدید، خروجی مقادیر متمایز را بر اساس فیلد توضیحات نشان نمیداد.این به دلیل استفاده نادرست از عملکردهای منتقل شده به عنوان ورودی به اپراتور ()Distinct بود. در اینجا نحوهی درست استفاده از آن آمده است.- ما یک Observable از لیستی از Task Objects را ایجاد میکنیم.
- اعمال اپراتور ()Distinct و پاس دادن یک
- اپراتور ()Distinct را اعمال کرده و یک custom function را پاس میدهیم.
- تفاوت در نوع بازگشت عملکرد است. توجه کنید که ما یک رشته را بر میگردانیم و در آن رشته چک کردن را انجام میدهیم.
public static List<Task> createTasksList(){
List<Task> tasks = new ArrayList<>();
tasks.add(new Task("Take out the trash", true, 3));
tasks.add(new Task("Walk the dog", false, 2));
tasks.add(new Task("Make my bed", true, 1));
tasks.add(new Task("Unload the dishwasher", false, 0));
tasks.add(new Task("Make dinner", true, 5));
tasks.add(new Task("Make dinner", true, 5)); // duplicate for testing the distinct operator
return tasks;
}
Observable<Task> taskObservable = Observable
.fromIterable(DataSource.createTasksList())
.distinct(new Function<Task, String>() { // <--- CORRECT
@Override
public String apply(Task task) throws Exception {
return task.getDescription();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
taskObservable.subscribe(new Observer<Task>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Task task) {
Log.d(TAG, "onNext: " + task.getDescription());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
خروجی کد بالا به صورت زیر میباشد.
MainActivity: onNext: Take out the trash
MainActivity: onNext: Walk the dog
MainActivity: onNext: Make my bed
MainActivity: onNext: Unload the dishwasher
MainActivity: onNext: Make dinner
اکنون خروجی صحیح است.
عملگرهای Take : Filter و TakeWhile
- مقدمه
- ()Take
- ()TakeWhile
مقدمه
()take و ()takeWhile را در دسته فیلترها قرار میدهیم. آنها شبیه به عملگر ()Filter هستند زیرا لیستهای اشیاء را فیلتر میکنند. تفاوت اصلی بین عملگرهای ()take و عملگر ()Filter در این است که عملگر ()Filter تمامی اشیاء موجود در لیست را بررسی میکند. بنابراین میتوان گفت عملگر ()Filter شامل تمامی آن هاست. در حالی که اپراتورهای ()take منحصر به فرد در نظر گرفته میشوند پس لازم نیست همهی موارد موجود در لیست را بررسی کنند. آنها اشیاء را منتشر میکنند تا زمانی که شرط عملکرد آنها برآورده شود.()take
اپراتور ()take تنها "n" آیتم را که توسط یک Observable قابل انتشار است را منتشر میکند، در حالی که از موارد باقیمانده کاملا غافل میشود.این مثال را در نظر بگیرید:
- ما یک Observable از لیستی از Task Objects را ایجاد میکنیم.
- اپراتور take را اعمال کرده و 3 مقدار را پاس میدهیم.
public class DataSource {
public static List<Task> createTasksList(){
List<Task> tasks = new ArrayList<>();
tasks.add(new Task("Take out the trash", true, 3));
tasks.add(new Task("Walk the dog", false, 2));
tasks.add(new Task("Make my bed", true, 1));
tasks.add(new Task("Unload the dishwasher", false, 0));
tasks.add(new Task("Make dinner", true, 5));
return tasks;
}
}
Observable<Task> taskObservable = Observable
.fromIterable(DataSource.createTasksList())
.take(3)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
taskObservable.subscribe(new Observer<Task>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Task task) {
Log.d(TAG, "onNext: " + task.getDescription());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
خروجی کد بالا به صورت زیر میباشد.
MainActivity: onNext: Take out the trash
MainActivity: onNext: Walk the dog
MainActivity: onNext: Make my bed
حتی اگر 5 شیء به لیست اضافه شده باشد، فقط 3 مورد منتشر میشوند.
()TakeWhile
()TakeWhile منبع Observable را منعکس میکند تا زمانی که شرایطی که شما تعیین میکنید نادرست شود.
این مثال را در نظر بگیرید:
- ما یک Observable از لیستی از Task Objects را ایجاد میکنیم.
- اپراتور ()TakeWhile را اعمال کرده و از custom function استفاده میکنیم. این تابع در حال بررسی یک کار انجام شده است.
- پس از یافتن یک کار کامل شده ، Observableبه اتمام میرسد.
public class DataSource {
public static List<Task> createTasksList(){
List<Task> tasks = new ArrayList<>();
tasks.add(new Task("Take out the trash", true, 3));
tasks.add(new Task("Walk the dog", false, 2));
tasks.add(new Task("Make my bed", true, 1));
tasks.add(new Task("Unload the dishwasher", false, 0));
tasks.add(new Task("Make dinner", true, 5));
return tasks;
}
}
Observable<Task> taskObservable = Observable
.fromIterable(DataSource.createTasksList())
.takeWhile(new Predicate<Task>() {
@Override
public boolean test(Task task) throws Exception {
return task.isComplete();
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
taskObservable.subscribe(new Observer<Task>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Task task) {
Log.d(TAG, "onNext: " + task.getDescription());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
خروجی کد بالا به صورت زیر میباشد.
MainActivity: onNext: Take out the trash
عملگرهای تبدیل کننده (Transformation Operators)
- Introduction
- ()Map
- ()Buffer
- ()Debounce
- ()ThrottleFirst
- ()FlatMap
- ()ConcatMap
- ()SwitchMap
Map
یک تابع را برای هر مورد منتشر شده اعمال میکند. با استفاده از تابعی از آن، هر مورد منتشر شده را تغییر میدهد. (نظم ارسال حفظ میشود.) نمودار Map :
Buffer
بهطور دوره ای items را از یک Observable به صورت bundles جمع میکند و به جای اینکه یکبار items را منتشر کند، bundles را منتشر میکند. (نظم ارسال حفظ میشود) نمودار Buffer:
Debounce
اگر یک زمان خاص گذشته باشد، یک Item را از یک Observable منتشر میکند بدون آنکه یک Item دیگر را منتشر کند. این عملگر برای کلیک دکمه (button clicks) بسیار عالی است. اگر کاربر بارها و بارها یک دکمه را کلیک کند، دیگر نیازی به اجرای چندین بار کار نیست. میتوانیم از اپراتور ()Debounce استفاده کنیم تا با معرفی یک بازهی زمانی مجاز بین کلیک، کلیکهای خود را کنترل کنیم. اگر مدت زمانی سپری نشده باشد، میتوانیم از هر دو متد جلوگیری کنیم. (نظم ارسال حفظ میشود.) نمودار Debounce:
ThrottleFirst
موارد منتشر شده توسط منبع Observable را که در یک بازهی زمانی قرار دارند، فیلتر میکند.(نظم ارسال حفظ میشود.) نمودار ThrottleFirst:
FlatMap
موارد منتشر شده توسط یک Observable را به Observables تبدیل میکند، و سپس انتشار از آن را به یک Observable Single قسمت میکند. اگر با LiveData آشنا باشید، MediatorLiveData میتواند کاری بسیار مشابه انجام دهد. در مورد ()FlatMap در ادامه بیشتر صحبت میکنیم. (نظم ارسال حفظ نمیشود.) نمودار FlatMap:
ConcatMap
موارد منتشر شده توسط یک Observable را به Observables تبدیل میکند. این در اصل همان مورد ()FlatMap است، اما نظم ارسال حفظ میشود. اما از آنجا که ()ConcatMap باید منتظر بماند تا هر یک از Observable کار خود را انجام دهند پس از نظر فنی غیر همزمان نیست. (نظم ارسال حفظ میشود.) نمودار ConcatMap:
SwitchMap
()SwitchMap آیتمهای منتشر شده توسط یک Observable را به یک Observable تبدیل میکند درست مثل ()ConcatMap و ()FlatMap. تفاوت این است که به محض مشترک شدن یک Observer جدید، Observer قبلی را لغو میکند. ()SwitchMap محدودیتی را حل میکند که ()ConcatMap و ()FlatMap هم دارند. (نظم ارسال حفظ میشود) نمودار SwitchMap:
عملگرهای تبدیل کننده : Map
یک تابع را برای هر مورد منتشر شده اعمال میکند. با استفاده از تابعی از آن، هر مورد منتشر شده را تغییر میدهد. (نظم ارسال حفظ میشود.) نمودار Map :
برای دو مثال زیر به این دو کلاس مراجعه میکنیم:
Task.java (کلاس Task)
public class Task {
private String description;
private boolean isComplete;
private int priority;
public Task(String description, boolean isComplete, int priority) {
this.description = description;
this.isComplete = isComplete;
this.priority = priority;
}
// getter and setters ....
}
DataSource.java (کلاس DataSource)
public class DataSource {
public static List<Task> createTasksList(){
List<Task> tasks = new ArrayList<>();
tasks.add(new Task("Take out the trash", true, 3));
tasks.add(new Task("Walk the dog", false, 2));
tasks.add(new Task("Make my bed", true, 1));
tasks.add(new Task("Unload the dishwasher", false, 0));
tasks.add(new Task("Make dinner", true, 5));
return tasks;
}
}
مثال : (Mapping (Task -> String
در این مثال ما یک custom map function ایجاد میکنیم که فیلد توضیحات را از Task Objects استخراج میکند و فقط آن پارامتر را منتشر میکند. تابع:Function<Task, String> extractDescriptionFunction = new Function<Task, String>() {
@Override
public String apply(Task task) throws Exception {
Log.d(TAG, "apply: doing work on thread: " + Thread.currentThread().getName());
return task.getDescription();
}
};
ایجاد Observer و Observable :
Observable<String> extractDescriptionObservable = Observable
.fromIterable(DataSource.createTasksList())
.subscribeOn(Schedulers.io())
.map(extractDescriptionFunction)
.observeOn(AndroidSchedulers.mainThread());
extractDescriptionObservable.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: extracted description: " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
خروجی کد بالا به صورت زیر میباشد.
MainActivity: apply: doing work on thread: RxCachedThreadScheduler-1
... Repeat x5
onNext: extracted description: Take out the trash
onNext: extracted description: Walk the dog
onNext: extracted description: Make my bed
onNext: extracted description: Unload the dishwasher
onNext: extracted description: Make dinner
مثال : (Mapping (Task -> Updated Task
در این مثال ما یک custom map function ایجاد میکنیم که یک Task object را بروزرسانی میکند و سپس آن کار بروزرسانی شده را منتشر میکند. تابع:Function<Task, Task> completeTaskFunction = new Function<Task, Task>() {
@Override
public Task apply(Task task) throws Exception {
Log.d(TAG, "apply: doing work on thread: " + Thread.currentThread().getName());
task.setComplete(true);
return task;
}
};
ایجاد Observer و Observable:
Observable<Task> completeTaskObservable = Observable
.fromIterable(DataSource.createTasksList())
.subscribeOn(Schedulers.io())
.map(completeTaskFunction)
.observeOn(AndroidSchedulers.mainThread());
completeTaskObservable.subscribe(new Observer<Task>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Task task) {
Log.d(TAG, "onNext: is this task complete? " + task.isComplete());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
خروجی کد بالا به صورت زیر میباشد.
MainActivity: apply: doing work on thread: RxCachedThreadScheduler-1
... Repeat x5
MainActivity: onNext: is this task complete? true
... Repeat x5
مثال : Order of Emitted Objects
همانطور که در ابتدا گفتیم، ترتیب اشیاء منتشر شده از اپراتور ()Map حفظ میشود. این بدان معناست که همه اشیاء منتشر شده از Observable به همان ترتیب هستند که به Observable اضافه شده اند. توجه کنید که اشیاء به همان ترتیب که در کلاس DataSource.java هستند، منتشر میشوند. شاید در حال حاضر این موضوع برای شما جالب به نظر نرسد، اما صبر کنید تا ما به برخی Mapهای دیگر (مانند ()FlapMap ) نگاهی بیندازیم، که نظم را حفظ نمیکند.// Create an Observable using the fromIterable operator
Observable<Task> mappedObservable = Observable
.fromIterable(DataSource.createTasksList())
.subscribeOn(Schedulers.io())
.map(new Function<Task, Task>() {
@Override
public Task apply(Task task) throws Exception {
task.setComplete(true);
return task;
}
})
.observeOn(AndroidSchedulers.mainThread());
// subscribe to the Observable and view the emitted results
mappedObservable.subscribe(new Observer<Task>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Task task) {
Log.d(TAG, "onNext: mapped: " + task.getDescription());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
خروجی کد بالا به صورت زیر میباشد.
MainActivity: onNext: mapped: Take out the trash
MainActivity: onNext: mapped: Walk the dog
MainActivity: onNext: mapped: Make my bed
MainActivity: onNext: mapped: Unload the dishwasher
MainActivity: onNext: mapped: Make dinner
عملگرهای تبدیل کننده: Buffer
بهطور دوره ای items را از یک Observable به صورت bundles جمع میکند و به جای اینکه یکبار items را منتشر کند، bundles را منتشر میکند. (نظم ارسال حفظ میشود.) نمودار Buffer:
public class Task {
private String description;
private boolean isComplete;
private int priority;
public Task(String description, boolean isComplete, int priority) {
this.description = description;
this.isComplete = isComplete;
this.priority = priority;
}
// getter and setters ....
}
DataSource.java (کلاس DataSource):
public class DataSource {
public static List<Task> createTasksList(){
List<Task> tasks = new ArrayList<>();
tasks.add(new Task("Take out the trash", true, 3));
tasks.add(new Task("Walk the dog", false, 2));
tasks.add(new Task("Make my bed", true, 1));
tasks.add(new Task("Unload the dishwasher", false, 0));
tasks.add(new Task("Make dinner", true, 5));
return tasks;
}
}
یک مثال ساده:
در این مثال:- Observable را ایجاد میکنیم.
- اپراتور ()Buffer را به کار میبریم.
- Subscribe کردن و مشاهده کردن نتیجه منتشر شده در log.
// Create an Observable
Observable<Task> taskObservable = Observable
.fromIterable(DataSource.createTasksList())
.subscribeOn(Schedulers.io());
taskObservable
.buffer(2) // Apply the Buffer() operator
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<List<Task>>() { // Subscribe and view the emitted results
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(List<Task> tasks) {
Log.d(TAG, "onNext: bundle results: -------------------");
for(Task task: tasks){
Log.d(TAG, "onNext: " + task.getDescription());
}
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
خروجی کد بالا به صورت زیر میباشد.
onNext: bundle results: -------------------
onNext: Take out the trash
onNext: Walk the dog
onNext: bundle results: -------------------
onNext: Make my bed
onNext: Unload the dishwasher
onNext: bundle results: -------------------
onNext: Make dinner
مثال : Tracking UI Interactions
در این مثال ، کلیک روی دکمه در UI را Observe میکنیم. برای تعیین فاصله زمانی از بافر استفاده میکنیم. هر کلیک که در بازه زمانی مشخص ثبت خواهد شد. سپس در پایان بازه کلیکها اضافه میشوند بنابراین میدانیم که تعداد آنها چقدر بوده است. Dependencies: این وابستگی به کتابخانه RxBinding است. می توانید آخرین نسخه آن را دریافت کنید.
def rxbinding_version = "4.0.0"
// Rx Binding Library
implementation "com.jakewharton.rxbinding4:rxbinding:$rxbinding_version"
Activity Code:
در اینجا تمام کدی که میخواهید به Activity خود اضافه کنید وجود دارد. البته باید یک دکمه به UI نیز اضافه کنید. ما کد UI را درج نکردیم چون بسیار ساده است.
نکته: به متد ()RxView.clicks توجه کنید. این بخشی از کتابخانه RxBinding است.
فراموش نکنید که Disposables را به CompositeDisposables اضافه کنید. سپس آنها را با متد onDestroy پاک کنید.
// global disposables object
CompositeDisposable disposables = new CompositeDisposable();
// detect clicks to a button
RxView.clicks(findViewById(R.id.button))
.map(new Function<Unit, Integer>() { // convert the detected clicks to an integer
@Override
public Integer apply(Unit unit) throws Exception {
return 1;
}
})
.buffer(4, TimeUnit.SECONDS) // capture all the clicks during a 4 second interval
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<List<Integer>>() {
@Override
public void onSubscribe(Disposable d) {
disposables.add(d); // add to disposables to you can clear in onDestroy
}
@Override
public void onNext(List<Integer> integers) {
Log.d(TAG, "onNext: You clicked " + integers.size() + " times in 4 seconds!");
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
// make sure to clear disposables when the activity is destroyed
@Override
protected void onDestroy() {
super.onDestroy();
disposables.clear();
}
خروجی کد بالا به صورت زیر میباشد.
onNext: You clicked 19 times in 4 seconds!
عملگرهای تبدیل کننده : Debounce
اپراتور Debounce مواردی را که توسط منبع Observable منتشر میشود را فیلتر میکند که به سرعت یک مورد دیگر به دنبال آن منتشر میشود. (نظم ارسال حفظ میشود.) نمودار Debounce:
مثال : محدود کردن درخواستهای سرور
ما Layout و کل Activity را برای این مثال گنجانده ایم زیرا چند قسمت مختلف و متغیرهای اضافی وجود دارد. این وابستگی به کتابخانه RxBinding است. می توانید آخرین نسخه آن را دریافت کنید.def rxbinding_version = "4.0.0"
// Rx Binding Library
implementation "com.jakewharton.rxbinding4:rxbinding:$rxbinding_version"
کدهای زیر برای activity_main.xml است:
<?xml version="1.0" encoding="utf-8"?>
<androidx.constraintlayout.widget.ConstraintLayout
xmlns:android="http://schemas.android.com/apk/res/android"
xmlns:app="http://schemas.android.com/apk/res-auto"
xmlns:tools="http://schemas.android.com/tools"
android:layout_width="match_parent"
android:layout_height="match_parent"
tools:context=".MainActivity">
<androidx.appcompat.widget.SearchView
android:layout_width="match_parent"
android:layout_height="?attr/actionBarSize"
app:layout_constraintTop_toTopOf="parent"
app:layout_constraintRight_toRightOf="parent"
app:layout_constraintLeft_toLeftOf="parent"
android:id="@+id/search_view">
</androidx.appcompat.widget.SearchView>
</androidx.constraintlayout.widget.ConstraintLayout>
MainActivity.java:
توجه: متغیر "timeSinceLastRequest" فقط برای خروجیهای Log است. این بخش منطقی (Logic) نیست.
- ساخت Observable
- گوش دادن به ورودی متن در SearchView.
- فرستادن Query به emitter.
-
استفاده از اپراتور ()Debounce، برای محدود کردن درخواست ها.
- Subscirbe کردن Observer
- ارسال درخواست به سرور
- پاک کردن disposable در ()onDestroy.
public class MainActivity extends AppCompatActivity {
private static final String TAG = "MainActivity";
//ui
private SearchView searchView;
// vars
private CompositeDisposable disposables = new CompositeDisposable();
private long timeSinceLastRequest; // for log printouts only. Not part of logic.
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
searchView = findViewById(R.id.search_view);
timeSinceLastRequest = System.currentTimeMillis();
// create the Observable
Observable<String> observableQueryText = Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(final ObservableEmitter<String> emitter) throws Exception {
// Listen for text input into the SearchView
searchView.setOnQueryTextListener(new SearchView.OnQueryTextListener() {
@Override
public boolean onQueryTextSubmit(String query) {
return false;
}
@Override
public boolean onQueryTextChange(final String newText) {
if(!emitter.isDisposed()){
emitter.onNext(newText); // Pass the query to the emitter
}
return false;
}
});
}
})
.debounce(500, TimeUnit.MILLISECONDS) // Apply Debounce() operator to limit requests
.subscribeOn(Schedulers.io());
// Subscribe an Observer
observableQueryText.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
disposables.add(d);
}
@Override
public void onNext(String s) {
Log.d(TAG, "onNext: time since last request: " + (System.currentTimeMillis() - timeSinceLastRequest));
Log.d(TAG, "onNext: search query: " + s);
timeSinceLastRequest = System.currentTimeMillis();
// method for sending a request to the server
sendRequestToServer(s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
// Fake method for sending a request to the server
private void sendRequestToServer(String query){
// do nothing
}
@Override
protected void onDestroy() {
super.onDestroy();
disposables.clear(); // clear disposables
}
خروجی کد بالا به صورت زیر میباشد.
onNext: time since last request: 3290
onNext: search query: john
onNext: time since last request: 1171
onNext: search query: blake
onNext: time since last request: 2360
onNext: search query: mitch
عملگرهای تبدیل کننده : ThrottleFirst
موارد منتشر شده توسط منبع Observable را که در یک بازه زمانی قرار دارند، فیلتر میکند. (نظم ارسال حفظ میشود. نمودار ThrottleFirst:
مثال : اسپم شدن Button را محدود کنید
این وابستگی به کتابخانه RxBinding است. می توانید آخرین نسخه آن را دریافت کنید.def rxbinding_version = "4.0.0"
// Rx Binding Library
implementation "com.jakewharton.rxbinding4:rxbinding:$rxbinding_version"
کدهای زیر برای activity_main.xml است:
<?xml version="1.0" encoding="utf-8"?>
<androidx.constraintlayout.widget.ConstraintLayout
xmlns:android="http://schemas.android.com/apk/res/android"
xmlns:app="http://schemas.android.com/apk/res-auto"
xmlns:tools="http://schemas.android.com/tools"
android:layout_width="match_parent"
android:layout_height="match_parent"
tools:context=".MainActivity">
<Button
android:layout_width="match_parent"
android:layout_height="wrap_content"
app:layout_constraintTop_toTopOf="parent"
app:layout_constraintBottom_toBottomOf="parent"
app:layout_constraintRight_toRightOf="parent"
app:layout_constraintLeft_toLeftOf="parent"
android:id="@+id/button"
android:text="button"/>
</androidx.constraintlayout.widget.ConstraintLayout>
MainActivity.java:
توجه: متغیر "timeSinceLastRequest" فقط برای خروجیهای Log است. این بخش منطقی (Logic) نیست.
- یک Click Listener برای Button با RxBinding Library ایجاد کنید.
- با کنترل کردن کلیک پش از گذشت 500 میلی ثانیه میتوانید یک کلیک جدید ثبت کنید.
-
هنگام ثبت یک کلیک متد هایی را اجرا کنید.
- Observable را Dispose کنید.
public class MainActivity extends AppCompatActivity {
private static final String TAG = "MainActivity";
//ui
private Button button;
// vars
private CompositeDisposable disposables = new CompositeDisposable();
private long timeSinceLastRequest;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
button = findViewById(R.id.button);
timeSinceLastRequest = System.currentTimeMillis();
// Set a click listener to the button with RxBinding Library
RxView.clicks(button)
.throttleFirst(500, TimeUnit.MILLISECONDS) // Throttle the clicks so 500 ms must pass before registering a new click
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Unit>() {
@Override
public void onSubscribe(Disposable d) {
disposables.add(d);
}
@Override
public void onNext(Unit unit) {
Log.d(TAG, "onNext: time since last clicked: " + (System.currentTimeMillis() - timeSinceLastRequest));
someMethod(); // Execute some method when a click is registered
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
}
private void someMethod(){
timeSinceLastRequest = System.currentTimeMillis();
// do something
}
@Override
protected void onDestroy() {
super.onDestroy();
disposables.clear(); // Dispose observable
}
}
خروجی کد بالا به صورت زیر میباشد.
onNext: time since last clicked: 649
onNext: time since last clicked: 830
onNext: time since last clicked: 564
عملگرهای تبدیل کننده : FlatMap
موارد منتشر شده توسط یک Observable را به Observables تبدیل میکند، و سپس انتشار از آن را به یک Observable Single قسمت میکند. اگر با LiveData آشنا باشید، MediatorLiveData میتواند کاری بسیار مشابه انجام دهد. در مورد ()FlatMap در ادامه بیشتر صحبت میکنیم. (نظم ارسال حفظ نمیشود.) نمودار ()FlatMap:
- ساخت Observables از اشیاء منتشر شده توسط Observables دیگر.
- ترکیب کردن منبع چندین Observables به Single Obsevables (این چیزی است که به عنوان "flattening" شناخته میشود).
از آنجایی که یک Single Observable از منابع بالقوه زیادی تولید میشود، آخرین Observable به صورت تصادفی منتشر میشود.
به عبارت دیگر، نظم حفظ نمیشود. بسته به وضعیت شما، این ممکن است مهم باشد یا مهم نباشد. در مثال زیر نظم مهم نیست.
مثال: ()FlatMap
فرض کنید میخواهید با استفاده از REST API، برخی از پستهای وبلاگ را از یک وب سایت Query کنید. اما این همه ی کار نیست.
هر پست وبلاگ حاوی نظرات است. و نظرات از یک end-point url دیگر دریافت میشود. بنابراین برای بازیابی تمام دادهها باید دو Query ایجاد کنیم. وب سایت jsonplaceholder.typicode.com دارای یک REST API است که میتوانم برای نشان دادن این مورد استفاده کنیم.
- لیستی از پستهای وبلاگ: /jsonplaceholder.typicode.com/posts
- لیست نظرات با متغیر jsonplaceholder.typicode.com/posts/1/comments/ :post-id
ما برای تعامل با REST API از Retrofit استفاده میکنیم.
چرا از FlatMap استفاده کنیم؟
زیرا باید اطلاعات را از بیش از یک منبع تهیه کنیم و سپس آن را در یک انتشار واحد ترکیب کنیم، یک Flatmap برای این وضعیت ایده آل است.
- لیستی از پستهای وبلاگ را از URL بازیابی کنید: /jsonplaceholder.typicode.com/posts
-
با استفاده از این URL، نظرات را برای هر پست شخصی بازیابی کنید: jsonplaceholder.typicode.com/posts/1/comments
یکی از وابستگی هایی که شاید با آن آشنایی نداشته باشید، Retrofit Call Adapter است. برای تبدیل Retrofit Call objects به Observables لازم است. در کلاس ServiceGenerator.java خواهید دید که ما یک ()RxJava2CallAdapterFactory.create را به عنوان ورودی در متد addCallAdapterFactory از شی Retrofit Builder میگذاریم. بدون این وابستگی در دسترس نخواهد بود.
apply plugin: 'com.android.application'
android {
compileSdkVersion 28
defaultConfig {
applicationId "com.codingwithmitch.rxjavaflatmapexample"
minSdkVersion 21
targetSdkVersion 28
versionCode 1
versionName "1.0"
testInstrumentationRunner "androidx.test.runner.AndroidJUnitRunner"
}
buildTypes {
release {
minifyEnabled false
proguardFiles getDefaultProguardFile('proguard-android-optimize.txt'), 'proguard-rules.pro'
}
}
}
dependencies {
def retrofitVersion = "2.5.0"
def rxjava_version = '2.2.7'
def rxandroid_version = '2.1.1'
def recyclerview_version = "1.0.0"
implementation fileTree(dir: 'libs', include: ['*.jar'])
implementation 'androidx.appcompat:appcompat:1.0.0-beta01'
implementation 'androidx.constraintlayout:constraintlayout:1.1.2'
testImplementation 'junit:junit:4.12'
androidTestImplementation 'androidx.test:runner:1.1.0-alpha4'
androidTestImplementation 'androidx.test.espresso:espresso-core:3.1.0-alpha4'
// Retrofit
implementation "com.squareup.retrofit2:retrofit:$retrofitVersion"
implementation "com.squareup.retrofit2:converter-gson:$retrofitVersion"
// RxJava
implementation "io.reactivex.rxjava2:rxjava:$rxjava_version"
// RxJava Call Adapter
implementation "com.squareup.retrofit2:adapter-rxjava2:2.5.0"
// RxAndroid
implementation "io.reactivex.rxjava2:rxandroid:$rxandroid_version"
// Recyclerview
implementation "androidx.recyclerview:recyclerview:$recyclerview_version"
}
AndroidManifest.xml
دسترسی به اینترنت (internet permissions) به فایل Manifest.xml اضافه کنید.
<?xml version="1.0" encoding="utf-8"?>
<manifest xmlns:android="http://schemas.android.com/apk/res/android"
package="com.codingwithmitch.rxjavaflatmapexample">
<uses-permission android:name="android.permission.INTERNET"/>
<application
android:allowBackup="true"
android:icon="@mipmap/ic_launcher"
android:label="@string/app_name"
android:roundIcon="@mipmap/ic_launcher_round"
android:supportsRtl="true"
android:theme="@style/AppTheme">
<activity android:name=".MainActivity">
<intent-filter>
<action android:name="android.intent.action.MAIN" />
<category android:name="android.intent.category.LAUNCHER" />
</intent-filter>
</activity>
</application>
</manifest>
activity_main.xml
در کد زیر فقط یک RecyclerView ایجاد شده است.<?xml version="1.0" encoding="utf-8"?>
<androidx.constraintlayout.widget.ConstraintLayout xmlns:android="http://schemas.android.com/apk/res/android"
xmlns:app="http://schemas.android.com/apk/res-auto"
xmlns:tools="http://schemas.android.com/tools"
android:layout_width="match_parent"
android:layout_height="match_parent"
tools:context=".MainActivity">
<androidx.recyclerview.widget.RecyclerView
android:layout_width="match_parent"
android:layout_height="match_parent"
android:id="@+id/recycler_view"
android:orientation="vertical">
</androidx.recyclerview.widget.RecyclerView>
</androidx.constraintlayout.widget.ConstraintLayout>
layout_post_list_item.xml
این یک Layout برای آیتمهای RecyclerView میباشد.//This is the layout for the RecyclerView list-items
<?xml version="1.0" encoding="utf-8"?>
<LinearLayout xmlns:android="http://schemas.android.com/apk/res/android"
android:orientation="horizontal"
android:layout_width="match_parent"
android:layout_height="wrap_content"
android:weightSum="100"
android:padding="20dp">
<RelativeLayout
android:layout_width="0dp"
android:layout_height="wrap_content"
android:layout_weight="80"
android:layout_gravity="center_vertical" >
<TextView
android:layout_width="match_parent"
android:layout_height="wrap_content"
android:id="@+id/title"
android:text="this is a title"
android:textColor="#000"
android:textSize="17sp"
/>
</RelativeLayout>
<RelativeLayout
android:layout_width="0dp"
android:layout_height="wrap_content"
android:layout_weight="20"
android:layout_gravity="center_vertical">
<TextView
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:id="@+id/num_comments"
android:textSize="14sp"
android:layout_centerInParent="true"/>
<ProgressBar
android:layout_width="20dp"
android:layout_height="20dp"
android:id="@+id/progress_bar"
style="@style/Widget.AppCompat.ProgressBar"
android:layout_centerInParent="true"
android:visibility="gone"/>
</RelativeLayout>
</LinearLayout>
Post.java
ما باید برای درخواستهای پست، Data را مدل سازی کنیم که در کد زیر میبینید.
public class Post {
@SerializedName("userId")
@Expose()
private int userId;
@SerializedName("id")
@Expose()
private int id;
@SerializedName("title")
@Expose()
private String title;
@SerializedName("body")
@Expose()
private String body;
private List<Comment> comments;
public Post(int userId, int id, String title, String body, List<Comment> comments) {
this.userId = userId;
this.id = id;
this.title = title;
this.body = body;
this.comments = comments;
}
public int getUserId() {
return userId;
}
public void setUserId(int userId) {
this.userId = userId;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String getBody() {
return body;
}
public void setBody(String body) {
this.body = body;
}
public List<Comment> getComments() {
return comments;
}
public void setComments(List<Comment> comments) {
this.comments = comments;
}
@Override
public String toString() {
return "Post{" +
"userId=" + userId +
", id=" + id +
", title='" + title + '\'' +
", body='" + body + '\'' +
'}';
}
}
Comment.java
public class Comment {
@Expose
@SerializedName("postId")
private int postId;
@Expose
@SerializedName("id")
private int id;
@Expose
@SerializedName("name")
private String name;
@Expose
@SerializedName("email")
private String email;
@Expose
@SerializedName("body")
private String body;
public Comment(int postId, int id, String name, String email, String body) {
this.postId = postId;
this.id = id;
this.name = name;
this.email = email;
this.body = body;
}
public int getPostId() {
return postId;
}
public void setPostId(int postId) {
this.postId = postId;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getEmail() {
return email;
}
public void setEmail(String email) {
this.email = email;
}
public String getBody() {
return body;
}
public void setBody(String body) {
this.body = body;
}
}
RequestApi.java
در اینجا interface methods برای اجرای درخواستهای شبکه با استفاده از Retrofit ارائه شده است.- ()getPosts لیست پستها را بازیابی میکند.
- ()getComments لیستی از نظرات را برای یک پست خاص بازیابی میکند.
public interface RequestApi {
@GET("posts")
Observable<List<Post>> getPosts();
@GET("posts/{id}/comments")
Observable<List<Comment>> getComments(
@Path("id") int id
);
}
ServiceGenerator.java
برای استفاده از Retrofit باید از آن نمونه بگیریم که کلاس ServiceGenerator مسئول ایجاد نمونه Retrofit است.public class ServiceGenerator {
public static final String BASE_URL = "https://jsonplaceholder.typicode.com";
private static Retrofit.Builder retrofitBuilder =
new Retrofit.Builder()
.baseUrl(BASE_URL)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create());
private static Retrofit retrofit = retrofitBuilder.build();
private static RequestApi requestApi = retrofit.create(RequestApi.class);
public static RequestApi getRequestApi(){
return requestApi;
}
}
RecyclerAdapter.java
در این بخش ما یک RecyclerView ایجاد میکنیم.public class RecyclerAdapter extends RecyclerView.Adapter<RecyclerAdapter.MyViewHolder> {
private static final String TAG = "RecyclerAdapter";
private List<Post> posts = new ArrayList<>();
@NonNull
@Override
public MyViewHolder onCreateViewHolder(@NonNull ViewGroup parent, int viewType) {
View view = LayoutInflater.from(parent.getContext()).inflate(R.layout.layout_post_list_item, null, false);
return new MyViewHolder(view);
}
@Override
public void onBindViewHolder(@NonNull MyViewHolder holder, int position) {
holder.bind(posts.get(position));
}
@Override
public int getItemCount() {
return posts.size();
}
public void setPosts(List<Post> posts){
this.posts = posts;
notifyDataSetChanged();
}
public void updatePost(Post post){
posts.set(posts.indexOf(post), post);
notifyItemChanged(posts.indexOf(post));
}
public List<Post> getPosts(){
return posts;
}
public class MyViewHolder extends RecyclerView.ViewHolder{
TextView title, numComments;
ProgressBar progressBar;
public MyViewHolder(@NonNull View itemView) {
super(itemView);
title = itemView.findViewById(R.id.title);
numComments = itemView.findViewById(R.id.num_comments);
progressBar = itemView.findViewById(R.id.progress_bar);
}
public void bind(Post post){
title.setText(post.getTitle());
if(post.getComments() == null){
showProgressBar(true);
numComments.setText("");
}
else{
showProgressBar(false);
numComments.setText(String.valueOf(post.getComments().size()));
}
}
private void showProgressBar(boolean showProgressBar){
if(showProgressBar) {
progressBar.setVisibility(View.VISIBLE);
}
else{
progressBar.setVisibility(View.GONE);
}
}
}
}
MainActivity.java
در زیر درون ManiActivity کدهای مربوطه را اضافه میکنیم.public class MainActivity extends AppCompatActivity {
private static final String TAG = "MainActivity";
//ui
private RecyclerView recyclerView;
// vars
private CompositeDisposable disposables = new CompositeDisposable();
private RecyclerAdapter adapter;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
recyclerView = findViewById(R.id.recycler_view);
initRecyclerView();
getPostsObservable()
.subscribeOn(Schedulers.io())
.flatMap(new Function<Post, ObservableSource<Post>>() {
@Override
public ObservableSource<Post> apply(Post post) throws Exception {
return getCommentsObservable(post);
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Post>() {
@Override
public void onSubscribe(Disposable d) {
disposables.add(d);
}
@Override
public void onNext(Post post) {
updatePost(post);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: ", e);
}
@Override
public void onComplete() {
}
});
}
private Observable<Post> getPostsObservable(){
return ServiceGenerator.getRequestApi()
.getPosts()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new Function<List<Post>, ObservableSource<Post>>() {
@Override
public ObservableSource<Post> apply(final List<Post> posts) throws Exception {
adapter.setPosts(posts);
return Observable.fromIterable(posts)
.subscribeOn(Schedulers.io());
}
});
}
private void updatePost(final Post p){
Observable
.fromIterable(adapter.getPosts())
.filter(new Predicate<Post>() {
@Override
public boolean test(Post post) throws Exception {
return post.getId() == p.getId();
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Post>() {
@Override
public void onSubscribe(Disposable d) {
disposables.add(d);
}
@Override
public void onNext(Post post) {
Log.d(TAG, "onNext: updating post: " + post.getId() + ", thread: " + Thread.currentThread().getName());
adapter.updatePost(post);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: ", e);
}
@Override
public void onComplete() {
}
});
}
private Observable<Post> getCommentsObservable(final Post post){
return ServiceGenerator.getRequestApi()
.getComments(post.getId())
.map(new Function<List<Comment>, Post>() {
@Override
public Post apply(List<Comment> comments) throws Exception {
int delay = ((new Random()).nextInt(5) + 1) * 1000; // sleep thread for x ms
Thread.sleep(delay);
Log.d(TAG, "apply: sleeping thread " + Thread.currentThread().getName() + " for " + String.valueOf(delay)+ "ms");
post.setComments(comments);
return post;
}
})
.subscribeOn(Schedulers.io());
}
private void initRecyclerView(){
adapter = new RecyclerAdapter();
recyclerView.setLayoutManager(new LinearLayoutManager(this));
recyclerView.setAdapter(adapter);
}
@Override
protected void onDestroy() {
super.onDestroy();
disposables.clear();
}
}
توجه: همه کدهای این بخش را میتوانید ببینید.
عملگرهای تبدیل کننده : ConcatMap
موارد منتشر شده توسط یک Observable را به Observables تبدیل میکند. این در اصل همان مورد ()FlatMap است، اما نظم ارسال حفظ میشود. اما از آنجا که ()ConcatMap باید منتظر بماند تا هر یک از Observable کار خود را انجام دهند پس از نظر فنی ()ConcatMap غیر همزمان نیست. (نظم ارسال حفظ میشود.)
مثالی از ConcatMap:
تنها کاری که شما باید انجام دهید این است که کدهای موجود در OnCreate of MainActivity را در کد زیر جایگزین کنید.اگر میخواهید این مثال را درک کنید، باید بخش اپراتور Flatmap را بخوانید.
MainActivity.java
در زیر درون ManiActivity کدهای مربوطه را اضافه میکنیم.package com.codingwithmitch.rxjavaflatmapexample;
import androidx.appcompat.app.AppCompatActivity;
import androidx.recyclerview.widget.LinearLayoutManager;
import androidx.recyclerview.widget.RecyclerView;
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.Observer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.reactivex.schedulers.Schedulers;
import android.os.Bundle;
import android.util.Log;
import com.codingwithmitch.rxjavaflatmapexample.models.Comment;
import com.codingwithmitch.rxjavaflatmapexample.models.Post;
import com.codingwithmitch.rxjavaflatmapexample.requests.ServiceGenerator;
import java.util.List;
import java.util.Random;
public class MainActivity extends AppCompatActivity {
private static final String TAG = "MainActivity";
//ui
private RecyclerView recyclerView;
// vars
private CompositeDisposable disposables = new CompositeDisposable();
private RecyclerAdapter adapter;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
recyclerView = findViewById(R.id.recycler_view);
initRecyclerView();
getPostsObservable()
.subscribeOn(Schedulers.io())
.concatMap(new Function<Post, ObservableSource<Post>>() {
@Override
public ObservableSource<Post> apply(Post post) throws Exception {
return getCommentsObservable(post);
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Post>() {
@Override
public void onSubscribe(Disposable d) {
disposables.add(d);
}
@Override
public void onNext(Post post) {
updatePost(post);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: ", e);
}
@Override
public void onComplete() {
}
});
}
private Observable<Post> getPostsObservable(){
return ServiceGenerator.getRequestApi()
.getPosts()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(new Function<List<Post>, ObservableSource<Post>>() {
@Override
public ObservableSource<Post> apply(final List<Post> posts) throws Exception {
adapter.setPosts(posts);
return Observable.fromIterable(posts)
.subscribeOn(Schedulers.io());
}
});
}
private void updatePost(Post post){
adapter.updatePost(post);
}
private Observable<Post> getCommentsObservable(final Post post){
return ServiceGenerator.getRequestApi()
.getComments(post.getId())
.map(new Function<List<Comment>, Post>() {
@Override
public Post apply(List<Comment> comments) throws Exception {
int delay = ((new Random()).nextInt(5) + 1) * 1000; // sleep thread for x ms
Thread.sleep(delay);
Log.d(TAG, "apply: sleeping thread " + Thread.currentThread().getName() + " for " + String.valueOf(delay)+ "ms");
post.setComments(comments);
return post;
}
})
.subscribeOn(Schedulers.io());
}
private void initRecyclerView(){
adapter = new RecyclerAdapter();
recyclerView.setLayoutManager(new LinearLayoutManager(this));
recyclerView.setAdapter(adapter);
}
@Override
protected void onDestroy() {
super.onDestroy();
disposables.clear();
}
}

apply: sleeping thread RxCachedThreadScheduler-2 for 4000ms
apply: sleeping thread RxCachedThreadScheduler-2 for 2000ms
apply: sleeping thread RxCachedThreadScheduler-2 for 5000ms
apply: sleeping thread RxCachedThreadScheduler-2 for 4000ms
apply: sleeping thread RxCachedThreadScheduler-2 for 5000ms
بنابراین به طور خلاصه، اپراتور ()Concatmap مشکلی را که اپراتور ()Flatmap دارد، حل میکند. اپراتور ()Flatmap به ترتیب منتشر کردن اشیاء اهمیت نمیدهد. فقط وقتی اشیاء آماده میشوند، آنها را منتشر میکند.
با این حال، اپراتور ()Concatmap باید منتظر باشد تا شیء قبلی منتشر شده سپس حرکت به قسمت بعدی را انجام دهد. بسته به وضعیت شما، این میتواند خوب باشد یا میتواند بد باشد.
عملگرهای تبدیل کننده : SwitchMap
()SwitchMap آیتمهای منتشر شده توسط یک Observable را به یک Observable تبدیل میکند درست مثل ()ConcatMap و ()FlatMap. تفاوت این است که به محض مشترک شدن (Subscribe) یک Observer جدید، Observer قبلی را لغو میکند. ()SwitchMap محدودیتی را حل میکند که ()ConcatMap و ()FlatMap هم دارند.(نظم ارسال حفظ میشود) نمودار SwitchMap:
حالت 1:
فرض کنید کاربر بر روی دکمه UI که درخواست شبکه را اجرا میکند، کلیک میکند. اما کاربر به سرعت یک دکمه متفاوت را فشار میدهد که درخواست دوم به شبکه را قبل از اتمام اولین نسخه انجام میدهد. شما میخواهید روند اول را خاتمه دهید و observers را UnSubscribe کنید، سپس مرحله دوم را شروع کنید. اما این فقط در مورد درخواستهای شبکه صدق نمیکند. هر فرآیند در Background برای اتمام، زمان نیاز دارد، قابل لغو است. مثال دوم را در نظر بگیرید:حالت 2:
مثال دیگر انتخاب یک تصویر یا فیلم است. اگر کاربر روی آن کلیک کند، ممکن است تصویر یا فیلم به صورت تمام صفحه شود و با کیفیت بالاتر ارائه شود. اگر قبل از اینکه مورد اول به طور کامل ارائه شود، یک مورد رسانه مختلف را انتخاب کنید، میخواهید این روند را لغو کنید و یک مورد جدید را شروع کنید. SwitchMap برای موقعیت هایی مانند این مورد ایده آل است.مثالی از ()SwitchMap:
نسخهی نمایشی اپلیکیشن به شکل زیر میباشد.- مرحلهی 1:
لیستی از پستهای وبلاگ را از آدرس اینترنتی (URL) بازیابی کنید: /jsonplaceholder.typicode.com/posts
- مرحلهی 2: هنگامی که یک مورد لیست کلیک میشود، درخواست دیگری به json placeholder api ارسال میشود، اما این بار برای یک پست خاص. بنابراین آدرس اینترنتی چیزی شبیه به این خواهد بود: /jsonplaceholder.typicode.com/posts/5 با این کار پست با id = 5 بازیابی میشود.
- مرحلهی 3: قبل از اینکه درخواست انجام شود، ما شبیه سازی میکنیم که شبکه ای با سرعت عملکرد پایین یا کند چگونه باشد. ما درخواست را مجبور میکنیم قبل از اجرا 3000 ms منتظر بماند. progress bar بروزرسانی میشود که زمان سپری شده از کلیک یک آیتم لیست را نشان میدهد. سرعت آهستهی شبکه با استفاده از اپراتور ()Interval، اپراتور ()TakeWhile و ()Filter شبیه سازی میشود.
- مرحله 4: اگر کاربر قبل از اجرای درخواست شبکه، مورد جدیدی از لیست را کلیک کند، کل فرایند دوباره شروع میشود. اپراتور ()SwitchMap از این امر مراقبت میکند.
- مرحله 5: در صورت تکمیل فرآیند (پس از گذشت 3000 میلی ثانیه بدون کلیک بر روی گزینه ای جدید)، درخواست انجام میشود و کاربر به یک Activity جدید که عنوان پست را نشان میدهد، هدایت میشوید.

build.gradle
چک کنید که فایل Build.Gradle شما شبیه کدهای زیر باشد.apply plugin: 'com.android.application'
android {
compileSdkVersion 28
defaultConfig {
applicationId "com.codingwithmitch.rxjavaflatmapexample"
minSdkVersion 21
targetSdkVersion 28
versionCode 1
versionName "1.0"
testInstrumentationRunner "androidx.test.runner.AndroidJUnitRunner"
}
buildTypes {
release {
minifyEnabled false
proguardFiles getDefaultProguardFile('proguard-android-optimize.txt'), 'proguard-rules.pro'
}
}
}
dependencies {
def retrofitVersion = "2.5.0"
def rxjava_version = '2.2.7'
def rxandroid_version = '2.1.1'
def recyclerview_version = "1.0.0"
implementation fileTree(dir: 'libs', include: ['*.jar'])
implementation 'androidx.appcompat:appcompat:1.0.0-beta01'
implementation 'androidx.constraintlayout:constraintlayout:1.1.2'
testImplementation 'junit:junit:4.12'
androidTestImplementation 'androidx.test:runner:1.1.0-alpha4'
androidTestImplementation 'androidx.test.espresso:espresso-core:3.1.0-alpha4'
// Retrofit
implementation "com.squareup.retrofit2:retrofit:$retrofitVersion"
implementation "com.squareup.retrofit2:converter-gson:$retrofitVersion"
// RxJava
implementation "io.reactivex.rxjava2:rxjava:$rxjava_version"
// RxJava Call Adapter
implementation "com.squareup.retrofit2:adapter-rxjava2:2.5.0"
// RxAndroid
implementation "io.reactivex.rxjava2:rxandroid:$rxandroid_version"
// Recyclerview
implementation "androidx.recyclerview:recyclerview:$recyclerview_version"
}
AndroidManifest.xml
دسترسی به اینترنت و تعریف دو Activity را درون فایل AndroidManifest.xml اضافه کنید.<?xml version="1.0" encoding="utf-8"?>
<manifest xmlns:android="http://schemas.android.com/apk/res/android"
package="com.codingwithmitch.rxjavaflatmapexample">
<uses-permission android:name="android.permission.INTERNET"/>
<application
android:allowBackup="true"
android:icon="@mipmap/ic_launcher"
android:label="@string/app_name"
android:roundIcon="@mipmap/ic_launcher_round"
android:supportsRtl="true"
android:theme="@style/AppTheme">
<activity android:name=".MainActivity">
<intent-filter>
<action android:name="android.intent.action.MAIN" />
<category android:name="android.intent.category.LAUNCHER" />
</intent-filter>
</activity>
<activity android:name=".ViewPostActivity" />
</application>
</manifest>
activity_main.xml
در این بخش در Activity_main.xml یک RecyclerView و horizontal Progressbar ایجاد میکنیم.<?xml version="1.0" encoding="utf-8"?>
<LinearLayout
xmlns:android="http://schemas.android.com/apk/res/android"
xmlns:app="http://schemas.android.com/apk/res-auto"
xmlns:tools="http://schemas.android.com/tools"
android:orientation="vertical"
android:layout_width="match_parent"
android:layout_height="match_parent"
tools:context=".MainActivity">
<ProgressBar
android:id="@+id/progress_bar"
style="@style/Widget.AppCompat.ProgressBar.Horizontal"
android:layout_width="match_parent"
android:layout_height="wrap_content"
android:layout_marginTop="16dp"
android:padding="10dp"
/>
<androidx.recyclerview.widget.RecyclerView
android:id="@+id/recycler_view"
android:layout_width="match_parent"
android:layout_height="match_parent"
android:orientation="vertical">
</androidx.recyclerview.widget.RecyclerView>
</LinearLayout>
layout_post_list_item.xml
این Layout برای آیتمهای RecyclerView میباشد.//This is the layout for the RecyclerView list-items.
<?xml version="1.0" encoding="utf-8"?>
<LinearLayout xmlns:android="http://schemas.android.com/apk/res/android"
android:orientation="horizontal"
android:layout_width="match_parent"
android:layout_height="wrap_content"
android:padding="20dp">
<TextView
android:layout_width="match_parent"
android:layout_height="wrap_content"
android:id="@+id/title"
android:text="this is a title"
android:textColor="#000"
android:textSize="17sp"
/>
</LinearLayout>
activity_view_post.xml
کدهای Layout برای Activity دوم درون activity_view_post.xml است.<?xml version="1.0" encoding="utf-8"?>
<androidx.constraintlayout.widget.ConstraintLayout
xmlns:android="http://schemas.android.com/apk/res/android"
xmlns:app="http://schemas.android.com/apk/res-auto"
xmlns:tools="http://schemas.android.com/tools"
android:layout_width="match_parent"
android:layout_height="match_parent"
tools:context=".MainActivity"
android:padding="15dp">
<TextView
android:layout_width="wrap_content"
android:layout_height="wrap_content"
android:id="@+id/text"
app:layout_constraintTop_toTopOf="parent"
app:layout_constraintRight_toRightOf="parent"
app:layout_constraintLeft_toLeftOf="parent"
app:layout_constraintBottom_toBottomOf="parent"/>
</androidx.constraintlayout.widget.ConstraintLayout>
Post.java
ما باید برای درخواستهای پست، Data را مدل سازی کنیم که در کد زیر میبینید.public class Post implements Parcelable {
@SerializedName("userId")
@Expose()
private int userId;
@SerializedName("id")
@Expose()
private int id;
@SerializedName("title")
@Expose()
private String title;
@SerializedName("body")
@Expose()
private String body;
private List<Comment> comments;
public Post(int userId, int id, String title, String body, List<Comment> comments) {
this.userId = userId;
this.id = id;
this.title = title;
this.body = body;
this.comments = comments;
}
protected Post(Parcel in) {
userId = in.readInt();
id = in.readInt();
title = in.readString();
body = in.readString();
}
public static final Creator<Post> CREATOR = new Creator<Post>() {
@Override
public Post createFromParcel(Parcel in) {
return new Post(in);
}
@Override
public Post[] newArray(int size) {
return new Post[size];
}
};
public int getUserId() {
return userId;
}
public void setUserId(int userId) {
this.userId = userId;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getTitle() {
return title;
}
public void setTitle(String title) {
this.title = title;
}
public String getBody() {
return body;
}
public void setBody(String body) {
this.body = body;
}
public List<Comment> getComments() {
return comments;
}
public void setComments(List<Comment> comments) {
this.comments = comments;
}
@Override
public String toString() {
return "Post{" +
"userId=" + userId +
", id=" + id +
", title='" + title + '\'' +
", body='" + body + '\'' +
'}';
}
@Override
public int describeContents() {
return 0;
}
@Override
public void writeToParcel(Parcel dest, int flags) {
dest.writeInt(userId);
dest.writeInt(id);
dest.writeString(title);
dest.writeString(body);
}
}
RequestApi.java
در اینجا interface methods برای اجرای درخواستهای شبکه با استفاده از Retrofit ارائه شده است.- ()getPosts لیست پستها را بازیابی میکند.
- ()getComments لیستی از نظرات را برای یک پست خاص بازیابی میکند.
public interface RequestApi {
@GET("posts")
Observable<List<Post>> getPosts();
@GET("posts/{id}")
Observable<Post> getPost(
@Path("id") int id
);
}
ServiceGenerator.java
کلاس ServiceGenerator مسئول ایجاد نمونهی Retrofit است.public class ServiceGenerator {
public static final String BASE_URL = "https://jsonplaceholder.typicode.com";
private static Retrofit.Builder retrofitBuilder =
new Retrofit.Builder()
.baseUrl(BASE_URL)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.addConverterFactory(GsonConverterFactory.create());
private static Retrofit retrofit = retrofitBuilder.build();
private static RequestApi requestApi = retrofit.create(RequestApi.class);
public static RequestApi getRequestApi(){
return requestApi;
}
}
RecyclerAdapter.java
یک کلاس RecyclerViewAdapter با یک OnClick interface سفارشی برای تشخیص کلیک به موارد لیست ایجاد میکنیم.public class RecyclerAdapter extends RecyclerView.Adapter<RecyclerAdapter.MyViewHolder> {
private static final String TAG = "RecyclerAdapter";
private List<Post> posts = new ArrayList<>();
private OnPostClickListener onPostClickListener;
public RecyclerAdapter(OnPostClickListener onPostClickListener) {
this.onPostClickListener = onPostClickListener;
}
@NonNull
@Override
public MyViewHolder onCreateViewHolder(@NonNull ViewGroup parent, int viewType) {
View view = LayoutInflater.from(parent.getContext()).inflate(R.layout.layout_post_list_item, null, false);
return new MyViewHolder(view, onPostClickListener);
}
@Override
public void onBindViewHolder(@NonNull MyViewHolder holder, int position) {
holder.bind(posts.get(position));
}
@Override
public int getItemCount() {
return posts.size();
}
public void setPosts(List<Post> posts){
this.posts = posts;
notifyDataSetChanged();
}
public void updatePost(Post post){
posts.set(posts.indexOf(post), post);
notifyItemChanged(posts.indexOf(post));
}
public List<Post> getPosts(){
return posts;
}
public class MyViewHolder extends RecyclerView.ViewHolder implements View.OnClickListener {
OnPostClickListener onPostClickListener;
TextView title;
public MyViewHolder(@NonNull View itemView, OnPostClickListener onPostClickListener) {
super(itemView);
title = itemView.findViewById(R.id.title);
this.onPostClickListener = onPostClickListener;
itemView.setOnClickListener(this);
}
public void bind(Post post){
title.setText(post.getTitle());
}
@Override
public void onClick(View v) {
onPostClickListener.onPostClick(getAdapterPosition());
}
}
public interface OnPostClickListener{
void onPostClick(int position);
}
}
ViewPostActivity.java
در ViewPostActivity کدهای مربوطه را اضافه کنید.public class ViewPostActivity extends AppCompatActivity {
private static final String TAG = "ViewPostActivity";
private TextView text;
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_view_post);
text = findViewById(R.id.text);
getIncomingIntent();
}
private void getIncomingIntent(){
if(getIntent().hasExtra("post")){
Post post = getIntent().getParcelableExtra("post");
text.setText(post.getTitle());
}
}
}
MainActivity.java
- ما از یک شیء PublishSubject استفاده میکنیم تا تشخیص دهیم که یک Post Object از لیست انتخاب شده است. وقتی تنظیم شود، فرایند ایجاد Observable را شروع میکند. میتوانید متد Override شده onPostClick را مشاهده کنید.
- سپس ما از عملگر SwitchMap استفاده میکنیم و یک <Observable<Post را از یک Post Object ایجاد میکنیم. این تضمین میکند که فقط یک Observer در هر زمان معین Observable را Subscribe میکند. بنابراین اگر پست دیگری انتخاب شود، از این رو Observer قبلی از بین میرود، بنابراین این فرآیند را دوباره تنظیم میکند.
- ()takeWhile() ،Interval و ()Filter برای شبیه سازی یک اتصال ضعیف به شبکه هستند. آنها progress bar را تنظیم میکنند و زمان سپری شده را ردیابی میکنند.
- اگر زمان سپری شود، اپراتور ()FlapMap کار را با تبدیل یک Long Object (حاصل عملگر Interval) به یک Observable<Post> Object پایان میدهد.
- و در آخر، Object Post به Observer منتقل میشود.
- بنابراین هر بار که یک پست جدید از لیست انتخاب میشود، فرآیند به دلیل وجود عملگر ()SwitchMap دوباره شروع میشود.
public class MainActivity extends AppCompatActivity implements RecyclerAdapter.OnPostClickListener {
private static final String TAG = "MainActivity";
//ui
private RecyclerView recyclerView;
private ProgressBar progressBar;
// vars
private CompositeDisposable disposables = new CompositeDisposable();
private RecyclerAdapter adapter;
private PublishSubject<Post> publishSubject = PublishSubject.create(); // for selecting a post
private static final int PERIOD = 100;
@Override
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
recyclerView = findViewById(R.id.recycler_view);
progressBar = findViewById(R.id.progress_bar);
initRecyclerView();
retrievePosts();
}
private void initSwitchMapDemo(){
publishSubject
// apply switchmap operator so only one Observable can be used at a time.
// it clears the previous one
.switchMap(new Function<Post, ObservableSource<Post>>() {
@Override
public ObservableSource<Post> apply(final Post post) throws Exception {
return Observable
// simulate slow network speed with interval + takeWhile + filter operators
.interval(PERIOD, TimeUnit.MILLISECONDS)
.subscribeOn(AndroidSchedulers.mainThread())
.takeWhile(new Predicate<Long>() { // stop the process if more than 5 seconds passes
@Override
public boolean test(Long aLong) throws Exception {
Log.d(TAG, "test: " + Thread.currentThread().getName() + ", " + aLong);
progressBar.setMax(3000 - PERIOD);
progressBar.setProgress(Integer.parseInt(String.valueOf((aLong * PERIOD) + PERIOD)));
return aLong <= (3000 / PERIOD);
}
})
.filter(new Predicate<Long>() {
@Override
public boolean test(Long aLong) throws Exception {
return aLong >= (3000 / PERIOD);
}
})
// flatmap to convert Long from the interval operator into a Observable<Post>
.subscribeOn(Schedulers.io())
.flatMap(new Function<Long, ObservableSource<Post>>() {
@Override
public ObservableSource<Post> apply(Long aLong) throws Exception {
return ServiceGenerator.getRequestApi()
.getPost(post.getId());
}
});
}
})
.subscribe(new Observer<Post>() {
@Override
public void onSubscribe(Disposable d) {
disposables.add(d);
}
@Override
public void onNext(Post post) {
Log.d(TAG, "onNext: done.");
navViewPostActivity(post);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: ", e);
}
@Override
public void onComplete() {
}
});
}
private void retrievePosts(){
ServiceGenerator.getRequestApi()
.getPosts()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<List<Post>>() {
@Override
public void onSubscribe(Disposable d) {
disposables.add(d);
}
@Override
public void onNext(List<Post> posts) {
adapter.setPosts(posts);
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: ", e);
}
@Override
public void onComplete() {
}
});
}
@Override
protected void onResume() {
super.onResume();
progressBar.setProgress(0);
initSwitchMapDemo();
}
private void initRecyclerView(){
adapter = new RecyclerAdapter(this);
recyclerView.setLayoutManager(new LinearLayoutManager(this));
recyclerView.setAdapter(adapter);
}
private void navViewPostActivity(Post post){
Intent intent = new Intent(this, ViewPostActivity.class);
intent.putExtra("post", post);
startActivity(intent);
}
@Override
protected void onPause() {
Log.d(TAG, "onPause: called.");
disposables.clear();
super.onPause();
}
@Override
public void onPostClick(final int position) {
Log.d(TAG, "onPostClick: clicked.");
// submit the selected post object to be queried
publishSubject.onNext(adapter.getPosts().get(position));
}
}
توجه: همه کدهای این بخش را میتوانید ببینید.
جمع بندی:
در مقالهی آموزش کامل RxJava در اندروید ، به طور کلی با RxJava و RxAndroid آشنا شدیم. ما با ویژگیها و کاراییهای RxJava و RxAndroid در مثال هایی آشنا شدیم .شما با این آموزش و مثالهای آن از یک توسعه دهنده تازه کار به توسعه دهنده متوسط در RxJava تبدیل میشوید. اگر در این زمینه تجربهای یا سوالی دارید خوشحال میشویم که با ما و کاربران وب سایت سون لرن به اشتراک بگذارید.
بیشتر بدانید:
پیش نیازهای یادگیری برنامه نویسی اندروید
روش کسب درآمد از برنامه نویسی اندروید
برنامه نویسی اندروید چیست؟
منابع این مقاله:
codingwithmitch.com github.com Reactivx.ioاگر به یادگیری بیشتر در زمینهی برنامه نویسی اندروید علاقه داری، با شرکت در دورهی برنامه نویسی اندروید در کمتر از یکسال به یک توسعهدهنده اندروید همه فن حریف تبدیل میشوی که آمادهی استخدام، دریافت پروژه و حتی پیادهسازی اپلیکیشن خودت هستی.