دوره زبان تخصصی برای برنامه‌نویسان (هدیه ویژه ثبت‌نام در دوره‌های متخصص) (فرصت محدود ⏰)
۰ ثانیه
۰ دقیقه
۰ ساعت
۳۱ دیدگاه نظر AmirHossien Heydari
آموزش کامل RxJava در اندروید
سرفصل‌های مقاله
  •  برنامه نویسی واکنشی (Reactive Programming) چیست ؟
  • Reactive Extensions
  • RxJava چیست ؟
  • RxAndroid  چیست ؟
  • مبانی RxJava : Observable، Observer
  • مراحل اساسی
  • معرفی Disposable
  • عملگرهای RxJava Operators) RxJava) 
  • Observers چندگانه و CompositDisposable
  • Flowables
  • ساخت عملگرهای  (Operators : Create، Just، Range، Repeat)
  • ساخت عملگرهای  (Operators : Interval - Timer)
  • ساخت عملگر‌های (fromArray, fromIterable, fromCallable)
  • ساخت عملگر (fromFuture)
  • مقدمه
  • ساخت عملگر (fromPublisher)
  • عملگرهای Filter : Filter
  • عملگرهای Distinct : Filter
  • عملگرهای Take : Filter و TakeWhile
  • عملگرهای تبدیل کننده (Transformation Operators)
  • عملگرهای تبدیل کننده : Map
  • عملگرهای تبدیل کننده: Buffer
  • عملگرهای تبدیل کننده : Debounce
  • عملگرهای تبدیل کننده : ThrottleFirst
  • عملگرهای تبدیل کننده : FlatMap
  • عملگرهای تبدیل کننده : ConcatMap
  • عملگرهای تبدیل کننده : SwitchMap

RxJava مدتی است که معرفی شده و برنامه نویسان حتما با توانایی‌ها و کارکرد آن آشنایی دارند، اما بعضی از برنامه نویسان هنوز کار با Rxjava را شروع نکرده‌اند. اگر یکی از آن‌ها هستید، در مقاله آموزش کامل RxJava در اندروید ، ما قصد داریم مبانی مربوط به RxJava و RxAndroid را از مباحث مبتدی تا پیشرفته آموزش دهیم.

 برنامه نویسی واکنشی (Reactive Programming) چیست ؟

برنامه نویسی واکنشی به‌طور اساسی برنامه نویسی غیرهمزمان مبتنی بر رویداد است. هر چیزی که می‌بینید، یک جریان داده غیرهمزمان است، که می‌تواند مشاهده (Observed) شود و هنگامی که مقادیر منتشر شود، عملی صورت می‌گیرد. می‌توانید جریان داده را از هر چیزی ایجاد کنید: تغییرات متغیر، رویدادهای کلیک، فراخوانی http، ذخیره داده ها، خطاها و ... هنگامی که می‌گویم برنامه نویسی واکنشی همزمان نیست، به معنی این است که هر ماژول کد بر روی Thread خود اجرا می‌شود، بنابراین چندین بلوک کد را همزمان اجرا می‌کند. یک مزیت رویکرد غیرهمزمان به این دلیل است که هر کار بر روی Thread خاص خود اجرا می‎شود، همه کارها می‌توانند همزمان شروع شوند و مدت زمان طول کشیدن تمام کارها، معادل کار طولانی‌تر در لیست است. وقتی صحبت از برنامه‌های تلفن همراه است، با شروع کار در (Background Thread)، می‌توانید بدون مسدود کردن Thread اصلی، به تجربه کاربری یکپارچه دست پیدا کنید.

اگر به مباحث برنامه نویسی اندروید علاقه مند هستید پیشنهاد ما دوره‌های آموزش برنامه نویسی اندروید با جاوا و دوره آموزش فلاتر سون لرن است که هر دو توسط یکی از مجرب‌ترین اساتید اندروید ایران تدریس شده است.

یک مثال ساده

x می‌تواند برابر با  y + z  (x = y + z) باشد؛ که در آن مقدار y و z به x اختصاص داده می‌شود. در برنامه نویسی واکنشی (Reactive Programming)، هنگامی که مقدار y تغییر می‌کند ، مقدار x بدون اجرای مجدد عبارت x = y + z به طور خودکار به روز می‌شود. این می‌تواند با مشاهده (Observe) مقدار y یا z حاصل شود. یک لیست آرایه می‌تواند یک جریان داده باشد و می‌توان هر بخشی را که از آن منتشر می‌شود، عملی کرد. ممکن است شما بخواهید شماره‌های زوج را فیلتر کنید و اعداد فرد را نادیده بگیرید. این کار را می‌توان با استفاده از حلقه‌های معمولی و عبارات شرطی انجام داد، اما در برنامه نویسی واکنشی می‌توانید به روش کاملاً متفاوت به این هدف دست پیدا کنید. هنگامی که برنامه خود را در برنامه نویسی واکنشی (Reactive Programming) شروع می‌کنید، نحوه طراحی معماری و نحوه نوشتن کد به طور کامل تغییر می‌کند. حتی بیشتر قدرت می‌یابد وقتی با Clean Architecture ، MVP Architecture ، MVVM Architecture و سایر Design Patterns آشنا شوید.

Reactive Extensions

ReactiveX یا RX کتابخانه‌ای است که از اصول برنامه نویسی واکنش پذیر پیروی می‌کند، یعنی با استفاده از توالی قابل مشاهده (observable sequence) برنامه هایی غیرهمزمان و مبتنی بر رویدادها می‌سازد. Reactive Extensions در چندین زبان (C++ (RxCpp)، C# (Rx.NET ، جاوا (RxJava) ، کاتلین (RxKotlin)، سویفت (Swift (RxSwift) و موارد دیگر موجود است. ما در این مقاله به طور خاص به توضیح RxJava و RxAndroid می‌پردازیم.

RxJava چیست ؟

در ادامه‌ی مقاله‌ی آموزش کامل RxJava در اندروید ، به شما توضیح می‌دهیم که rxjava چیست؟ RxJava پیاده سازی جاوا از Reactive Extensions است. در واقع این کتابخانه با دنبال کردن الگوی Observer، رویدادهای غیرهمزمان را می‌سازد. می‌توانید جریان داده‌ای غیرهمزمان را در هر Thread ایجاد کنید، داده‌ها را تغییر داده و توسط هر Observer در هر Thread استفاده کنید. این کتابخانه طیف گسترده ای از عملگرهای شگفت انگیز مانند نقشه ، ترکیب ، ادغام ، فیلتر و موارد دیگر را ارائه می‌دهد که می‌توانند در جریان داده استفاده شوند. این کتابخانه طیف گسترده‌ای از عملگرهای شگفت انگیز مانند map ، combine ، merge ، filter و موارد دیگر را ارائه می‌دهد که می‌توانند در جریان داده استفاده شوند. هنگامی که شروع به کار کنید و مثال‌ها یا کد‌های واقعی بنویسید، اطلاعات شما درباره اپراتورها و تحولات بیشتر شده و بهتر آن را درک می‌کنید.

RxAndroid  چیست ؟

در این بخش مقاله‌ی آموزش کامل RxJava در اندروید ، به شما توضیح می‌دهیم که RxAndroid چیست؟ RxAndroid مخصوص به برنامه‌های اندرویدی (Android Platform) است که دارای چند کلاس اضافه در RxJava است. به طور خاص،  Schedulers در (()RxAndroid (AndroidSchedulers.mainThread  معرفی شده که نقش عمده ای در پشتیبانی از مفهوم  multithreading در برنامه‌های اندرویدی دارد. Schedulers به‌طور اساسی تصمیم می‌گیرد Thread مورد نظر که کد خاصی دارد بر روی Thread  Background اجرا شود یا بر روی Main Thread  اجرا شود. جدا از آن، هر آنچه ما استفاده می‌کنیم فقط از کتابخانه RxJava است. برنامه‌های در دسترس بسیاری وجود دارد، به‌طور گسترده در برنامه نویسی اندروید از ()Schedulers.io و ()AndroidSchedulers.mainThread استفاده می‌شود. در زیر لیستی از Schedulers موجود و معرفی مختصر آن‌ها آورده شده است.

()io:

این مورد از Schedulers برای انجام عملیات غیر فشرده Central Processing Unit) CPU) مانند برقراری تماس‌های شبکه، خواندن دیسک / فایل ها، عملیات پایگاه داده و غیره مورد استفاده قرار می‌گیرد.

()mainThread:

این مورد از Schedulers امکان دسترسی به Main Thread /Thread UI را در اندروید فراهم می‌کند. معمولاً عملیاتی مانند به روزرسانی UI، فعل و انفعالات کاربر در این موضوع اتفاق می‌افتد. ما نباید عملیات فشرده ای را در مورد این موضوع انجام دهیم زیرا در اپلیکیشن باعث glitchy یا ANR می‌شود.

()newThread:

با استفاده از این مورد از Schedulers، هر زمان که یک کار انجام می‌شود ، Thread جدیدی ایجاد می‌شود. معمولاً توصیه می‌شود از این Schedulers استفاده نکنید، مگر اینکه عملیاتی طولانی انجام شود. Threadهای ایجاد شده از طریق ()newThread: مورد استفاده مجدد قرار نمی‌گیرند.

()computation

این مورد از Schedulers می‌توان برای انجام عملیات فشرده CPU مانند پردازش داده‌های عظیم، پردازش bitmap و غیره استفاده کرد، تعداد Threadهای ایجاد شده با استفاده از این Scheduler به طور کامل به تعداد هسته‌های موجود CPU بستگی دارد.

()single:

این مورد از Schedulers همه وظایف را به ترتیبی که اضافه شده است، انجام می‌دهد. این مورد می‌تواند در صورت لزوم اجرای متوالی مورد استفاده قرار گیرد.

()immediate:

این Scheduler بلافاصله با مسدود کردن Thread اصلی (MainThread)، کار را به صورت همزمان انجام می‌دهد.

()trampoline:

این Scheduler وظایفش را به روش First In - First Out انجام می‌دهد. تمام کارهای برنامه ریزی شده با محدود کردن تعداد Background Threads به یک، به صورت یک به یک انجام می‌شوند.

()from:

این مورد از Schedulers به ما این امکان را می‌دهد که با محدود کردن تعداد Threadهایی که ایجاد می‌شود، Scheduler را از یک executor ایجاد کنیم. وقتی Thread Pool اشغال شد، وظایف صف می‌شوند. اکنون ما مفاهیم اساسی لازم را داریم. بیایید با برخی از مفاهیم اصلی RxJava که همه باید از آن آگاه باشند، شروع کنیم.

مبانی RxJava : Observable، Observer

RxJava در مورد دو components مهم است:  Observable و  Observer علاوه بر این موارد، موارد دیگری مانندSchedulers ، Operators و Subscription نیز وجود دارد. Observable :Observable یک جریان داده است که برخی از کارها را انجام می‌دهد و داده را منتشر می‌کند. Observer :Observer بخش مهمی از Observable است. Observer داده‌های منتشر شده توسط Observable را دریافت می‌کند. Subscription: پیوند بین Observable و Observer به عنوان Subscription خوانده می‌شود. چندین Observers کننده در یک Observable می‌توانند مشترک باشند. Operator / Transformation: اپراتورها داده‌های منتشر شده توسط Observable را قبل از اینکه Observer آنها را دریافت کند، تغییر می‌دهد. Schedulers :Schedulerها Threadها را تعیین و تصمیم گیری می‌کنند که Observable باید داده‌ها را منتشر کند و Observer کننده باید داده‌ها را از قبیل Background Thread ،Main Thread و... دریافت کند.

نمونه پایه‌ای از RxJava

اکنون ما در مورد برنامه نویسی Reactive ، RxJava و RxAndroid دانش نظری خوبی داریم. بیایید به چند نمونه کد برویم تا بهتر مفاهیم را درک کنیم.

اضافه کردن وابستگی‌ها (Dependencies)

برای شروع، باید وابستگی‌های RxJava و RxAndroid را به پروژه‌های خود در build.gradle اضافه کنید و پروژه را همگام سازی (sync) کنید.

// RxJava
implementation 'io.reactivex.rxjava2:rxjava:2.1.9'
// RxAndroid
implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'

مراحل اساسی

Observable -1 ایجاد می‌کنیم که داده را منتشر ‌کند. در زیر ما یک Observable ایجاد کرده‌ ایم که لیستی از اسامی حیوانات را منتشر می‌کند. در اینجا از اپراتور ()just برای انتشار چند نام حیوان استفاده می‌شود.

Observable<String> animalsObservable = Observable.just("Eagle", "Bee", "Lion", "Dog", "Wolf");

Observer -2 ایجاد کنید که به Observable گوش دهد. Observer متدهای Interface زیر را برای شناخت وضعیت Observable ارائه می‌دهد.

  • ()onSubscribe : وقتی Observer عضو Observable شود، این متد فراخوانی می‌شود.
  • ()onNext : این متد هنگامی که Observable شروع به انتشار اطلاعات می‌کند، فراخوانی می‌شود.
  • ()onError : در صورت بروز هرگونه خطا، متد onError () فراخوانی می‌شود.
  • ()onComplete : هنگامی که یک Observable کننده انتشار همه موارد را انجام می‌دهد،()onComplete فراخوانی می‌شود.

RxJava

Observer<String> animalsObserver = getAnimalsObserver();
private Observer<String> getAnimalsObserver() {
        return new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }
            @Override
            public void onNext(String s) {
                Log.d(TAG, "Name: " + s);
            }
            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: " + e.getMessage());
            }
            @Override
            public void onComplete() {
                Log.d(TAG, "All items are emitted!");
            }
        };
    }

Observer -3 را در عضویت Observable قرار دهید تا بتواند شروع به دریافت داده کند. در اینجا، شما می‌توانید دو روش دیگر،  ()ObserveOn و ()subscribeOn را مشاهده کنید.

  • (()SubscribOn (Schedulers.io : این مورد به  Observable می‌گوید که وظیفه خود را بر روی یک                     Background Thread اجرا کند.
  • (()observeOn (AndroidSchedulers.mainThread : این مورد به Observer می‌گوید که داده‌های مربوط به Android Ui Thread را دریافت کند تا بتوانید هرگونه کار مرتبط با UI (رابط کاربری) را انجام دهید.
animalsObservable
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(animalsObserver);

اگر برنامه را اجرا کنید، می‌توانید خروجی زیر را در LogCat خود مشاهده کنید.

onSubscribe Name: Eagle Name: Bee Name: Lion Name: Dog Name: Wolf All items are emitted!

شما اولین برنامه RxJava خود را به همین راحتی نوشتید. ما در ادامه قصد داریم اطلاعات بیشتری درباره Schedulers و Observers کسب کنیم. اما در حال حاضر این اطلاعات برای شروع کافی است.

اساس Observable و Observer

در اینجا کد کاملی از مثال بالا آورده شده است. فعالیت را اجرا کنید و خروجی را در LogCat بررسی کنید.

import android.support.v7.app.AppCompatActivity;
import android.os.Bundle;
import android.util.Log;
import info.androidhive.rxandroidexamples.R;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.Disposable;
import io.reactivex.schedulers.Schedulers;
public class Example1Activity extends AppCompatActivity {
    /**
     * Basic Observable, Observer, Subscriber example
     * Observable emits list of animal names
     */
    private static final String TAG = Example1Activity.class.getSimpleName();
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_example1);
        // observable
        Observable<String> animalsObservable = getAnimalsObservable();
        // observer
        Observer<String> animalsObserver = getAnimalsObserver();
        // observer subscribing to observable
        animalsObservable
                .observeOn(Schedulers.io())
                .subscribeOn(AndroidSchedulers.mainThread())
                .subscribe(animalsObserver);
    }
    private Observer<String> getAnimalsObserver() {
        return new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }
            @Override
            public void onNext(String s) {
                Log.d(TAG, "Name: " + s);
            }
            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: " + e.getMessage());
            }
            @Override
            public void onComplete() {
                Log.d(TAG, "All items are emitted!");
            }
        };
    }
    private Observable<String> getAnimalsObservable() {
        return Observable.just("Eagle", "Bee", "Lion", "Dog", "Wolf");
    }
}

onSubscribe Name: Eagle Name: Bee Name: Lion Name: Dog Name: Wolf All items are emitted!

معرفی Disposable

در این مثال می‌خواهیم component  جدیدی به نام Disposable را معرفی کنیم. Disposable: زمانی که Observer دیگر نمی‌خواهد منتظر رسیدن آیتم‌ها از Observable باشد، به‌طوری دیگر به آن گوش ندهد، از Disposable برای این کار استفاده می‌شود. در اندروید Disposable برای جلوگیری از memory leaks بسیار مفید است. برای مثال فرض کنیم که شما در حال فراخوانی طولانی شبکه هستید و رابط کاربری (UI) را به روز می‌کنید. زمانی که فراخوانی شبکه تکمیل و کار خود را تمام می‌کند، اگر Activity / Fragment از قبل از بین رفته (Destroyed) باشد، چون اشتراک Observer هنوز باقی است، سعی می‌کند Activity از قبل Destroyed شده را به روز کند. در این حالت می‌تواند باعث memory leaks شود. بنابراین با استفاده از Disposables، می‌توان در صورت از بین رفتن Activity از این مورد جلوگیری کرد. در مثال زیر می‌توانید مشاهده کنید که از Disposable استفاده می‌شود و فراخوانی ()disposable.dispose در ()onDestroy باعث اشتراک نداشتن با Observer می‌شود.

import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import info.androidhive.rxandroidexamples.R;
import io.reactivex.Observable;
import io.reactivex.Observer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.observers.DisposableObserver;
import io.reactivex.schedulers.Schedulers;
public class Example2Activity extends AppCompatActivity {
    /**
     * Basic Observable, Observer, Subscriber example
     * Observable emits list of animal names
     * You can see Disposable introduced in this example
     */
    private static final String TAG = Example2Activity.class.getSimpleName();
    private Disposable disposable;
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_example2);
        // observable
        Observable<String> animalsObservable = getAnimalsObservable();
        // observer
        Observer<String> animalsObserver = getAnimalsObserver();
        // observer subscribing to observable
        animalsObservable
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribeWith(animalsObserver);
    }
    private Observer<String> getAnimalsObserver() {
        return new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
                disposable = d;
            }
            @Override
            public void onNext(String s) {
                Log.d(TAG, "Name: " + s);
            }
            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: " + e.getMessage());
            }
            @Override
            public void onComplete() {
                Log.d(TAG, "All items are emitted!");
            }
        };
    }
    private Observable<String> getAnimalsObservable() {
        return Observable.just("Eagle", "Bee", "Lion", "Dog", "Wolf");
    }
    @Override
    protected void onDestroy() {
        super.onDestroy();
        // don't send events once the activity is destroyed
        disposable.dispose();
    }
}

