Portál AbcLinuxu, 1. května 2025 00:42
Proudy dat jsou v knihovně scalaz-stream reprezentovány jako procesy. V tomto článku si řekneme, co je to proces, ukážeme si, jak se procesy konstruují a jak je lze propojit pomocí rour.
V článku budeme pracovat s knihovnou scalaz-stream verze 0.7a, začneme jejím stažením:
git clone https://github.com/scalaz/scalaz-stream cd scalaz-stream git checkout series/0.7a
Knihovnu budeme používat z REPLu:
./sbt console
Příklady v článku předpokládají následující importy:
import scalaz.concurrent.Task import scalaz.stream
Knihovna scalaz-stream reprezentuje proudy dat jako procesy,
které provádí určité požadavky a na základě jejich výsledků
produkují prvky. Tyto procesy mají typ
Process[F[_], A]
, kde F
je typ požadavků
a A
je typ produkovaných prvků.
Například proud řádků ze souboru je reprezentován procesem
io.linesR("file.txt")
typu Process[Task, String]
. Tento proces dělá požadavky typu
Task
a produkuje řádky typu String
.
Task
je typ požadavků, jenž zahrnují vedlejší efekty,
zejména (asynchronní) vstup a výstup. Typ Process[Task, String]
nám tedy říká, že proces bude dělat vstup a výstup a produkovat řetězce.
Jak jsme viděli minule, s procesy lze v určitých situacích pracovat jako se sekvencemi. Například prázdnou sekvenci resp. sekvenci s prvky 1, 2, 4, 8 zkonstruujeme takto
Seq.empty // Alternativne: Seq() Seq(1, 2, 4, 8)
a analogicky zkonstruujeme proces, jenž neprodukuje žádné prvky resp. produkuje prvky 1, 2, 4, 8
Process.empty // Alternativne: Process() Process(1, 2, 4, 8)
Tyto dva procesy mají typ Process[Nothing, Nothing]
resp.
Process[Nothing, Int]
. Z typů je patrné, že první proces
neprodukuje žádné prvky a druhý proces produkuje celá čísla.
Dále je vidět, že oba procesy nedělají žádné požadavky.
Typ procesů je kovariantní v obou parametrech, tudíž máme-li
val p: Process[Nothing, Int] = Process(1)
lze provést následující přiřazení
val q: Process[Task, Int] = p val r: Process[Nothing, Any] = p val s: Process[Task, Any] = p
Chceme-li změnit typ požadavku z Nothing
na Task
,
lze použít funkci toSource
.
V modulu Process
import Process._
se nachází celá řada funkcí pro konstrukci
procesů. Oblíbené jsou například funkce emit
a emitAll
.
První vytváří proces, jenž produkuje právě jeden prvek,
druhá vytváří proces, jenž produkuje prvky z dané sekvence.
Tedy
emit(1)
se chová jako Process(1)
a
emitAll(Seq(1, 2, 4))
se chová jako Process(Seq(1, 2, 4): _*)
. Předaná sekvence
může být i nekonečná:
emitAll(Stream.continually(1))
Procesy, jenž produkují nekonečně mnoho prvků, můžeme vytvářet i bez nekonečných sekvencí. Například
Process(1, 2, 3).repeat
se chová jako Process(1, 2, 3) ++ Process(1, 2, 3) ++ Process(1, 2, 3) ++ …
,
což můžeme zapsat také pomocí rekurze
def repeated: Process[Nothing, Int] = Process(1, 2, 3) ++ repeated
Vyhodnocení výrazu repeated
skončí díky tomu, že se druhý argument ++
předává jménem – tedy nevyhodnocuje se před zavoláním funkce ++
.
Při vytváření složitějších procesů využijeme funkce s parametry:
def fib(a: Int = 0, b: Int = 1): Process[Nothing, Int] = emit(a) ++ fib(b, a + b)
Volání fib()
vytvoří proces Fibonacciho čísel,
stav procesu se předává pomocí parametrů funkce.
Chceme-li, aby program něco dělal, musíme ho spustit. Stejně to funguje i s procesy –
aby proces něco dělal, musíme ho spustit. V minulém díle jsme procesy spouštěli
dvojím voláním funkce run
:
io.linesR("file.txt").run.run
Přirozenou otázkou je, proč se run
používá dvakrát.
io.linesR("file.txt")
má typ
Process[Task, String]
a první volání run
z tohoto procesu vytvoří hodnotu typu Task[Unit]
.
První volání vlastně nahradí proces, který dělal požadavky typu
Task[_]
, jedním velkým požadavkem typu Task[Unit]
.
A jak už asi tušíte, aby se něco stalo, i tento velký požadavek typu
Task[Unit]
je třeba spustit,
což se provede druhým voláním run
. Výraz
io.linesR("file.txt").run.run
přečte řádky souboru file.txt
a vrátí Unit
.
Pokud bychom chtěli vrátit přečtené řádky,
musíme místo prvního run
použít runLog
:
io.linesR("file.txt").runLog.run
runLog
uloží prvky, jenž proces vyprodukuje.
V našem případě runLog
vrátí hodnotu
typu Task[IndexedSeq[String]]
,
jejímž spuštěním dostaneme sekvenci řádků souboru.
Stejně jako run
ani runLog
neprovádí žádné vedlejší efekty, jedná se pouze
o transformaci procesu s požadavky typu F[_]
na hodnotu typu F[Unit]
v případě run
resp. F[A]
v případě runLog
(A
je typ produkovaných prvků).
Je to až druhý run
na hodnotě typu Task[Unit]
resp. Task[A]
, jenž provede vedlejší efekty.
run
a runLog
jde použít pouze na procesy,
kde typ požadavku splňuje určité vlastnosti (přesněji: typ požadavků
musí mít instanci Monad
a Catchable
;
Process
pak můžeme chápat jako monad transformer).
Například proces, jenž neprovádí žádné požadavky, má typ požadavku
Nothing
, který tyto vlastnosti nesplňuje, proto nelze psát
emit(1).runLog
Pro procesy, jenž neprovádí žádné požadavky,
máme typový alias Process0
emit(1): Process0[Int]
a produkované prvky lze získat pomocí funkce toList
emit(1).toList
Alternativně můžeme využít kovarianci procesů a proces přetypovat,
aby měl požadavky typu Task
:
emit(1).toSource.runLog.run
Síla knihovny scalaz-stream spočívá v tom, že složitou funkcionalitu lze poskládat z malých a znovupoužitelných dílků. Například máme-li proces, jenž produkuje čísla 0, 1, 2, -3
Process(0, 1, 2, -3)
a proces, jenž filtruje kladná čísla
process1.filter[Int](_ > 0)
pak tyto procesy můžeme propojit pomocí funkce |>
Process(0, 1, 2, -3) |> process1.filter(_ > 0)
čímž získáme proces, jenž produkuje čísla 1, 2.
|>
funguje jako roura z shellu,
zápis p |> q
napojí výstup procesu p
na vstup procesu q
. process1.filter[Int](_ > 0)
má typ Process1[Int, Int]
. Typ Process1[A, B]
značí procesy, které dělají požadavky na prvky typu A
a produkují prvky typu B
. Process1[A, B]
je typový alias
pro Process[Env[A,Any]#Is, B]
– typ požadavků tedy není A
,
ale Env[A,Any]#Is
. Alternativně lze místo |>
použít funkci
pipe
, tj. p.pipe(q)
je jiný zápis p |> q
.
Modul process1
je plný procesů typu Process1
,
které jde použít pro transformaci proudů. Některé z těchto transformací
jako například filter
nebo collect
se používají tak často, že si vysloužili zkratku, například místo
Process(0, 1, 2, -3) |> process1.filter(_ > 0)
můžeme psát
Process(0, 1, 2, -3).filter(_ > 0)
Jelikož Process0[B]
je podtyp Process1[A, B]
,
lze proces typu Process0[B]
použít všude, kde je očekáván proces
typu Process1[A, B]
:
io.linesR("file.txt") |> emit(1)
Roura funguje na principu pull. Máme-li p |> q
,
roura interpretuje q
a v okamžiku, kdy q
požaduje prvek z p
, roura začne interpretovat p
,
dokud p
nevyprodukuje prvek nebo neskončí, pak pokračuje
v interpretaci q
. Jelikož proces emit(1)
žádný prvek nepožaduje, není důvod začít interpretovat proces
io.linesR("file.txt")
, tedy k otevření souboru
file.txt
nedojde.
Hodnoty typu Process1
jde kombinovat standardním způsobem. Například vezmeme-li
val p: Process1[Int, Int] = await1 val q: Process1[Int, Int] = process1.skippak
(p ++ q).repeat
je proces, jenž vypustí každý druhý prvek. process1.skip
načte jeden prvek a skončí, žádný prvek není vyprodukován.
await1
načte jeden prvek, vyprodukuje ho a skončí.
Zobecněním await1
je proces process1.take(n)
,
který načte n
prvků a skončí, přičemž každý prvek je ihned
po načtení vyprodukován. await1
je tedy totéž, co process1.take(1)
.
Například chceme-li aplikovat filtr pouze na prvních 5 prvků, můžeme psát
process1.take[Int](5).filter(_ > 0) ++ process1.id
kde process1.id
je identita – proces,
který načítá prvky a beze změn je produkuje.
Důležitými stavebními kameny při psaní procesů typu Process1
jsou funkce await1
a flatMap
. flatMap
,
známý ze sekvencí, zamění každý prvek procesu za proces.
Process(1, 2, 3).map(f)
se chová jako
emit(f(1)) ++ emit(f(2)) ++ emit(f(3))
a Process(1, 2, 3).flatMap(f)
se chová jako
f(1) ++ f(2) ++ f(3)
Mějme predikát positive
na celých číslech:
def positive(i: Int) = i > 0
Chceme-li definovat process1.filter(positive)
, začneme
procesem, který přefiltruje jeden prvek a pak skončí:
val filterOne = await1[Int].flatMap { x => if (positive(x)) emit(x) else empty }
Je-li x
kladné, je vyprodukováno (emit(x)
),
v opačném případě není vyprodukováno nic (empty
).
S pomocí filterOne
definujeme process1.filter(positive)
jako
filterOne.repeat
Ekvivalentem zápisu await1.flatMap(f)
je zápis receive1(f)
.
Proces receive1(f)
načte prvek x
a pokračuje procesem f(x)
.
Pokud se žádný prvek načíst nepodaří, pokračuje procesem empty
.
Procesy typu Process1
mohou mít stav, můžeme tak napsat například proces,
který odstraní duplikáty:
def nub[A](history: Set[A] = Set.empty[A]): Process1[A, A] = receive1 { x => if (history.contains(x)) nub(history) else emit(x) ++ nub(history + x) }
Prvky, které se objevily, ukládáme do množiny history
.
Pokud se prvek neobjevil, tak ho vyprodukujeme pomocí emit
a pokračujeme
s množinou rozšířenou o x
(rekurzivní volání nub(history + x)
).
V některých situacích receive1
nestačí.
Například chceme-li vytvořit Process1
,
jenž sečte všechna čísla a poté součet vyprodukuje:
def badSum(s: Int = 0): Process1[Int, Int] = receive1 { x => badSum(s + x) }
badSum
součet nevyprodukuje. Součet bychom chtěli
totiž vyprodukovat až poté, co jsou všechna čísla sečtena,
jenže jak to poznat? Jak poznat, které číslo je poslední?
Pro tyto případy je k dispozici funkce receive1Or
:
def sum(s: Int = 0): Process1[Int, Int] = receive1Or[Int, Int](emit(s)) { x => sum(s + x) }
První parametr funkce receive1Or
je proces, který se použije v případě, že
už byly načteny všechny prvky – v našem případě emit(s)
.
Funkce receive1(f)
je tedy zkratkou za receive1Or(empty)(f)
.
Ukázali jsme si, jak lze dva procesy propojit rourou.
Roura (pipe
) napojí jeden proces na proces typu Process1
.
Příště se budeme zabývat procesy s vedlejšími efekty a ukážeme si propojení tvaru T
(tee
), které nám dovolí napojit dva procesy na proces typu Tee
.
určitě by bylo možné s procesem provést i jiné věci (např. serializovat?)Serializace asi nebude úplně jednoduchá, neboť procesy obsahují funkce (nebo dokonce uzávěry). Například serializovat následující proces
process1.filter(f(criteria))znamená mj. serializovat i funkci
f
a hodnotu criteria
.
Není mi moc jasný ten typ Env, kterým se Process parametrizuje jinde, ale předpokládám, že k tomu se asi blíže dostaneme příště.
Env
si můžeme vysvětlit, ale uděláme to spíše až přespříště. Příště se podíváme, jak se používá Task
, tee
, Sink
a Channel
.
ISSN 1214-1267, (c) 1999-2007 Stickfish s.r.o.