abclinuxu.cz AbcLinuxu.cz itbiz.cz ITBiz.cz HDmag.cz HDmag.cz abcprace.cz AbcPráce.cz
AbcLinuxu hledá autory!
Inzerujte na AbcPráce.cz od 950 Kč
Rozšířené hledání
×
dnes 13:00 | Komunita

Do 30. října se lze přihlásit do dalšího kola programu Outreachy (Wikipedie), jehož cílem je přitáhnout do světa svobodného a otevřeného softwaru lidi ze skupin, jež jsou ve světě svobodného a otevřeného softwaru málo zastoupeny. Za 3 měsíce práce, od 4. prosince 2018 do 4. března 2019, v participujících organizacích lze vydělat 5 500 USD.

Ladislav Hagara | Komentářů: 31
včera 22:22 | Komunita

Společnost Purism představila kryptografický token Librem Key. Koupit jej lze za 59 dolarů. Token byl vyvinut ve spolupráci se společností Nitrokey a poskytuje jak OpenPGP čipovou kartu, tak zabezpečení bootování notebooků Librem a také dalších notebooků s open source firmwarem Heads.

Ladislav Hagara | Komentářů: 7
včera 20:33 | Nová verze

Společnost NVIDIA oficiálně vydala verzi 10.0 toolkitu CUDA (Wikipedie) umožňujícího vývoj aplikací běžících na jejich grafických kartách. Přehled novinek v poznámkách k vydání.

Ladislav Hagara | Komentářů: 0
včera 20:00 | Upozornění

Příspěvek Jak přežít plánovanou údržbu DNS na blogu zaměstnanců CZ.NIC upozorňuje na historicky poprvé podepsání DNS root zóny novým klíčem dne 11. října 2018 v 18:00. Software, který nebude po tomto okamžiku obsahovat nový DNSSEC root klíč, nebude schopen resolvovat žádná data. Druhým důležitým datem je 1. února 2019, kdy významní výrobci DNS softwaru, také historicky poprvé, přestanou podporovat servery, které porušují DNS standard

… více »
Ladislav Hagara | Komentářů: 5
včera 15:55 | Pozvánky

Spolek OpenAlt zve příznivce otevřených řešení a přístupu na 156. brněnský sraz, který proběhne v pátek 21. září od 18:00 v restauraci Na Purkyňce na adrese Purkyňova 80.

Ladislav Hagara | Komentářů: 0
včera 13:22 | Nová verze

Alan Griffiths z Canonicalu oznámil vydání verze 1.0.0 display serveru Mir (GitHub, Wikipedie). Mir byl představen v březnu 2013 jako náhrada X serveru a alternativa k Waylandu. Dnes Mir běží nad Waylandem a cílen je na internet věcí (IoT).

Ladislav Hagara | Komentářů: 0
20.9. 22:00 | Nasazení Linuxu
Stabilní aktualizace Chrome OS 69 (resp. Chromium OS), konkrétně 69.0.3497.95, přináší mj. podporu linuxových aplikací. Implementována je pomocí virtualizace, a proto je tato funkce také omezena na zařízení s dostatkem paměti a podporou hardwarové akcelerace, tudíž nejsou podporovány chromebooky s 32bitovými architekturami ARM, či Intel Bay Trail (tzn. bez Intel VT-x).
Fluttershy, yay! | Komentářů: 5
20.9. 21:32 | Zajímavý projekt

Došlo k uvolnění linuxové distribuce CLIP OS, vyvíjené francouzským úřadem pro kybernetickou bezpečnost ANSSI, jako open source. Vznikla za účelem nasazení v úřadech, kde je potřeba omezit přístup k důvěrným datům. Je založená na Gentoo.

Fluttershy, yay! | Komentářů: 1
20.9. 16:00 | Komerce

Zjistěte více o bezpečné a flexibilní architektuře v cloudu! IBM Cloud poskytuje bezpečné úložiště pro Vaše obchodní data s možností škálovatelnosti a flexibilitou ukládání dat. Zároveň nabízí prostředky pro jejich analýzu, vizualizaci, reporting a podporu rozhodování.

… více »
Fluttershy, yay! | Komentářů: 12
20.9. 12:22 | Nová verze

V dubnu letošního roku Mozilla představila webový prohlížeč pro rozšířenou a virtuální realitu Firefox Reality (GitHub). V úterý oznámila vydání verze 1.0. Ukázka na YouTube. Firefox Reality je k dispozici pro Viveport, Oculus a Daydream.

