RxJava – sådan tæmmer du dine asynkrone processer!

I artiklen ser vi på, hvordan du kan bruge RxJava til at strukturere og implementere programmer, som har behov for at afvikle og udnytte asynkrone processer. Artiklen beskriver, hvordan RxJava sættes op, de basale byggeklodser samt giver eksempler på anvendelse.

At skrive og vedligeholde parallelle programmer er svært, desværre. Front end og back end programmer, skal i stigende grad samle input fra mange services – helst uden at blokere UI’et eller det centrale accept loop. De skal kunne håndtere, at services fejler eller er langsomme, og som om det ikke var galt nok, så skal programmerne også helst udnytte det voksende antal CPU-kerner, som hardwaren tilbyder selv på små håndholdte enheder.

Dine programmer kan nemt drukne i styring af thread pools, caching, timeouts, fejlhåndtering o.s.v. Forretningslogikken, alt det vigtige, bliver sværere og sværere at følge pga. støjen fra infrastrukturen.

Reactive streams to the rescue

”Reactive” er et af tidens store buzz-words. Termen sniger sig ind alle vegne fra front end til back end frameworks, og som det så ofte går, kommer begrebet til at dække et større og større felt af teknologier og ideer. Udgangspunktet kan dog genfindes i det reaktive manifest. Her opstilles en række målsætninger for moderne IT-systemer, som i kernen handler om evnen til at reagere på hændelser uden afbrydelse og med minimal forsinkelse.

I starten af 0’erne udviklede Eric Meijer Reactive Extensions til .NET for Microsoft for at adresserer nogle af disse problemer. Det arbejde fik Netflix øje på, da de manglede en platform til at samle og orkestrerer deres servicelag, men da Netflix er en JVM-shop, udviklede Ben Christensen (mfl.) en Java version (RxJava), som siden har dannet grundlag for versioner i andre JVM-sprog (RxScala, RxGroovy mfl.). Initiativet har nu fået sit eget liv på Reactivex.io men stadig med støtte fra en række af de største aktører.

Udover en høj grad af modenhed er projektet kendetegnet ved det meget høje niveau af dokumentationen, og så er det selvfølgelig open source.

RxJava

Lad os sætte RxJava op og se på, hvordan det kan hjælpe på nogle af udfordringerne. I et Maven projekt tilføjer du afhængigheden til din pom.xml. For at bruge den pt. seneste release:

<dependency>
    <groupId>io.reactivex</groupId>
    <artifactId>rxjava</artifactId>
    <version>1.1.1</version> 
</dependency>

Eller med Gradle:

compile 'io.reactivex:rxjava:1.1.1'

Det centrale koncept i RxJava er Observables, objekter som udsender hændelser, og Subscribers, objekter som abonnere på og reagere på disse hændelser. Mange vil kende konceptet fra designmønsteret af samme navn, men konceptet kan genfindes i mange ”message-passing” varianter.

RxJava indkapsler alle ”processer” i Observables. Output er en strøm (stream) af hændelser/elementer, som forsætter til processen afslutter. Afslutningen er i sig selv en hændelses, og hvis processen kaster en Exception, er det også en hændelse, der kan reageres på.

Lad os se et eksempel:

(1) Observable<String> strings = Observable.create(s -> {
    log.info("connecting subscriber");
    try{
       s.onStart(); // signaler start
       s.onNext("produce 1"); // send et element
       s.onNext("produce 2");
       s.onCompleted(); // signaler afslutning
     } catch (Exception e) {
(4)    s.onError(e);  // signaler fejl
     }
    });
(2) strings.subscribe(
               s -> log.info("on next: {}", s), // behandling af hvert element
               e -> log.error("boom", e),  // hvis der sker en fejl
               () -> log.info("completed no errors") // når strømmen slutter normalt
    );
(3) strings.subscribe(...)

Her dannes en simpel Observable (1), som producerer 2 strenge og afslutter normalt. Dernæst (2) subscribes og hændelserne printes til en ”log”. Hvis der subscribes mere end en gang til den samme Observable (3), udføres processen i udgangspunktet igen for hver subscriber. Skulle der ske en fejl i processen, så fanges fejlen og sendes som en error-hændelse (4).

Det virker måske ikke så imponerende, men så vent og se, hvordan man kan transformere og kombinere disse strømme af hændelser fra Observables.

Transformation og kombination af hændelses-strømme

Før Java 8 gav os streams, havde Rx allerede et ret imponerende katalog af streaming-operationer.