این کد همان خروجی مشابه نمونه قبلی را تولید می‌کند.

onSubscribe Name: Eagle Name: Bee Name: Lion Name: Dog Name: Wolf All items are emitted!

عملگرهای RxJava Operators) RxJava) 

در این بخش مقاله‌ی آموزش کامل RxJava در اندروید ، عملگرهای RxJava Operators) RxJava) را برای شما شرح می‌دهیم.

  •  Introduction
  • Map
  • Buffer
  • Debounce
  • ThrottleFirst
  • FlatMap
  • ConcatMap
  • SwitchMap

معرفی(Introduction)
کار یک اپراتور به شرح زیر است : اپراتورها مواردی را که توسط یک Observable منتشر می‌شود را تبدیل می‌کنند. در زیر برخی از متداول‌ترین اپراتورهای دگرگونی وجود دارد که فرض می‌کنیم شما از آن‌ها استفاده می‌کنید.

Map

یک تابع را برای هر مورد منتشر شده اعمال می‌کند. با استفاده از تابعی از آن، هر مورد منتشر شده را تغییر می‌دهد. (نظم ارسال حفظ می‌شود) نمودار Map : آموزش کامل RxJava در اندروید

Buffer

به‌طور دوره ای items را از یک Observable به صورت bundles جمع می‌کند و به جای اینکه یکبار items را منتشر کند، bundles را منتشر می‌کند. (نظم ارسال حفظ می‌شود) نمودار : آموزش کامل RxJava در اندروید

Debounce

اگر یک زمان خاص گذشته باشد، یک Item را از یک Observable منتشر می‌کند بدون آنکه یک Item دیگر را منتشر کند. این عملگر برای کلیک دکمه (button clicks) بسیار عالی است. اگر کاربر بارها و بارها یک دکمه را کلیک کند، دیگر نیازی به اجرای چندین بار کار نیست. می‌توانیم از اپراتور ()Debounce استفاده کنیم تا با معرفی یک بازه زمانی مجاز بین کلیک، کلیک‌های خود را کنترل کنیم. اگر مدت زمانی سپری نشده باشد، می‌توانیم از هر دو متد جلوگیری کنیم. (نظم ارسال حفظ می‌شود) نمودار :

ThrottleFirst

موارد منتشر شده توسط منبع Observable را که در یک بازه زمانی قرار دارند، فیلتر می‌کند.(نظم ارسال حفظ می‌شود) نمودار : آموزش کامل RxJava در اندروید

FlatMap

موارد منتشر شده توسط یک Observable را به Observables تبدیل می‌کند، و سپس انتشار از آن را به یک Observable Single قسمت می‌کند. اگر با LiveData آشنا باشید، MediatorLiveData می‌تواند کاری بسیار مشابه انجام دهد. در مورد ()FlatMap در ادامه بیشتر صحبت می‌کنیم. (نظم ارسال حفظ نمی‌شود) نمودار : آموزش کامل RxJava در اندروید

ConcatMap

موارد منتشر شده توسط یک Observable را به Observables تبدیل می‌کند. این در اصل همان مورد ()FlatMap است، اما نظم ارسال حفظ می‌شود. اما از آنجا که ()ConcatMap باید منتظر بماند تا هر یک از Observable کار خود را انجام دهند پس از نظر فنی آن غیر همزمان نیست. (نظم ارسال حفظ می‌شود) نمودار : آموزش کامل RxJava در اندروید

SwitchMap

()SwitchMap آیتم‌های منتشر شده توسط یک Observable را به یک Observable تبدیل می‌کند درست مثل ()ConcatMap  و ()FlatMap. تفاوت این است که به محض مشترک شدن یک Observer جدید، Observer  قبلی را لغو می‌کند. اساساً ()SwitchMap  محدودیتی را حل می‌کند که ()ConcatMap و ()FlatMap هم دارند.(نظم ارسال حفظ می‌شود) نمودار :

Observers چندگانه و CompositDisposable

یک مورد را در نظر بگیرید که دارای چندین Observables  و Observers باشد. Dispose کردن آنها یک به یک در Destroy یک کار خسته کننده است و ممکن است در معرض خطا باشد به این دلیل که شاید شما Dispose کردن را فراموش کنید. در این حالت می‌توانیم از CompositeDisposable استفاده کنیم.

CompositeDisposable 

CompositeDisposable می‌تواند لیست اشتراک‌ها (subscriptions) را در یک استخر حفظ کند و می‌تواند همه آن‌ها را یک باره Dispose کند. معمولاً ما ()compositeDisposable.clear را در ()onDestroy فراخوانی می‌کنیم اما شما می‌توانید جای دیگری که خودتان می‌خواهید آن را فراخوانی کنید. در مثال زیر، شما می‌توانید متوجه شوید که دو animalsObserver ،Observer و animalsObserverAllCaps در همان Observable مشترک هستند. هر دو Observer داده‌های یکسانی را دریافت می‌کنند اما داده‌ها با اعمال اپراتورهای مختلف در stream تغییر می‌کنند.

  • AnimalObserver : از عملگر ()filter برای فیلتر کردن نام حیوانات با حرف "b" استفاده می‌شود.
  • AnimalObserverAllCaps : عملگر ()filter برای فیلتر کردن نام حیوانات با شروع حرف "c" استفاده می‌شود. از نقشه عملگر map() بعدی برای تبدیل نام هر حیوانی به حروف بزرگ استفاده می‌شود. استفاده از چندین اپراتور روی یک observer  مفرد، زنجیره عملگرها نام (chaining of operators) دارد.
import android.os.Bundle;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import info.androidhive.rxandroidexamples.R;
import io.reactivex.Observable;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.reactivex.observers.DisposableObserver;
import io.reactivex.schedulers.Schedulers;
public class Example4Activity extends AppCompatActivity {
    /**
     * Basic Observable, Observer, Subscriber example
     * Observable emits list of animal names
     * You can see filter() operator is used to filter out the
     * animal names that starts with letter `b`
     */
    private static final String TAG = Example4Activity.class.getSimpleName();
    private CompositeDisposable compositeDisposable = new CompositeDisposable();
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_example4);
        Observable<String> animalsObservable = getAnimalsObservable();
        DisposableObserver<String> animalsObserver = getAnimalsObserver();
        DisposableObserver<String> animalsObserverAllCaps = getAnimalsAllCapsObserver();
        /**
         * filter() is used to filter out the animal names starting with `b`
         * */
        compositeDisposable.add(
                animalsObservable
                        .subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread())
                        .filter(new Predicate<String>() {
                            @Override
                            public boolean test(String s) throws Exception {
                                return s.toLowerCase().startsWith("b");
                            }
                        })
                        .subscribeWith(animalsObserver));
        /**
         * filter() is used to filter out the animal names starting with 'c'
         * map() is used to transform all the characters to UPPER case
         * */
        compositeDisposable.add(
                animalsObservable
                        .subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread())
                        .filter(new Predicate<String>() {
                            @Override
                            public boolean test(String s) throws Exception {
                                return s.toLowerCase().startsWith("c");
                            }
                        })
                        .map(new Function<String, String>() {
                            @Override
                            public String apply(String s) throws Exception {
                                return s.toUpperCase();
                            }
                        })
                        .subscribeWith(animalsObserverAllCaps));
    }
    private DisposableObserver<String> getAnimalsObserver() {
        return new DisposableObserver<String>() {
            @Override
            public void onNext(String s) {
                Log.d(TAG, "Name: " + s);
            }
            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: " + e.getMessage());
            }
            @Override
            public void onComplete() {
                Log.d(TAG, "All items are emitted!");
            }
        };
    }
    private DisposableObserver<String> getAnimalsAllCapsObserver() {
        return new DisposableObserver<String>() {
            @Override
            public void onNext(String s) {
                Log.d(TAG, "Name: " + s);
            }
            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: " + e.getMessage());
            }
            @Override
            public void onComplete() {
                Log.d(TAG, "All items are emitted!");
            }
        };
    }
    private Observable<String> getAnimalsObservable() {
        return Observable.fromArray(
                "Ant", "Ape",
                "Bat", "Bee", "Bear", "Butterfly",
                "Cat", "Crab", "Cod",
                "Dog", "Dove",
                "Fox", "Frog");
    }
    @Override
    protected void onDestroy() {
        super.onDestroy();
        // don't send events once the activity is destroyed
        compositeDisposable.clear();
    }
}

اگر این مثال را اجرا کنید، می‌توانید خروجی زیر را مشاهده کنید. در اینجا نام حیواناتی که با "B" شروع می‌شوند توسط animalsObserver ،Observed می‌شوند و همه نام‌های شروع شده با حرف "c" و همه animalsObserverAllCaps با Observed می‌شوند.

Name: Bat Name: Bee Name: Bear Name: Butterfly All items are emitted! Name: CAT Name: CRAB Name: COD All items are emitted!

Flowables

  • Flowable چیست؟
  • Backpressure
  • Backpressure Strategies
  • Flowables and Observables

ما می‌خواهیم با Flowables شروع کنیم و به طور کلی آنچه را که باید در مورد آن‌ها بدانید تشریح دهیم.

Flowables موضوع قابل توجهی است. از آنجا که این یک دوره مبتدی است، ما نمی‌خواهیم به جزییات Flowables بپردازیم. ما در انتهای این مقاله منابعی را درج خواهیم کرد که در صورت تمایل می‌توانید اطلاعات بیشتری درباره Flowables پیدا کنید (که توصیه می‌کنیم آن‌ها را بررسی کنید).

در اصل هدف این بخش از مقاله ارائه توضیحی در مورد مهم‌ترین مفاهیم مربوط به Flowables است تا شما بدانید که چه اطلاعاتی را در کجا جستجو کنید.

Flowable چیست ؟

Flowables در RxJava2 به عنوان راه حلی برای یک مشکل معرفی شد. مشکل مربوط به backpressure بود. اگر نمی‌دانید backpressure چیست، در بخش بعدی در مورد آن صحبت خواهیم کرد.

به‌طور کلی شما در RxJava2 به جای یک کلاس از Observable دو کلاس از آن را دارید:

  1. Observable
  2. Flowable

خاصیتی که یکی را از دیگری متمایز می‌کند backpressure است.

  1. Observables are not backpressure-aware 
  2. Flowables are backpressure-aware 

backpressure

از اسناد Rx: backpressure زمانی است که در یک خط طولانی پردازشی "Flowable" نمی‌تواند مقادیر را با سرعت کافی پردازش کند و به روشی برای کاهش سرعت تولیدکننده بالادست احتیاج دارد. مانند این است که اگر داده‌های زیادی را برای Observerها پخش می‌کنید و Observerها نتوانند از پس آن برآیند، در موقعیت هایی مانند این ممکن است باعث Out of Memory شود. دستگاه به معنای واقعی کلمه نمی‌تواند اطلاعات ورودی را به اندازه کافی سریع اداره کند. در رابطه با backpressure، برخی اصطلاحات وجود دارد که باید در مورد آن‌ها بدانید: منابع داغ و منابع سرد (Hot sources و Cold sources).

  • Hot sources: به این عنوان به شکل یک فشار نسبی فکر کنید. Observables بدون در نظر گرفتن اینکه آیا آن‌ها می‌توانند به کار خود ادامه دهند یا خیر، هدایت اشیا را به سمت Observers ادامه می‌دهند. در اصل Observables اشیاء را منتشر می‌کنند و این امر به عهده Observers است که به کار خود ادامه دهند. هنگامی که Observables نتواند اشیای منتشر شده را نگهداری کند، بایدبافر شوند یا به طریقی آن‌ها handle شوند. در ادامه در مورد برخی استراتژی‌های بافر صحبت خواهیم کرد.
  • Cold sources: این مورد برخلاف Hot sources است. می‌توان گفت که اشیاء توسط Observables به صورت lazily منتشر می‌شوند. به معنی اینکه: Observables هنگامی که Observerها اشیاء را می‌خواهند و با یک نرخ مناسب برای Observer، اشیاء را منتشر می‌کنند. اشیاء منتشر شده توسط Observables  نیازی به بافر ندارند زیرا تمام فرایند به‌طور کلی در اختیار Observables است. به آن به عنوان یک رابطه کششی نگاه کنید.Observerها وقتی اشیاء یا آیتم‌ها را می‌خواهند می‌توانند از  Observables آن‌ها را دریافت کنند.

Backpressure Strategies

همان طور که در بخش بالا گفتیم، Hot sources نیاز به یک استراتژی بافر دارند. تعدادی از اپراتورهای backpressure وجود دارد که می‌توانید برای اجرای استراتژی‌های مختلف بافر، در Flowables به کار ببرید.

استراتژی 1 ()onBackpressureBuffer :

این اپراتور بافر نامحدود را بین منبع upstream و اپراتور downstream معرفی می‌کند.

حال 'نامحدود' به چه معنی است؟

"نامحدود بودن به معنای عدم وجود JVM در حافظه است، می‌تواند با هر هزینه ای که از یک منبع پراکنده حاصل می‌شود، آن را handle کند.

مثال onBackPressure:

حتی با وجود 1 میلیون عدد صحیح ، این مورد باعث خارج شدن از حافظه (Out of Memory Exception) نمی‌شود.

Flowable.range(0, 1000000)
    .onBackpressureBuffer()
    .observeOn(Schedulers.computation())
    .subscribe(new FlowableSubscriber<Integer>() {
        @Override
        public void onSubscribe(Subscription s) {
        }
        @Override
        public void onNext(Integer integer) {
            Log.d(TAG, "onNext: " + integer);
        }
        @Override
        public void onError(Throwable t) {
            Log.e(TAG, "onError: ", t);
        }
        @Override
        public void onComplete() {
        }
    });

سایر استراتژی ها: 

این مقاله RxJava 2 را در مورد backpressure  مشاهده کنید: RxJava2 Backpressure.

Observable و Flowable

Flowables قابل تبدیل به Observables و Observables قابل تبدیل به Flowables هستند.

Observable to Flowable: 

هنگام تبدیل یک Observable به یک Flowable ، باید یک استراتژی بافر را مشخص کنید.

  • BackpressureStrategy.BUFFER
  • BackpressureStrategy.DROP
  • BackpressureStrategy.ERROR
  • BackpressureStrategy.LATEST
  • BackpressureStrategy.MISSING
Observable<Integer> observable = Observable
        .just(1, 2, 3, 4, 5);
Flowable<Integer> flowable = observable.toFlowable(BackpressureStrategy.BUFFER);

Flowable to Observable: 

با فراخوانی متد ()toObservable روی Flowable می‌توانید یک Flowable را به یک Observable تبدیل کنید.

Observable<Integer> observable = Observable
        .just(1, 2, 3, 4, 5);