Ladislav Hagara | Komentářů: 2
Na optické médium (CD, DVD, BD aj.) jsem naposledy vypaloval(a) data před méně než
 (13%)
 (14%)
 (21%)
 (23%)
 (25%)
 (4%)
 (1%)
Celkem 390 hlasů
 Komentářů: 33, poslední 16.9. 11:55
Rozcestník

Funkcionální programování ve Scale 3.

18. 5. 2015 | Redakce | Návody | 1987×

V tomto článku si popíšeme typ Task a ukážeme si, jak konstruovat procesy s vedlejšími efekty, jenž dělají požadavky typu Task. Poté se budeme věnovat Sinkům a Channelům. Na závěr článku si řekneme něco o plánovaných změnách v knihovně scalaz-stream.

scalaz-stream 3: Procesy s vedlejšími efekty

Používáme knihovnu scalaz-stream verze 0.7a a předpokládáme následující importy:

import scalaz.concurrent.Task
import scalaz.stream._
import Process._

Při programování s knihovnou scalaz-stream je zvykem, že vedlejší efekty dělají pouze procesy s požadavky typu Task. Pomocí funkce Task.delay můžeme do Tasku zabalit každý výraz, například:

val t = Task.delay { println("Hi") }

Jak již víme, Task lze spustit pomocí funkce run:

t.run

Při každém spuštění se zabalený výraz vyhodnotí a získaná hodnota je vrácena funkcí run. Při každém spuštění t se tedy vypíše řetězec Hi a run vrátí hodnotu () typu Unit. Další funkcí, která vytváří Task, je Task.now:

val t = Task.now(5 + 3)

Rozdíl mezi funkcemi Task.now a Task.delay spočívá v tom, že funkci Task.now se argument předává hodnotou (argument je vyhodnocen před zavoláním funkce) a funkci Task.delay se předává jménem. V našem případě je tedy výraz 5 + 3 vyhodnocen a teprve poté je zkonstruován Task. Například následující Task vrací pokaždé stejné náhodné číslo:

import scala.util.Random

val badRandNumGen = Task.now { Random.nextInt }

Náhodné číslo je totiž vygenerováno ještě před zkonstruováním Tasku, při jeho spouštění se žádné náhodné číslo negeneruje. Použijeme-li Task.delay místo Task.now

val randNumGen = Task.delay { Random.nextInt }

dostaneme Task, který při každém spuštění generuje nové náhodné číslo. Pokud výraz uvnitř Tasku vyhodí výjimku, je tato výjimka následně vyhozena i funkcí run. Alternativně můžeme použít funkci attemptRun, která výjimku nevyhazuje, ale vrací. Výsledek funkce attemptRun má typ Throwable \/ A, kde \/ je jako Either. Například

Task.delay { 5 / 0 }.attemptRun

vrátí hodnotu tvaru -\/(_: ArithmeticException). Chceme-li do Tasku zabalit asynchronní výpočet, použijeme funkci Task.async:

import scalaz.\/

def startAsyncComputation[A](cb: (Throwable \/ A) => Unit): Unit = ???

val t = Task.async(startAsyncComputation)

Funkce startAsyncComputation spustí asynchronní výpočet a skončí. Výsledek asynchronního výpočtu se do t předá pomocí callbacku cb, který funkce startAsyncComputation dostala jako argument. Funkce Task.async je obzvláště užitečná pro propojení s okolním světem. Například díky Task.async snadno napíšeme funkci, která převede Future na Task:

import scala.concurrent.Future
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success}
import scalaz.{-\/, \/-}

def futureToTask[A](fut: => Future[A]): Task[A] =
  Task.async[A](cb => fut onComplete {
    case Success(a) => cb(\/-(a))
    case Failure(t) => cb(-\/(t))
  })

Future se funkci futureToTask předává jménem, aby se zamezilo jejímu spuštění před spuštěním Tasku. Zde je příklad použití:

val t = futureToTask(Future(println("Hi")))
t.run

Na závěr našeho stručného představení Tasku poznamenejme, že Task je monáda, k dispozici jsou tudíž funkce map, flatMap i for notace.

Procesy a Tasky

Máme-li Task, můžeme pomocí eval vytvořit proces, který Task spustí a vyprodukuje hodnotu z Tasku. Například proces, který vyprodukuje jedno náhodné číslo:

eval(randNumGen)

