abclinuxu.cz AbcLinuxu.cz itbiz.cz ITBiz.cz HDmag.cz HDmag.cz abcprace.cz AbcPráce.cz
AbcLinuxu hledá autory!
Inzerujte na AbcPráce.cz od 950 Kč
Rozšířené hledání
×
    dnes 23:33 | Nová verze

    Immich byl vydán v první stabilní verzi 2.0.0 (YouTube). Jedná se o alternativu k výchozím aplikacím od Googlu a Applu pro správu fotografií a videí umožňující vlastní hosting serveru Immich. K vyzkoušení je demo. Immich je součástí balíčků open source aplikací FUTO. Zdrojové kódy jsou k dispozici na GitHubu pod licencí AGPL-3.0.

    Ladislav Hagara | Komentářů: 0
    dnes 22:33 | IT novinky

    Český telekomunikační úřad vydal zprávy o vývoji cen a trhu elektronických komunikací se zaměřením na rok 2024. Jaká jsou hlavní zjištění? V roce 2024 bylo v ČR v rámci služeb přístupu k internetu v pevném místě přeneseno v průměru téměř 366 GB dat na jednu aktivní přípojku měsíčně – celkově jich tak uživateli bylo přeneseno přes 18 EB (Exabyte). Nejvyužívanějším způsobem přístupu k internetu v pevném místě zůstal v roce 2024 bezdrátový

    … více »
    Ladislav Hagara | Komentářů: 0
    dnes 12:11 | Nová verze

    Raspberry Pi OS, oficiální operační systém pro Raspberry Pi, byl vydán v nové verzi 2025-10-01. Přehled novinek v příspěvku na blogu Raspberry Pi a poznámkách k vydání. Jedná o první verzi postavenou na Debianu 13 Trixie.

    Ladislav Hagara | Komentářů: 0
    dnes 05:22 | Nová verze

    Byla vydána nová verze 4.6 svobodného notačního programu MuseScore Studio (Wikipedie). Představení novinek v oznámení v diskusním fóru a také na YouTube.

    Ladislav Hagara | Komentářů: 0
    dnes 02:22 | Komunita

    Společnost DuckDuckGo stojící za stejnojmenným vyhledávačem věnovala 1,1 milionu dolarů (stejně jako loni) na podporu digitálních práv, online soukromí a lepšího internetového ekosystému. Rozdělila je mezi 29 organizací a projektů. Za 15 let rozdala 8 050 000 dolarů.

    Ladislav Hagara | Komentářů: 4
    včera 20:11 | Nová verze

    Svobodný multiplatformní herní engine Bevy napsaný v Rustu byl vydán ve verzi 0.17. Díky 278 přispěvatelům.

    Ladislav Hagara | Komentářů: 0
    včera 16:11 | Nová verze

    Bylo vydáno openSUSE Leap 16 (cs). Ve výchozím nastavení přichází s vypnutou 32bitovou (ia32) podporou. Uživatelům však poskytuje možnost ji ručně povolit a užívat si tak hraní her ve Steamu, který stále závisí na 32bitových knihovnách. Změnily se požadavky na hardware. Leap 16 nyní vyžaduje jako minimální úroveň architektury procesoru x86-64-v2, což obecně znamená procesory zakoupené v roce 2008 nebo později. Uživatelé se starším hardwarem mohou migrovat na Slowroll nebo Tumbleweed.

    Ladislav Hagara | Komentářů: 3
    včera 16:00 | IT novinky

    Ministerstvo průmyslu a obchodu (MPO) ve spolupráci s Národní rozvojovou investiční (NRI) připravuje nový investiční nástroj zaměřený na podporu špičkových technologií – DeepTech fond. Jeho cílem je posílit inovační ekosystém české ekonomiky, rozvíjet projekty s vysokou přidanou hodnotou, podpořit vznik nových technologických lídrů a postupně zařadit Českou republiku mezi země s nejvyspělejší technologickou základnou.

    … více »
    Ladislav Hagara | Komentářů: 3
    včera 12:55 | Nová verze

    Radicle byl vydán ve verzi 1.5.0 s kódovým jménem Hibiscus. Jedná se o distribuovanou alternativu k softwarům pro spolupráci jako např. GitLab.

    Ladislav Hagara | Komentářů: 3
    včera 03:22 | IT novinky

    Společnost OpenAI představila text-to-video AI model Sora 2 pro generování realistických videí z textového popisu. Přesnější, realističtější a lépe ovladatelný než předchozí modely. Nabízí také synchronizované dialogy a zvukové efekty.

    Ladislav Hagara | Komentářů: 4
    Jaké řešení používáte k vývoji / práci?
     (41%)
     (47%)
     (15%)
     (16%)
     (18%)
     (14%)
     (17%)
     (14%)
     (14%)
    Celkem 158 hlasů
     Komentářů: 9, poslední 24.9. 17:28
    Rozcestník

    Funkcionální programování ve Scale 3.

    18. 5. 2015 | Redakce | Návody | 9979×

    V tomto článku si popíšeme typ Task a ukážeme si, jak konstruovat procesy s vedlejšími efekty, jenž dělají požadavky typu Task. Poté se budeme věnovat Sinkům a Channelům. Na závěr článku si řekneme něco o plánovaných změnách v knihovně scalaz-stream.

    scalaz-stream 3: Procesy s vedlejšími efekty

    Používáme knihovnu scalaz-stream verze 0.7a a předpokládáme následující importy:

    import scalaz.concurrent.Task
    import scalaz.stream._
    import Process._
    

    Při programování s knihovnou scalaz-stream je zvykem, že vedlejší efekty dělají pouze procesy s požadavky typu Task. Pomocí funkce Task.delay můžeme do Tasku zabalit každý výraz, například:

    val t = Task.delay { println("Hi") }
    

    Jak již víme, Task lze spustit pomocí funkce run:

    t.run
    

    Při každém spuštění se zabalený výraz vyhodnotí a získaná hodnota je vrácena funkcí run. Při každém spuštění t se tedy vypíše řetězec Hi a run vrátí hodnotu () typu Unit. Další funkcí, která vytváří Task, je Task.now:

    val t = Task.now(5 + 3)
    

    Rozdíl mezi funkcemi Task.now a Task.delay spočívá v tom, že funkci Task.now se argument předává hodnotou (argument je vyhodnocen před zavoláním funkce) a funkci Task.delay se předává jménem. V našem případě je tedy výraz 5 + 3 vyhodnocen a teprve poté je zkonstruován Task. Například následující Task vrací pokaždé stejné náhodné číslo:

    import scala.util.Random
    
    val badRandNumGen = Task.now { Random.nextInt }
    

    Náhodné číslo je totiž vygenerováno ještě před zkonstruováním Tasku, při jeho spouštění se žádné náhodné číslo negeneruje. Použijeme-li Task.delay místo Task.now

    val randNumGen = Task.delay { Random.nextInt }
    

    dostaneme Task, který při každém spuštění generuje nové náhodné číslo. Pokud výraz uvnitř Tasku vyhodí výjimku, je tato výjimka následně vyhozena i funkcí run. Alternativně můžeme použít funkci attemptRun, která výjimku nevyhazuje, ale vrací. Výsledek funkce attemptRun má typ Throwable \/ A, kde \/ je jako Either. Například

    Task.delay { 5 / 0 }.attemptRun
    

    vrátí hodnotu tvaru -\/(_: ArithmeticException). Chceme-li do Tasku zabalit asynchronní výpočet, použijeme funkci Task.async:

    import scalaz.\/
    
    def startAsyncComputation[A](cb: (Throwable \/ A) => Unit): Unit = ???
    
    val t = Task.async(startAsyncComputation)
    

    Funkce startAsyncComputation spustí asynchronní výpočet a skončí. Výsledek asynchronního výpočtu se do t předá pomocí callbacku cb, který funkce startAsyncComputation dostala jako argument. Funkce Task.async je obzvláště užitečná pro propojení s okolním světem. Například díky Task.async snadno napíšeme funkci, která převede Future na Task:

    import scala.concurrent.Future
    import scala.concurrent.ExecutionContext.Implicits.global
    import scala.util.{Failure, Success}
    import scalaz.{-\/, \/-}
    
    def futureToTask[A](fut: => Future[A]): Task[A] =
      Task.async[A](cb => fut onComplete {
        case Success(a) => cb(\/-(a))
        case Failure(t) => cb(-\/(t))
      })
    

    Future se funkci futureToTask předává jménem, aby se zamezilo jejímu spuštění před spuštěním Tasku. Zde je příklad použití:

    val t = futureToTask(Future(println("Hi")))
    t.run
    

    Na závěr našeho stručného představení Tasku poznamenejme, že Task je monáda, k dispozici jsou tudíž funkce map, flatMap i for notace.

    Procesy a Tasky

    Máme-li Task, můžeme pomocí eval vytvořit proces, který Task spustí a vyprodukuje hodnotu z Tasku. Například proces, který vyprodukuje jedno náhodné číslo:

    eval(randNumGen)
    

    Název eval je poněkud zavádějící, neboť eval Tasky nespouští a ani nijak nevyhodnocuje, eval pouze zabalí Task do procesu. Například

    val randomNumbers = eval(randNumGen).repeat
    

    je proces s mnoha náhodnými čísly, nikoliv s jedním náhodným číslem, které se opakuje. eval do procesu zabalí celý Task, tj. nedojde k tomu, že by eval napřed spustil Task randNumGen a poté do procesu zabalil pouze výsledek Tasku.

    eval a repeat se často používají společně, proto vznikla funkce repeatEval, která nám výše uvedený proces náhodných čísel dovoluje přepsat na jedno volání:

    val randomNumbers = repeatEval(randNumGen)
    

    Pokud nás výsledek Tasku nezajímá, můžeme použít eval_, který Task spustí, ale jeho výsledek zahodí:

    val p = eval_(Task.delay { println("Starting") }) ++ randomNumbers
    

    p je proces produkující čísla, kdybychom však místo eval_ použili eval, měli bychom proces produkující AnyVal – první vyprodukovaný prvek by byl () vrácený println("Starting"), ten by byl následován náhodnými čísly. Obecně, chceme-li, aby proces žádné prvky neprodukoval, můžeme použít funkci drain, ta z procesu, který produkuje prvky typu A, udělá proces, který neprodukuje žádné prvky (typ produkovaných prvků je Nothing). eval_(t) je zkratka za eval(t).drain.

    Procesy se zdroji

    Nyní se pokusíme napsat proces, který přečte a vyprodukuje řádky ze souboru – něco jako io.linesR. Náš první pokus je:

    import scala.io.Source
    
    def badLinesR(file: String): Process[Task, String] =
      eval(Task.delay { Source.fromFile(file) }).flatMap { src =>
        lazy val lines = src.getLines
        repeatEval(Task.delay { lines.next }) ++ eval_(Task.delay { src.close })
      }
    

    První problém funkce badLinesR je, že volá lines.next aniž by testovala lines.hasNext. Pokud je lines.haxNext false, není chování lines.next definováno. Otázkou je, co dělat, když je lines.hasNext false? Pro tyto případy existuje speciální výjimka Cause.Terminated(Cause.End), kterou můžeme vyhodit. Tato výjimka zajistí, že repeatEval skončí a pokračuje se další částí procesu – v našem případě eval_(Task.delay { src.close }).

    Další problém funkce badLinesR je, že nezaručuje uzavření souboru. Tento problém nastane, pokud proces předčasně přerušíme, například pomocí take:

    // Simuluje cteni souboru se dvema radky.
    val p =
      eval(Task.delay { println("Line 1") }) ++
      eval(Task.delay { println("Line 2") }) ++
      eval_(Task.delay { println("Closing") })
    
    p.take(1).run.run // Closing se nevypise.
    p.take(2).run.run // Closing se nevypise.
    p.take(3).run.run
    

    Problém s neuzavřením souboru rovněž nastane, když Task vyhodí jinou výjimku než Cause.Terminated(Cause.End):

    val p =
      eval(Task.delay { 5 / 0 }) ++
      eval_(Task.delay { println("Closing") })
    
    p.run.run // Closing se nevypise, run vyhodi vyjimku.
    

    S výjimkou Cause.Terminated(Cause.End) tento problém nenastane a soubor se uzavře:

    val p =
      eval(Task.delay { throw Cause.Terminated(Cause.End) }) ++
      eval_(Task.delay { println("Closing") })
    
    p.run.run // Closing se vypise, run zadnou vyjimku nevyhodi.
    

    Obecně, máme-li proces tvaru p ++ q a skončí-li p výjimkou jinou než Cause.Terminated(Cause.End), proces q se vůbec nevykoná. Pokud se q má vykonat v každém případě bez ohledu na to, jak skončil proces p, je třeba použít onComplete místo ++:

    val p =
      eval(Task.delay { 5 / 0 }) onComplete
      eval_(Task.delay { println("Closing") })
    
    p.run.run // Closing se vypise, run vyhodi vyjimku.
    

    Nyní již známe vše, abychom napsali funkci linesR:

    def linesR(file: String): Process[Task, String] =
      eval(Task.delay { Source.fromFile(file) }).flatMap { src =>
        lazy val lines = src.getLines
        repeatEval(Task.delay {
          if (lines.hasNext) lines.next
          else throw Cause.Terminated(Cause.End)
        }) onComplete eval_(Task.delay { src.close })
      }
    

    V knihovně scalaz-stream je funkce io.resource, která zobecňuje naši funkci linesR pro jiné zdroje. Ostatní funkce v modulu io (včetně funkce io.linesR) využívají tuto obecnou funkci io.resource.

    Jak již víme, p onComplete q zajistí, že se q spustí bez ohledu na to, jak skončilo p. Kromě toho onComplete zajistí, že proces q není možné přerušit:

    val p = emit(1)
    val q =
      eval_(Task.delay { println("One") }) ++
      emit(2) ++
      eval_(Task.delay { println("Two") })
    val r = p onComplete q
    
    r.take(2).run.run // Vypise One i Two.
    

    Ačkoliv z procesu r načítáme pouze dva prvky, proces q doběhne celý – vypíše se i řetězec Two. Pokud by q byl proces, který nikdy neskončí (například emit(0).repeat), tak by ani proces r.take(2) nikdy neskončil. p onComplete q se nejčastěji používá v situacích, kdy proces q má uklidit po procesu pq nejde přerušit, aby bylo zaručeno, že se úklid provede celý.

    Sink a Channel

    V této části se budeme zabývat tím, jak prvky produkované procesem někam uložit nebo někam zapsat. Pokud proces produkuje málo prvků a stačí, když se zápis provede až celý proces skončí, můžeme použít runLog. Proces spustíme pomocí runLog.run a poté, co skončí, vyprodukované prvky zapíšeme.

    Situace se ovšem zkomplikuje, když proces produkuje (neomezeně) mnoho prvků a my je chceme zapisovat průběžně. V některých případech (například logování) lze zápisy umístit do map nebo do nějakého jiného procesu typu Process1. Pokud však zápisy prvků používají Task (například proto, že jsou asynchronní, nebo proto, že je dobrým zvykem vedlejší efekty umístit do Tasků), je třeba použít i proces s typem požadavků Task, což Process1 není.

    Naštěstí má výše nastíněný problém velmi jednoduché řešení. Tím řešením je proces funkcí. Pojďme se podívat na příklad. Mějme proces řádků ze souboru

    val p: Process[Task, String] = io.linesR("file.txt")
    

    a předpokládejme, že chceme tyto řádky zapsat na standardní výstup. K zápisu na standardní výstup použijeme proces funkcí, které zapisují na standardní výstup:

    val stdOut: Process[Task, String => Unit] = constant(println)
    

    constant(a) je zkratka za emit(a).repeat. Pro každý řádek z p zavoláme funkci ze stdOut – k tomu lze použít zip a map

    p.zip(stdOut).map { case (line, f) => f(line) }.run.run
    

    nebo zipWith:

    p.zipWith(stdOut) { (line, f) => f(line) }.run.run
    

    To funguje, ale vedlejší efekt f(line) se stále provádí v procesu typu Process1. Navíc opět nepoužíváme Task – funkce, které provádí vedlejší efekty, mají typ String => Unit. Změníme tedy typ funkcí ze String => Unit na String => Task[Unit]:

    val stdOut: Process[Task, String => Task[Unit]] =
      constant(s => Task.delay { println(s) })
    

    Když spustíme náš kód

    p.zipWith(stdOut) { (line, f) => f(line) }.run.run
    

    zjistíme, že se nic nevypisuje. Problém je v tom, že proces

    p.zipWith(stdOut) { (line, f) => f(line) }
    

    produkuje Tasky, které nespouští – typ procesu je Process[Task, Task[Unit]]. Aby se něco vypsalo, potřebujeme produkované Tasky spustit, což provede funkce eval:

    p.zipWith(stdOut) { (line, f) => f(line) }.eval.run.run
    

    eval tedy z procesu typu Process[Task, Task[A]] udělá proces typu Process[Task, A]. stdOut je Sink. Pro typ Process[Task, A => Task[Unit]] existuje typový alias Sink[Task, A]. Data do Sinku posíláme funkcí to:

    p.to(stdOut).run.run
    

    Funkce to je implementována pomocí zipWith a eval – přesně jako v našem příkladu. S pomocí funkce io.resource snadno naimplementujeme Sink, který zapisuje řádky do souboru:

    import java.io.FileWriter
    
    def linesW(file: String): Sink[Task, String] =
      io.resource(
        Task.delay { new FileWriter(file) }
      )(
        fw => Task.delay { fw.close() }
      )(
        fw => Task.now(line => Task.delay { fw.write(line); fw.write("\n") })
      )
    

    to nahrazuje produkované prvky hodnotou (). Pokud chceme prvky zachovat, musíme použít funkci observe – ta se chová jako to, ale nenahrazuje prvky hodnotou (). S pomocí observe můžeme poslat prvky do více Sinků:

    p.observe(stdOut).to(linesW("file2.txt")).run.run
    

    Do Sinku stdOut můžeme posílat pouze řetězce, pokud bychom tam chtěli poslat několik náhodných čísel, museli bychom čísla převést na řetězce:

    randomNumbers.map(_.toString).observe(stdOut).take(10).runLog.run
    

    Stinnou stránkou tohoto řešení je, že proces už neprodukuje čísla, ale řetězce. V podobných případech se hodí funkce contramap, která změní změní typ vstupních prvků Sinku:

    randomNumbers.observe(stdOut.contramap(_.toString)).take(10).runLog.run
    

    contramap lze snadno implementovat pomocí map – v našem případě by map transformoval funkce typu String => Task[Unit] na funkce typu Int => Task[Unit].

    Sink[Task, A] je proces, který produkuje funkce A => Task[Unit]. Nahradíme-li Unit typovým parametrem B, dostaneme proces, kterému se říká Channel. Channel[Task, A, B] je tedy alias pro Process[Task, A => Task[B]] a Sink[Task, A] můžeme chápat jako alias pro Channel[Task, A, Unit]. Channel[Task, A, B] používáme pro transformaci prvků typu A na prvky typu B. Do Channelu se prvky posílají pomocí funkce through, typ vstupních prvků lze změnit pomocí funkce contramap.

    Připomeňme si Process1 nub pro odstranění duplikátů z minulého dílu

    def nub[A](history: Set[A] = Set.empty[A]): Process1[A, A] =
      receive1 { x =>
        if (history.contains(x)) nub(history)
        else emit(x) ++ nub(history + x)
      }
    

    a zkusme napsat Channel, který dělá totéž. První nepříjemnost je, že Channel transformuje jeden prvek vstupu na jeden prvek výstupu, jenže nub v případě duplikátů žádné prvky neprodukuje. Toto omezení Channelů obejdeme tak, že použijeme Channel typu Channel[Task, A, Option[A]] (místo Channel[Task, A, A]) – pro duplikáty bude Channel vracet None.

    Další nepříjemností je, že nemůžeme použít rekurzi pro předávání aktualizované množiny history. Problém je v tom, že Channel napřed vyprodukuje funkce A => Task[Option[A]] a teprve poté jsou tyto funkce aplikovány a Tasky spuštěny pomocí through. Samotný Channel tedy žádné filtrování neprovádí – ten pouze produkuje funkce A => Task[Option[A]]. Filtrování se provádí až pomocí through – je to tedy funkce through, od níž bychom potřebovali, aby předávala množinu history mezi funkcemi z Channelu. Jelikož funkce through nic takového neumí, musíme si pomoci sami – po spuštění Channelu vytvoříme měnitelnou množinu, kterou dáme do všech produkovaných funkcí:

    def nubChannel[A]: Channel[Task, A, Option[A]] =
      eval(Task.delay {
        collection.mutable.Set.empty[A]
      }).flatMap { history =>
        constant((x: A) => Task.delay {
          if (history.contains(x)) None
          else {
            history += x
            Some(x)
          }
        }).toSource
      }
    

    Použití nub:

    Process(1, 2, 1, 0, 1, 2).toSource |> nub()
    

    Použití nubChannel:

    (Process(1, 2, 1, 0, 1, 2).toSource through nubChannel) |> process1.stripNone
    

    Na rozdíl od Channelu může Process1 vrátit libovolný počet prvků a snáze se tam pracuje se stavem. Na rozdíl od Process1 může Channel provádět vedlejší efekty. Pokud nepotřebujeme vedlejší efekty, dáváme přednost Process1 před Channelem – důvodem je právě flexibilita Process1.

    Výhled do budoucna

    Hezké je, že řadu funkcí, které jsme si zde představili, může napsat i uživatel knihovny. Například p.drain můžeme nahradit p.flatMap { x => empty } nebo p.run lze nahradit funkcemi p.drain.runLog. Už jsme také viděli, že repeatEval(t) je zkratka za eval(t).repeat, nebo ne? Skutečnost bohužel není tak růžová, jak by se mohlo zdát:

    val x = eval(Task.delay { throw Cause.Terminated(Cause.End) }).repeat
    val y = repeatEval(Task.delay { throw Cause.Terminated(Cause.End) })
    

    Na rozdíl od procesu y proces x nikdy neskončí. V případě výjimky Cause.Terminated(Cause.End) se tedy repeatEval nechová jako eval a repeat. V případě jiných výjimek se x i y chovají stejně. Příkladem stejného charakteru je proces p.eval, který můžeme zapsat jako p.flatMap(eval), ale opět zjistíme, že se v případě výjimky Cause.Terminated(Cause.End) obě konstrukce liší:

    val x = constant(Task.delay { throw Cause.Terminated(Cause.End) }).flatMap(eval)
    val y = constant(Task.delay { throw Cause.Terminated(Cause.End) }).eval
    

    V uvedeném příkladu x opět neskončí a y ano. Pravděpodobně z tohoto důvodu bude výjimka Cause.Terminated(Cause.End) odstraněna z další verze knihovny scalaz-stream.

    Další plánovaná změna v knihovně scalaz-stream se týká funkcí ++ a onComplete. Kdybychom místo ++ a onComplete použili syntaxi jazyka Scala, pak by se proces p ++ q zapisoval jako { p; q } a p onComplete q jako try { p } finally { q }. Následující dva procesy se tedy chovají jinak:

    val x = (p ++ q) onComplete r
    val y = p ++ (q onComplete r)
    

    Použijeme-li syntax Scaly, je x ekvivalentní

    try { p; q } finally { r }
    

    a y ekvivalentní

    p
    try { q } finally { r }
    

    U x se r vykoná vždy, zatímco u y se r nevykoná, pokud p skončí výjimkou. Některým uživatelům knihovny se toto chování nelíbí a chtěli by, aby se proces r v obou případech vykonal vždy.

    Závěr

    Příště se naučíme, jak napojit více procesů na jeden proces. Řeč bude o procesech typu Tee a Wye a o funkcích tee a wye. Například nám již známé funkce zip a zipWith lze implementovat pomocí typu Tee a funkce tee.

    Spinoco        

    Hodnocení: 100 %

            špatnédobré        

    Nástroje: Tisk bez diskuse

    Tiskni Sdílej: Linkuj Jaggni to Vybrali.sme.sk Google Del.icio.us Facebook

    Komentáře

    Vložit další komentář

    24.5.2015 12:18 ava
    Rozbalit Rozbalit vše Re: Funkcionální programování ve Scale 3.
    Díky za další parádní díl. Tohle už se četlo o trochu snadněji, než ten minulý :) Je to pro mě mimořádné srovnání s RxJava, jen si v hlavě pořád nedokážu úplně srovnat push-based přístup RxJavy (předchozí článek řetězu tlačí data do dalšího pomocí OnNext) (i když v reakci na problémy s backpressure tam přidali možnost kombinovaného push/pull based), a - asi - pull based přístupu scalaz-stream (následující články řetězu tahají data od předchozích)? Nebo je celé tohle dělení umělé? A jaký je vlastně rozdíl mezi pull-based přístupem a obyčejnými streamy? To jsou otázky, které mi v noci nedají spát :) Těším se na další díly.
    25.5.2015 16:49 Radek Miček | skóre: 23 | blog: radekm_blog
    Rozbalit Rozbalit vše Re: Funkcionální programování ve Scale 3.
    pull based přístupu scalaz-stream (následující články řetězu tahají data od předchozích)
    Ano, scalaz-stream je pull based. Push lze nasimulovat například pomocí fronty:
    val p = (Process(1, 2, 3).toSource ++ eval_(Task.delay(println("print")))).repeat
    val processItem: Sink[Task, Int] = io.stdOutLines.contramap(_.toString)
    
    // Vytvorime frontu.
    val q = async.unboundedQueue[Int]
    
    // Prvky produkovane procesem p posleme do fronty.
    p.to(q.enqueue).run.runAsync(x => println("enqueue end"))
    
    // q.dequeue je proces prvku z fronty.
    // Prvky z fronty posilame do Sinku processItem, ktery prvky zpracovava.
    // Az jsou prvky zpracovany (nebo po vyhozeni vyjimky) frontu zavreme
    // -- zavreni fronty zajisti ukonceni procesu, jenz dava prvky do fronty
    // -- tj. ukonceni p.to(q.enqueue).run.runAsync(x => println("enqueue end")).
    (q.dequeue.to(processItem).take(2) onComplete eval_(q.close)).run.run
    
    Kód obvykle několikrát vypíše řetězec print – počet vypsaných řetězců print závisí na tom, jak rychle se prvky produkované p posílají do fronty q. Kromě neomezené fronty async.unboundedQueue existuje i omezená fronta async.boundedQueue.
    A jaký je vlastně rozdíl mezi pull-based přístupem a obyčejnými streamy?
    Myslím, že obyčejné streamy jsou také pull based.
    ISSN 1214-1267   www.czech-server.cz
    © 1999-2015 Nitemedia s. r. o. Všechna práva vyhrazena.