Flowable<Integer> flowable = observable.toFlowable(BackpressureStrategy.BUFFER);
Observable<Integer> backToObservable = flowable.toObservable(

ساخت عملگرهای  (Operators : Create، Just، Range، Repeat)

  • مقدمه
  • چرا باید این اپراتورها برای شما مهم باشد
  • ()create
  • ()just
  • ()range
  • ()repeat

مقدمه

در این بخش از مقاله می‌خواهیم به شما چهار اپراتور را آموزش دهیم که کارشان ایجاد Observables است.

  1. ()create
  2. ()just
  3. ()range
  4. ()repeat

چرا باید این اپراتورها برای شما مهم باشند

اگر می‌خواهید یک Single Observable ایجاد کنید، می‌توانید از اپراتورهای "()just" و "()create" استفاده کنید. اپراتور ()just توانایی پذیرش لیستی تا 10 ورودی را دارد. با این حال، ما امتیازی برای آن نمی‌بینیم زیرا اپراتورهای دیگری نیز وجود دارند که لیست را می‌پذیرند و فقط به 10 ورودی محدود نمی‌شوند. بنابراین ما فقط برای ایجاد یک Single Observable قابل استفاده، آن را به شما پیشنهاد می‌کنیم.

()range و ()repeat برای جایگزین کردن حلقه‌ها (loops)، هر پردازش تکراری یا متد تکرار شونده عالی هستند. می‌توانید کارها را روی یک Background Thread انجام دهید و نتایج را در Main Thread مشاهده کنید.

()create

Input : T

<Output : Observable<T

اپراتور ()create به طور مشخصی برای ایجاد Observables استفاده می‌شود. این سطحی‌ترین مورد استفاده است اما همچنین می‌تواند انعطاف پذیر باشد. اگر می‌خواهید یک Observeable ایجاد کنید و هیچ یک از اپراتورهای دیگر متناسب با نیازهای شما نیستند، اپراتور ()create را در نظر بگیرید.

ما می‌خواهیم به شما دو روش مختلف برای ایجاد یک Observable با استفاده از اپراتور ()create را نشان دهیم.

  • ایجاد یک Observable از یک single object
  • ایجاد یک Observable از لیستی از اشیاء

ایجاد یک Observable از یک single object

  1. یک شیء را برای تبدیل شدن به یک Observable تعریف کنید.
  2. یک Observable ایجاد کنید.
  3. Observable را Subscribe کرده و شیء منتشر شده را دریافت کنید.
// Instantiate the object to become an Observable
final Task task = new Task("Walk the dog", false, 4);
// Create the Observable
Observable<Task> singleTaskObservable = Observable
        .create(new ObservableOnSubscribe<Task>() {
            @Override
            public void subscribe(ObservableEmitter<Task> emitter) throws Exception {
                if(!emitter.isDisposed()){
                    emitter.onNext(task);
                    emitter.onComplete();
                }
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());
// Subscribe to the Observable and get the emitted object
singleTaskObservable.subscribe(new Observer<Task>() {
    @Override
    public void onSubscribe(Disposable d) {
    }
    @Override
    public void onNext(Task task) {
        Log.d(TAG, "onNext: single task: " + task.getDescription());
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onComplete() {
    }
});

خروجی کد بالا به صورت زیر می‌باشد.

MainActivity: onNext: single task: Walk the dog

ایجاد یک Observable از لیستی از اشیاء

  1. Observable را ایجاد کنید.
  2. در داخل متد Subscribe از طریق لیست Taskهای تکرار شده می‌توانید ()onNext را فراخوانی کنید.
  3. پس از اتمام حلقه (Loop)، می‌توانید متد ()onComplete را فراخوانی کنید.
  4. Observable را Subscribe کرده و شیء منتشر شده را دریافت کنید.
// Create the Observable
Observable<Task> taskListObservable = Observable
    .create(new ObservableOnSubscribe<Task>() {
        @Override
        public void subscribe(ObservableEmitter<Task> emitter) throws Exception {
            // Inside the subscribe method iterate through the list of tasks and call onNext(task)
            for(Task task: DataSource.createTasksList()){
                if(!emitter.isDisposed()){
                    emitter.onNext(task);
                }
            }
            // Once the loop is complete, call the onComplete() method
            if(!emitter.isDisposed()){
                emitter.onComplete();
            }
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread());
// Subscribe to the Observable and get the emitted objects
taskListObservable.subscribe(new Observer<Task>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Task task) {
    Log.d(TAG, "onNext: task list: " + task.getDescription());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});

خروجی کد بالا به صورت زیر می‌باشد.

MainActivity: onNext: task list: Take out the trash MainActivity: onNext: task list: Walk the dog MainActivity: onNext: task list: Make my bed MainActivity: onNext: task list: Unload the dishwasher MainActivity: onNext: task list: Make dinner

()just

([Input : T... (Optional Array[10 <Output : Observable<T در این مثال ما از اپراتور "()just" استفاده می‌کنیم و مجموعه داده ای از 10 رشته (String) را منتقل می‌کنیم. توجه : شما می‌توانید حداکثر 10 اشیاء را به اپراتور "()just" منتقل کنید و نمی‌توانید همیشه از آن استفاده کنید.

Observable.just("first", "second", "third", "fourth", "fifth", "sixth",
          "seventh", "eighth", "ninth", "tenth")
          .subscribeOn(Schedulers.io()) // What thread to do the work on
          .observeOn(AndroidSchedulers.mainThread()) // What thread to observe the results on
          .subscribe(new Observer<String>() { // view the results by creating a new observer
              @Override
              public void onSubscribe(Disposable d) {
                  Log.d(TAG, "onSubscribe: called");
              }
              @Override
              public void onNext(String s) {
                  Log.d(TAG, "onNext: " + s);
              }
              @Override
              public void onError(Throwable e) {
                  Log.e(TAG, "onError: ", e);
              }
              @Override
              public void onComplete() {
                  Log.d(TAG, "onComplete: done...");
              }
          });

خروجی کد بالا به صورت زیر می‌باشد.

onNext: first onNext: second onNext: third onNext: fourth onNext: fifth onNext: sixth onNext: seventh onNext: eighth onNext: ninth onNext: tenth

به خاطر داشته باشید که این مثال به هیچ وجه عملی نیست، اما همچنان دارای ارزش است زیرا تمام عملیات از همان ساختار کلی پیروی می‌کنند. بنابراین ما از آن برای نشان دادن ساختار کلی عملیات استفاده می‌کنیم.

()range

[Input: [x, x+1, ..., x + y <Output: Observable<Integer ()range بسیار ساده و به راحتی قابل درک است. محدوده ای از اشیاء را منتشر می‌کند. به عنوان ورودی، مقدار min و مقدار max را وارد می‌کنید. تمام مقادیر موجود در این محدوده را منتشر می‌کند.

Observable.range(0,11)
        .observeOn(Schedulers.io())
        .subscribeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
            }
            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: " + integer);
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onComplete() {
            }
        });

خروجی کد بالا به صورت زیر می‌باشد.

onNext: 0 onNext: 1 onNext: 2 onNext: 3 onNext: 4 onNext: 5 onNext: 6 onNext: 7 onNext: 8 onNext: 9 onNext: 10

()repeat

()repeat یکی دیگر از اپراتورها است. repeat باید در رابطه با اپراتور دیگری استفاده شود. مثال خوب با اپراتور ()range است.

Observable.range(0,3)
        .repeat(2)
        .observeOn(Schedulers.io())
        .subscribeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
            }
            @Override
            public void onNext(Integer integer) {
                Log.d(TAG, "onNext: " + integer);
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onComplete() {
            }
        });

خروجی کد بالا به صورت زیر می‌باشد.

onNext: 0 onNext: 1 onNext: 2 onNext: 0 onNext: 1 onNext: 2

ساخت عملگرهای  (Operators : Interval - Timer)

  • مقدمه
  • ()interval
  • ()timer

مقدمه

یک مشکل بسیار رایج که ما برنامه نویسان اندروید با آن مواجه هستیم، "اجرای یک متد" یا "اجرای نوعی فرآیند" در یک بازه زمانی خاص یا دارای تأخیر است. به طور معمول، عملکردهایی مانند این با استفاده از فراخوانی Runnable و Handler با فراخوانی متد ()postDelayed  اجرا می‌شود. در اینجا مثالی آورده شده است که log کل زمان سپری شده در هر 1 ثانیه را چاپ می‌کند تا زمانی که 5 تکرار را اجرا کند:

final Handler handler = new Handler();
final Runnable runnable = new Runnable() {
    int elapsedTime = 0;
    @Override
    public void run() {
        if(elapsedTime >= 5){ // if greater than 5 seconds
            handler.removeCallbacks(this);
        }
        else{
            elapsedTime = elapsedTime + 1;
            handler.postDelayed(this, 1000);
            Log.d(TAG, "run: " + elapsedTime);
        }
    }
};
handler.postDelayed(runnable, 1000);

خروجی کد بالا به صورت زیر می‌باشد.

MainActivity: run: 1 MainActivity: run: 2 MainActivity: run: 3 MainActivity: run: 4 MainActivity: run: 5

این کار را انجام می‌دهد، اما خیلی خوب نیست و اضافه کردن ویژگی‌های بیشتر معمولاً سخت است. RxJava اپراتورهای مرتبط با زمان مختلفی را برای کمک به این نوع کارها دارد.

()interval

اپراتور Interval یک بازه Observable را با فاصله زمانی ثابت از زمان انتخاب شما در بین انتشار، یک دنباله نامحدود از اعداد صعودی را منتشر می‌کند. ما با استفاده از اپراتور ()takeWhile می‌توانیم هر مقدار منتشر شده را چک کنیم. اگر مقدار از 5 بیشتر شود، Observable نتایج را متوقف می‌کند. Observable : کدهای Observable

// emit an observable every time interval
Observable<Long> intervalObservable = Observable
        .interval(1, TimeUnit.SECONDS)
        .subscribeOn(Schedulers.io())
        .takeWhile(new Predicate<Long>() { // stop the process if more than 5 seconds passes
            @Override
            public boolean test(Long aLong) throws Exception {
                return aLong <= 5;
            }
        })
        .observeOn(AndroidSchedulers.mainThread());

Subscribe to the Observable : در این بخش Observable را Subscribe می‌کنیم.

intervalObservable.subscribe(new Observer<Long>() {
    @Override
    public void onSubscribe(Disposable d) {
    }
    @Override
    public void onNext(Long aLong) {
        Log.d(TAG, "onNext: interval: " + aLong);
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onComplete() {
    }
});

خروجی کد بالا به صورت زیر می‌باشد.

MainActivity: onNext: interval: 0 MainActivity: onNext: interval: 1 MainActivity: onNext: interval: 2 MainActivity: onNext: interval: 3 MainActivity: onNext: interval: 4 MainActivity: onNext: interval: 5

()timer

اپراتور Timer یک Observable را ایجاد می‌کند که یک آیتم خاص را بعد از مدت زمان مشخص شده، منتشر می‌کند. Observable : کدهای Observable

// emit single observable after a given delay
Observable<Long> timeObservable = Observable
        .timer(3, TimeUnit.SECONDS)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());

Subscribe to the Observable : در این بخش Observable را Subscribe می‌کنیم.

timeObservable.subscribe(new Observer<Long>() {
    long time = 0; // variable for demonstating how much time has passed
    @Override
    public void onSubscribe(Disposable d) {
        time = System.currentTimeMillis() / 1000;
    }
    @Override
    public void onNext(Long aLong) {
        Log.d(TAG, "onNext: " + ((System.currentTimeMillis() / 1000) - time) + " seconds have elapsed." );
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onComplete() {
    }
});

خروجی کد بالا به صورت زیر می‌باشد.

MainActivity: onNext: 3 seconds have elapsed.

ساخت عملگر‌های (fromArray, fromIterable, fromCallable)

  • مقدمه
  • ()fromArray
  • ()fromIterable
  • ()fromCallable

همان طور که در مقاله قبلی اشاره کردیم، اپراتورهای ()from خیلی مفید می‌باشند. ما توضیحی در مورد آن‌ها و سپس مثالی از هر یک ارائه خواهیم داد. توجه: همه مثال‌ها از شیء کلاس task استفاده می‌کنند :

public class Task {
    private String description;
    private boolean isComplete;
    private int priority;
    public Task(String description, boolean isComplete, int priority) {
        this.description = description;
        this.isComplete = isComplete;
        this.priority = priority;
    }
    // getter and setters ....
}

()fromarray

()fromarray آرایه ای از اشیاء را به عنوان ورودی گرفته و  خروجی یک Observable می‌باشد. این متد بلافاصله اجرا نمی‌شود. ()fromarray تنها زمانی که Subscribed ،Subscribe شود، این متد را اجرا می‌کند. چه زمانی باید از این اپراتور استفاده کنید؟ برای انتشار تعداد دلخواه از مواردی که به صورت مقدماتی شناخته شده اند. []Input: T <Ouput: Observable<T مثال ()fromarray :

ایجاد یک آرایه Observable از task objects.

Task[] list = new Task[5];
list[0] = (new Task("Take out the trash", true, 3));
list[1] = (new Task("Walk the dog", false, 2));
list[2] = (new Task("Make my bed", true, 1));
list[3] = (new Task("Unload the dishwasher", false, 0));
list[4] = (new Task("Make dinner", true, 5));
Observable<Task> taskObservable = Observable
        .fromArray(list)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());
taskObservable.subscribe(new Observer<Task>() {
    @Override
    public void onSubscribe(Disposable d) {
    }
    @Override
    public void onNext(Task task) {
        Log.d(TAG, "onNext: : " + task.getDescription());
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onComplete() {
    }
});)

()fromIterable

()fromIterable می‌تواند به عنوان ورودی و خروجی در Observable، اشیاء قابل تکرار را بگیرد. انواع Iterable شامل: List ،ArrayList ،Set و غیره ... می‌باشد. این متد بلافاصله اجرا نمی‌شود. ()fromIterable تنها زمانی که Subscribed ،Subscribe شود، این متد را اجرا می‌کند. چه زمانی باید از این اپراتور استفاده کنید؟ برای انتشار تعداد دلخواه از مواردی که به صورت مقدماتی شناخته شده اند استفاده می‌شود. همانند اپراتور  ()fromarray اما قابل تکرار است.

...Input: List<T>, ArrayList<T>, Set<T>, etc <Ouput: Observable<T مثال ()fromIterable: ایجاد لیست Observable از task objects.

List<Task> taskList = new ArrayList<>();
taskList.add(new Task("Take out the trash", true, 3));
taskList.add(new Task("Walk the dog", false, 2));
taskList.add(new Task("Make my bed", true, 1));
taskList.add(new Task("Unload the dishwasher", false, 0));
taskList.add(new Task("Make dinner", true, 5));
Observable<Task> taskObservable = Observable
        .fromIterable(taskList)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());
taskObservable.subscribe(new Observer<Task>() {
    @Override
    public void onSubscribe(Disposable d) {
    }
    @Override
    public void onNext(Task task) {
        Log.d(TAG, "onNext: : " + task.getDescription());
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onComplete() {
    }
});

()fromCallable

()fromCallable یک بلوک کد را اجرا می‌کند (معمولاً یک متد) و نتیجه را برمی‌گرداند. این متد بلافاصله اجرا نمی‌شود. ()fromIterable تنها زمانی که Subscribed ،Subscribe شود ، این متد را اجرا می‌کند. چه زمانی باید از این اپراتور استفاده کنید؟ برای تولید یک مورد Single Observable در صورت تقاضا استفاده می‌شود. مانند فراخوانی متد برای بازیابی برخی اشیاء یا لیستی از اشیاء. <Input: Callable<T

Output: T

مثال ()fromCallable: شما باید یک Task Object را از local SQLite database cache برگردانید. کلیه‌ی عملیات پایگاه داده باید بر روی background thread انجام شود. سپس نتیجه به Main Thread بازگردانده می‌شود.

  1. برای اجرای متد بر روی background thread می‌توانید از یک callable استفاده کنید.
  2. سپس نتایج را به Main Thread برگردانید.

بازیابی یک Task Object از local SQLite database cache:

// create Observable (method will not execute yet)
Observable<Task> callable = Observable
        .fromCallable(new Callable<Task>() {
            @Override
            public Task call() throws Exception {
                return MyDatabase.getTask();
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());
// method will be executed since now something has subscribed
callable.subscribe(new Observer<Task>() {
    @Override
    public void onSubscribe(Disposable d) {
    }
    @Override
    public void onNext(Task task) {
        Log.d(TAG, "onNext: : " + task.getDescription());
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onComplete() {
    }
});

ساخت عملگر (fromFuture)

  • مقدمه
  • ()fromFuture
  • مثال

مقدمه

در اینجا در مورد اپراتور ()fromFuture صحبت خواهیم کرد. ما توضیح مختصری خواهیم داد و سپس به مثال بسیار مفصلی می‌پردازیم. نکته: ما انتظار داریم که شما  نحوه‌ی استفاده از معماری Model, View, ViewModel) MVVM)،Retrofit را بدانید و آن را درک کنید زیرا ()fromFuture چیزی است که برای معماری MVVM طراحی شده است.

()fromFuture

قبل از اینکه در مورد آنچه این متد انجام می‌دهد صحبت کنیم، بگذارید به طور خلاصه توضیح دهیم که Future چیست.

Future اساساً یک کار معلق است.

 این یک وعده برای نتیجه از کاری است که در آینده انجام می‌شود.

این کار می‌تواند از طریق Runnable یا Callable انجام شود (با یک Rx Callable اشتباه نمی‌شود). مثالی از چیزی که می‌تواند این runnables یا callable‌ها را اجرا کند، یک ExecutorService است.

بنابراین اگر می‌خواهید از Executor در رابطه با RxJava استفاده کنید، در اصل از این امر استفاده خواهید کرد.

<Input: Future<T

<Ouput: Observable<T

در این مثال، ما به شما نشان می‌دهیم که چگونه می‌توانید با استفاده از Retrofit و MVVM با Future Observable، یک درخواست شبکه را به REST API انجام دهید.

وابستگی (Dependencies)

ما برای این مثال به وابستگی Retrofit احتیاج داریم زیرا  دارای درخواست شبکه (network request) هستیم. همچنین وابستگی به LiveData و ViewModels داریم زیرا  این مثال از معماری MVVM استفاده می‌کند.

همچنین وابستگی به RxJava Call Adapter داریم. به طور پیش فرض، شما نمی‌توانید یک Observable یا یک Flowable را از یک درخواست Retrofit بازگردانید. این وابستگی به شما امکان می‌دهد تا Retrofit Call objects را به Flowables / Observables تبدیل کنید.

implementation "com.squareup.retrofit2:adapter-rxjava2:2.5.0"

جدید‌ترین نسخه را می‌توانید دریافت کنید.

def retrofitVersion = "2.5.0"
def rxjava_version = '2.2.7'
def rxandroid_version = '2.1.1'
def lifecycle_version = "1.1.1"
// ViewModel and LiveData
implementation "android.arch.lifecycle:extensions:$lifecycle_version"
// Retrofit
implementation "com.squareup.retrofit2:retrofit:$retrofitVersion"
implementation "com.squareup.retrofit2:converter-gson:$retrofitVersion"
// RxJava
implementation "io.reactivex.rxjava2:rxjava:$rxjava_version"
// RxAndroid
implementation "io.reactivex.rxjava2:rxandroid:$rxandroid_version"
// RxJava Call Adapter
implementation "com.squareup.retrofit2:adapter-rxjava2:2.5.0"

RequestApi :

Request Api در اینجا متدی است که Retrofit برای ایجاد query به REST API استفاده می‌کند. توجه کنید که یک Observable برمی‌گردد.

این به دلیل وابستگی RxJava Call Adapter می‌باشد که در بخش وابستگی‌ها به آن اشاره کردیم، که می‌تواند یک Observable را بازگرداند.

public interface RequestApi {
    @GET("todos/1")
    Observable<ResponseBody> makeObservableQuery();
}

ServiceGenerator :

این کلاس مسئول ایجاد نمونه Retrofit، مراجعه و گرفتن کلاس RequestApi است که ما در بالا تعریف کردیم.

نکته ای که در اینجا باید به آن توجه کنیم، فراخوانی متد addCallAdapterFactory است. بدون آن، ما نمی‌توانیم

Retrofit Call objects را به  Flowables / Observables  تبدیل کنیم.

.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
public class ServiceGenerator {
    public static final String BASE_URL = "https://jsonplaceholder.typicode.com";
    private static Retrofit.Builder retrofitBuilder =
            new Retrofit.Builder()
                    .baseUrl(BASE_URL)
                    .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                    .addConverterFactory(GsonConverterFactory.create());
    private static Retrofit retrofit = retrofitBuilder.build();
    private static RequestApi requestApi = retrofit.create(RequestApi.class);
    public static RequestApi getRequestApi(){
        return requestApi;
    }
}

Repository :

در اینجا  مثالی از "bread and butter" آورده شده است. اینجاست که ما از Executor برای فراخوانی شبکه استفاده می‌کنیم و سپس Future Observable را به ViewModel باز می‌گردانیم.

public class Repository {
    private static Repository instance;
    public static Repository getInstance(){
        if(instance == null){
            instance = new Repository();
        }
        return instance;
    }
    public Future<Observable<ResponseBody>> makeFutureQuery(){
        final ExecutorService executor = Executors.newSingleThreadExecutor();
        final Callable<Observable<ResponseBody>> myNetworkCallable = new Callable<Observable<ResponseBody>>() {
            @Override
            public Observable<ResponseBody> call() throws Exception {
                return ServiceGenerator.getRequestApi().makeObservableQuery();
            }
        };
        final Future<Observable<ResponseBody>> futureObservable = new Future<Observable<ResponseBody>>(){
            @Override
            public boolean cancel(boolean mayInterruptIfRunning) {
                if(mayInterruptIfRunning){
                    executor.shutdown();
                }
                return false;
            }
            @Override
            public boolean isCancelled() {
                return executor.isShutdown();
            }
            @Override
            public boolean isDone() {
                return executor.isTerminated();
            }
            @Override
            public Observable<ResponseBody> get() throws ExecutionException, InterruptedException {
                return executor.submit(myNetworkCallable).get();
            }
            @Override
            public Observable<ResponseBody> get(long timeout, TimeUnit unit) throws ExecutionException, InterruptedException, TimeoutException {
                return executor.submit(myNetworkCallable).get(timeout, unit);
            }
        };
        return futureObservable;
    }
}

View Model :

از آنجا که ما از معماری MVVM استفاده می‌کنیم، باید یک ViewModel داشته باشیم. فقط یک متد واحد وجود دارد که به Repository دسترسی پیدا می‌کند و Future Observable را برمی گرداند.

public class MainViewModel extends ViewModel {
    private Repository repository;
    public MainViewModel() {
        repository = Repository.getInstance();
    }
    public Future<Observable<ResponseBody>> makeFutureQuery(){
        return repository.makeFutureQuery();
    }
}

MainActivity:

و سرانجام در MainActivity ،query با Subscribe کردن observable شروع می‌شود و پاسخ از سرور در Log چاپ می‌شود.

MainViewModel viewModel = ViewModelProviders.of(this).get(MainViewModel.class);
try {
    viewModel.makeFutureQuery().get()
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<ResponseBody>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.d(TAG, "onSubscribe: called.");
                }
                @Override
                public void onNext(ResponseBody responseBody) {
                    Log.d(TAG, "onNext: got the response from server!");
                    try {
                        Log.d(TAG, "onNext: " + responseBody.string());
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                }
                @Override
                public void onError(Throwable e) {
                    Log.e(TAG, "onError: ", e);
                }
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete: called.");
                }
            });
} catch (ExecutionException e) {
    e.printStackTrace();
} catch (InterruptedException e) {
    e.printStackTrace();
}

خروجی کد بالا به صورت زیر می‌باشد.

MainActivity: onNext: got the response from server! MainActivity: onNext: { "userId": 1, "id": 1, "title": "delectus aut autem", "completed": false }

ساخت عملگر (fromPublisher)

  • مقدمه
  • ()fromPublisher
  • مثال

مقدمه

در این بخش در مورد اپراتور ()fromPublisher صحبت خواهیم کرد. ما توضیح مختصری خواهیم داد و سپس به مثالی بسیار مفصل می‌پردازیم. نکته: ما انتظار داریم شما بدانید که چگونه از معماری MVVM و Retrofit استفاده کنید.

()fromPublisher

()fromPublisher برای تبدیل اشیاء LiveData به reactive streams یا از reactive streams به اشیاء LiveData استفاده می‌شود. احتمالا هرگز نمی‌خواهید LiveData را به یک reactive Observable تبدیل کنید، غیر از موارد نادری که ممکن است بخواهید از اپراتور استفاده کنید. از طرف دیگر، تبدیل یک Observable به LiveData در واقع بسیار کاربردی است. موارد زیادی برای این کار وجود دارد. مثال زیر یک درخواست شبکه با استفاده از Retrofit ایجاد می‌کند و پاسخی را به صورت یک شیء Flowable بازیابی می‌کند. Publisher interface ،Flowable را پیاده سازی می‌کند. اشیاء Publisher دقیقاً مانند subscribed ،Observable می‌شوند. <Input: LiveData<T <Ouput: Observable<T یا <Input: Observable<T <Ouput: LiveData<T

مثالی از FromPublisher:

مجوز اینترنت (Internet Permission): Internet Permission را به Manifest اضافه کنید زیرا می‌خواهیم با استفاده از Retrofit درخواستی انجام دهیم.

<uses-permission android:name="android.permission.INTERNET"/>

وابستگی (Dependencies): ما به وابستگی‌های زیادی احتیاج داریم. انتظار داریم که شما در حال حاضر بدانید که چگونه از  Retrofit و معماری MVVM استفاده کنید. بنابراین تنها دو وابستگی زیر را می‌خواهیم توضیح دهیم: (RxJava Call Adapter (Call object to Observable: به طور پیش فرض، شما نمی‌توانید یک Observable یا یک flowable را از یک Retrofit request بازگردانید.

implementation "com.squareup.retrofit2:adapter-rxjava2:2.9.0"

جدید‌ترین نسخه این کتابخانه را می‌توانید دریافت کنید. تبدیل  LiveData به Observable: به ما این امکان را می‌دهد تا Observables (یا Flowables) را به اشیاء LiveData تبدیل کنیم.

implementation "android.arch.lifecycle:reactivestreams:1.1.1"

جدید‌ترین نسخه این کتابخانه را می‌توانید دریافت کنید.

def retrofitVersion = "2.5.0"
def rxjava_version = '2.2.7'
def rxandroid_version = '2.1.1'
def lifecycle_version = "1.1.1"
// ViewModel and LiveData
implementation "android.arch.lifecycle:extensions:$lifecycle_version"
// Retrofit
implementation "com.squareup.retrofit2:retrofit:$retrofitVersion"
implementation "com.squareup.retrofit2:converter-gson:$retrofitVersion"
// RxJava
implementation "io.reactivex.rxjava2:rxjava:$rxjava_version"
// RxAndroid
implementation "io.reactivex.rxjava2:rxandroid:$rxandroid_version"
// RxJava Call Adapter (Call object to Observable)
implementation "com.squareup.retrofit2:adapter-rxjava2:2.5.0"
// Convert Observable to LiveData
implementation "android.arch.lifecycle:reactivestreams:1.1.1"

RequestApi :

این کلاس جایی است که متدهای Retrofit request در آن هستند. در این حالت تنها یک مورد وجود دارد. توجه کنید که یک شیء Flowable را برمی گرداند.

public interface RequestApi {
    @GET("todos/1")
    Flowable<ResponseBody> makeQuery();
}

ServiceGenerator:

این کلاس مسئول ایجاد نمونه Retrofit مراجعه و گرفتن  کلاس RequestApi است که ما در بالا تعریف کردیم. نکته: موردی که در اینجا باید به آن توجه کنیم ، فراخوانی متد addCallAdapterFactory است. بدون آن، ما نمی‌توانیم Flowables / Observables را به Retrofit Call objects  تبدیل کنیم.

.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
public class ServiceGenerator {
    public static final String BASE_URL = "https://jsonplaceholder.typicode.com";
    private static Retrofit.Builder retrofitBuilder =
            new Retrofit.Builder()
                    .baseUrl(BASE_URL)
                    .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                    .addConverterFactory(GsonConverterFactory.create());
    private static Retrofit retrofit = retrofitBuilder.build();
    private static RequestApi requestApi = retrofit.create(RequestApi.class);
    public static RequestApi getRequestApi(){
        return requestApi;
    }
}

Repository:

همان طور که چندین بار بیان کردیم، ما از معماری MVVM استفاده می‌کنیم. و معماری مناسب MVVM همیشه شامل یک repository (حداقل در هر زمان انجام معاملات بانک اطلاعاتی) است. این جایی است که جادو اتفاق می‌افتد. متد ()makeReactiveQuery  از متد ()fromPublisher  برای تبدیل یک Flowable به LiveData استفاده می‌کند. توجه کنید که ما در یک background thread مشترک (Subscribe) می‌شویم! این خیلی مهم است. کلیه عملیات شبکه باید بر روی یک background thread انجام شود.

public class Repository {
    private static Repository instance;
    public static Repository getInstance(){
        if(instance == null){
            instance = new Repository();
        }
        return instance;
    }
    public LiveData<ResponseBody> makeReactiveQuery(){
        return LiveDataReactiveStreams.fromPublisher(ServiceGenerator.getRequestApi()
                .makeQuery()
                .subscribeOn(Schedulers.io()));
    }
}

ViewModel:

ما یک بار دیگر که از معماری MVVM استفاده می‌کنیم باید ViewModel داشته باشیم. فقط یک متد واحد وجود دارد که به مخزن دسترسی پیدا می‌کند و LiveData را برمی گرداند.

public class MainViewModel extends ViewModel {
    private Repository repository;
    public MainViewModel() {
        repository = Repository.getInstance();
    }
    public LiveData<ResponseBody> makeQuery(){
        return repository.makeReactiveQuery();
    }
}

MainActivity:

قسمت آخر Subscirbe کردن LiveData در MainActivity است. خروجی کد بالا به صورت زیر می‌باشد.

MainActivity: onChanged: this is a live data response!MainActivity: onChanged: { "userId": 1, "id": 1, "title": "delectus aut autem", "completed": false }

عملگرهای Filter : Filter

وقتی لیستی از custom objects را در اختیار دارید و می‌خواهید بر اساس یک فیلد خاص آن را فیلتر کنید، چند بار در این وضعیت قرار گرفته‌اید؟ اگر لیست بسیار بزرگ است پس فیلتر کردن باید روی یک background thread انجام شود. اگر چنین اتفاقی برای شما بیفتد، عملگر ()filter بهترین دوست شما خواهد بود. ()filter یک اپراتور عالی است. ما حدس می‌زنیم که شما از این اپراتور زیاد استفاده خواهید کرد.

مثال 1: فیلتر کردن  یک فیلد String در یک لیست

در این مثال می‌خواهم به شما نشان دهیم چگونه لیستی از  Task POJO's) custom java objects) را در یک فیلد String خاص فیلتر کنید. بنابراین فقط Task Objects را که شامل یک پارامتر String خاص هستند، منتشر می‌کند.

  1. ما یک Observable از لیستی از Task Objects را ایجاد می‌کنیم.
  2.  Tasks را برای توضیحات خاص فیلتر می‌کنیم.
  3. سپس  Subscribe کردن و منتشر کردن کارهایی (Task) که از  Test عبور می‌کنند.
Observable<Task> taskObservable = Observable
            .fromIterable(DataSource.createTasksList())
            .filter(new Predicate<Task>() {
                @Override
                public boolean test(Task task) throws Exception {
                    if(task.getDescription().equals("Walk the dog")){
                        return true;
                    }
                    return false;
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
taskObservable.subscribe(new Observer<Task>() {
            @Override
            public void onSubscribe(Disposable d) {
            }
            @Override
            public void onNext(Task task) {
                Log.d(TAG, "onNext: This task matches the description: " + task.getDescription());
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onComplete() {
            }
});

 

public static List<Task> createTasksList(){
        List<Task> tasks = new ArrayList<>();
        tasks.add(new Task("Take out the trash", true, 3));
        tasks.add(new Task("Walk the dog", false, 2));
        tasks.add(new Task("Make my bed", true, 1));
        tasks.add(new Task("Unload the dishwasher", false, 0));
        tasks.add(new Task("Make dinner", true, 5));
        return tasks;
    }

خروجی کد بالا به صورت زیر می‌باشد.

onNext: This task matches the description: Walk the dog

مثال 2 : فیلتر کردن  یک فیلد Boolean در یک لیست

ما به شما نشان می‌دهیم که چگونه لیستی از custom objects را در یک فیلد خاص Boolean فیلتر کنید. اگر فیلد Boolean صحیح (True) باشد، Task Object در Observer منتشر می‌شود.

  1. ما یک Observable از لیستی از Task Objects را ایجاد می‌کنیم.
  2. Tasksرا در قسمت ()isComplete فیلتر می‌کنیم.
  3. سپس  Subscribe کردن و منتشر کردن کارهایی (Task) که از  Test عبور می‌کنند.
Observable<Task> taskObservable = Observable
            .fromIterable(DataSource.createTasksList())
            .filter(new Predicate<Task>() {
                @Override
                public boolean test(Task task) throws Exception {
                    return task.isComplete();
                }
            })
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread());
taskObservable.subscribe(new Observer<Task>() {
            @Override
            public void onSubscribe(Disposable d) {
            }
            @Override
            public void onNext(Task task) {
                Log.d(TAG, "onNext: This is a completed task: " + task.getDescription());
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onComplete() {
            }
        });

 

public static List<Task> createTasksList(){
        List<Task> tasks = new ArrayList<>();
        tasks.add(new Task("Take out the trash", true, 3));
        tasks.add(new Task("Walk the dog", false, 2));
        tasks.add(new Task("Make my bed", true, 1));
        tasks.add(new Task("Unload the dishwasher", false, 0));
        tasks.add(new Task("Make dinner", true, 5));
        return tasks;
    }

خروجی کد بالا به صورت زیر می‌باشد.

onNext: : Take out the trash onNext: : Make my bed onNext: : Make dinner

عملگرهای Distinct : Filter

  • مقدمه
  • اسفاده نادرست از ()Distinct
  • استفاده درست از ()Distinct

مقدمه

اپراتور ()Distinct نوع دیگری است که شما زیاد از آن استفاده خواهید کرد. می‌توانید اشیاء سفارشی (Custom Objects) را بر اساس فیلدهای مشخص فیلتر کنید. فکر می‌کنیم بدون مثال درک این موضوع کمی سخت باشد. بنابراین اگر دچار سردرگمی شدید، به مثال‌های زیر توجه کنید. اپراتور Distinct فقط با دسترسی به آیتم هایی که از قبل منتشر نشده اند، Observable را فیلتر می‌کند.

استفاده نادرست از ()Distinct

ما فکر می‌کنیم اپراتور ()Distinct در نگاه اول خیلی قابل درک نیست. بنابراین می‌خواهیم روش نادرست استفاده از آن را به شما نشان دهیم (که ممکن است روشی باشد که شما سعی کرده اید از آن استفاده کنید).

  1. ما یک Observable از لیستی از Task Objects را ایجاد می‌کنیم.
  2. اپراتور  ()Distinct را اعمال کرده و یک custom function را پاس می‌دهیم.
public static List<Task> createTasksList(){
    List<Task> tasks = new ArrayList<>();
    tasks.add(new Task("Take out the trash", true, 3));
    tasks.add(new Task("Walk the dog", false, 2));
    tasks.add(new Task("Make my bed", true, 1));
    tasks.add(new Task("Unload the dishwasher", false, 0));
    tasks.add(new Task("Make dinner", true, 5));
    tasks.add(new Task("Make dinner", true, 5)); // duplicate for testing the distinct operator
    return tasks;
}

 

Observable<Task> taskObservable = Observable
        .fromIterable(DataSource.createTasksList())
        .distinct(new Function<Task, Task>() { // <--- WRONG 
            @Override
            public Task apply(Task task) throws Exception { 
                return task;
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());
taskObservable.subscribe(new Observer<Task>() {
    @Override
    public void onSubscribe(Disposable d) {
    }
    @Override
    public void onNext(Task task) {
        Log.d(TAG, "onNext: " + task.getDescription());
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onComplete() {
    }
});

خروجی کد بالا به صورت زیر می‌باشد.

MainActivity: onNext: Take out the trash MainActivity: onNext: Walk the dog MainActivity: onNext: Make my bed MainActivity: onNext: Unload the dishwasher MainActivity: onNext: Make dinner MainActivity: onNext: Make dinner

توجه کنید که خروجی یک نسخه‌ی تکراری را نشان می‌دهد. این صحیح نیست. اپراتور ()Distinct برای حذف نسخه‌های تکراری است. پس کجا اشتباه کردیم؟ نوع داده‌ی دوم در عملکرد (Funtcion)  نادرست است. به مثال دوم زیر نگاهی بیندازید تا ببینید چگونه قرار است از آن استفاده شود.

استفاده درست از ()Distinct

همان‌طور که در مثال قبلی دیدید، خروجی مقادیر متمایز را بر اساس فیلد توضیحات نشان نمی‌داد.این به دلیل استفاده نادرست از عملکردهای منتقل شده به عنوان ورودی به اپراتور ()Distinct بود. در اینجا نحوه‌ی درست استفاده از آن آمده است.

  1. ما یک Observable از لیستی از Task Objects را ایجاد می‌کنیم.
  2. اعمال اپراتور ()Distinct و پاس دادن یک
  3. اپراتور  ()Distinct را اعمال کرده و یک custom function را پاس می‌دهیم.
  4. تفاوت در نوع بازگشت عملکرد است. توجه کنید که ما یک رشته را بر می‌گردانیم و در آن رشته چک کردن را انجام می‌دهیم.
public static List<Task> createTasksList(){
    List<Task> tasks = new ArrayList<>();
    tasks.add(new Task("Take out the trash", true, 3));
    tasks.add(new Task("Walk the dog", false, 2));
    tasks.add(new Task("Make my bed", true, 1));
    tasks.add(new Task("Unload the dishwasher", false, 0));
    tasks.add(new Task("Make dinner", true, 5));
    tasks.add(new Task("Make dinner", true, 5)); // duplicate for testing the distinct operator
    return tasks;
}

 

Observable<Task> taskObservable = Observable
        .fromIterable(DataSource.createTasksList())
        .distinct(new Function<Task, String>() { // <--- CORRECT 
            @Override
            public String apply(Task task) throws Exception {
                return task.getDescription();
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());
taskObservable.subscribe(new Observer<Task>() {
    @Override
    public void onSubscribe(Disposable d) {
    }
    @Override
    public void onNext(Task task) {
        Log.d(TAG, "onNext: " + task.getDescription());
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onComplete() {
    }
});

خروجی کد بالا به صورت زیر می‌باشد.

MainActivity: onNext: Take out the trash MainActivity: onNext: Walk the dog MainActivity: onNext: Make my bed MainActivity: onNext: Unload the dishwasher MainActivity: onNext: Make dinner

اکنون خروجی صحیح است.

عملگرهای Take : Filter و TakeWhile

  • مقدمه
  • ()Take
  • ()TakeWhile

مقدمه

()take و ()takeWhile را در دسته فیلترها قرار می‌دهیم. آن‌ها شبیه به عملگر ()Filter هستند زیرا لیست‌های اشیاء را فیلتر می‌کنند. تفاوت اصلی بین عملگرهای ()take و عملگر ()Filter در این است که عملگر ()Filter تمامی اشیاء موجود در لیست را بررسی می‌کند. بنابراین می‌توان گفت عملگر ()Filter شامل تمامی آن ‌هاست. در حالی که اپراتورهای ()take منحصر به فرد در نظر گرفته می‌شوند پس لازم نیست همه‌ی موارد موجود در لیست را بررسی کنند. آنها اشیاء را منتشر می‌کنند تا زمانی که شرط عملکرد آن‌ها برآورده شود.

()take

اپراتور ()take تنها "n" آیتم را که توسط یک Observable قابل انتشار است را منتشر می‌کند، در حالی که از موارد باقیمانده کاملا غافل می‌شود.

این مثال را در نظر بگیرید:

  1. ما یک Observable از لیستی از Task Objects را ایجاد می‌کنیم.
  2. اپراتور take را اعمال کرده و 3 مقدار را پاس می‌دهیم.
public class DataSource {
    public static List<Task> createTasksList(){
        List<Task> tasks = new ArrayList<>();
        tasks.add(new Task("Take out the trash", true, 3));
        tasks.add(new Task("Walk the dog", false, 2));
        tasks.add(new Task("Make my bed", true, 1));
        tasks.add(new Task("Unload the dishwasher", false, 0));
        tasks.add(new Task("Make dinner", true, 5));
        return tasks;
    }
}

 

Observable<Task> taskObservable = Observable
        .fromIterable(DataSource.createTasksList())
        .take(3)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());
taskObservable.subscribe(new Observer<Task>() {
    @Override
    public void onSubscribe(Disposable d) {
    }
    @Override
    public void onNext(Task task) {
        Log.d(TAG, "onNext: " + task.getDescription());
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onComplete() {
    }
});

خروجی کد بالا به صورت زیر می‌باشد.

MainActivity: onNext: Take out the trash MainActivity: onNext: Walk the dog MainActivity: onNext: Make my bed

حتی اگر 5 شیء  به لیست اضافه شده باشد، فقط 3 مورد منتشر می‌شوند.

()TakeWhile

()TakeWhile منبع Observable را منعکس می‌کند تا زمانی که شرایطی که شما تعیین می‌کنید نادرست شود.

این مثال را در نظر بگیرید:

  1. ما یک Observable از لیستی از Task Objects را ایجاد می‌کنیم.
  2. اپراتور ()TakeWhile را اعمال کرده و  از custom function استفاده می‌کنیم. این تابع در حال بررسی یک کار انجام شده است.
  3. پس از یافتن یک کار کامل شده ، Observableبه اتمام می‌رسد.
public class DataSource {
    public static List<Task> createTasksList(){
        List<Task> tasks = new ArrayList<>();
        tasks.add(new Task("Take out the trash", true, 3));
        tasks.add(new Task("Walk the dog", false, 2));
        tasks.add(new Task("Make my bed", true, 1));
        tasks.add(new Task("Unload the dishwasher", false, 0));
        tasks.add(new Task("Make dinner", true, 5));
        return tasks;
    }
}
Observable<Task> taskObservable = Observable
        .fromIterable(DataSource.createTasksList())
        .takeWhile(new Predicate<Task>() {
            @Override
            public boolean test(Task task) throws Exception {
                return task.isComplete();
            }
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread());
taskObservable.subscribe(new Observer<Task>() {
    @Override
    public void onSubscribe(Disposable d) {
    }
    @Override
    public void onNext(Task task) {
        Log.d(TAG, "onNext: " + task.getDescription());
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onComplete() {
    }
});

خروجی کد بالا به صورت زیر می‌باشد.

MainActivity: onNext: Take out the trash

عملگرهای تبدیل کننده (Transformation Operators)

  • Introduction
  • ()Map
  • ()Buffer
  • ()Debounce
  • ()ThrottleFirst
  • ()FlatMap
  • ()ConcatMap
  • ()SwitchMap

مقدمه(Introduction)
کار یک اپراتور به شرح زیر است : اپراتورها مواردی را که توسط یک Observable منتشر می‌شود را تبدیل می‌کنند. در زیر برخی از متداول‌ترین اپراتورهای دگرگونی وجود دارد که فرض می‌کنیم شما از آن‌ها استفاده می‌کنید.

Map

یک تابع را برای هر مورد منتشر شده اعمال می‌کند. با استفاده از تابعی از آن، هر مورد منتشر شده را تغییر می‌دهد. (نظم ارسال حفظ می‌شود.) نمودار Map :

Buffer

به‌طور دوره ای items را از یک Observable به صورت bundles جمع می‌کند و به جای اینکه یکبار items را منتشر کند، bundles را منتشر می‌کند. (نظم ارسال حفظ می‌شود) نمودار Buffer:

Debounce

اگر یک زمان خاص گذشته باشد، یک Item را از یک Observable منتشر می‌کند بدون آنکه یک Item دیگر را منتشر کند. این عملگر برای کلیک دکمه (button clicks) بسیار عالی است. اگر کاربر بارها و بارها یک دکمه را کلیک کند، دیگر نیازی به اجرای چندین بار کار نیست. می‌توانیم از اپراتور ()Debounce استفاده کنیم تا با معرفی یک بازه‌ی زمانی مجاز بین کلیک، کلیک‌های خود را کنترل کنیم. اگر مدت زمانی سپری نشده باشد، می‌توانیم از هر دو متد جلوگیری کنیم. (نظم ارسال حفظ می‌شود.) نمودار Debounce:

ThrottleFirst

موارد منتشر شده توسط منبع Observable را که در یک بازه‌ی زمانی قرار دارند، فیلتر می‌کند.(نظم ارسال حفظ می‌شود.) نمودار ThrottleFirst: آموزش کامل RxJava در اندروید

FlatMap

موارد منتشر شده توسط یک Observable را به Observables تبدیل می‌کند، و سپس انتشار از آن را به یک Observable Single قسمت می‌کند. اگر با LiveData آشنا باشید، MediatorLiveData می‌تواند کاری بسیار مشابه انجام دهد. در مورد ()FlatMap در ادامه بیشتر صحبت می‌کنیم. (نظم ارسال حفظ نمی‌شود.) نمودار FlatMap: آموزش کامل RxJava در اندروید

ConcatMap

موارد منتشر شده توسط یک Observable را به Observables تبدیل می‌کند. این در اصل همان مورد ()FlatMap است، اما نظم ارسال حفظ می‌شود. اما از آنجا که ()ConcatMap باید منتظر بماند تا هر یک از Observable کار خود را انجام دهند پس از نظر فنی غیر همزمان نیست. (نظم ارسال حفظ می‌شود.) نمودار ConcatMap: آموزش کامل RxJava در اندروید

SwitchMap

()SwitchMap آیتم‌های منتشر شده توسط یک Observable را به یک Observable تبدیل می‌کند درست مثل ()ConcatMap  و ()FlatMap. تفاوت این است که به محض مشترک شدن یک Observer جدید، Observer  قبلی را لغو می‌کند. ()SwitchMap  محدودیتی را حل می‌کند که ()ConcatMap و ()FlatMap هم دارند. (نظم ارسال حفظ می‌شود) نمودار SwitchMap:

عملگرهای تبدیل کننده : Map

یک تابع را برای هر مورد منتشر شده اعمال می‌کند. با استفاده از تابعی از آن، هر مورد منتشر شده را تغییر می‌دهد. (نظم ارسال حفظ می‌شود.) نمودار Map : آموزش کامل RxJava در اندروید

برای دو مثال زیر به این دو کلاس مراجعه می‌کنیم:

Task.java (کلاس Task)

public class Task {
    private String description;
    private boolean isComplete;
    private int priority;
    public Task(String description, boolean isComplete, int priority) {
        this.description = description;
        this.isComplete = isComplete;
        this.priority = priority;
    }
    // getter and setters ....
}

DataSource.java (کلاس DataSource)

public class DataSource {
    public static List<Task> createTasksList(){
        List<Task> tasks = new ArrayList<>();
        tasks.add(new Task("Take out the trash", true, 3));
        tasks.add(new Task("Walk the dog", false, 2));
        tasks.add(new Task("Make my bed", true, 1));
        tasks.add(new Task("Unload the dishwasher", false, 0));
        tasks.add(new Task("Make dinner", true, 5));
        return tasks;
    }
}

مثال : (Mapping (Task -> String

در این مثال ما یک  custom map function  ایجاد می‌کنیم که فیلد توضیحات را از Task Objects استخراج می‌کند و فقط آن پارامتر را منتشر می‌کند. تابع:

Function<Task, String> extractDescriptionFunction = new Function<Task, String>() {
    @Override
    public String apply(Task task) throws Exception {
        Log.d(TAG, "apply: doing work on thread: " + Thread.currentThread().getName());
        return task.getDescription();
    }
};

ایجاد Observer و Observable :

Observable<String> extractDescriptionObservable = Observable
        .fromIterable(DataSource.createTasksList())
        .subscribeOn(Schedulers.io())
        .map(extractDescriptionFunction)
        .observeOn(AndroidSchedulers.mainThread());
extractDescriptionObservable.subscribe(new Observer<String>() {
    @Override
    public void onSubscribe(Disposable d) {
    }
    @Override
    public void onNext(String s) {
        Log.d(TAG, "onNext: extracted description: " + s);
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onComplete() {
    }
});

خروجی کد بالا به صورت زیر می‌باشد.

MainActivity: apply: doing work on thread: RxCachedThreadScheduler-1 ... Repeat x5 onNext: extracted description: Take out the trash onNext: extracted description: Walk the dog onNext: extracted description: Make my bed onNext: extracted description: Unload the dishwasher onNext: extracted description: Make dinner

مثال : (Mapping (Task -> Updated Task

در این مثال ما یک custom map function ایجاد می‌کنیم که یک Task object را بروزرسانی می‌کند و سپس آن کار بروزرسانی شده را منتشر می‌کند. تابع:

Function<Task, Task> completeTaskFunction = new Function<Task, Task>() {
    @Override
    public Task apply(Task task) throws Exception {
        Log.d(TAG, "apply: doing work on thread: " + Thread.currentThread().getName());
        task.setComplete(true);
        return task;
    }
};

 ایجاد Observer و Observable:

Observable<Task> completeTaskObservable = Observable
        .fromIterable(DataSource.createTasksList())
        .subscribeOn(Schedulers.io())
        .map(completeTaskFunction)
        .observeOn(AndroidSchedulers.mainThread());
completeTaskObservable.subscribe(new Observer<Task>() {
    @Override
    public void onSubscribe(Disposable d) {
    }
    @Override
    public void onNext(Task task) {
        Log.d(TAG, "onNext: is this task complete? " + task.isComplete());
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onComplete() {
    }
});

خروجی کد بالا به صورت زیر می‌باشد.

MainActivity: apply: doing work on thread: RxCachedThreadScheduler-1 ... Repeat x5 MainActivity: onNext: is this task complete? true ... Repeat x5

مثال : Order of Emitted Objects

همان‌طور که در ابتدا گفتیم، ترتیب اشیاء منتشر شده از اپراتور  ()Map حفظ می‌شود. این بدان معناست که همه اشیاء منتشر شده از Observable به همان ترتیب هستند که به Observable اضافه شده اند. توجه کنید که اشیاء به همان ترتیب که در کلاس DataSource.java هستند، منتشر می‌شوند. شاید در حال حاضر این موضوع برای شما جالب به نظر نرسد، اما صبر کنید تا ما به برخی Mapهای دیگر (مانند ()FlapMap ) نگاهی بیندازیم، که نظم را حفظ نمی‌کند.

// Create an Observable using the fromIterable operator
Observable<Task> mappedObservable = Observable
        .fromIterable(DataSource.createTasksList())
        .subscribeOn(Schedulers.io())
        .map(new Function<Task, Task>() {
            @Override
            public Task apply(Task task) throws Exception {
                task.setComplete(true);
                return task;
            }
        })
        .observeOn(AndroidSchedulers.mainThread());
// subscribe to the Observable and view the emitted results
mappedObservable.subscribe(new Observer<Task>() {
    @Override
    public void onSubscribe(Disposable d) {
    }
    @Override
    public void onNext(Task task) {
        Log.d(TAG, "onNext: mapped: " + task.getDescription());
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onComplete() {
    }
});

خروجی کد بالا به صورت زیر می‌باشد.

MainActivity: onNext: mapped: Take out the trash MainActivity: onNext: mapped: Walk the dog MainActivity: onNext: mapped: Make my bed MainActivity: onNext: mapped: Unload the dishwasher MainActivity: onNext: mapped: Make dinner

عملگرهای تبدیل کننده: Buffer

به‌طور دوره ای items را از یک Observable به صورت bundles جمع می‌کند و به جای اینکه یکبار items را منتشر کند، bundles را منتشر می‌کند. (نظم ارسال حفظ می‌شود.) نمودار Buffer: آموزش کامل RxJava در اندروید اولین و بارزترین کاربرد اپراتور ()Buffer بسته بندی (Bundle) اشیای منتشر شده به گروه‌ها است. شاید شما فقط می‌خواهید به طور همزمان 2 اشیاء منتشر کنید و یک تأخیر زمانی بین آن‌ها اضافه کنید. این همان چیزی است که در اولین مثال زیر مشاهده خواهید کرد. برنامه بسیار مفید دیگر tracking UI interactions است. این همان چیزی است که در مثال دوم در زیر مشاهده خواهید کرد. همچنین به شما نشان خواهیم داد که چگونه از یک کتابخانه بسیار مفید ساخته شده توسط Jake Wharton، کتابخانه RxBinding، استفاده کنید. می‌توانیم از کتابخانه RxBinding استفاده کنیم تا رویدادهای کلیک قابل مشاهده باشد. توجه داشته باشید که برای مثال‌های زیر به این دو کلاس مراجعه می‌کنیم: Task.java (کلاس Task):

public class Task {
    private String description;
    private boolean isComplete;
    private int priority;
    public Task(String description, boolean isComplete, int priority) {
        this.description = description;
        this.isComplete = isComplete;
        this.priority = priority;
    }
    // getter and setters ....
}

DataSource.java (کلاس DataSource):

public class DataSource {
    public static List<Task> createTasksList(){
        List<Task> tasks = new ArrayList<>();
        tasks.add(new Task("Take out the trash", true, 3));
        tasks.add(new Task("Walk the dog", false, 2));
        tasks.add(new Task("Make my bed", true, 1));
        tasks.add(new Task("Unload the dishwasher", false, 0));
        tasks.add(new Task("Make dinner", true, 5));
        return tasks;
    }
}

یک مثال ساده:

در این مثال:

  1. Observable را ایجاد می‌کنیم.
  2. اپراتور ()Buffer را به کار می‌بریم.
  3. Subscribe کردن و مشاهده کردن نتیجه منتشر شده در log.
// Create an Observable
Observable<Task> taskObservable = Observable
        .fromIterable(DataSource.createTasksList())
        .subscribeOn(Schedulers.io());
taskObservable
        .buffer(2) // Apply the Buffer() operator
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<List<Task>>() { // Subscribe and view the emitted results
            @Override
            public void onSubscribe(Disposable d) {
            }
            @Override
            public void onNext(List<Task> tasks) {
                Log.d(TAG, "onNext: bundle results: -------------------");
                for(Task task: tasks){
                    Log.d(TAG, "onNext: " + task.getDescription());
                }
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onComplete() {
            }
        });

خروجی کد بالا به صورت زیر می‌باشد.

onNext: bundle results: ------------------- onNext: Take out the trash onNext: Walk the dog onNext: bundle results: ------------------- onNext: Make my bed onNext: Unload the dishwasher onNext: bundle results: ------------------- onNext: Make dinner

مثال : Tracking UI Interactions

در این مثال ، کلیک روی دکمه  در UI را Observe می‌کنیم. برای تعیین فاصله زمانی از بافر استفاده می‌کنیم. هر کلیک که در بازه زمانی مشخص ثبت خواهد شد. سپس در پایان بازه کلیک‌‌ها اضافه می‌شوند بنابراین می‌دانیم که تعداد آن‌ها چقدر بوده است. Dependencies: این وابستگی به کتابخانه RxBinding است. می توانید آخرین نسخه آن را دریافت کنید.


def rxbinding_version = "4.0.0"
// Rx Binding Library
implementation "com.jakewharton.rxbinding4:rxbinding:$rxbinding_version"

Activity Code: در اینجا تمام کدی که می‌خواهید به Activity خود اضافه کنید وجود دارد. البته باید یک دکمه به UI نیز اضافه کنید. ما کد UI را درج نکردیم چون بسیار ساده است. نکته: به متد ()RxView.clicks توجه کنید. این بخشی از کتابخانه RxBinding است. فراموش نکنید که Disposables را به CompositeDisposables اضافه کنید. سپس آن‌ها را با متد onDestroy پاک کنید.

// global disposables object
CompositeDisposable disposables = new CompositeDisposable();
// detect clicks to a button
RxView.clicks(findViewById(R.id.button))
.map(new Function<Unit, Integer>() { // convert the detected clicks to an integer
    @Override
    public Integer apply(Unit unit) throws Exception {
        return 1;
    }
})
.buffer(4, TimeUnit.SECONDS) // capture all the clicks during a 4 second interval
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<List<Integer>>() {
    @Override
    public void onSubscribe(Disposable d) {
        disposables.add(d); // add to disposables to you can clear in onDestroy
    }
    @Override
    public void onNext(List<Integer> integers) {
        Log.d(TAG, "onNext: You clicked " + integers.size() + " times in 4 seconds!");
    }
    @Override
    public void onError(Throwable e) {
    }
    @Override
    public void onComplete() {
    }
});
// make sure to clear disposables when the activity is destroyed
@Override
protected void onDestroy() {
    super.onDestroy();
    disposables.clear();
}

خروجی کد بالا به صورت زیر می‌باشد.

onNext: You clicked 19 times in 4 seconds!

عملگرهای تبدیل کننده : Debounce

اپراتور Debounce مواردی را که توسط منبع Observable منتشر می‌شود را فیلتر می‌کند که به سرعت یک مورد دیگر به دنبال آن منتشر می‌شود. (نظم ارسال حفظ می‌شود.) نمودار Debounce: فرض کنید SearchView را در برنامه خود دارید. از آنجا که کاربر کاراکترهایی را در SearchView وارد می‌کند، می‌خواهید Query مربوط به سرور را انجام دهید. اگر گرفتن کاراکترها را برای یک دوره زمانی محدود نکنید، هر بار که یک کاراکتر جدید را وارد SearchView می‌کنید، درخواست جدیدی ایجاد می‌شود. به طور معمول این کار غیر ضروری است و نتایج نامطلوب به همراه خواهد داشت. اجرای یک جستجوی جدید در هر 0.5 ثانیه، خوب است.

مثال : محدود کردن درخواست‌‌های سرور

ما Layout و کل Activity را برای این مثال گنجانده ایم زیرا چند قسمت مختلف و متغیرهای اضافی وجود دارد. این وابستگی به کتابخانه RxBinding است. می توانید آخرین نسخه آن را دریافت کنید.

def rxbinding_version = "4.0.0"
// Rx Binding Library
implementation "com.jakewharton.rxbinding4:rxbinding:$rxbinding_version"

کدهای زیر برای activity_main.xml است:

<?xml version="1.0" encoding="utf-8"?>
<androidx.constraintlayout.widget.ConstraintLayout
    xmlns:android="http://schemas.android.com/apk/res/android"
    xmlns:app="http://schemas.android.com/apk/res-auto"
    xmlns:tools="http://schemas.android.com/tools"
    android:layout_width="match_parent"
    android:layout_height="match_parent"
    tools:context=".MainActivity">
    <androidx.appcompat.widget.SearchView
        android:layout_width="match_parent"
        android:layout_height="?attr/actionBarSize"
        app:layout_constraintTop_toTopOf="parent"
        app:layout_constraintRight_toRightOf="parent"
        app:layout_constraintLeft_toLeftOf="parent"
        android:id="@+id/search_view">
    </androidx.appcompat.widget.SearchView>
  </androidx.constraintlayout.widget.ConstraintLayout>

MainActivity.java: توجه: متغیر "timeSinceLastRequest" فقط برای خروجی‌های Log است. این بخش منطقی (Logic) نیست.

  • ساخت Observable
  • گوش دادن به ورودی متن در SearchView.
  • فرستادن Query به emitter.
  • استفاده از اپراتور ()Debounce، برای محدود کردن درخواست ها.
  • Subscirbe کردن Observer
  • ارسال درخواست به سرور
  • پاک کردن disposable در ()onDestroy.

این 7 مرحله در زیر انجام شده شده است.

public class MainActivity extends AppCompatActivity {
    private static final String TAG = "MainActivity";
    //ui
    private SearchView searchView;
    // vars
    private CompositeDisposable disposables = new CompositeDisposable();
    private long timeSinceLastRequest; // for log printouts only. Not part of logic.
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        searchView = findViewById(R.id.search_view);
        timeSinceLastRequest = System.currentTimeMillis();
        // create the Observable
        Observable<String> observableQueryText = Observable
                .create(new ObservableOnSubscribe<String>() {
                    @Override
                    public void subscribe(final ObservableEmitter<String> emitter) throws Exception {
                        // Listen for text input into the SearchView
                        searchView.setOnQueryTextListener(new SearchView.OnQueryTextListener() {
                            @Override
                            public boolean onQueryTextSubmit(String query) {
                                return false;
                            }
                            @Override
                            public boolean onQueryTextChange(final String newText) {
                                if(!emitter.isDisposed()){
                                    emitter.onNext(newText); // Pass the query to the emitter
                                }
                                return false;
                            }
                        });
                    }
                })
                .debounce(500, TimeUnit.MILLISECONDS) // Apply Debounce() operator to limit requests
                .subscribeOn(Schedulers.io());
        // Subscribe an Observer
        observableQueryText.subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                disposables.add(d);
            }
            @Override
            public void onNext(String s) {
                Log.d(TAG, "onNext: time  since last request: " + (System.currentTimeMillis() - timeSinceLastRequest));
                Log.d(TAG, "onNext: search query: " + s);
                timeSinceLastRequest = System.currentTimeMillis();
                // method for sending a request to the server
                sendRequestToServer(s);
            }
            @Override
            public void onError(Throwable e) {
            }
            @Override
            public void onComplete() {
            }
        });
    }
    // Fake method for sending a request to the server
    private void sendRequestToServer(String query){
        // do nothing
    }
    @Override
    protected void onDestroy() {
        super.onDestroy();
        disposables.clear(); // clear disposables
    }

خروجی کد بالا به صورت زیر می‌باشد.

onNext: time since last request: 3290 onNext: search query: john onNext: time since last request: 1171 onNext: search query: blake onNext: time since last request: 2360 onNext: search query: mitch

عملگرهای تبدیل کننده : ThrottleFirst

موارد منتشر شده توسط منبع Observable را که در یک بازه زمانی قرار دارند، فیلتر می‌کند. (نظم ارسال حفظ می‌شود. نمودار ThrottleFirst: آموزش کامل RxJava در اندروید اپراتور ()ThrottleFirst در توسعه اندروید بسیار مفید است. به عنوان مثال: اگر یک کاربر یک دکمه (Button) را اسپم (spamming) می‌کند و شما نمی‌خواهید هر کلیک را ثبت کنید. شما می‌توانید از اپراتور ()ThrottleFirst استفاده کنید تا فقط در هر بازه زمانی رویدادهای جدید کلیک را ثبت کنید.

مثال : اسپم شدن Button را محدود کنید

این وابستگی به کتابخانه RxBinding است. می توانید آخرین نسخه آن را دریافت کنید.

def rxbinding_version = "4.0.0"
// Rx Binding Library
implementation "com.jakewharton.rxbinding4:rxbinding:$rxbinding_version"

کدهای زیر برای activity_main.xml است:

<?xml version="1.0" encoding="utf-8"?>
<androidx.constraintlayout.widget.ConstraintLayout
    xmlns:android="http://schemas.android.com/apk/res/android"
    xmlns:app="http://schemas.android.com/apk/res-auto"
    xmlns:tools="http://schemas.android.com/tools"
    android:layout_width="match_parent"
    android:layout_height="match_parent"
    tools:context=".MainActivity">
    <Button
        android:layout_width="match_parent"
        android:layout_height="wrap_content"
        app:layout_constraintTop_toTopOf="parent"
        app:layout_constraintBottom_toBottomOf="parent"
        app:layout_constraintRight_toRightOf="parent"
        app:layout_constraintLeft_toLeftOf="parent"
        android:id="@+id/button"
        android:text="button"/>
  </androidx.constraintlayout.widget.ConstraintLayout>

MainActivity.java: 

توجه: متغیر "timeSinceLastRequest" فقط برای خروجی‌های Log است. این بخش منطقی (Logic) نیست.

  •  یک Click Listener برای Button با RxBinding Library ایجاد کنید.
  • با کنترل کردن کلیک پش از گذشت 500 میلی ثانیه می‌توانید یک کلیک جدید ثبت کنید.
  • هنگام ثبت یک کلیک متد هایی را اجرا کنید.
  •  Observable را Dispose کنید.

این چهار مرحله در کدهای زیر انجام شده است.

public class MainActivity extends AppCompatActivity {
    private static final String TAG = "MainActivity";
    //ui
    private Button button;
    // vars
    private CompositeDisposable disposables = new CompositeDisposable();
    private long timeSinceLastRequest;
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        button = findViewById(R.id.button);
        timeSinceLastRequest = System.currentTimeMillis();
        // Set a click listener to the button with RxBinding Library
        RxView.clicks(button)
            .throttleFirst(500, TimeUnit.MILLISECONDS) // Throttle the clicks so 500 ms must pass before registering a new click
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<Unit>() {
                @Override
                public void onSubscribe(Disposable d) {
                    disposables.add(d);
                }
                @Override
                public void onNext(Unit unit) {
                    Log.d(TAG, "onNext: time since last clicked: " + (System.currentTimeMillis() - timeSinceLastRequest));
                    someMethod(); // Execute some method when a click is registered
                }
                @Override
                public void onError(Throwable e) {
                }
                @Override
                public void onComplete() {
                }
            });
    }
    private void someMethod(){
        timeSinceLastRequest = System.currentTimeMillis();
        // do something
    }
    @Override
    protected void onDestroy() {
        super.onDestroy();
        disposables.clear(); // Dispose observable
    }
}

خروجی کد بالا به صورت زیر می‌باشد.

onNext: time since last clicked: 649 onNext: time since last clicked: 830 onNext: time since last clicked: 564

عملگرهای تبدیل کننده : FlatMap

موارد منتشر شده توسط یک Observable را به Observables تبدیل می‌کند، و سپس انتشار از آن را به یک Observable Single قسمت می‌کند. اگر با LiveData آشنا باشید، MediatorLiveData می‌تواند کاری بسیار مشابه انجام دهد. در مورد ()FlatMap در ادامه بیشتر صحبت می‌کنیم. (نظم ارسال حفظ نمی‌شود.) نمودار ()FlatMap: آموزش کامل RxJava در اندروید اپراتور ()FlatMap در توسعه اندروید بسیار مفید است و کارکرد اصلی دارد:

  • ساخت Observables از اشیاء منتشر شده توسط Observables دیگر.
  • ترکیب کردن منبع چندین Observables به Single Obsevables (این چیزی است که به عنوان "flattening" شناخته می‌شود).

از آنجایی که یک Single Observable از منابع بالقوه زیادی تولید می‌شود، آخرین Observable به صورت تصادفی منتشر می‌شود.

به عبارت دیگر، نظم حفظ نمی‌شود. بسته به وضعیت شما، این ممکن است مهم باشد یا مهم نباشد. در مثال زیر نظم مهم نیست.

مثال: ()FlatMap 

فرض کنید می‌خواهید با استفاده از REST API، برخی از پست‌های وبلاگ را از یک وب سایت Query کنید. اما این همه ی کار نیست.

هر پست وبلاگ حاوی نظرات است. و نظرات از یک end-point url دیگر دریافت می‌شود. بنابراین برای بازیابی تمام داده‌ها باید دو Query ایجاد کنیم. وب سایت jsonplaceholder.typicode.com دارای یک REST API است که می‌توانم برای نشان دادن این مورد استفاده کنیم.

ما برای تعامل با REST API از Retrofit استفاده می‌کنیم.

چرا از FlatMap استفاده کنیم؟

زیرا باید اطلاعات را از بیش از یک منبع تهیه کنیم و سپس آن را در یک انتشار واحد ترکیب کنیم، یک Flatmap برای این وضعیت ایده آل است.

به شماره 1 در url توجه کنید. این به این معنی است که نظرات مربوط به پست وبلاگ را با id = 1 بازیابی می‌کند. build.gradle:

یکی از وابستگی هایی که شاید با آن آشنایی نداشته باشید، Retrofit Call Adapter است. برای تبدیل Retrofit Call objects به Observables لازم است. در کلاس ServiceGenerator.java خواهید دید که ما یک ()RxJava2CallAdapterFactory.create  را به عنوان ورودی در متد addCallAdapterFactory از شی Retrofit Builder می‌گذاریم. بدون این وابستگی در دسترس نخواهد بود.

apply plugin: 'com.android.application'
android {
    compileSdkVersion 28
    defaultConfig {
        applicationId "com.codingwithmitch.rxjavaflatmapexample"
        minSdkVersion 21
        targetSdkVersion 28
        versionCode 1
        versionName "1.0"
        testInstrumentationRunner "androidx.test.runner.AndroidJUnitRunner"
    }
    buildTypes {
        release {
            minifyEnabled false
            proguardFiles getDefaultProguardFile('proguard-android-optimize.txt'), 'proguard-rules.pro'
        }
    }
}
dependencies {
    def retrofitVersion = "2.5.0"
    def rxjava_version = '2.2.7'
    def rxandroid_version = '2.1.1'
    def recyclerview_version = "1.0.0"
    implementation fileTree(dir: 'libs', include: ['*.jar'])
    implementation 'androidx.appcompat:appcompat:1.0.0-beta01'
    implementation 'androidx.constraintlayout:constraintlayout:1.1.2'
    testImplementation 'junit:junit:4.12'
    androidTestImplementation 'androidx.test:runner:1.1.0-alpha4'
    androidTestImplementation 'androidx.test.espresso:espresso-core:3.1.0-alpha4'
    // Retrofit
    implementation "com.squareup.retrofit2:retrofit:$retrofitVersion"
    implementation "com.squareup.retrofit2:converter-gson:$retrofitVersion"
    // RxJava
    implementation "io.reactivex.rxjava2:rxjava:$rxjava_version"
    // RxJava Call Adapter
    implementation "com.squareup.retrofit2:adapter-rxjava2:2.5.0"
    // RxAndroid
    implementation "io.reactivex.rxjava2:rxandroid:$rxandroid_version"
    // Recyclerview
    implementation "androidx.recyclerview:recyclerview:$recyclerview_version"
}

AndroidManifest.xml

دسترسی به اینترنت  (internet permissions) به فایل Manifest.xml اضافه کنید.

<?xml version="1.0" encoding="utf-8"?>
<manifest xmlns:android="http://schemas.android.com/apk/res/android"
    package="com.codingwithmitch.rxjavaflatmapexample">
    <uses-permission android:name="android.permission.INTERNET"/>
    <application
        android:allowBackup="true"
        android:icon="@mipmap/ic_launcher"
        android:label="@string/app_name"
        android:roundIcon="@mipmap/ic_launcher_round"
        android:supportsRtl="true"
        android:theme="@style/AppTheme">
        <activity android:name=".MainActivity">
            <intent-filter>
                <action android:name="android.intent.action.MAIN" />
                <category android:name="android.intent.category.LAUNCHER" />
            </intent-filter>
        </activity>
    </application>
</manifest>

activity_main.xml

در کد زیر فقط یک RecyclerView ایجاد شده است.

<?xml version="1.0" encoding="utf-8"?>
<androidx.constraintlayout.widget.ConstraintLayout xmlns:android="http://schemas.android.com/apk/res/android"
    xmlns:app="http://schemas.android.com/apk/res-auto"
    xmlns:tools="http://schemas.android.com/tools"
    android:layout_width="match_parent"
    android:layout_height="match_parent"
    tools:context=".MainActivity">
    <androidx.recyclerview.widget.RecyclerView
        android:layout_width="match_parent"
        android:layout_height="match_parent"
        android:id="@+id/recycler_view"
        android:orientation="vertical">
    </androidx.recyclerview.widget.RecyclerView>
</androidx.constraintlayout.widget.ConstraintLayout>

layout_post_list_item.xml

این یک Layout برای آیتم‌های RecyclerView می‌باشد.

//This is the layout for the RecyclerView list-items
<?xml version="1.0" encoding="utf-8"?>
<LinearLayout xmlns:android="http://schemas.android.com/apk/res/android"
    android:orientation="horizontal"
    android:layout_width="match_parent"
    android:layout_height="wrap_content"
    android:weightSum="100"
    android:padding="20dp">
    <RelativeLayout
        android:layout_width="0dp"
        android:layout_height="wrap_content"
        android:layout_weight="80"
        android:layout_gravity="center_vertical" >
        <TextView
            android:layout_width="match_parent"
            android:layout_height="wrap_content"
            android:id="@+id/title"
            android:text="this is a title"
            android:textColor="#000"
            android:textSize="17sp"
            />
    </RelativeLayout>
    <RelativeLayout
        android:layout_width="0dp"
        android:layout_height="wrap_content"
        android:layout_weight="20"
        android:layout_gravity="center_vertical">
        <TextView
            android:layout_width="wrap_content"
            android:layout_height="wrap_content"
            android:id="@+id/num_comments"
            android:textSize="14sp"
            android:layout_centerInParent="true"/>
        <ProgressBar
            android:layout_width="20dp"
            android:layout_height="20dp"
            android:id="@+id/progress_bar"
            style="@style/Widget.AppCompat.ProgressBar"
            android:layout_centerInParent="true"
            android:visibility="gone"/>
    </RelativeLayout>
</LinearLayout>

Post.java

ما باید برای درخواست‌های پست، Data را مدل سازی کنیم که در کد زیر میبینید.

public class Post {
    @SerializedName("userId")
    @Expose()
    private int userId;
    @SerializedName("id")
    @Expose()
    private int id;
    @SerializedName("title")
    @Expose()
    private String title;
    @SerializedName("body")
    @Expose()
    private String body;
    private List<Comment> comments;
    public Post(int userId, int id, String title, String body, List<Comment> comments) {
        this.userId = userId;
        this.id = id;
        this.title = title;
        this.body = body;
        this.comments = comments;
    }
    public int getUserId() {
        return userId;
    }
    public void setUserId(int userId) {
        this.userId = userId;
    }
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getTitle() {
        return title;
    }
    public void setTitle(String title) {
        this.title = title;
    }
    public String getBody() {
        return body;
    }
    public void setBody(String body) {
        this.body = body;
    }
    public List<Comment> getComments() {
        return comments;
    }
    public void setComments(List<Comment> comments) {
        this.comments = comments;
    }
    @Override
    public String toString() {
        return "Post{" +
                "userId=" + userId +
                ", id=" + id +
                ", title='" + title + '\'' +
                ", body='" + body + '\'' +
                '}';
    }
}

Comment.java

ما باید برای درخواست‌های نظرات، Data را مدل سازی کنیم که در کد زیر میبینید.

public class Comment {
    @Expose
    @SerializedName("postId")
    private int postId;
    @Expose
    @SerializedName("id")
    private int id;
    @Expose
    @SerializedName("name")
    private String name;
    @Expose
    @SerializedName("email")
    private String email;
    @Expose
    @SerializedName("body")
    private String body;
    public Comment(int postId, int id, String name, String email, String body) {
        this.postId = postId;
        this.id = id;
        this.name = name;
        this.email = email;
        this.body = body;
    }
    public int getPostId() {
        return postId;
    }
    public void setPostId(int postId) {
        this.postId = postId;
    }
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getName() {
        return name;
    }
    public void setName(String name) {
        this.name = name;
    }
    public String getEmail() {
        return email;
    }
    public void setEmail(String email) {
        this.email = email;
    }
    public String getBody() {
        return body;
    }
    public void setBody(String body) {
        this.body = body;
    }
}

RequestApi.java

در اینجا interface methods برای اجرای درخواست‌های شبکه با استفاده از Retrofit ارائه شده است.

  • ()getPosts لیست پست‌ها را بازیابی می‌کند.
  • ()getComments لیستی از نظرات را برای یک پست خاص بازیابی می‌کند.
public interface RequestApi {
    @GET("posts")
    Observable<List<Post>> getPosts();
    @GET("posts/{id}/comments")
    Observable<List<Comment>> getComments(
            @Path("id") int id
    );
}

ServiceGenerator.java

برای استفاده از Retrofit باید از آن نمونه بگیریم که کلاس ServiceGenerator مسئول ایجاد نمونه Retrofit است.

public class ServiceGenerator {
    public static final String BASE_URL = "https://jsonplaceholder.typicode.com";
    private static Retrofit.Builder retrofitBuilder =
            new Retrofit.Builder()
                    .baseUrl(BASE_URL)
                    .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                    .addConverterFactory(GsonConverterFactory.create());
    private static Retrofit retrofit = retrofitBuilder.build();
    private static RequestApi requestApi = retrofit.create(RequestApi.class);
    public static RequestApi getRequestApi(){
        return requestApi;
    }
}

RecyclerAdapter.java

در این بخش ما یک RecyclerView ایجاد می‌کنیم.

public class RecyclerAdapter extends RecyclerView.Adapter<RecyclerAdapter.MyViewHolder> {
    private static final String TAG = "RecyclerAdapter";
    private List<Post> posts = new ArrayList<>();
    @NonNull
    @Override
    public MyViewHolder onCreateViewHolder(@NonNull ViewGroup parent, int viewType) {
        View view = LayoutInflater.from(parent.getContext()).inflate(R.layout.layout_post_list_item, null, false);
        return new MyViewHolder(view);
    }
    @Override
    public void onBindViewHolder(@NonNull MyViewHolder holder, int position) {
        holder.bind(posts.get(position));
    }
    @Override
    public int getItemCount() {
        return posts.size();
    }
    public void setPosts(List<Post> posts){
        this.posts = posts;
        notifyDataSetChanged();
    }
    public void updatePost(Post post){
        posts.set(posts.indexOf(post), post);
        notifyItemChanged(posts.indexOf(post));
    }
    public List<Post> getPosts(){
        return posts;
    }
    public class MyViewHolder extends RecyclerView.ViewHolder{
        TextView title, numComments;
        ProgressBar progressBar;
        public MyViewHolder(@NonNull View itemView) {
            super(itemView);
            title = itemView.findViewById(R.id.title);
            numComments = itemView.findViewById(R.id.num_comments);
            progressBar = itemView.findViewById(R.id.progress_bar);
        }
        public void bind(Post post){
            title.setText(post.getTitle());
            if(post.getComments() == null){
                showProgressBar(true);
                numComments.setText("");
            }
            else{
                showProgressBar(false);
                numComments.setText(String.valueOf(post.getComments().size()));
            }
        }
        private void showProgressBar(boolean showProgressBar){
            if(showProgressBar) {
                progressBar.setVisibility(View.VISIBLE);
            }
            else{
                progressBar.setVisibility(View.GONE);
            }
        }
    }
}

MainActivity.java

در زیر درون ManiActivity کدهای مربوطه را اضافه می‌کنیم.

public class MainActivity extends AppCompatActivity {
    private static final String TAG = "MainActivity";
    //ui
    private RecyclerView recyclerView;
    // vars
    private CompositeDisposable disposables = new CompositeDisposable();
    private RecyclerAdapter adapter;
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        recyclerView = findViewById(R.id.recycler_view);
        initRecyclerView();
        getPostsObservable()
                .subscribeOn(Schedulers.io())
                .flatMap(new Function<Post, ObservableSource<Post>>() {
                    @Override
                    public ObservableSource<Post> apply(Post post) throws Exception {
                        return getCommentsObservable(post);
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Post>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        disposables.add(d);
                    }
                    @Override
                    public void onNext(Post post) {
                        updatePost(post);
                    }
                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "onError: ", e);
                    }
                    @Override
                    public void onComplete() {
                    }
                });
    }
    private Observable<Post> getPostsObservable(){
        return ServiceGenerator.getRequestApi()
                .getPosts()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .flatMap(new Function<List<Post>, ObservableSource<Post>>() {
                    @Override
                    public ObservableSource<Post> apply(final List<Post> posts) throws Exception {
                        adapter.setPosts(posts);
                        return Observable.fromIterable(posts)
                                .subscribeOn(Schedulers.io());
                    }
                });
    }
    private void updatePost(final Post p){
        Observable
                .fromIterable(adapter.getPosts())
                .filter(new Predicate<Post>() {
                    @Override
                    public boolean test(Post post) throws Exception {
                        return post.getId() == p.getId();
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Post>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        disposables.add(d);
                    }
                    @Override
                    public void onNext(Post post) {
                        Log.d(TAG, "onNext: updating post: " + post.getId() + ", thread: " + Thread.currentThread().getName());
                        adapter.updatePost(post);
                    }
                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "onError: ", e);
                    }
                    @Override
                    public void onComplete() {
                    }
                });
    }
    private Observable<Post> getCommentsObservable(final Post post){
        return ServiceGenerator.getRequestApi()
                .getComments(post.getId())
                .map(new Function<List<Comment>, Post>() {
                    @Override
                    public Post apply(List<Comment> comments) throws Exception {
                        int delay = ((new Random()).nextInt(5) + 1) * 1000; // sleep thread for x ms
                        Thread.sleep(delay);
                        Log.d(TAG, "apply: sleeping thread " + Thread.currentThread().getName() + " for " + String.valueOf(delay)+ "ms");
                        post.setComments(comments);
                        return post;
                    }
                })
                .subscribeOn(Schedulers.io());
    }
    private void initRecyclerView(){
        adapter = new RecyclerAdapter();
        recyclerView.setLayoutManager(new LinearLayoutManager(this));
        recyclerView.setAdapter(adapter);
    }
    @Override
    protected void onDestroy() {
        super.onDestroy();
        disposables.clear();
    }
}

توجه: همه کد‌های این بخش را می‌توانید ببینید.

عملگرهای تبدیل کننده : ConcatMap

موارد منتشر شده توسط یک Observable را به Observables تبدیل می‌کند. این در اصل همان مورد ()FlatMap است، اما نظم ارسال حفظ می‌شود. اما از آنجا که ()ConcatMap باید منتظر بماند تا هر یک از Observable کار خود را انجام دهند پس از نظر فنی ()ConcatMap غیر همزمان نیست. (نظم ارسال حفظ می‌شود.) آموزش کامل RxJava در اندروید برای نشان دادن این موضوع ، می‌خواهیم دقیقاً همان مثالی که در بخش Flatmap انجام دادیم را انجام بدهیم، اما به جای آن از اپراتور ()Concatmap استفاده می‌کنیم. اگر قبلاً از آن مثال استفاده کرده باشید، نتایج آن واقعاً جالب است.

مثالی از ConcatMap:

تنها کاری که شما باید انجام دهید این است که کدهای موجود در OnCreate of MainActivity را در کد زیر جایگزین کنید.

اگر می‌خواهید این مثال را درک کنید، باید بخش اپراتور Flatmap را بخوانید.

MainActivity.java

در زیر درون ManiActivity کدهای مربوطه را اضافه می‌کنیم.

package com.codingwithmitch.rxjavaflatmapexample;
import androidx.appcompat.app.AppCompatActivity;
import androidx.recyclerview.widget.LinearLayoutManager;
import androidx.recyclerview.widget.RecyclerView;
import io.reactivex.Observable;
import io.reactivex.ObservableSource;
import io.reactivex.Observer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.disposables.CompositeDisposable;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.reactivex.schedulers.Schedulers;
import android.os.Bundle;
import android.util.Log;
import com.codingwithmitch.rxjavaflatmapexample.models.Comment;
import com.codingwithmitch.rxjavaflatmapexample.models.Post;
import com.codingwithmitch.rxjavaflatmapexample.requests.ServiceGenerator;
import java.util.List;
import java.util.Random;
public class MainActivity extends AppCompatActivity {
    private static final String TAG = "MainActivity";
    //ui
    private RecyclerView recyclerView;
    // vars
    private CompositeDisposable disposables = new CompositeDisposable();
    private RecyclerAdapter adapter;
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        recyclerView = findViewById(R.id.recycler_view);
        initRecyclerView();
        getPostsObservable()
                .subscribeOn(Schedulers.io())
                .concatMap(new Function<Post, ObservableSource<Post>>() {
                    @Override
                    public ObservableSource<Post> apply(Post post) throws Exception {
                        return getCommentsObservable(post);
                    }
                })
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Post>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        disposables.add(d);
                    }
                    @Override
                    public void onNext(Post post) {
                        updatePost(post);
                    }
                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "onError: ", e);
                    }
                    @Override
                    public void onComplete() {
                    }
                });
    }
    private Observable<Post> getPostsObservable(){
        return ServiceGenerator.getRequestApi()
                .getPosts()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .flatMap(new Function<List<Post>, ObservableSource<Post>>() {
                    @Override
                    public ObservableSource<Post> apply(final List<Post> posts) throws Exception {
                        adapter.setPosts(posts);
                        return Observable.fromIterable(posts)
                                .subscribeOn(Schedulers.io());
                    }
                });
    }
    private void updatePost(Post post){
        adapter.updatePost(post);
    }
    private Observable<Post> getCommentsObservable(final Post post){
        return ServiceGenerator.getRequestApi()
                .getComments(post.getId())
                .map(new Function<List<Comment>, Post>() {
                    @Override
                    public Post apply(List<Comment> comments) throws Exception {
                        int delay = ((new Random()).nextInt(5) + 1) * 1000; // sleep thread for x ms
                        Thread.sleep(delay);
                        Log.d(TAG, "apply: sleeping thread " + Thread.currentThread().getName() + " for " + String.valueOf(delay)+ "ms");
                        post.setComments(comments);
                        return post;
                    }
                })
                .subscribeOn(Schedulers.io());
    }
    private void initRecyclerView(){
        adapter = new RecyclerAdapter();
        recyclerView.setLayoutManager(new LinearLayoutManager(this));
        recyclerView.setAdapter(adapter);
    }
    @Override
    protected void onDestroy() {
        super.onDestroy();
        disposables.clear();
    }
}

حال به جای اینکه کامنت‌ها همه با هم بازیابی شوند و به صورت تصادفی منتشر شوند، به صورت متوالی بازیابی می‌شوند. بنابراین لیست بعدی نظرات تا منتشر نشدن فهرست قبلی قابل بازیابی نیست. ستون اول: App launch ستون دوم:  پس از گذشت 3000 میلی ثانیه ستون سوم: پس از گذشت 20.000 میلی ثانیه RxJava این خروجی مربوط به کدهای MainActivity بالا است.

apply: sleeping thread RxCachedThreadScheduler-2 for 4000ms apply: sleeping thread RxCachedThreadScheduler-2 for 2000ms apply: sleeping thread RxCachedThreadScheduler-2 for 5000ms apply: sleeping thread RxCachedThreadScheduler-2 for 4000ms apply: sleeping thread RxCachedThreadScheduler-2 for 5000ms

بنابراین به طور خلاصه، اپراتور ()Concatmap مشکلی را که اپراتور ()Flatmap دارد، حل می‌کند. اپراتور ()Flatmap به ترتیب منتشر کردن اشیاء اهمیت نمی‌دهد. فقط وقتی اشیاء آماده می‌شوند، آن‌ها را منتشر می‌کند. با این حال، اپراتور ()Concatmap باید منتظر باشد تا شیء قبلی منتشر شده سپس حرکت به قسمت بعدی را انجام دهد. بسته به وضعیت شما، این می‌تواند خوب باشد یا می‌تواند بد باشد.

عملگرهای تبدیل کننده : SwitchMap

()SwitchMap آیتم‌های منتشر شده توسط یک Observable را به یک Observable تبدیل می‌کند درست مثل ()ConcatMap  و ()FlatMap. تفاوت این است که به محض مشترک شدن (Subscribe) یک Observer جدید، Observer  قبلی را لغو می‌کند.  ()SwitchMap  محدودیتی را حل می‌کند که ()ConcatMap و ()FlatMap هم دارند.(نظم ارسال حفظ می‌شود) نمودار SwitchMap: همان‌طور که در تعریف فوق بیان کردیم، ()SwitchMap مشکلی را حل می‌کند که ()ConcatMap و ()FlatMap هر دو دارای آن هستند. این اطمینان را می‌دهد که فقط یک Observer می‌تواند در هر زمان معین Subscribe شود.

حالت 1:

فرض کنید کاربر بر روی دکمه UI که درخواست شبکه را اجرا می‌کند، کلیک می‌کند. اما کاربر به سرعت یک دکمه متفاوت را فشار می‌دهد که درخواست دوم به شبکه را قبل از اتمام اولین نسخه انجام می‌دهد. شما می‌خواهید روند اول را خاتمه دهید و observers را UnSubscribe کنید، سپس مرحله دوم را شروع کنید. اما این فقط در مورد درخواست‌های شبکه صدق نمی‌کند. هر فرآیند در Background برای اتمام، زمان نیاز دارد، قابل لغو است. مثال دوم را در نظر بگیرید:

حالت 2:

مثال دیگر انتخاب یک تصویر یا فیلم  است. اگر کاربر روی آن کلیک کند، ممکن است تصویر یا فیلم به صورت تمام صفحه شود و با کیفیت بالاتر ارائه شود. اگر قبل از اینکه مورد اول به طور کامل ارائه شود، یک مورد رسانه مختلف را انتخاب کنید، می‌خواهید این روند را لغو کنید و یک مورد جدید را شروع کنید. SwitchMap برای موقعیت هایی مانند این مورد ایده آل است.

مثالی از ()SwitchMap:

نسخه‌ی نمایشی اپلیکیشن به شکل زیر می‌باشد.

  • مرحله‌ی 1:

    لیستی از پست‌های وبلاگ را از آدرس اینترنتی (URL) بازیابی کنید: /jsonplaceholder.typicode.com/posts

  • مرحله‌ی 2: هنگامی که یک مورد لیست کلیک می‌شود، درخواست دیگری به json placeholder api ارسال می‌شود، اما این بار برای یک پست خاص. بنابراین آدرس اینترنتی چیزی شبیه به این خواهد بود: /jsonplaceholder.typicode.com/posts/5 با این کار پست با id = 5 بازیابی می‌شود.
  • مرحله‌ی 3: قبل از اینکه درخواست انجام شود‌، ما شبیه سازی می‌کنیم که شبکه ای با سرعت عملکرد پایین یا کند چگونه باشد. ما درخواست را مجبور می‌کنیم قبل از اجرا 3000 ms منتظر بماند. progress bar بروزرسانی می‌شود که زمان سپری شده از کلیک یک آیتم لیست را نشان می‌دهد. سرعت آهسته‌ی شبکه با استفاده از اپراتور ()Interval، اپراتور ()TakeWhile و ()Filter شبیه سازی می‌شود.
  • مرحله 4: اگر کاربر قبل از اجرای درخواست شبکه، مورد جدیدی از لیست را کلیک کند، کل فرایند دوباره شروع می‌شود. اپراتور ()SwitchMap  از این امر مراقبت می‌کند.
  • مرحله 5: در صورت تکمیل فرآیند (پس از گذشت 3000 میلی ثانیه بدون کلیک بر روی گزینه ای جدید)، درخواست انجام می‌شود و کاربر به یک Activity جدید که عنوان پست را نشان می‌دهد، هدایت می‌شوید.

RxJava

build.gradle

چک کنید که فایل Build.Gradle شما شبیه کدهای زیر باشد.

apply plugin: 'com.android.application'
android {
    compileSdkVersion 28
    defaultConfig {
        applicationId "com.codingwithmitch.rxjavaflatmapexample"
        minSdkVersion 21
        targetSdkVersion 28
        versionCode 1
        versionName "1.0"
        testInstrumentationRunner "androidx.test.runner.AndroidJUnitRunner"
    }
    buildTypes {
        release {
            minifyEnabled false
            proguardFiles getDefaultProguardFile('proguard-android-optimize.txt'), 'proguard-rules.pro'
        }
    }
}
dependencies {
    def retrofitVersion = "2.5.0"
    def rxjava_version = '2.2.7'
    def rxandroid_version = '2.1.1'
    def recyclerview_version = "1.0.0"
    implementation fileTree(dir: 'libs', include: ['*.jar'])
    implementation 'androidx.appcompat:appcompat:1.0.0-beta01'
    implementation 'androidx.constraintlayout:constraintlayout:1.1.2'
    testImplementation 'junit:junit:4.12'
    androidTestImplementation 'androidx.test:runner:1.1.0-alpha4'
    androidTestImplementation 'androidx.test.espresso:espresso-core:3.1.0-alpha4'
    // Retrofit
    implementation "com.squareup.retrofit2:retrofit:$retrofitVersion"
    implementation "com.squareup.retrofit2:converter-gson:$retrofitVersion"
    // RxJava
    implementation "io.reactivex.rxjava2:rxjava:$rxjava_version"
    // RxJava Call Adapter
    implementation "com.squareup.retrofit2:adapter-rxjava2:2.5.0"
    // RxAndroid
    implementation "io.reactivex.rxjava2:rxandroid:$rxandroid_version"
    // Recyclerview
    implementation "androidx.recyclerview:recyclerview:$recyclerview_version"
}

AndroidManifest.xml

دسترسی به اینترنت و تعریف دو Activity را درون فایل AndroidManifest.xml اضافه کنید.

<?xml version="1.0" encoding="utf-8"?>
<manifest xmlns:android="http://schemas.android.com/apk/res/android"
    package="com.codingwithmitch.rxjavaflatmapexample">
    <uses-permission android:name="android.permission.INTERNET"/>
    <application
        android:allowBackup="true"
        android:icon="@mipmap/ic_launcher"
        android:label="@string/app_name"
        android:roundIcon="@mipmap/ic_launcher_round"
        android:supportsRtl="true"
        android:theme="@style/AppTheme">
        <activity android:name=".MainActivity">
            <intent-filter>
                <action android:name="android.intent.action.MAIN" />
                <category android:name="android.intent.category.LAUNCHER" />
            </intent-filter>
        </activity>
        <activity android:name=".ViewPostActivity" />
    </application>
</manifest>

activity_main.xml

در این بخش در Activity_main.xml یک RecyclerView و horizontal Progressbar ایجاد می‌کنیم.

<?xml version="1.0" encoding="utf-8"?>
<LinearLayout
    xmlns:android="http://schemas.android.com/apk/res/android"
    xmlns:app="http://schemas.android.com/apk/res-auto"
    xmlns:tools="http://schemas.android.com/tools"
    android:orientation="vertical"
    android:layout_width="match_parent"
    android:layout_height="match_parent"
    tools:context=".MainActivity">
    <ProgressBar
        android:id="@+id/progress_bar"
        style="@style/Widget.AppCompat.ProgressBar.Horizontal"
        android:layout_width="match_parent"
        android:layout_height="wrap_content"
        android:layout_marginTop="16dp"
        android:padding="10dp"
        />
    <androidx.recyclerview.widget.RecyclerView
        android:id="@+id/recycler_view"
        android:layout_width="match_parent"
        android:layout_height="match_parent"
        android:orientation="vertical">
    </androidx.recyclerview.widget.RecyclerView>
</LinearLayout>

layout_post_list_item.xml

این Layout برای آیتم‌های RecyclerView می‌باشد.

//This is the layout for the RecyclerView list-items.
<?xml version="1.0" encoding="utf-8"?>
<LinearLayout xmlns:android="http://schemas.android.com/apk/res/android"
    android:orientation="horizontal"
    android:layout_width="match_parent"
    android:layout_height="wrap_content"
    android:padding="20dp">
    <TextView
        android:layout_width="match_parent"
        android:layout_height="wrap_content"
        android:id="@+id/title"
        android:text="this is a title"
        android:textColor="#000"
        android:textSize="17sp"
        />
</LinearLayout>

activity_view_post.xml

کد‌های Layout برای Activity دوم درون activity_view_post.xml است.

<?xml version="1.0" encoding="utf-8"?>
<androidx.constraintlayout.widget.ConstraintLayout
    xmlns:android="http://schemas.android.com/apk/res/android"
    xmlns:app="http://schemas.android.com/apk/res-auto"
    xmlns:tools="http://schemas.android.com/tools"
    android:layout_width="match_parent"
    android:layout_height="match_parent"
    tools:context=".MainActivity"
    android:padding="15dp">
    <TextView
        android:layout_width="wrap_content"
        android:layout_height="wrap_content"
        android:id="@+id/text"
        app:layout_constraintTop_toTopOf="parent"
        app:layout_constraintRight_toRightOf="parent"
        app:layout_constraintLeft_toLeftOf="parent"
        app:layout_constraintBottom_toBottomOf="parent"/>
</androidx.constraintlayout.widget.ConstraintLayout>

Post.java

ما باید برای درخواست‌های پست، Data را مدل سازی کنیم که در کد زیر می‌بینید.

public class Post implements Parcelable {
    @SerializedName("userId")
    @Expose()
    private int userId;
    @SerializedName("id")
    @Expose()
    private int id;
    @SerializedName("title")
    @Expose()
    private String title;
    @SerializedName("body")
    @Expose()
    private String body;
    private List<Comment> comments;
    public Post(int userId, int id, String title, String body, List<Comment> comments) {
        this.userId = userId;
        this.id = id;
        this.title = title;
        this.body = body;
        this.comments = comments;
    }
    protected Post(Parcel in) {
        userId = in.readInt();
        id = in.readInt();
        title = in.readString();
        body = in.readString();
    }
    public static final Creator<Post> CREATOR = new Creator<Post>() {
        @Override
        public Post createFromParcel(Parcel in) {
            return new Post(in);
        }
        @Override
        public Post[] newArray(int size) {
            return new Post[size];
        }
    };
    public int getUserId() {
        return userId;
    }
    public void setUserId(int userId) {
        this.userId = userId;
    }
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getTitle() {
        return title;
    }
    public void setTitle(String title) {
        this.title = title;
    }
    public String getBody() {
        return body;
    }
    public void setBody(String body) {
        this.body = body;
    }
    public List<Comment> getComments() {
        return comments;
    }
    public void setComments(List<Comment> comments) {
        this.comments = comments;
    }
    @Override
    public String toString() {
        return "Post{" +
                "userId=" + userId +
                ", id=" + id +
                ", title='" + title + '\'' +
                ", body='" + body + '\'' +
                '}';
    }
    @Override
    public int describeContents() {
        return 0;
    }
    @Override
    public void writeToParcel(Parcel dest, int flags) {
        dest.writeInt(userId);
        dest.writeInt(id);
        dest.writeString(title);
        dest.writeString(body);
    }
}

RequestApi.java

در اینجا interface methods برای اجرای درخواست‌های شبکه با استفاده از Retrofit ارائه شده است.

  • ()getPosts لیست پست‌ها را بازیابی می‌کند.
  • ()getComments لیستی از نظرات را برای یک پست خاص بازیابی می‌کند.
public interface RequestApi {
    @GET("posts")
    Observable<List<Post>> getPosts();
    @GET("posts/{id}")
    Observable<Post> getPost(
            @Path("id") int id
    );
}

ServiceGenerator.java

کلاس ServiceGenerator مسئول ایجاد نمونه‌ی Retrofit است.

public class ServiceGenerator {
    public static final String BASE_URL = "https://jsonplaceholder.typicode.com";
    private static Retrofit.Builder retrofitBuilder =
            new Retrofit.Builder()
                    .baseUrl(BASE_URL)
                    .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
                    .addConverterFactory(GsonConverterFactory.create());
    private static Retrofit retrofit = retrofitBuilder.build();
    private static RequestApi requestApi = retrofit.create(RequestApi.class);
    public static RequestApi getRequestApi(){
        return requestApi;
    }
}

RecyclerAdapter.java

یک کلاس RecyclerViewAdapter با یک OnClick interface سفارشی برای تشخیص کلیک به موارد لیست ایجاد می‌کنیم.

public class RecyclerAdapter extends RecyclerView.Adapter<RecyclerAdapter.MyViewHolder> {
    private static final String TAG = "RecyclerAdapter";
    private List<Post> posts = new ArrayList<>();
    private OnPostClickListener onPostClickListener;
    public RecyclerAdapter(OnPostClickListener onPostClickListener) {
        this.onPostClickListener = onPostClickListener;
    }
    @NonNull
    @Override
    public MyViewHolder onCreateViewHolder(@NonNull ViewGroup parent, int viewType) {
        View view = LayoutInflater.from(parent.getContext()).inflate(R.layout.layout_post_list_item, null, false);
        return new MyViewHolder(view, onPostClickListener);
    }
    @Override
    public void onBindViewHolder(@NonNull MyViewHolder holder, int position) {
        holder.bind(posts.get(position));
    }
    @Override
    public int getItemCount() {
        return posts.size();
    }
    public void setPosts(List<Post> posts){
        this.posts = posts;
        notifyDataSetChanged();
    }
    public void updatePost(Post post){
        posts.set(posts.indexOf(post), post);
        notifyItemChanged(posts.indexOf(post));
    }
    public List<Post> getPosts(){
        return posts;
    }
    public class MyViewHolder extends RecyclerView.ViewHolder implements View.OnClickListener {
        OnPostClickListener onPostClickListener;
        TextView title;
        public MyViewHolder(@NonNull View itemView, OnPostClickListener onPostClickListener) {
            super(itemView);
            title = itemView.findViewById(R.id.title);
            this.onPostClickListener = onPostClickListener;
            itemView.setOnClickListener(this);
        }
        public void bind(Post post){
            title.setText(post.getTitle());
        }
        @Override
        public void onClick(View v) {
            onPostClickListener.onPostClick(getAdapterPosition());
        }
    }
    public interface OnPostClickListener{
        void onPostClick(int position);
    }
}

ViewPostActivity.java

در ViewPostActivity کدهای مربوطه را اضافه کنید.

public class ViewPostActivity extends AppCompatActivity {
    private static final String TAG = "ViewPostActivity";
    private TextView text;
    @Override
    protected void onCreate(@Nullable Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_view_post);
        text = findViewById(R.id.text);
        getIncomingIntent();
    }
    private void getIncomingIntent(){
        if(getIntent().hasExtra("post")){
            Post post = getIntent().getParcelableExtra("post");
            text.setText(post.getTitle());
        }
    }
}

MainActivity.java

  1. ما از یک شیء PublishSubject استفاده می‌کنیم تا تشخیص دهیم که یک Post Object از لیست انتخاب شده است. وقتی تنظیم شود، فرایند ایجاد Observable را شروع می‌کند. می‌توانید متد Override شده onPostClick را مشاهده کنید.
  2. سپس ما از عملگر SwitchMap استفاده می‌کنیم و یک <Observable<Post را از یک Post Object ایجاد می‌کنیم. این تضمین می‌کند که فقط یک Observer در هر زمان معین Observable را Subscribe می‌کند. بنابراین اگر پست دیگری انتخاب شود، از این رو  Observer قبلی از بین می‌رود، بنابراین این فرآیند را دوباره تنظیم می‌کند.
  3. ()takeWhile() ،Interval و ()Filter برای شبیه سازی یک اتصال ضعیف به شبکه هستند. آن‌ها progress bar را تنظیم می‌کنند و زمان سپری شده را ردیابی می‌کنند.
  4. اگر زمان سپری شود، اپراتور ()FlapMap کار را با تبدیل یک Long Object (حاصل عملگر Interval) به یک  Observable<Post> Object پایان می‌دهد.
  5. و در آخر، Object Post به Observer منتقل می‌شود.
  6. بنابراین هر بار که یک پست جدید از لیست انتخاب می‌شود، فرآیند به دلیل وجود عملگر ()SwitchMap دوباره شروع می‌شود.
public class MainActivity extends AppCompatActivity implements RecyclerAdapter.OnPostClickListener {
    private static final String TAG = "MainActivity";
    //ui
    private RecyclerView recyclerView;
    private ProgressBar progressBar;
    // vars
    private CompositeDisposable disposables = new CompositeDisposable();
    private RecyclerAdapter adapter;
    private PublishSubject<Post> publishSubject = PublishSubject.create(); // for selecting a post
    private static final int PERIOD = 100;
    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        recyclerView = findViewById(R.id.recycler_view);
        progressBar = findViewById(R.id.progress_bar);
        initRecyclerView();
        retrievePosts();
    }
    private void initSwitchMapDemo(){
        publishSubject
                // apply switchmap operator so only one Observable can be used at a time.
                // it clears the previous one
                .switchMap(new Function<Post, ObservableSource<Post>>() {
                    @Override
                    public ObservableSource<Post> apply(final Post post) throws Exception {
                        return Observable
                                // simulate slow network speed with interval + takeWhile + filter operators
                                .interval(PERIOD, TimeUnit.MILLISECONDS)
                                .subscribeOn(AndroidSchedulers.mainThread())
                                .takeWhile(new Predicate<Long>() { // stop the process if more than 5 seconds passes
                                    @Override
                                    public boolean test(Long aLong) throws Exception {
                                        Log.d(TAG, "test: " + Thread.currentThread().getName() + ", " + aLong);
                                        progressBar.setMax(3000 - PERIOD);
                                        progressBar.setProgress(Integer.parseInt(String.valueOf((aLong * PERIOD) + PERIOD)));
                                        return aLong <= (3000 / PERIOD);
                                    }
                                })
                                .filter(new Predicate<Long>() {
                                    @Override
                                    public boolean test(Long aLong) throws Exception {
                                        return aLong >= (3000 / PERIOD);
                                    }
                                })
                                // flatmap to convert Long from the interval operator into a Observable<Post>
                                .subscribeOn(Schedulers.io())
                                .flatMap(new Function<Long, ObservableSource<Post>>() {
                                    @Override
                                    public ObservableSource<Post> apply(Long aLong) throws Exception {
                                        return ServiceGenerator.getRequestApi()
                                                .getPost(post.getId());
                                    }
                                });
                    }
                })
                .subscribe(new Observer<Post>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        disposables.add(d);
                    }
                    @Override
                    public void onNext(Post post) {
                        Log.d(TAG, "onNext: done.");
                        navViewPostActivity(post);
                    }
                    @Override
                    public void onError(Throwable e) {
                        Log.e(TAG, "onError: ", e);
                    }
                    @Override
                    public void onComplete() {
                    }
                });
    }
    private void retrievePosts(){
        ServiceGenerator.getRequestApi()
                .getPosts()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<List<Post>>() {
            @Override
            public void onSubscribe(Disposable d) {
                disposables.add(d);
            }
            @Override
            public void onNext(List<Post> posts) {
                adapter.setPosts(posts);
            }
            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: ", e);
            }
            @Override
            public void onComplete() {
            }
        });
    }
    @Override
    protected void onResume() {
        super.onResume();
        progressBar.setProgress(0);
        initSwitchMapDemo();
    }
    private void initRecyclerView(){
        adapter = new RecyclerAdapter(this);
        recyclerView.setLayoutManager(new LinearLayoutManager(this));
        recyclerView.setAdapter(adapter);
    }
    private void navViewPostActivity(Post post){
        Intent intent = new Intent(this, ViewPostActivity.class);
        intent.putExtra("post", post);
        startActivity(intent);
    }
    @Override
    protected void onPause() {
        Log.d(TAG, "onPause: called.");
        disposables.clear();
        super.onPause();
    }
    @Override
    public void onPostClick(final int position) {
        Log.d(TAG, "onPostClick: clicked.");
        // submit the selected post object to be queried
        publishSubject.onNext(adapter.getPosts().get(position));
    }
}

توجه: همه کد‌های این بخش را می‌توانید ببینید. جمع بندی: در مقاله‌ی آموزش کامل RxJava در اندروید ، به طور کلی با RxJava و RxAndroid آشنا شدیم. ما با ویژگی‌ها و کارایی‌های RxJava و RxAndroid در مثال هایی آشنا شدیم  .شما با این آموزش و مثال‌های آن از یک توسعه دهنده تازه کار به توسعه دهنده متوسط در RxJava تبدیل می‌شوید. اگر در این زمینه تجربه‌ای یا سوالی دارید خوشحال می‌شویم که با ما و کاربران وب سایت سون لرن به اشتراک بگذارید. بیشتر بدانید: پیش نیاز‌های یادگیری برنامه نویسی اندروید روش کسب درآمد از برنامه نویسی اندروید برنامه نویسی اندروید چیست؟

منابع این مقاله:

codingwithmitch.com github.com Reactivx.io

اگر به یادگیری بیشتر در زمینه‌ی برنامه نویسی اندروید علاقه داری، با شرکت در دوره‌ی برنامه نویسی اندروید در کمتر از یکسال به یک توسعه‌دهنده اندروید همه فن حریف تبدیل می‌شوی که آماده‌ی استخدام، دریافت پروژه و حتی پیاده‌سازی اپلیکیشن خودت هستی.

 

 

۳۱ دیدگاه
ما همه سوالات و دیدگاه‌ها رو می‌خونیم و پاسخ میدیم
۲۲ دی ۱۴۰۰، ۰۶:۰۹

واقعا خیلی نقاله خوبی بود، متشکرم

شادی ۰۷ مهر ۱۴۰۰، ۰۸:۳۵

عالی ی ی ی ممنون ب خاطر آموزش‌های خوبتون

نازنین کریمی مقدم ۰۷ مهر ۱۴۰۰، ۱۷:۱۸

ممنون که با ما همراه هستید.

amir ۳۱ شهریور ۱۴۰۰، ۰۹:۰۲

فقط میتونم بگم عالی و کامل بود

نازنین کریمی مقدم ۳۱ شهریور ۱۴۰۰، ۱۰:۱۸

ممنون که با ما همراه هستید :)

Zahra Hosseini ۰۹ شهریور ۱۴۰۰، ۰۴:۲۸

من کلا با آموزشی که مثال زیاد داشته باشه خیلی راحت یاد میگیرم و این مقاله از این نظر خیلی عالی بود و واقعا مفید بود واسم مرسی

نازنین کریمی مقدم ۱۳ شهریور ۱۴۰۰، ۱۲:۵۳

درود خوشحالیم که مقاله براتون مفید بوده دوست عزیز :)

مهدی جمشیدی ۰۱ شهریور ۱۴۰۰، ۱۹:۰۹

الان به نظرتون مثال هاتون به درد کسی هم میخوره ؟ چرا flatmap انقدر ناقص توضیح داده شده ؟ توی انتخاب تولید کننده‌های محتوا تون تجدید نظر کنید

نازنین کریمی مقدم ۱۳ شهریور ۱۴۰۰، ۱۷:۵۹

درود سعی میکنیم از نظرات تون در تولید محتواهای بعدی استفاده کنیم. ممنون که با ما همراه هستید.

قاسمی ۰۵ فروردین ۱۴۰۰، ۰۸:۳۴

با تشکر از مقاله ارزشمندتون که معلومه هم برای خودتون و هم برای مخاطب ارزش قائل هستید.

نازنین کریمی مقدم ۰۵ فروردین ۱۴۰۰، ۱۴:۱۰

درود خوشحالیم مقاله براتون مفید بوده. ممنون که با ما همراه هستید.

ساقی ۲۴ مهر ۱۳۹۹، ۰۸:۰۷

عالی بود خلاصه یک کتاب ۵۰۰ صفحه ای رو یکجا گذاشتید از این مدل مقالات جامع بیشتر بزارید

امیر حسین حیدری ۲۶ مهر ۱۳۹۹، ۰۹:۵۶

سلام، حتما دوست عزیز

علی رحمانی ۰۳ مهر ۱۳۹۹، ۱۵:۴۶

خیلی عالی بود با اینکه دیگه جاوا کار نمیکنم و به دارت سوییچ کردم مباحت مروبط با ری اکت عالی بود و مفاهیمش خیلی به دردم خورد مچکر

امیر حسین حیدری ۰۷ مهر ۱۳۹۹، ۰۷:۰۸

سلام دوست عزیز، ممنون از نظرتون موفق باشید.

محمد ۲۰ شهریور ۱۳۹۹، ۱۸:۳۷

سلام. با وجود تجربه زیادم توی کد نویسی، همیشه فکر میکردم RX جاوا یه چیز غوله و بسیار سخت و همش ازش فرار میکردم. ولی امروز وقتی این سایت رو پیدا کردم که دیدم رایگان گذاشته و انقدر روان، خیلی خوب یادگرفتم. حداقل مفاهیمش رو. و این باعث شد که اسم سایت شما همیشه توی ذهنم باشه تا هر وقت منبع آموزش فارسی در مورد چیزی خواستم، اول به اینجا سر بزنم. همین روند رو ادامه بدید چون واقعا سورس‌های فارسی و سایت هایی که سرشون به تنشون ارزش داشته باشه(!) خیلی کمه. به یکه تازی تون ادامه بدید. SEO تون رو هم قوی و قوی‌تر کنید تا مثل برخی سایت‌های فارسی، به ورود به سایت تون عادت کنیم و کنن از بس اولید. موفق باشید

امیر حسین حیدری ۰۱ مهر ۱۳۹۹، ۰۷:۱۰

سلام . از این که از مقاله لذت بردین و واستون مفید بوده واقعا خوش‌حالیم، انشالله در ادامه راه موفق باشی

مهدیار ۰۶ مرداد ۱۳۹۹، ۱۰:۱۱

سلام منبع آموزش درست ذکر نشده است منبع صحیح سایت codingwithmitch.com میباشد با تشکر از سایت خوبتون

امیر حسین حیدری ۰۸ مرداد ۱۳۹۹، ۰۸:۱۷

سلام مهدیار جان ممنون بابت یاد آوری که کردی فراموش شده بود تصحیح شد.

وحید گروسی ۰۸ تیر ۱۳۹۹، ۰۷:۵۸

عالی بود امیرحسین جان

امیر حسین حیدری ۰۸ تیر ۱۳۹۹، ۱۰:۵۶

سلامت باشی وحید جان

اصی ۰۷ تیر ۱۳۹۹، ۱۴:۱۵

واقعا عالی بود

امیر حسین حیدری ۰۷ تیر ۱۳۹۹، ۲۰:۱۱

ممنون از نظرتون از اینکه واستون مفید بوده خوشحالیم

محمدباقر ۰۷ تیر ۱۳۹۹، ۰۶:۳۰

به بدترین شکل ممکن ترجمه شده کل محتوا انگلیسیه فقط کلماتش فارسی هستن تا حدودی متوجه شدم چون از قبل چیزایی بلد بودم کسی که هیچی ازش ندونه حتما متوجه هیچیش نمیشه !

امیر حسین حیدری ۰۷ تیر ۱۳۹۹، ۲۰:۱۱

ممنون از نظرتون دوست عزیر این مقاله خیلی از کلمات انگلیسیش مخصوص هستش و معنای درستی در صورت ترجمه به جمله نمیده سعی میکنیم در بروز رسانی‌های بعدی اگه مشکلی بود برطرف کنیم در ضمن کسانی بودند که هیچی نمیدونستند و با همین مقاله یاد گرفتند

Behrooz Pahlavan ۲۶ خرداد ۱۳۹۹، ۲۳:۱۷

عالی بود ممنون از شما

امیر حسین حیدری ۲۸ خرداد ۱۳۹۹، ۰۸:۱۰

خواهش میکنم نظر لطفتون هست

Parsa Dadras ۰۲ خرداد ۱۳۹۹، ۲۳:۰۶

بسیار عالی ، ممنونم

امیر حسین حیدری ۰۸ خرداد ۱۳۹۹، ۱۶:۱۰

ممنون از نظرتون

ماندانا مهرپور ۰۲ خرداد ۱۳۹۹، ۱۷:۰۰

سلام و عرض ادب عالی بخدااا من الان فهمیدم چی به چیه.انشاالله ادامه رو سریعا بگذارید .خدا خیرتون بده. بازم ممنون

امیر حسین حیدری ۰۲ خرداد ۱۳۹۹، ۲۱:۴۷

سلام خدمت شما ممنون نظر لطفتون هست سعی میکنم هر چی سریع‌تر به دستتون برسه

دوره الفبای برنامه نویسی با هدف انتخاب زبان برنامه نویسی مناسب برای شما و پاسخگویی به سوالات متداول در شروع یادگیری موقتا رایگان شد:

۲۰۰ هزار تومان رایگان
دریافت دوره الفبای برنامه نویسی