Observable.from("rx-Java is so cool! Not-only-cool .. super-COOL!".split("s"))
           .map(String::toUpperCase)
           .flatMap(s -> Observable.from(s.split("-")))
           .doOnNext(log::info) // log elementerne, når de passerer forbi
           .filter(s -> s.contains("COOL"))
           .count()
           .subscribe(n -> log.info("cool was found {} time(s)", n));

Her et eksempel, som først danner en Observable-stream fra et array af strenge. Hvert ord Upper-cases med map, hvorefter elementerne yderligere splittes omkring ”-”. Den sidstnævnte operation danner arrays af elementer, som vi flatmapper til en ny strøm af strenge. Dernæst logger vi elementerne i strømmen, inden de fortsætter ind i et filter. Elementerne tælles og resultatet logges.

Her er det værd at stoppe et øjeblik og overveje, hvad der sker, hvis ingen subscriber til sådan en kæde? Ingenting. Streams i RxJava er lazy, hvilket vil sige, at først i det øjeblik nogen abonnerer, så aktiveres operationerne i kæden. Der kommer altså ingen ”doOnNext”-log før en subscriber faktisk melder sig.

Lazy-streams gør det muligt at arbejde med principielt uendelige strømme. Du kan også pakke meget ”dyre” processer ind i Observables, uden at de bliver kørt unødvendigt o.s.v. Alt sammen rigtigt brugbart i vores færd mod bedre asynkrone programmer.

Så: Hvordan kan vi kombinere strømme? Og hvordan arbejder vi med Observables, der eksekverer i forskellige tråde? Vi starter med det sidste.

RxJava og multi-threading

Lad os forestille os et webservice-kald eller et database-opslag, som vi pakker ind i en Observable. Operationen kan fejle, og der vil være en varierende forsinkelse fra kald til svar. Yderligere kan kaldet resulterer i mellem 0, et eller flere resultat-elementer/rækker.

Fordi operationen tænkes at være IO-bundet, vil vi gerne udføre den i en thread pool. Lad os se på et mock-eksempel:

   final Random random = new Random(11345);
   //simulerer et webservice-kald til en langsom og dyr asynkron opeartion
   Observable<String> ws1 = Observable.<String>create(s -> {
       try {
           log.info("ws start");
           s.onStart();
           // tænke, tænke, tænke ...
           Thread.sleep(random.nextInt(1000));
           // 50% chance for fejl
           if (random.nextBoolean())
               throw new Exception("oh-no the service blew up!");
           for (int i = random.nextInt(5); i > 0; i--) {
               s.onNext("ws-element" + i);
           }
           log.info("ws end");
           s.onCompleted();
       } catch (Throwable e) {
           s.onError(e);
       }
   }).subscribeOn(Schedulers.io());

   ws1.subscribe(log::info, err -> log.error("bum:{}", err.getMessage()));
   ws1.subscribe(log::info, err -> log.error("bum: {}", err.getMessage()));
   ws1.subscribe(log::info, err -> log.error("bum: {}", err.getMessage()));
   log.info("ws calls done");
   Thread.sleep(5000);

Helt overordnet, har vi et ”kald” pakket ind i ws1, som vi ”kalder” 3 gange, logger en besked og venter. Vi venter, fordi vi nu med subscribeOn har skubbet eksekveringen over i den indbyggede ”IO”-threadpool.

Lad os se, på sekvensen af log-beskeder produceret af ovenstående kode, for bedre at forstå hvad det betyder:

[main] INFO dk.lsz.rxjava.TestRxJavaConcepts - ws calls done
[RxCachedThreadScheduler-1] INFO dk.lsz.rxjava.TestRxJavaConcepts - ws start
[RxCachedThreadScheduler-3] INFO dk.lsz.rxjava.TestRxJavaConcepts - ws start
[RxCachedThreadScheduler-2] INFO dk.lsz.rxjava.TestRxJavaConcepts - ws start
[RxCachedThreadScheduler-2] INFO dk.lsz.rxjava.TestRxJavaConcepts - ws end
[RxCachedThreadScheduler-3] INFO dk.lsz.rxjava.TestRxJavaConcepts - ws-element 4
[RxCachedThreadScheduler-3] INFO dk.lsz.rxjava.TestRxJavaConcepts - ws-element 3
[RxCachedThreadScheduler-3] INFO dk.lsz.rxjava.TestRxJavaConcepts - ws-element 2
[RxCachedThreadScheduler-3] INFO dk.lsz.rxjava.TestRxJavaConcepts - ws-element 1
[RxCachedThreadScheduler-3] INFO dk.lsz.rxjava.TestRxJavaConcepts - ws end
[RxCachedThreadScheduler-1] ERROR dk.lsz.rxjava.TestRxJavaConcepts - bum: oh-no the service blew up!

