abclinuxu.cz AbcLinuxu.cz itbiz.cz ITBiz.cz HDmag.cz HDmag.cz abcprace.cz AbcPráce.cz
AbcLinuxu hledá autory!
Inzerujte na AbcPráce.cz od 950 Kč
Rozšířené hledání
×

16.11. 23:44 | IT novinky

Společnosti Dell a Canonical společně představily 5 nových počítačů Dell Precision s předinstalovaným Ubuntu. Jedná se o 4 notebooky a 1 all-in-one počítač. Cena počítačů s Ubuntu je o 100 dolarů nižší než jejich cena s Windows 10.

Ladislav Hagara | Komentářů: 8
16.11. 22:55 | Nová verze

Po pěti měsících vývoje od vydání verze 4.8 byla vydána nová verze 4.9 svobodného open source redakčního systému WordPress. Kódové označením Tipton bylo vybráno na počest amerického jazzového muzikanta a kapelníka Billyho Tiptona.

Ladislav Hagara | Komentářů: 0
16.11. 22:11 | Pozvánky

Spolek OpenAlt zve příznivce otevřených technologií a otevřeného přístupu na 146. brněnský sraz, který proběhne v pátek 17. listopadu od 18:00 hodin v restauraci Bogota na Nových Sadech.

Ladislav Hagara | Komentářů: 0
16.11. 21:55 | Nová verze

Dle plánu byla vydána nová verze 9.2.1 živé linuxové distribuce Slax. Novinkou je především přechod ze Slackware na Debian a z KDE na Fluxbox.

Ladislav Hagara | Komentářů: 2
15.11. 22:44 | Zajímavý projekt

Vítězným projektem letošního ročníku soutěže určené vývojářům open source hardwaru Hackaday Prize se stal podvodní kluzák (YouTube, Onshape). Cenu za nejlepší produkt získala braillská klávesnice pro chytré telefony Tipo (YouTube).

Ladislav Hagara | Komentářů: 0
15.11. 06:33 | Nová verze

Byla vydána verze 3.3 živé linuxové distribuce Tails (The Amnesic Incognito Live System), jež klade důraz na ochranu soukromí uživatelů a anonymitu. Přehled změn v příslušném seznamu. Řešena je také řada bezpečnostních problémů.

Ladislav Hagara | Komentářů: 3
15.11. 00:11 | Nová verze

Byla vydána beta verze Linux Mintu 18.3 s kódovým jménem Sylvia. Na blogu Linux Mintu jsou hned dvě oznámení. První o vydání Linux Mintu s prostředím MATE a druhé o vydání Linux Mintu s prostředím Cinnamon. Stejným způsobem jsou rozděleny také poznámky k vydání (MATE, Cinnamon) a přehled novinek s náhledy (MATE, Cinnamon). Linux Mint 18.3 bude podporován až do roku 2021.

Ladislav Hagara | Komentářů: 0
14.11. 21:44 | Nová verze

Byla vydána verze 5.2.0 svobodného integrovaného vývojového prostředí KDevelop. Přímo z menu KDevelopu lze nově analyzovat aplikace napsané v C/C++ pomocí nástroje Heaptrack. Vylepšena byla podpora programovacích jazyků C++, PHP a Python. Ke stažení a k vyzkoušení je binární balíček s KDevelopem 5.2.0 ve formátu AppImage.

Ladislav Hagara | Komentářů: 8
14.11. 17:33 | Nová verze

MojeFedora.cz informuje, že bylo oficiálně oznámeno vydání Fedory 27. Ve finální verzi vycházejí dvě edice: Workstation pro desktopové a Atomic pro cloudové nasazení. Fedora Server vzhledem k náročnosti přechodu na modularitu vychází pouze v betaverzi a finální verze je naplánována na leden. Vedle nich jsou k dispozici také alternativní desktopy v podobě KDE Plasma, Xfce a další a k tomu laby – upravené vydání Fedory například pro designery, robotiku, vědecké použití atd. Stahovat lze z Get Fedora.