Název eval je poněkud zavádějící, neboť eval Tasky nespouští a ani nijak nevyhodnocuje, eval pouze zabalí Task do procesu. Například

val randomNumbers = eval(randNumGen).repeat

je proces s mnoha náhodnými čísly, nikoliv s jedním náhodným číslem, které se opakuje. eval do procesu zabalí celý Task, tj. nedojde k tomu, že by eval napřed spustil Task randNumGen a poté do procesu zabalil pouze výsledek Tasku.

eval a repeat se často používají společně, proto vznikla funkce repeatEval, která nám výše uvedený proces náhodných čísel dovoluje přepsat na jedno volání:

val randomNumbers = repeatEval(randNumGen)

Pokud nás výsledek Tasku nezajímá, můžeme použít eval_, který Task spustí, ale jeho výsledek zahodí:

val p = eval_(Task.delay { println("Starting") }) ++ randomNumbers

p je proces produkující čísla, kdybychom však místo eval_ použili eval, měli bychom proces produkující AnyVal – první vyprodukovaný prvek by byl () vrácený println("Starting"), ten by byl následován náhodnými čísly. Obecně, chceme-li, aby proces žádné prvky neprodukoval, můžeme použít funkci drain, ta z procesu, který produkuje prvky typu A, udělá proces, který neprodukuje žádné prvky (typ produkovaných prvků je Nothing). eval_(t) je zkratka za eval(t).drain.

Procesy se zdroji

Nyní se pokusíme napsat proces, který přečte a vyprodukuje řádky ze souboru – něco jako io.linesR. Náš první pokus je:

import scala.io.Source

def badLinesR(file: String): Process[Task, String] =
  eval(Task.delay { Source.fromFile(file) }).flatMap { src =>
    lazy val lines = src.getLines
    repeatEval(Task.delay { lines.next }) ++ eval_(Task.delay { src.close })
  }

První problém funkce badLinesR je, že volá lines.next aniž by testovala lines.hasNext. Pokud je lines.haxNext false, není chování lines.next definováno. Otázkou je, co dělat, když je lines.hasNext false? Pro tyto případy existuje speciální výjimka Cause.Terminated(Cause.End), kterou můžeme vyhodit. Tato výjimka zajistí, že repeatEval skončí a pokračuje se další částí procesu – v našem případě eval_(Task.delay { src.close }).

Další problém funkce badLinesR je, že nezaručuje uzavření souboru. Tento problém nastane, pokud proces předčasně přerušíme, například pomocí take:

// Simuluje cteni souboru se dvema radky.
val p =
  eval(Task.delay { println("Line 1") }) ++
  eval(Task.delay { println("Line 2") }) ++
  eval_(Task.delay { println("Closing") })

p.take(1).run.run // Closing se nevypise.
p.take(2).run.run // Closing se nevypise.
p.take(3).run.run

Problém s neuzavřením souboru rovněž nastane, když Task vyhodí jinou výjimku než Cause.Terminated(Cause.End):

val p =
  eval(Task.delay { 5 / 0 }) ++
  eval_(Task.delay { println("Closing") })

p.run.run // Closing se nevypise, run vyhodi vyjimku.

S výjimkou Cause.Terminated(Cause.End) tento problém nenastane a soubor se uzavře:

val p =
  eval(Task.delay { throw Cause.Terminated(Cause.End) }) ++
  eval_(Task.delay { println("Closing") })

p.run.run // Closing se vypise, run zadnou vyjimku nevyhodi.

Obecně, máme-li proces tvaru p ++ q a skončí-li p výjimkou jinou než Cause.Terminated(Cause.End), proces q se vůbec nevykoná. Pokud se q má vykonat v každém případě bez ohledu na to, jak skončil proces p, je třeba použít onComplete místo ++:

val p =
  eval(Task.delay { 5 / 0 }) onComplete
  eval_(Task.delay { println("Closing") })

p.run.run // Closing se vypise, run vyhodi vyjimku.

Nyní již známe vše, abychom napsali funkci linesR:

def linesR(file: String): Process[Task, String] =
  eval(Task.delay { Source.fromFile(file) }).flatMap { src =>
    lazy val lines = src.getLines
    repeatEval(Task.delay {
      if (lines.hasNext) lines.next
      else throw Cause.Terminated(Cause.End)
    }) onComplete eval_(Task.delay { src.close })
  }

V knihovně scalaz-stream je funkce io.resource, která zobecňuje naši funkci linesR pro jiné zdroje. Ostatní funkce v modulu io (včetně funkce io.linesR) využívají tuto obecnou funkci io.resource.

