Reactive Programming (Reaktif Programlama) Nedir?
Geliştirilen birçok uygulama, olaylar (events) üzerine kuruludur. Bu olaylar, kullanıcının uygulama ile etkileşimde bulunduğu tıklama olayları olabileceği gibi, görsel bileşenlerin ekrana yüklendiği window.onload
gibi uygulamanın yaşam döngüsü olayları da olabilir. Sunucu taraflı render edilen uygulamalarda, sayfalar sürekli olarak sunucuya gidilip tekrar yüklendiği için bu durum çok da önemli değildir. Ancak tek sayfalı uygulamalar (single page applications) yoğun bir şekilde olay ağırlıklıdır. Bu gibi uygulamalarda olay yönetimi önemlidir. Örneğin aşağıdaki gibi tıklama sonrasında bir kez çalışacak bir işlemimiz olsun:
button.addEventListener(‘click’, function f(e) {
e.currentTarget.removeEventListener(e.type, f);
console.log(“Hoşgeldiniz”);
});
Burada tıklama sonrasında yapılan işlemin tekrar gerçekleşmemesi adına butondan click olayını çıkarmamız gereklidir. Buna benzer her fonksiyon için böyle bir durumu kodlamak zaman içerisinde gereksiz bir zorunluluk oluşturabilir. Bunun yerine RxJS’te yer alan first operatörü aşağıdaki gibi kullanılabilir:
import { fromEvent } from ‘rxjs’;
fromEvent(button, ‘click’)
.pipe(first())
.subscribe(x => console.log(“Hoşgeldiniz”));
Not: RxJS örneklerini Stackblitz sitesinden çalıştırabilirsiniz.
İşte reaktif programlama, bu gibi olayları deklaratif hale getirerek basitleştirdiği için, okunaklı ve daha hatasız kod yazmamızı sağlayacaktır. Reaktif programlamanın yetenekleri sadece bununla da sınırlı değildir. İsterseniz öncelikle Reaktif programlamanın ne olduğuna bakalım.
Reaktif programlama nedir?
Wikipedia tanımına bakacak olursak:
Reaktif programlama, veri akışları (data streams) ve değişikliklerin sisteme yayılması (propagation of change) ile ilgilenen deklaratif bir programlama paradigmasıdır.
Bu tanımı anlayabilmek için öncelikle veri akışının ne demek olduğunu bilmemiz gerekiyor. Reaktif programlamada her veri bir veri akışını oluşturur. Bu veri türünün sadece video veya ses olması gerekmez. Tek bir değişken bile bir akış oluşturabilir. Örneğin aşağıdaki fonksiyonu ele alalım:
import { from } from ‘rxjs’;
from([1,2,3])
.subscribe(x => console.log(x));
Burada from()
fonksiyonu, diziden bir akış oluşturmaktadır. Bu akışa abone olan (subscribe) diğer bir fonksiyon ise akışı izleyerek gelen veriler üzerinde işlem yapmaktadır. Bu bağlamda reaktif programlama, observable (izlenen) ve observer (izleyici) ilişkisi üzerine kuruludur.
Observer ve observable nedir?
Observable, oluşturulan veri akışına denir. Observer ise bu akışı izleyen bir fonksiyondur. Akışı izleme olayına subscription (abonelik) adı verilir. Kod ile örnek verecek olursak:
import { from } from ‘rxjs’;
var observable = from([1,2,3]);
var observer = x => console.log(x);
var subscription = observable.subscribe(observer);
Observable’ın sadece bir diziden oluşması zorunlu değildir. Asenkron olarak çeşitli şekillerde veri üretecek özel Observable’lar da yazılabilir:
import { Observable } from ‘rxjs’;
const observable = new Observable(subscriber => {
subscriber.next(123);
subscriber.next(‘Merhaba Dünya!’);
subscriber.next(true);
subscriber.next(null);
subscriber.next({ a: ‘test’});
setTimeout(() => {
subscriber.next(‘Son değer’);
subscriber.complete();
subscriber.next(‘Bu değeri vermeyecek.’)
}, 1000);
});
const observer = {
next: x => console.log(x),
error: err => console.error(‘Hata: ‘ + err),
complete: () => console.log(‘Veri akışı tamamlandı.’),
};
console.log(‘========== Abonelik başlatıldı ==========’);
const subscription = observable.subscribe(observer);
console.log(‘========== Senkron veriler alındı ==========’);
/*
========== Abonelik başlatıldı ==========
123
Hello World!
true
null
{a: “test”}
========== Senkron veriler alındı ==========
Son değer
Veri akışı tamamlandı.
*/
Buradaki next()
fonksiyonu, kendisine abone olan fonksiyon için bir veri sunar. complete()
fonksiyonu ise artık veri akışının sonlandığını bildirir ve aynı return
deyimi gibi kendinden sonra gelecek ifadeleri çalıştırmadan fonksiyondan çıkar. Her bir observer’ın ise next
, error
ve complete
fonksiyonları bulunabilir:
- next: Verinin alınıp üzerinde işlem yapılmasını sağlar.
- error: bir hata oluştuğunda bildirilmesini sağlar.
- complete: veri akışının sonlandığını bildirir. Observable sürekli olarak veri üretebilir. Oluşan veri akışındaki abonelikten çıkmak için ise
subscription.subscribe();
ifadesi çalıştırılabilir. Abonelikten çıkıldıktan sonra veri akışı hala devam eder fakat abone olan fonksiyon artık çalışmasını durdurmuş olur.
Operator nedir?
Operatörler en basit haliyle birer fonksiyondur. Pipeable operators ve Creation operators olmak üzere iki çeşit operatör bulunur.
Pipeable operatörler
Pipeable operatörler, birbirine arka arkaya bağlanarak veri akışı üzerinde işlem yapmayı sağlayan fonksiyonlardır. Dizilerde bulunan map
ve filter
fonksiyonları gibi çalışırlar ve varolan observable üzerinde değişiklik yapmadan yenisini üretirler:
import { of } from ‘rxjs’;
import { map, filter } from ‘rxjs/operators’;
filter(x => x == 2)(map(x => x * 2)(of(1, 2, 3)))
.subscribe((x) => console.log(x));
// 2
Burada fonksiyon f(g(x)) şeklinde içten dışa doğru çalıştığı için okunaklı olmuyor. Bunun yerine observable içerisinde yer alan pipe()
fonksiyonu kullanılabilir:
import { of } from ‘rxjs’;
import { map, filter } from ‘rxjs/operators’;
of(1, 2, 3)
.pipe(
map(x => x * 2),
filter(x => x == 2)
)
.subscribe(x => console.log(x))
// 2
Creation operatörleri
from()
, of()
gibi bir observable üreten operatörlerdir. Örneğin aşağıdaki interval()
operatörü sıfırdan başlayarak her saniye bir artan değer üretir:
import { interval } from ‘rxjs’;
import { take } from ‘rxjs/operators’;
interval(1000)
.pipe(take(4))
.subscribe(x => console.log(x));
// 1
// 2
// 3
// 4
Burada take()
fonksiyonu, observable’dan yalnızca 4 adet değerin üretilmesini sağlar.
Subject nedir?
Normalde observable.subscribe(observer)
metodu ile yalnızca bir observer, bir adet observable’a abone olmaktadır (unicast). Birden fazla observer’ın bir observer’a abone olması (multicast) için Subject
türü kullanılır. Örnek:
import { Subject } from ‘rxjs’;
const subject = new Subject<number>();
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
subject.next(1);
subject.next(2);
// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
Her Subject’in next
, error
ve complete
fonksiyonları bulunmaktadır. Bu nedenle aynı zamanda bir observer gibi de davranabilir:
import { Subject, from } from ‘rxjs’;
const subject = new Subject<number>();
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
const observable = from([1, 2, 3]);
observable.subscribe(subject);
// Logs:
// observerA: 1
// observerB: 1
// observerA: 2
// observerB: 2
// observerA: 3
// observerB: 3
BehaviorSubject
, ReplaySubject
, ve AsyncSubject
olmak üzere 3 çeşit Subject türü bulunmaktadır.
BehaviorSubject
Veri akışına abone olan observer’ın, veri akışındaki bir önceki veriden başlayarak gelen verileri almasını sağlar.
import { BehaviorSubject } from ‘rxjs’;
// En başta bir önceki değer olmadığı için ilk değer parametre
olarak geçilir:
const subject = new BehaviorSubject(“İlk değer”);
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.next(1);
subject.next(2);
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
subject.next(3);
// Çıktısı:
// observerA: İlk değer
// observerA: 1
// observerA: 2
// observerB: 2 <= next(2)’den sonra çalıştırılmasına rağmen
bu değeri alabildi
// observerA: 3
// observerB: 3
// Aynı kod Subject ile çalıştırılsaydı sadece 3 sayısını
alacaktı:
//observerA: 1
//observerA: 2
//observerA: 3
//observerB: 3
ReplaySubject
BehaviorSubject’in istenildiği kadar eski veriyi alabilen versiyonudur. Parametre olarak ön belleğe alınacak veri adedi geçilir:
import { ReplaySubject } from ‘rxjs’;
const subject = new ReplaySubject(3); // Yani aboneler için son 3 veriyi sakla
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
subject.next(5);
// Logs:
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerB: 2
// observerB: 3
// observerB: 4
// observerA: 5
// observerB: 5
ReplaySubject’e ek olarak ne kadar eski tarihli verilerin alınabileceği de parametre olarak geçilebilir. Aşağıdaki örnekte, 500ms öncesinden bu yana kadar olan 100 verinin saklanması istenmiştir:
import { ReplaySubject } from ‘rxjs’;
// Son 500 ms’ye kadar olan 100 adet veriyi sakla
const subject = new ReplaySubject(100, 500);
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
let i = 1;
setInterval(() => subject.next(i++), 200);
setTimeout(() => {
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
}, 1000);
// Çıktısı
// observerA: 1
// observerA: 2
// observerA: 3
// observerA: 4
// observerA: 5
// observerB: 3
// observerB: 4
// observerB: 5
// observerA: 6
// observerB: 6
// …
AsyncSubject
Akıştaki son değerin alınabilmesi içindir. next()
fonksiyonu çalışsa dahi observer’a veriyi vermez. complete()
fonksiyonu çalıştığında akıştaki son veri ne ise onu verir.
import { AsyncSubject } from ‘rxjs’;
const subject = new AsyncSubject();
subject.subscribe({
next: (v) => console.log(`observerA: ${v}`)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log(`observerB: ${v}`)
});
subject.next(5);
subject.complete();
// Çıktısı
// observerA: 5
// observerB: 5
Scheduler Nedir?
Scheduler (zamanlayıcı), bir aboneliğin ne zaman başlayacağını ve verinin ne zaman observer’lara iletileceğini kontrol eder. 3 bileşenden oluşur:
- Veri yapısı (data structure): Scheduler türüne göre veriyi nasıl tutacağını ve sorgulanacağını belirler.
- Çalışma bağlamı (execution context): Observer’ın ne zaman ve nerede çalıştırılacağını belirler. Örneğin hemen de çalıştırılabilir, ya da setTimeout gibi metotlarla daha sonra da çalıştırılabilir.
- Sanal saat (virtual clock): Scheduler’daki
now()
metodu ile sanal olarak mevcut zamanın bilgisinin alınmasını sağlar. Observer’lar bu saate uyarak çalışırlar. Aşağıdaki örnekteasyncScheduler
‘ın’ kullanımı gösterilmiştir:
import { Observable, asyncScheduler } from ‘rxjs’;
import { observeOn } from ‘rxjs/operators’;
const observable = new Observable((observer) => {
observer.next(1);
observer.next(2);
observer.next(3);
observer.complete();
}).pipe(
observeOn(asyncScheduler)
);
console.log(‘Subscribe öncesi’);
observable.subscribe({
next: x => console.log(x),
error: err => console.error(‘Hata: ‘ + err),
complete: () => console.log(‘Veri akışı tamamlandı.’),
});
console.log(‘Subscribe sonrası’);
// Çıktısı:
// Subscribe öncesi
// Subscribe sonrası
// 1
// 2
// 3
// Veri akışı tamamlandı.
// asyncScheduler kullanılmasaydı:
// Subscribe öncesi
// 1
// 2
// 3
// Veri akışı tamamlandı.
// Subscribe sonrası
asyncScheduler
, setTimeout(fn, 0)
fonksiyonu ile çalıştığı için, verileri observer’a asenkron olarak iletir. 5 tane Scheduler tipi mevcuttur:
- null: Herhangi bir Scheduler kullanılmadığında yapılacak iş senkronize ve recursive olarak çalıştırılır. Anlık çalışarak bitecek işler için tasarlanmıştır.
- queueScheduler: Mevcut olay çerçevesinde yapılacak işi kuyruğa alır. Belirli bir sıra ile arka arkaya çalıştırılacak işler için kullanılır.
- asapScheduler: Yapılacak işi, promise’ler için kullanılan micro task kuyruğuna alır. Mevcut işten sonra ve sonraki işten önce çalıştırılır. Asenkron dönüşümler için kullanılır.
- asyncScheduler: setInterval ile çalıştırılmasını sağlar. Zaman bazlı işlerde kullanılır.
- animationFrameScheduler: Tarayıcının sonraki repaint evresinden hemen önce ilgili işin çalıştırılmasını sağlar. Daha akıcı animasyonlar üretmek için kullanılır.
Sonuç olarak
Reaktif programlama, fonksiyonel programlama ile birlikte sade ve yönetilebilir uygulamalar yazmamızı sağlıyor. İçerisinde birçok yararlı operatör barındırıyor. Reaktif programlama için öğrenme eğrisi yokuşlu ve engellerle dolu olabilir. Bu nedenle örnek projelerde belirli bir süre çalışarak pratik yapmaya ihtiyaç var. Örnek projeler için https://www.learnrxjs.io/recipes/ adresini ziyaret edebilirsiniz. Sıkıştığınız durumda belirli bir işlemi yapmak için hangi operatör kullanmanız gerektiğini buradaki karar ağacından tespit edebilirsiniz. Bu yazı hakkında öneri ve görüşlerinizi yorum bölümünden bize yazabilirsiniz. Bir sonraki yazıda görüşmek üzere…