Først ser vi, at ”ws calls done” fra ”main”-tråden kommer først. Dernæst ses det, at 3 andre tråde ”RxCached…” har haft det øvrige arbejde. Havde vi ikke haft sleep tilsidst, var test-processen afsluttet sammen med ”main”, og vi havde ikke fået noget output. Normalt ikke et issue, hvis du bruger Rx i en container.

De 3 ”kald” er startet cirka samtidigt (”ws start”). Det ”kald” som ”RxCachedThreadScheduler-2” fik, producerede ikke nogle output-elementer (tomt resultatsæt), 3’eren producerede 4 elementer, og 1’eren fejlede. Bemærk i øvrigt at ovenstående sekvens ikke er 100% deterministisk, da der i praksis er en race-condition på random, samt at rækkefølgen af logs kan variere.

Pointen er, at vi nu kan genbruge funktionaliteten og at subscribeOn styrer, hvor afviklingen skal foregå. Vi kan stadig transformere og arbejde med output som i det synkrone eksempel – faktisk berører det slet ikke den del af koden. Vi har abstraheret threading væk – 1. opgave gennemført!

subscribeOn har en fætter observeOn som lader efterfølgende operationer modtage og arbejde med output på en bestemt scheduler (tænk thread pool). Herved kan du placere de enkelte operationer i en stream i forskellige thread pools. Ligeledes betyder det, at den operation, som tager et element, ikke behøver blive afviklet i producent-tråden – så producenten kan f.eks. være et event-loop, som ikke må blokeres (som i asynkrone web-klienter mv.). Det kan være en hjælp, at opstille og afprøve nogle eksempler for at forstå, hvordan de to operationer virker i en stream.

Men nu vil vi gerne samle output og f.eks. foretage webservice-kald / database-opslag baseret på indholdet. Lige nu afvikles kaldene uafhængigt af hinanden og resultaterne ”lever” i hver deres tråd.

Operationer til at kombinere Observables

Og det bringer os til det sidste emne i denne introduktion: Hvordan man kombinerer streams fra forskellige observables. RxJava har en hel del forskellige operatorer, så lad os fokusere på et par af de mere almindelige.

   Observable<Integer> range = Observable.range(1, 5);
   Observable<String> repeat = Observable.just("a", "b", "c").repeat();
   Observable.concat(range, repeat)
           .take(10)
           .subscribe(value -> log.info("concat: {}", value));
   Observable
           .zip(range, repeat, repeat.skip(1),
                   (i, s1, s2) -> i + "-" + s1 + "-" + s2)
           .subscribe(value -> log.info("zip: {}", value));

Først en Observable som sender tallene fra 1 til 5. Dernæst en observable som gentager sekvensen ”a”, ”b”, ”c” – uendeligt.

Concat danner en ny Observable som sender elementerne fra parameter-observables i rækkefølge. Skulle én fejle, fejler concat med den samme fejl. Bemærk, at der findes varianter med mere end 2 parametre.

Zip parer elementerne fra flere Observables. Så for hvert onNext fra zip, er der trukket et element fra hver parameter-observable. I concat-eksemplet er vi nødt til at begrænse os til at tage 10 elementer, da repeat er uendelig. I zip-eksemplet giver range en naturlig begrænsning for det samlede output.

Det interessante i denne sammenhæng er, at det også virker med asynkrone Observables. Så du skal ikke bekymrer dig om synkronisering og låsning. Det ordner RxJava.

   // ticker, som sender tæller op asynkront
   Observable<Long> o1 = Observable.interval(100, TimeUnit.MILLISECONDS);
   Observable<Long> o2 = Observable.interval(300, TimeUnit.MILLISECONDS);
   Observable.zip(o1, o2, (e1, e2) -> e1 + e2)
           .map(String::valueOf)
           .subscribe(log::info);

Observable.interval udsender heltal med den frekvens som parametrene angiver – startende når der subscribes til den. Men selvom den ene proces her er hurtigere end den anden, så er output alligevel 0, 2, 4, 6 …. fordi at zip ”lyner” de to strømme sammen.

Her er det (igen) værd at stoppe op og tænker over, hvad der sker, når processerne producerer uafhængigt af hinanden og i forskelligt tempo? Man skal forestille sig, at der foran zip ophobes elementer i en kø, imens der ventes på input fra den anden (andre) Observable input.