Jak již víme, p onComplete q zajistí, že se q spustí bez ohledu na to, jak skončilo p. Kromě toho onComplete zajistí, že proces q není možné přerušit:

val p = emit(1)
val q =
  eval_(Task.delay { println("One") }) ++
  emit(2) ++
  eval_(Task.delay { println("Two") })
val r = p onComplete q

r.take(2).run.run // Vypise One i Two.

Ačkoliv z procesu r načítáme pouze dva prvky, proces q doběhne celý – vypíše se i řetězec Two. Pokud by q byl proces, který nikdy neskončí (například emit(0).repeat), tak by ani proces r.take(2) nikdy neskončil. p onComplete q se nejčastěji používá v situacích, kdy proces q má uklidit po procesu pq nejde přerušit, aby bylo zaručeno, že se úklid provede celý.

Sink a Channel

V této části se budeme zabývat tím, jak prvky produkované procesem někam uložit nebo někam zapsat. Pokud proces produkuje málo prvků a stačí, když se zápis provede až celý proces skončí, můžeme použít runLog. Proces spustíme pomocí runLog.run a poté, co skončí, vyprodukované prvky zapíšeme.

Situace se ovšem zkomplikuje, když proces produkuje (neomezeně) mnoho prvků a my je chceme zapisovat průběžně. V některých případech (například logování) lze zápisy umístit do map nebo do nějakého jiného procesu typu Process1. Pokud však zápisy prvků používají Task (například proto, že jsou asynchronní, nebo proto, že je dobrým zvykem vedlejší efekty umístit do Tasků), je třeba použít i proces s typem požadavků Task, což Process1 není.

Naštěstí má výše nastíněný problém velmi jednoduché řešení. Tím řešením je proces funkcí. Pojďme se podívat na příklad. Mějme proces řádků ze souboru

val p: Process[Task, String] = io.linesR("file.txt")

a předpokládejme, že chceme tyto řádky zapsat na standardní výstup. K zápisu na standardní výstup použijeme proces funkcí, které zapisují na standardní výstup:

val stdOut: Process[Task, String => Unit] = constant(println)

constant(a) je zkratka za emit(a).repeat. Pro každý řádek z p zavoláme funkci ze stdOut – k tomu lze použít zip a map

p.zip(stdOut).map { case (line, f) => f(line) }.run.run

nebo zipWith:

p.zipWith(stdOut) { (line, f) => f(line) }.run.run

To funguje, ale vedlejší efekt f(line) se stále provádí v procesu typu Process1. Navíc opět nepoužíváme Task – funkce, které provádí vedlejší efekty, mají typ String => Unit. Změníme tedy typ funkcí ze String => Unit na String => Task[Unit]:

val stdOut: Process[Task, String => Task[Unit]] =
  constant(s => Task.delay { println(s) })

Když spustíme náš kód

p.zipWith(stdOut) { (line, f) => f(line) }.run.run

zjistíme, že se nic nevypisuje. Problém je v tom, že proces

p.zipWith(stdOut) { (line, f) => f(line) }

produkuje Tasky, které nespouští – typ procesu je Process[Task, Task[Unit]]. Aby se něco vypsalo, potřebujeme produkované Tasky spustit, což provede funkce eval:

p.zipWith(stdOut) { (line, f) => f(line) }.eval.run.run

eval tedy z procesu typu Process[Task, Task[A]] udělá proces typu Process[Task, A]. stdOut je Sink. Pro typ Process[Task, A => Task[Unit]] existuje typový alias Sink[Task, A]. Data do Sinku posíláme funkcí to:

p.to(stdOut).run.run

Funkce to je implementována pomocí zipWith a eval – přesně jako v našem příkladu. S pomocí funkce io.resource snadno naimplementujeme Sink, který zapisuje řádky do souboru:

import java.io.FileWriter

def linesW(file: String): Sink[Task, String] =
  io.resource(
    Task.delay { new FileWriter(file) }
  )(
    fw => Task.delay { fw.close() }
  )(
    fw => Task.now(line => Task.delay { fw.write(line); fw.write("\n") })
  )

to nahrazuje produkované prvky hodnotou (). Pokud chceme prvky zachovat, musíme použít funkci observe – ta se chová jako to, ale nenahrazuje prvky hodnotou (). S pomocí observe můžeme poslat prvky do více Sinků:

p.observe(stdOut).to(linesW("file2.txt")).run.run