Ladislav Hagara | Komentářů: 21
14.11. 17:22 | Pozvánky

Máš rád svobodný software a hardware nebo se o nich chceš něco dozvědět? Zajímá tě DIY, CNC, SDR nebo morseovka? Přijď na sraz spolku OpenAlt – tradičně první čtvrtek před třetím pátkem v měsíci: 16. listopadu od 18:00 v Radegastovně Perón (Stroupežnického 20, Praha 5).

xkucf03 | Komentářů: 0
Jak se vás potenciálně dotkne trend odstraňování analogového audio konektoru typu 3,5mm jack z „chytrých telefonů“?
 (9%)
 (1%)
 (1%)
 (1%)
 (73%)
 (14%)
Celkem 680 hlasů
 Komentářů: 36, poslední včera 18:43
    Rozcestník

    Funkcionální programování ve Scale 3.

    18. 5. 2015 | Redakce | Návody | 1934×

    V tomto článku si popíšeme typ Task a ukážeme si, jak konstruovat procesy s vedlejšími efekty, jenž dělají požadavky typu Task. Poté se budeme věnovat Sinkům a Channelům. Na závěr článku si řekneme něco o plánovaných změnách v knihovně scalaz-stream.

    scalaz-stream 3: Procesy s vedlejšími efekty

    Používáme knihovnu scalaz-stream verze 0.7a a předpokládáme následující importy:

    import scalaz.concurrent.Task
    import scalaz.stream._
    import Process._
    

    Při programování s knihovnou scalaz-stream je zvykem, že vedlejší efekty dělají pouze procesy s požadavky typu Task. Pomocí funkce Task.delay můžeme do Tasku zabalit každý výraz, například:

    val t = Task.delay { println("Hi") }
    

    Jak již víme, Task lze spustit pomocí funkce run:

    t.run
    

    Při každém spuštění se zabalený výraz vyhodnotí a získaná hodnota je vrácena funkcí run. Při každém spuštění t se tedy vypíše řetězec Hi a run vrátí hodnotu () typu Unit. Další funkcí, která vytváří Task, je Task.now:

    val t = Task.now(5 + 3)
    

    Rozdíl mezi funkcemi Task.now a Task.delay spočívá v tom, že funkci Task.now se argument předává hodnotou (argument je vyhodnocen před zavoláním funkce) a funkci Task.delay se předává jménem. V našem případě je tedy výraz 5 + 3 vyhodnocen a teprve poté je zkonstruován Task. Například následující Task vrací pokaždé stejné náhodné číslo:

    import scala.util.Random
    
    val badRandNumGen = Task.now { Random.nextInt }
    

    Náhodné číslo je totiž vygenerováno ještě před zkonstruováním Tasku, při jeho spouštění se žádné náhodné číslo negeneruje. Použijeme-li Task.delay místo Task.now

    val randNumGen = Task.delay { Random.nextInt }
    

    dostaneme Task, který při každém spuštění generuje nové náhodné číslo. Pokud výraz uvnitř Tasku vyhodí výjimku, je tato výjimka následně vyhozena i funkcí run. Alternativně můžeme použít funkci attemptRun, která výjimku nevyhazuje, ale vrací. Výsledek funkce attemptRun má typ Throwable \/ A, kde \/ je jako Either. Například

    Task.delay { 5 / 0 }.attemptRun
    

    vrátí hodnotu tvaru -\/(_: ArithmeticException). Chceme-li do Tasku zabalit asynchronní výpočet, použijeme funkci Task.async:

    import scalaz.\/
    
    def startAsyncComputation[A](cb: (Throwable \/ A) => Unit): Unit = ???
    
    val t = Task.async(startAsyncComputation)
    

    Funkce startAsyncComputation spustí asynchronní výpočet a skončí. Výsledek asynchronního výpočtu se do t předá pomocí callbacku cb, který funkce startAsyncComputation dostala jako argument. Funkce Task.async je obzvláště užitečná pro propojení s okolním světem. Například díky Task.async snadno napíšeme funkci, která převede Future na Task:

    import scala.concurrent.Future
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.util.{Failure, Success}
    import scalaz.{-\/, \/-}
    
    def futureToTask[A](fut: => Future[A]): Task[A] =
      Task.async[A](cb => fut onComplete {
        case Success(a) => cb(\/-(a))
        case Failure(t) => cb(-\/(t))
      })
    

    Future se funkci futureToTask předává jménem, aby se zamezilo jejímu spuštění před spuštěním Tasku. Zde je příklad použití:

    val t = futureToTask(Future(println("Hi")))
    t.run
    

    Na závěr našeho stručného představení Tasku poznamenejme, že Task je monáda, k dispozici jsou tudíž funkce map, flatMap i for notace.

    Procesy a Tasky

    Máme-li Task, můžeme pomocí eval vytvořit proces, který Task spustí a vyprodukuje hodnotu z Tasku. Například proces, který vyprodukuje jedno náhodné číslo:

    eval(randNumGen)
    

    Název eval je poněkud zavádějící, neboť eval Tasky nespouští a ani nijak nevyhodnocuje, eval pouze zabalí Task do procesu. Například

    val randomNumbers = eval(randNumGen).repeat
    

    je proces s mnoha náhodnými čísly, nikoliv s jedním náhodným číslem, které se opakuje. eval do procesu zabalí celý Task, tj. nedojde k tomu, že by eval napřed spustil Task randNumGen a poté do procesu zabalil pouze výsledek Tasku.

    eval a repeat se často používají společně, proto vznikla funkce repeatEval, která nám výše uvedený proces náhodných čísel dovoluje přepsat na jedno volání:

    val randomNumbers = repeatEval(randNumGen)
    

    Pokud nás výsledek Tasku nezajímá, můžeme použít eval_, který Task spustí, ale jeho výsledek zahodí:

    val p = eval_(Task.delay { println("Starting") }) ++ randomNumbers
    

    p je proces produkující čísla, kdybychom však místo eval_ použili eval, měli bychom proces produkující AnyVal – první vyprodukovaný prvek by byl () vrácený println("Starting"), ten by byl následován náhodnými čísly. Obecně, chceme-li, aby proces žádné prvky neprodukoval, můžeme použít funkci drain, ta z procesu, který produkuje prvky typu A, udělá proces, který neprodukuje žádné prvky (typ produkovaných prvků je Nothing). eval_(t) je zkratka za eval(t).drain.

    Procesy se zdroji

    Nyní se pokusíme napsat proces, který přečte a vyprodukuje řádky ze souboru – něco jako io.linesR. Náš první pokus je:

    import scala.io.Source
    
    def badLinesR(file: String): Process[Task, String] =
      eval(Task.delay { Source.fromFile(file) }).flatMap { src =>
        lazy val lines = src.getLines
        repeatEval(Task.delay { lines.next }) ++ eval_(Task.delay { src.close })
      }
    

    První problém funkce badLinesR je, že volá lines.next aniž by testovala lines.hasNext. Pokud je lines.haxNext false, není chování lines.next definováno. Otázkou je, co dělat, když je lines.hasNext false? Pro tyto případy existuje speciální výjimka Cause.Terminated(Cause.End), kterou můžeme vyhodit. Tato výjimka zajistí, že repeatEval skončí a pokračuje se další částí procesu – v našem případě eval_(Task.delay { src.close }).

    Další problém funkce badLinesR je, že nezaručuje uzavření souboru. Tento problém nastane, pokud proces předčasně přerušíme, například pomocí take:

    // Simuluje cteni souboru se dvema radky.
    val p =
      eval(Task.delay { println("Line 1") }) ++
      eval(Task.delay { println("Line 2") }) ++
      eval_(Task.delay { println("Closing") })
    
    p.take(1).run.run // Closing se nevypise.
    p.take(2).run.run // Closing se nevypise.
    p.take(3).run.run
    

    Problém s neuzavřením souboru rovněž nastane, když Task vyhodí jinou výjimku než Cause.Terminated(Cause.End):

    val p =
      eval(Task.delay { 5 / 0 }) ++
      eval_(Task.delay { println("Closing") })
    
    p.run.run // Closing se nevypise, run vyhodi vyjimku.
    

    S výjimkou Cause.Terminated(Cause.End) tento problém nenastane a soubor se uzavře:

    val p =
      eval(Task.delay { throw Cause.Terminated(Cause.End) }) ++
      eval_(Task.delay { println("Closing") })
    
    p.run.run // Closing se vypise, run zadnou vyjimku nevyhodi.
    

    Obecně, máme-li proces tvaru p ++ q a skončí-li p výjimkou jinou než Cause.Terminated(Cause.End), proces q se vůbec nevykoná. Pokud se q má vykonat v každém případě bez ohledu na to, jak skončil proces p, je třeba použít onComplete místo ++:

    val p =
      eval(Task.delay { 5 / 0 }) onComplete
      eval_(Task.delay { println("Closing") })
    
    p.run.run // Closing se vypise, run vyhodi vyjimku.
    

    Nyní již známe vše, abychom napsali funkci linesR:

    def linesR(file: String): Process[Task, String] =
      eval(Task.delay { Source.fromFile(file) }).flatMap { src =>
        lazy val lines = src.getLines
        repeatEval(Task.delay {
          if (lines.hasNext) lines.next
          else throw Cause.Terminated(Cause.End)
        }) onComplete eval_(Task.delay { src.close })
      }
    

    V knihovně scalaz-stream je funkce io.resource, která zobecňuje naši funkci linesR pro jiné zdroje. Ostatní funkce v modulu io (včetně funkce io.linesR) využívají tuto obecnou funkci io.resource.

    Jak již víme, p onComplete q zajistí, že se q spustí bez ohledu na to, jak skončilo p. Kromě toho onComplete zajistí, že proces q není možné přerušit:

    val p = emit(1)
    val q =
      eval_(Task.delay { println("One") }) ++
      emit(2) ++
      eval_(Task.delay { println("Two") })
    val r = p onComplete q
    
    r.take(2).run.run // Vypise One i Two.
    

    Ačkoliv z procesu r načítáme pouze dva prvky, proces q doběhne celý – vypíše se i řetězec Two. Pokud by q byl proces, který nikdy neskončí (například emit(0).repeat), tak by ani proces r.take(2) nikdy neskončil. p onComplete q se nejčastěji používá v situacích, kdy proces q má uklidit po procesu pq nejde přerušit, aby bylo zaručeno, že se úklid provede celý.

    Sink a Channel

    V této části se budeme zabývat tím, jak prvky produkované procesem někam uložit nebo někam zapsat. Pokud proces produkuje málo prvků a stačí, když se zápis provede až celý proces skončí, můžeme použít runLog. Proces spustíme pomocí runLog.run a poté, co skončí, vyprodukované prvky zapíšeme.

    Situace se ovšem zkomplikuje, když proces produkuje (neomezeně) mnoho prvků a my je chceme zapisovat průběžně. V některých případech (například logování) lze zápisy umístit do map nebo do nějakého jiného procesu typu Process1. Pokud však zápisy prvků používají Task (například proto, že jsou asynchronní, nebo proto, že je dobrým zvykem vedlejší efekty umístit do Tasků), je třeba použít i proces s typem požadavků Task, což Process1 není.

    Naštěstí má výše nastíněný problém velmi jednoduché řešení. Tím řešením je proces funkcí. Pojďme se podívat na příklad. Mějme proces řádků ze souboru

    val p: Process[Task, String] = io.linesR("file.txt")
    

    a předpokládejme, že chceme tyto řádky zapsat na standardní výstup. K zápisu na standardní výstup použijeme proces funkcí, které zapisují na standardní výstup:

    val stdOut: Process[Task, String => Unit] = constant(println)
    

    constant(a) je zkratka za emit(a).repeat. Pro každý řádek z p zavoláme funkci ze stdOut – k tomu lze použít zip a map

    p.zip(stdOut).map { case (line, f) => f(line) }.run.run
    

    nebo zipWith:

    p.zipWith(stdOut) { (line, f) => f(line) }.run.run
    

    To funguje, ale vedlejší efekt f(line) se stále provádí v procesu typu Process1. Navíc opět nepoužíváme Task – funkce, které provádí vedlejší efekty, mají typ String => Unit. Změníme tedy typ funkcí ze String => Unit na String => Task[Unit]:

    val stdOut: Process[Task, String => Task[Unit]] =
      constant(s => Task.delay { println(s) })
    

    Když spustíme náš kód

    p.zipWith(stdOut) { (line, f) => f(line) }.run.run
    

    zjistíme, že se nic nevypisuje. Problém je v tom, že proces

    p.zipWith(stdOut) { (line, f) => f(line) }
    

    produkuje Tasky, které nespouští – typ procesu je Process[Task, Task[Unit]]. Aby se něco vypsalo, potřebujeme produkované Tasky spustit, což provede funkce eval:

    p.zipWith(stdOut) { (line, f) => f(line) }.eval.run.run
    

    eval tedy z procesu typu Process[Task, Task[A]] udělá proces typu Process[Task, A]. stdOut je Sink. Pro typ Process[Task, A => Task[Unit]] existuje typový alias Sink[Task, A]. Data do Sinku posíláme funkcí to:

    p.to(stdOut).run.run
    

    Funkce to je implementována pomocí zipWith a eval – přesně jako v našem příkladu. S pomocí funkce io.resource snadno naimplementujeme Sink, který zapisuje řádky do souboru:

    import java.io.FileWriter
    
    def linesW(file: String): Sink[Task, String] =
      io.resource(
        Task.delay { new FileWriter(file) }
      )(
        fw => Task.delay { fw.close() }
      )(
        fw => Task.now(line => Task.delay { fw.write(line); fw.write("\n") })
      )
    

    to nahrazuje produkované prvky hodnotou (). Pokud chceme prvky zachovat, musíme použít funkci observe – ta se chová jako to, ale nenahrazuje prvky hodnotou (). S pomocí observe můžeme poslat prvky do více Sinků:

    p.observe(stdOut).to(linesW("file2.txt")).run.run
    

    Do Sinku stdOut můžeme posílat pouze řetězce, pokud bychom tam chtěli poslat několik náhodných čísel, museli bychom čísla převést na řetězce:

    randomNumbers.map(_.toString).observe(stdOut).take(10).runLog.run
    

    Stinnou stránkou tohoto řešení je, že proces už neprodukuje čísla, ale řetězce. V podobných případech se hodí funkce contramap, která změní změní typ vstupních prvků Sinku:

    randomNumbers.observe(stdOut.contramap(_.toString)).take(10).runLog.run
    

    contramap lze snadno implementovat pomocí map – v našem případě by map transformoval funkce typu String => Task[Unit] na funkce typu Int => Task[Unit].

    Sink[Task, A] je proces, který produkuje funkce A => Task[Unit]. Nahradíme-li Unit typovým parametrem B, dostaneme proces, kterému se říká Channel. Channel[Task, A, B] je tedy alias pro Process[Task, A => Task[B]] a Sink[Task, A] můžeme chápat jako alias pro Channel[Task, A, Unit]. Channel[Task, A, B] používáme pro transformaci prvků typu A na prvky typu B. Do Channelu se prvky posílají pomocí funkce through, typ vstupních prvků lze změnit pomocí funkce contramap.

    Připomeňme si Process1 nub pro odstranění duplikátů z minulého dílu

    def nub[A](history: Set[A] = Set.empty[A]): Process1[A, A] =
      receive1 { x =>
        if (history.contains(x)) nub(history)
        else emit(x) ++ nub(history + x)
      }
    

    a zkusme napsat Channel, který dělá totéž. První nepříjemnost je, že Channel transformuje jeden prvek vstupu na jeden prvek výstupu, jenže nub v případě duplikátů žádné prvky neprodukuje. Toto omezení Channelů obejdeme tak, že použijeme Channel typu Channel[Task, A, Option[A]] (místo Channel[Task, A, A]) – pro duplikáty bude Channel vracet None.

    Další nepříjemností je, že nemůžeme použít rekurzi pro předávání aktualizované množiny history. Problém je v tom, že Channel napřed vyprodukuje funkce A => Task[Option[A]] a teprve poté jsou tyto funkce aplikovány a Tasky spuštěny pomocí through. Samotný Channel tedy žádné filtrování neprovádí – ten pouze produkuje funkce A => Task[Option[A]]. Filtrování se provádí až pomocí through – je to tedy funkce through, od níž bychom potřebovali, aby předávala množinu history mezi funkcemi z Channelu. Jelikož funkce through nic takového neumí, musíme si pomoci sami – po spuštění Channelu vytvoříme měnitelnou množinu, kterou dáme do všech produkovaných funkcí:

    def nubChannel[A]: Channel[Task, A, Option[A]] =
      eval(Task.delay {
        collection.mutable.Set.empty[A]
      }).flatMap { history =>
        constant((x: A) => Task.delay {
          if (history.contains(x)) None
          else {
            history += x
            Some(x)
          }
        }).toSource
      }
    

    Použití nub:

    Process(1, 2, 1, 0, 1, 2).toSource |> nub()
    

    Použití nubChannel:

    (Process(1, 2, 1, 0, 1, 2).toSource through nubChannel) |> process1.stripNone
    

    Na rozdíl od Channelu může Process1 vrátit libovolný počet prvků a snáze se tam pracuje se stavem. Na rozdíl od Process1 může Channel provádět vedlejší efekty. Pokud nepotřebujeme vedlejší efekty, dáváme přednost Process1 před Channelem – důvodem je právě flexibilita Process1.

    Výhled do budoucna

    Hezké je, že řadu funkcí, které jsme si zde představili, může napsat i uživatel knihovny. Například p.drain můžeme nahradit p.flatMap { x => empty } nebo p.run lze nahradit funkcemi p.drain.runLog. Už jsme také viděli, že repeatEval(t) je zkratka za eval(t).repeat, nebo ne? Skutečnost bohužel není tak růžová, jak by se mohlo zdát:

    val x = eval(Task.delay { throw Cause.Terminated(Cause.End) }).repeat
    val y = repeatEval(Task.delay { throw Cause.Terminated(Cause.End) })
    

    Na rozdíl od procesu y proces x nikdy neskončí. V případě výjimky Cause.Terminated(Cause.End) se tedy repeatEval nechová jako eval a repeat. V případě jiných výjimek se x i y chovají stejně. Příkladem stejného charakteru je proces p.eval, který můžeme zapsat jako p.flatMap(eval), ale opět zjistíme, že se v případě výjimky Cause.Terminated(Cause.End) obě konstrukce liší:

    val x = constant(Task.delay { throw Cause.Terminated(Cause.End) }).flatMap(eval)
    val y = constant(Task.delay { throw Cause.Terminated(Cause.End) }).eval
    

    V uvedeném příkladu x opět neskončí a y ano. Pravděpodobně z tohoto důvodu bude výjimka Cause.Terminated(Cause.End) odstraněna z další verze knihovny scalaz-stream.

    Další plánovaná změna v knihovně scalaz-stream se týká funkcí ++ a onComplete. Kdybychom místo ++ a onComplete použili syntaxi jazyka Scala, pak by se proces p ++ q zapisoval jako { p; q } a p onComplete q jako try { p } finally { q }. Následující dva procesy se tedy chovají jinak:

    val x = (p ++ q) onComplete r
    val y = p ++ (q onComplete r)
    

    Použijeme-li syntax Scaly, je x ekvivalentní

    try { p; q } finally { r }
    

    a y ekvivalentní

    p
    try { q } finally { r }
    

    U x se r vykoná vždy, zatímco u y se r nevykoná, pokud p skončí výjimkou. Některým uživatelům knihovny se toto chování nelíbí a chtěli by, aby se proces r v obou případech vykonal vždy.

    Závěr

    Příště se naučíme, jak napojit více procesů na jeden proces. Řeč bude o procesech typu Tee a Wye a o funkcích tee a wye. Například nám již známé funkce zip a zipWith lze implementovat pomocí typu Tee a funkce tee.

    Spinoco        

    Hodnocení: 100 %

            špatnédobré        

    Nástroje: Tisk bez diskuse

    Tiskni Sdílej: Linkuj Jaggni to Vybrali.sme.sk Google Del.icio.us Facebook

    Komentáře

    Vložit další komentář

    24.5.2015 12:18 ava
    Rozbalit Rozbalit vše Re: Funkcionální programování ve Scale 3.
    Díky za další parádní díl. Tohle už se četlo o trochu snadněji, než ten minulý :) Je to pro mě mimořádné srovnání s RxJava, jen si v hlavě pořád nedokážu úplně srovnat push-based přístup RxJavy (předchozí článek řetězu tlačí data do dalšího pomocí OnNext) (i když v reakci na problémy s backpressure tam přidali možnost kombinovaného push/pull based), a - asi - pull based přístupu scalaz-stream (následující články řetězu tahají data od předchozích)? Nebo je celé tohle dělení umělé? A jaký je vlastně rozdíl mezi pull-based přístupem a obyčejnými streamy? To jsou otázky, které mi v noci nedají spát :) Těším se na další díly.
    25.5.2015 16:49 Radek Miček | skóre: 23 | blog: radekm_blog
    Rozbalit Rozbalit vše Re: Funkcionální programování ve Scale 3.
    pull based přístupu scalaz-stream (následující články řetězu tahají data od předchozích)
    Ano, scalaz-stream je pull based. Push lze nasimulovat například pomocí fronty:
    val p = (Process(1, 2, 3).toSource ++ eval_(Task.delay(println("print")))).repeat
    val processItem: Sink[Task, Int] = io.stdOutLines.contramap(_.toString)
    
    // Vytvorime frontu.
    val q = async.unboundedQueue[Int]
    
    // Prvky produkovane procesem p posleme do fronty.
    p.to(q.enqueue).run.runAsync(x => println("enqueue end"))
    
    // q.dequeue je proces prvku z fronty.
    // Prvky z fronty posilame do Sinku processItem, ktery prvky zpracovava.
    // Az jsou prvky zpracovany (nebo po vyhozeni vyjimky) frontu zavreme
    // -- zavreni fronty zajisti ukonceni procesu, jenz dava prvky do fronty
    // -- tj. ukonceni p.to(q.enqueue).run.runAsync(x => println("enqueue end")).
    (q.dequeue.to(processItem).take(2) onComplete eval_(q.close)).run.run
    
    Kód obvykle několikrát vypíše řetězec print – počet vypsaných řetězců print závisí na tom, jak rychle se prvky produkované p posílají do fronty q. Kromě neomezené fronty async.unboundedQueue existuje i omezená fronta async.boundedQueue.
    A jaký je vlastně rozdíl mezi pull-based přístupem a obyčejnými streamy?
    Myslím, že obyčejné streamy jsou také pull based.
    ISSN 1214-1267   www.czech-server.cz
    © 1999-2015 Nitemedia s. r. o. Všechna práva vyhrazena.