RxJava har forskellige løsnings-stragtegier her: back-pressure – forsøger at bremse producenten, eller aggregerende operationer, der f.eks. sampler elementer (og altså smider andre væk) m.v. Her vil vi ikke gå så meget ind i den problematik.

Sådan tæmmer du dine asynkrone processer

Så lad os samle stumperne i et (tænkt) eksempel, som kan illustrere hvordan RxJava kan bidrage til at sænke kompleksiteten i din kode, og samtidig åbner for brugen af et stort katalog af streaming-operatorer:

   // db-kald som funktion der returner en Observable range fra 1 til param
   Func1<Integer, Observable<Integer>> db1 = param -> Observable.range(1, param);
   // ws-kald som funktion der returner en Observable – som bare giver param tilbage
   Func1<Integer, Observable<Integer>> ws1 = param -> Observable.just(param);
   // ws-kald som funktion der returner en Observable – som bare giver param tilbage
   Func1<Integer, Observable<Integer>> ws2 = param -> Observable.just(param);
   // ws-kald som funktion der returner en Observable – som ikke returnere noget
   Func1<List<Integer>, Observable<Object>> ws3 = param -> Observable.empty();

   // start med at ”kalde” databasen med en parameter ...
   db1.call(5)
           .flatMap(row -> {
               if (row % 2 == 0){
                   //ws2 kaldes kun, hvis ws1 ikke var tom
                   return Observable.concat(ws1.call(row), ws2.call(row)).first();
               } else {
                   // ws1 og ws2 udføres parallelt
                   return Observable.zip(ws1.call(10), ws2.call(20), (e1, e2) -> e1 + e2);
               }
           })
           .onErrorResumeNext(err -> {
               log.error("error - resume next ", err);
               return Observable.empty();
           })
           .doOnNext(n -> log.info("element {}", n))
           //samle/batche 5 elementer i en liste (men højst vente 1 sekund)
           .buffer(5, 1, TimeUnit.SECONDS)
           //output sendes til en ws3
           .flatMap(ws3)
           //subscribe starter kæden
           .subscribe();

Vi forestiller os, at vi har et database-kald (db1), som givet en parameter returnerer et (ukendt) antal rækker (potentielt ingen). For hver række, ønsker vi at udføre ”kompliceret forretningslogik” og i nogle tilfælde prøve ws1, og hvis den ikke giver et resultat tilbage, så kalde ws2. I andre tilfælde ønsker vi at kalde ws1 og ws2 parallelt og samle (synkronisere) outputtet.

Skulle en operation i behandlingen af en ”række” fejle, logger vi det, men går så videre med den næste række fra database-kaldet (onErrorResumeNext). Tilsidst samles output i lister (buffer) med op til 5 elementer – eller så mange elementer vi kan samle på et sekund – inden de sendes til ws3.

For eksemplets skyld er operationerne her implementeret som (RxJava) funktioner. Som illustreret tidligere kunne de udløse asynkrone processer, men her er de for overskuelighedens skyld bare dummy og synkrone.

RxJava – en enkel måde at skrive asynkrone, reaktive programmer

Vi har nu set nogle eksempler på, hvordan RxJava ved at indkapsle processer i Observables gør det muligt at abstrahere fra den underliggende afvikling (threading, eventloop) – og i stedet betragte alt som output-strømme, der kan kombineres og manipuleres med operatorer. I eksemplerne så vi nogle af disse operatorer anvendt. Men der findes mange flere, som er velbeskrevet bl.a. med illustrationer i dokumentationen.

Helt grundlæggende gør RxJava det muligt at skille tingene ad, så det igen er muligt at få øje på forretningsfunktionaliteten. Udviklerne slipper for at beskæftige sig så meget med synkronisering og parallelisering, som kan være svært at gøre rigtigt.

Hvis du vil vide mere

RxJava har som sagt et meget rigt katalog af operationer og virkelig god dokumentation. Start på http://reactivex.io/. Mange store programmerings-sprog er dækket.

Netflix og andre har også udviklet andre frameworks som bygger på eller ”forlænger” Rx-pakkerne. Se f.eks. Hystrix fra Netflix mfl. Lidt flere her https://github.com/ReactiveX/RxJava/wiki.

RxJava findes ligeledes i en Android-specifik udgave, hvilket yderligere understreger de brede anvendelses muligheder.

Du finder eksemplerne fra artiklen i eksekver-bar form her https://github.com/zuwalski/rxjava-examples.