Do Sinku stdOut můžeme posílat pouze řetězce, pokud bychom tam chtěli poslat několik náhodných čísel, museli bychom čísla převést na řetězce:

randomNumbers.map(_.toString).observe(stdOut).take(10).runLog.run

Stinnou stránkou tohoto řešení je, že proces už neprodukuje čísla, ale řetězce. V podobných případech se hodí funkce contramap, která změní změní typ vstupních prvků Sinku:

randomNumbers.observe(stdOut.contramap(_.toString)).take(10).runLog.run

contramap lze snadno implementovat pomocí map – v našem případě by map transformoval funkce typu String => Task[Unit] na funkce typu Int => Task[Unit].

Sink[Task, A] je proces, který produkuje funkce A => Task[Unit]. Nahradíme-li Unit typovým parametrem B, dostaneme proces, kterému se říká Channel. Channel[Task, A, B] je tedy alias pro Process[Task, A => Task[B]] a Sink[Task, A] můžeme chápat jako alias pro Channel[Task, A, Unit]. Channel[Task, A, B] používáme pro transformaci prvků typu A na prvky typu B. Do Channelu se prvky posílají pomocí funkce through, typ vstupních prvků lze změnit pomocí funkce contramap.

Připomeňme si Process1 nub pro odstranění duplikátů z minulého dílu

def nub[A](history: Set[A] = Set.empty[A]): Process1[A, A] =
  receive1 { x =>
    if (history.contains(x)) nub(history)
    else emit(x) ++ nub(history + x)
  }

a zkusme napsat Channel, který dělá totéž. První nepříjemnost je, že Channel transformuje jeden prvek vstupu na jeden prvek výstupu, jenže nub v případě duplikátů žádné prvky neprodukuje. Toto omezení Channelů obejdeme tak, že použijeme Channel typu Channel[Task, A, Option[A]] (místo Channel[Task, A, A]) – pro duplikáty bude Channel vracet None.

Další nepříjemností je, že nemůžeme použít rekurzi pro předávání aktualizované množiny history. Problém je v tom, že Channel napřed vyprodukuje funkce A => Task[Option[A]] a teprve poté jsou tyto funkce aplikovány a Tasky spuštěny pomocí through. Samotný Channel tedy žádné filtrování neprovádí – ten pouze produkuje funkce A => Task[Option[A]]. Filtrování se provádí až pomocí through – je to tedy funkce through, od níž bychom potřebovali, aby předávala množinu history mezi funkcemi z Channelu. Jelikož funkce through nic takového neumí, musíme si pomoci sami – po spuštění Channelu vytvoříme měnitelnou množinu, kterou dáme do všech produkovaných funkcí:

def nubChannel[A]: Channel[Task, A, Option[A]] =
  eval(Task.delay {
    collection.mutable.Set.empty[A]
  }).flatMap { history =>
    constant((x: A) => Task.delay {
      if (history.contains(x)) None
      else {
        history += x
        Some(x)
      }
    }).toSource
  }

Použití nub:

Process(1, 2, 1, 0, 1, 2).toSource |> nub()

Použití nubChannel:

(Process(1, 2, 1, 0, 1, 2).toSource through nubChannel) |> process1.stripNone

Na rozdíl od Channelu může Process1 vrátit libovolný počet prvků a snáze se tam pracuje se stavem. Na rozdíl od Process1 může Channel provádět vedlejší efekty. Pokud nepotřebujeme vedlejší efekty, dáváme přednost Process1 před Channelem – důvodem je právě flexibilita Process1.

Výhled do budoucna

Hezké je, že řadu funkcí, které jsme si zde představili, může napsat i uživatel knihovny. Například p.drain můžeme nahradit p.flatMap { x => empty } nebo p.run lze nahradit funkcemi p.drain.runLog. Už jsme také viděli, že repeatEval(t) je zkratka za eval(t).repeat, nebo ne? Skutečnost bohužel není tak růžová, jak by se mohlo zdát:

val x = eval(Task.delay { throw Cause.Terminated(Cause.End) }).repeat
val y = repeatEval(Task.delay { throw Cause.Terminated(Cause.End) })

Na rozdíl od procesu y proces x nikdy neskončí. V případě výjimky Cause.Terminated(Cause.End) se tedy repeatEval nechová jako eval a repeat. V případě jiných výjimek se x i y chovají stejně. Příkladem stejného charakteru je proces p.eval, který můžeme zapsat jako p.flatMap(eval), ale opět zjistíme, že se v případě výjimky Cause.Terminated(Cause.End) obě konstrukce liší:

val x = constant(Task.delay { throw Cause.Terminated(Cause.End) }).flatMap(eval)
val y = constant(Task.delay { throw Cause.Terminated(Cause.End) }).eval

V uvedeném příkladu x opět neskončí a y ano. Pravděpodobně z tohoto důvodu bude výjimka Cause.Terminated(Cause.End) odstraněna z další verze knihovny scalaz-stream.

Další plánovaná změna v knihovně scalaz-stream se týká funkcí ++ a onComplete. Kdybychom místo ++ a onComplete použili syntaxi jazyka Scala, pak by se proces p ++ q zapisoval jako { p; q } a p onComplete q jako try { p } finally { q }. Následující dva procesy se tedy chovají jinak:

val x = (p ++ q) onComplete r
val y = p ++ (q onComplete r)

Použijeme-li syntax Scaly, je x ekvivalentní

try { p; q } finally { r }

a y ekvivalentní

p
try { q } finally { r }

U x se r vykoná vždy, zatímco u y se r nevykoná, pokud p skončí výjimkou. Některým uživatelům knihovny se toto chování nelíbí a chtěli by, aby se proces r v obou případech vykonal vždy.

Závěr

Příště se naučíme, jak napojit více procesů na jeden proces. Řeč bude o procesech typu Tee a Wye a o funkcích tee a wye. Například nám již známé funkce zip a zipWith lze implementovat pomocí typu Tee a funkce tee.

Spinoco        

Hodnocení: 100 %

        špatnédobré        

Nástroje: Tisk bez diskuse

Tiskni Sdílej: Linkuj Jaggni to Vybrali.sme.sk Google Del.icio.us Facebook

Komentáře

Vložit další komentář

24.5.2015 12:18 ava
Rozbalit Rozbalit vše Re: Funkcionální programování ve Scale 3.
Díky za další parádní díl. Tohle už se četlo o trochu snadněji, než ten minulý :) Je to pro mě mimořádné srovnání s RxJava, jen si v hlavě pořád nedokážu úplně srovnat push-based přístup RxJavy (předchozí článek řetězu tlačí data do dalšího pomocí OnNext) (i když v reakci na problémy s backpressure tam přidali možnost kombinovaného push/pull based), a - asi - pull based přístupu scalaz-stream (následující články řetězu tahají data od předchozích)? Nebo je celé tohle dělení umělé? A jaký je vlastně rozdíl mezi pull-based přístupem a obyčejnými streamy? To jsou otázky, které mi v noci nedají spát :) Těším se na další díly.
25.5.2015 16:49 Radek Miček | skóre: 23 | blog: radekm_blog
Rozbalit Rozbalit vše Re: Funkcionální programování ve Scale 3.
pull based přístupu scalaz-stream (následující články řetězu tahají data od předchozích)
Ano, scalaz-stream je pull based. Push lze nasimulovat například pomocí fronty:
val p = (Process(1, 2, 3).toSource ++ eval_(Task.delay(println("print")))).repeat
val processItem: Sink[Task, Int] = io.stdOutLines.contramap(_.toString)

// Vytvorime frontu.
val q = async.unboundedQueue[Int]

// Prvky produkovane procesem p posleme do fronty.
p.to(q.enqueue).run.runAsync(x => println("enqueue end"))

// q.dequeue je proces prvku z fronty.
// Prvky z fronty posilame do Sinku processItem, ktery prvky zpracovava.
// Az jsou prvky zpracovany (nebo po vyhozeni vyjimky) frontu zavreme
// -- zavreni fronty zajisti ukonceni procesu, jenz dava prvky do fronty
// -- tj. ukonceni p.to(q.enqueue).run.runAsync(x => println("enqueue end")).
(q.dequeue.to(processItem).take(2) onComplete eval_(q.close)).run.run
Kód obvykle několikrát vypíše řetězec print – počet vypsaných řetězců print závisí na tom, jak rychle se prvky produkované p posílají do fronty q. Kromě neomezené fronty async.unboundedQueue existuje i omezená fronta async.boundedQueue.
A jaký je vlastně rozdíl mezi pull-based přístupem a obyčejnými streamy?
Myslím, že obyčejné streamy jsou také pull based.
ISSN 1214-1267   www.czech-server.cz
© 1999-2015 Nitemedia s. r. o. Všechna práva vyhrazena.