OpenJS Foundation, oficiální projekt konsorcia Linux Foundation, oznámila vydání verze 22 otevřeného multiplatformního prostředí pro vývoj a běh síťových aplikací napsaných v JavaScriptu Node.js (Wikipedie). V říjnu se verze 22 stane novou aktivní LTS verzí. Podpora je plánována do dubna 2027.
Byla vydána verze 8.2 open source virtualizační platformy Proxmox VE (Proxmox Virtual Environment, Wikipedie) založené na Debianu. Přehled novinek v poznámkách k vydání a v informačním videu. Zdůrazněn je průvodce migrací hostů z VMware ESXi do Proxmoxu.
R (Wikipedie), programovací jazyk a prostředí určené pro statistickou analýzu dat a jejich grafické zobrazení, bylo vydáno ve verzi 4.4.0. Její kódové jméno je Puppy Cup.
IBM kupuje společnost HashiCorp (Terraform, Packer, Vault, Boundary, Consul, Nomad, Waypoint, Vagrant, …) za 6,4 miliardy dolarů, tj. 35 dolarů za akcii.
Byl vydán TrueNAS SCALE 24.04 “Dragonfish”. Přehled novinek této open source storage platformy postavené na Debianu v poznámkách k vydání.
Oznámeny byly nové Raspberry Pi Compute Module 4S. Vedle původní 1 GB varianty jsou nově k dispozici také varianty s 2 GB, 4 GB a 8 GB paměti. Compute Modules 4S mají na rozdíl od Compute Module 4 tvar a velikost Compute Module 3+ a předchozích. Lze tak provést snadný upgrade.
Po roce vývoje od vydání verze 1.24.0 byla vydána nová stabilní verze 1.26.0 webového serveru a reverzní proxy nginx (Wikipedie). Nová verze přináší řadu novinek. Podrobný přehled v souboru CHANGES-1.26.
Byla vydána nová verze 6.2 živé linuxové distribuce Tails (The Amnesic Incognito Live System), jež klade důraz na ochranu soukromí uživatelů a anonymitu. Přehled změn v příslušném seznamu. Tor Browser byl povýšen na verzi 13.0.14.
Byla vydána nová verze 30.0.0 frameworku pro vývoj multiplatformních desktopových aplikací pomocí JavaScriptu, HTML a CSS Electron (Wikipedie, GitHub). Chromium bylo aktualizováno na verzi 124.0.6367.49, V8 na verzi 12.4 a Node.js na verzi 20.11.1. Electron byl původně vyvíjen pro editor Atom pod názvem Atom Shell. Dnes je na Electronu postavena celá řada dalších aplikací.
Byla vydána nová verze 9.0.0 otevřeného emulátoru procesorů a virtualizačního nástroje QEMU (Wikipedie). Přispělo 220 vývojářů. Provedeno bylo více než 2 700 commitů. Přehled úprav a nových vlastností v seznamu změn.
Task
a ukážeme si,
jak konstruovat procesy s vedlejšími efekty, jenž dělají požadavky typu
Task
. Poté se budeme věnovat Sink
ům a
Channel
ům. Na závěr článku si řekneme něco o plánovaných změnách
v knihovně scalaz-stream.Používáme knihovnu scalaz-stream verze 0.7a a předpokládáme následující importy:
import scalaz.concurrent.Task import scalaz.stream._ import Process._
Při programování s knihovnou scalaz-stream je zvykem,
že vedlejší efekty dělají pouze procesy s požadavky typu
Task
. Pomocí funkce Task.delay
můžeme do
Task
u zabalit každý výraz, například:
val t = Task.delay { println("Hi") }
Jak již víme, Task
lze spustit pomocí funkce run
:
t.run
Při každém spuštění se zabalený výraz vyhodnotí
a získaná hodnota je vrácena funkcí run
.
Při každém spuštění t
se tedy vypíše řetězec Hi
a run
vrátí hodnotu ()
typu Unit
.
Další funkcí, která vytváří Task
, je
Task.now
:
val t = Task.now(5 + 3)
Rozdíl mezi funkcemi Task.now
a Task.delay
spočívá
v tom, že funkci Task.now
se argument předává hodnotou (argument
je vyhodnocen před zavoláním funkce) a funkci Task.delay
se předává jménem. V našem případě je tedy výraz 5 + 3
vyhodnocen a teprve poté je zkonstruován Task
.
Například následující Task
vrací pokaždé stejné náhodné číslo:
import scala.util.Random val badRandNumGen = Task.now { Random.nextInt }
Náhodné číslo je totiž vygenerováno ještě před zkonstruováním
Task
u, při jeho spouštění se žádné náhodné
číslo negeneruje. Použijeme-li Task.delay
místo Task.now
val randNumGen = Task.delay { Random.nextInt }
dostaneme Task
, který při každém spuštění generuje nové
náhodné číslo. Pokud výraz uvnitř Task
u vyhodí výjimku,
je tato výjimka následně vyhozena i funkcí run
.
Alternativně můžeme použít funkci attemptRun
,
která výjimku nevyhazuje, ale vrací. Výsledek funkce
attemptRun
má typ Throwable \/ A
,
kde \/
je jako Either
.
Například
Task.delay { 5 / 0 }.attemptRun
vrátí hodnotu tvaru -\/(_: ArithmeticException)
.
Chceme-li do Task
u zabalit asynchronní
výpočet, použijeme funkci Task.async
:
import scalaz.\/ def startAsyncComputation[A](cb: (Throwable \/ A) => Unit): Unit = ??? val t = Task.async(startAsyncComputation)
Funkce startAsyncComputation
spustí asynchronní výpočet a skončí.
Výsledek asynchronního výpočtu se do t
předá pomocí callbacku
cb
, který funkce startAsyncComputation
dostala jako argument. Funkce Task.async
je obzvláště užitečná
pro propojení s okolním světem. Například díky Task.async
snadno
napíšeme funkci, která převede Future
na Task
:
import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits. global import scala.util.{Failure, Success} import scalaz.{-\/, \/-} def futureToTask[A](fut: => Future[A]): Task[A] = Task.async[A](cb => fut onComplete { case Success(a) => cb(\/-(a)) case Failure(t) => cb(-\/(t)) })
Future
se funkci futureToTask
předává jménem,
aby se zamezilo jejímu spuštění před spuštěním
Task
u. Zde je příklad použití:
val t = futureToTask(Future(println("Hi"))) t.run
Na závěr našeho stručného představení Task
u poznamenejme,
že Task
je monáda, k dispozici jsou tudíž funkce
map
, flatMap
i for
notace.
Task
y
Máme-li Task
, můžeme pomocí eval
vytvořit proces,
který Task
spustí a vyprodukuje hodnotu z Task
u.
Například proces, který vyprodukuje jedno náhodné číslo:
eval(randNumGen)
Název eval
je poněkud zavádějící, neboť eval
Task
y nespouští a ani nijak nevyhodnocuje, eval
pouze zabalí Task
do procesu. Například
val randomNumbers = eval(randNumGen).repeat
je proces s mnoha náhodnými čísly, nikoliv s jedním
náhodným číslem, které se opakuje. eval
do procesu zabalí celý
Task
, tj. nedojde k tomu, že by eval
napřed spustil
Task
randNumGen
a poté do procesu zabalil pouze výsledek Task
u.
eval
a repeat
se často používají společně,
proto vznikla funkce repeatEval
, která nám výše uvedený
proces náhodných čísel dovoluje přepsat na jedno volání:
val randomNumbers = repeatEval(randNumGen)
Pokud nás výsledek Task
u nezajímá, můžeme použít
eval_
, který Task
spustí, ale jeho výsledek zahodí:
val p = eval_(Task.delay { println("Starting") }) ++ randomNumbers
p
je proces produkující čísla, kdybychom však místo
eval_
použili eval
, měli bychom proces produkující
AnyVal
– první vyprodukovaný prvek
by byl ()
vrácený println("Starting")
,
ten by byl následován náhodnými čísly.
Obecně, chceme-li, aby proces žádné prvky neprodukoval, můžeme použít
funkci drain
, ta z procesu, který produkuje prvky typu
A
, udělá proces, který neprodukuje žádné prvky
(typ produkovaných prvků je Nothing
).
eval_(t)
je zkratka za eval(t).drain
.
Nyní se pokusíme napsat proces, který přečte a vyprodukuje řádky
ze souboru – něco jako io.linesR
. Náš první pokus je:
import scala.io.Source def badLinesR(file: String): Process[Task, String] = eval(Task.delay { Source.fromFile(file) }).flatMap { src => lazy val lines = src.getLines repeatEval(Task.delay { lines.next }) ++ eval_(Task.delay { src.close }) }
První problém funkce badLinesR
je, že volá
lines.next
aniž by testovala lines.hasNext
.
Pokud je lines.haxNext
false
, není chování
lines.next
definováno. Otázkou je, co dělat, když je
lines.hasNext
false
?
Pro tyto případy existuje speciální výjimka
Cause.Terminated(Cause.End)
,
kterou můžeme vyhodit. Tato výjimka zajistí, že repeatEval
skončí
a pokračuje se další částí procesu – v našem případě
eval_(Task.delay { src.close })
.
Další problém funkce badLinesR
je, že nezaručuje uzavření souboru.
Tento problém nastane, pokud proces předčasně přerušíme, například pomocí
take
:
// Simuluje cteni souboru se dvema radky. val p = eval(Task.delay { println("Line 1") }) ++ eval(Task.delay { println("Line 2") }) ++ eval_(Task.delay { println("Closing") }) p.take(1).run.run // Closing se nevypise. p.take(2).run.run // Closing se nevypise. p.take(3).run.run
Problém s neuzavřením souboru rovněž nastane, když Task
vyhodí
jinou výjimku než Cause.Terminated(Cause.End)
:
val p = eval(Task.delay { 5 / 0 }) ++ eval_(Task.delay { println("Closing") }) p.run.run // Closing se nevypise, run vyhodi vyjimku.
S výjimkou Cause.Terminated(Cause.End)
tento problém nenastane
a soubor se uzavře:
val p = eval(Task.delay { throw Cause.Terminated(Cause.End) }) ++ eval_(Task.delay { println("Closing") }) p.run.run // Closing se vypise, run zadnou vyjimku nevyhodi.
Obecně, máme-li proces tvaru p ++ q
a skončí-li p
výjimkou jinou než Cause.Terminated(Cause.End)
, proces
q
se vůbec nevykoná.
Pokud se q
má vykonat v každém případě bez ohledu na to,
jak skončil proces p
, je třeba použít onComplete
místo ++
:
val p = eval(Task.delay { 5 / 0 }) onComplete eval_(Task.delay { println("Closing") }) p.run.run // Closing se vypise, run vyhodi vyjimku.
Nyní již známe vše, abychom napsali funkci linesR
:
def linesR(file: String): Process[Task, String] = eval(Task.delay { Source.fromFile(file) }).flatMap { src => lazy val lines = src.getLines repeatEval(Task.delay { if (lines.hasNext) lines.next else throw Cause.Terminated(Cause.End) }) onComplete eval_(Task.delay { src.close }) }
V knihovně scalaz-stream je funkce io.resource
, která zobecňuje
naši funkci linesR
pro jiné zdroje. Ostatní funkce v modulu
io
(včetně funkce io.linesR
) využívají tuto obecnou
funkci io.resource
.
Jak již víme, p onComplete q
zajistí, že se q
spustí bez ohledu na to, jak skončilo p
. Kromě toho
onComplete
zajistí, že proces q
není možné přerušit:
val p = emit(1) val q = eval_(Task.delay { println("One") }) ++ emit(2) ++ eval_(Task.delay { println("Two") }) val r = p onComplete q r.take(2).run.run // Vypise One i Two.
Ačkoliv z procesu r
načítáme pouze dva prvky, proces
q
doběhne celý – vypíše se i řetězec Two.
Pokud by q
byl proces, který nikdy neskončí (například
emit(0).repeat
), tak by ani proces r.take(2)
nikdy
neskončil. p onComplete q
se nejčastěji používá v situacích,
kdy proces q
má uklidit po procesu p
–
q
nejde přerušit, aby bylo zaručeno, že se úklid provede celý.
Sink
a Channel
V této části se budeme zabývat tím, jak
prvky produkované procesem někam uložit nebo někam zapsat.
Pokud proces produkuje málo prvků a stačí, když se zápis provede
až celý proces skončí, můžeme použít runLog
. Proces spustíme
pomocí runLog.run
a poté, co skončí, vyprodukované prvky zapíšeme.
Situace se ovšem zkomplikuje, když proces produkuje (neomezeně) mnoho
prvků a my je chceme zapisovat průběžně. V některých případech (například
logování) lze zápisy umístit do map
nebo do nějakého jiného
procesu typu Process1
. Pokud však zápisy prvků používají
Task
(například proto, že jsou asynchronní, nebo proto,
že je dobrým zvykem vedlejší efekty umístit do Task
ů),
je třeba použít i proces
s typem požadavků Task
, což Process1
není.
Naštěstí má výše nastíněný problém velmi jednoduché řešení. Tím řešením je proces funkcí. Pojďme se podívat na příklad. Mějme proces řádků ze souboru
val p: Process[Task, String] = io.linesR("file.txt")
a předpokládejme, že chceme tyto řádky zapsat na standardní výstup. K zápisu na standardní výstup použijeme proces funkcí, které zapisují na standardní výstup:
val stdOut: Process[Task, String => Unit] = constant(println)
constant(a)
je zkratka za emit(a).repeat
.
Pro každý řádek z p
zavoláme funkci
ze stdOut
– k tomu lze použít zip
a map
p.zip(stdOut).map { case (line, f) => f(line) }.run.run
nebo zipWith
:
p.zipWith(stdOut) { (line, f) => f(line) }.run.run
To funguje, ale vedlejší efekt f(line)
se stále provádí v procesu typu Process1
.
Navíc opět nepoužíváme Task
– funkce, které provádí
vedlejší efekty, mají typ String => Unit
.
Změníme tedy typ funkcí ze String => Unit
na String => Task[Unit]
:
val stdOut: Process[Task, String => Task[Unit]] = constant(s => Task.delay { println(s) })
Když spustíme náš kód
p.zipWith(stdOut) { (line, f) => f(line) }.run.run
zjistíme, že se nic nevypisuje. Problém je v tom, že proces
p.zipWith(stdOut) { (line, f) => f(line) }
produkuje Task
y, které nespouští – typ procesu je
Process[Task, Task[Unit]]
.
Aby se něco vypsalo, potřebujeme produkované Task
y spustit, což
provede funkce eval
:
p.zipWith(stdOut) { (line, f) => f(line) }.eval.run.run
eval
tedy z procesu typu Process[Task, Task[A]]
udělá proces typu Process[Task, A]
. stdOut
je Sink
. Pro typ Process[Task, A => Task[Unit]]
existuje typový alias Sink[Task, A]
.
Data do Sink
u posíláme funkcí to
:
p.to(stdOut).run.run
Funkce to
je implementována pomocí zipWith
a eval
– přesně jako v našem příkladu.
S pomocí funkce io.resource
snadno naimplementujeme Sink
, který zapisuje řádky do souboru:
import java.io.FileWriter def linesW(file: String): Sink[Task, String] = io.resource( Task.delay { new FileWriter(file) } )( fw => Task.delay { fw.close() } )( fw => Task.now(line => Task.delay { fw.write(line); fw.write("\n") }) )
to
nahrazuje produkované prvky hodnotou ()
.
Pokud chceme prvky zachovat, musíme použít funkci observe
–
ta se chová jako to
, ale nenahrazuje prvky hodnotou
()
. S pomocí observe
můžeme poslat prvky do více
Sink
ů:
p.observe(stdOut).to(linesW("file2.txt")).run.run
Do Sink
u stdOut
můžeme posílat pouze řetězce,
pokud bychom tam chtěli poslat několik náhodných
čísel, museli bychom čísla převést na řetězce:
randomNumbers.map(_.toString).observe(stdOut).take(10). runLog.run
Stinnou stránkou tohoto řešení je, že proces už neprodukuje
čísla, ale řetězce. V podobných případech se hodí funkce
contramap
, která změní změní typ vstupních prvků
Sink
u:
randomNumbers.observe(stdOut.contramap(_.toString)).take( 10).runLog.run
contramap
lze snadno implementovat pomocí
map
– v našem případě by map
transformoval funkce typu String => Task[Unit]
na funkce typu
Int => Task[Unit]
.
Sink[Task, A]
je proces, který produkuje funkce
A => Task[Unit]
.
Nahradíme-li Unit
typovým parametrem B
, dostaneme
proces, kterému se říká Channel
. Channel[Task, A, B]
je tedy alias pro Process[Task, A => Task[B]]
a Sink[Task, A]
můžeme chápat jako alias
pro Channel[Task, A, Unit]
. Channel[Task, A, B]
používáme pro transformaci prvků typu A
na prvky typu
B
. Do Channel
u se prvky posílají pomocí funkce
through
, typ vstupních prvků lze změnit pomocí funkce
contramap
.
Připomeňme si Process1
nub
pro odstranění duplikátů
z minulého dílu
def nub[A](history: Set[A] = Set.empty[A]): Process1[A, A] = receive1 { x => if (history.contains(x)) nub(history) else emit(x) ++ nub(history + x) }
a zkusme napsat Channel
, který dělá totéž. První nepříjemnost je,
že Channel
transformuje jeden prvek vstupu
na jeden prvek výstupu, jenže nub
v případě duplikátů
žádné prvky neprodukuje. Toto omezení Channel
ů obejdeme tak,
že použijeme Channel
typu
Channel[Task, A, Option[A]]
(místo Channel[Task, A, A]
) –
pro duplikáty bude Channel
vracet None
.
Další nepříjemností je, že nemůžeme použít rekurzi pro předávání
aktualizované množiny history
. Problém je v tom,
že Channel
napřed vyprodukuje funkce
A => Task[Option[A]]
a teprve poté jsou tyto funkce aplikovány a Task
y spuštěny
pomocí through
. Samotný Channel
tedy žádné
filtrování neprovádí – ten pouze produkuje funkce
A => Task[Option[A]]
. Filtrování
se provádí až pomocí through
– je to tedy funkce
through
, od níž bychom potřebovali, aby předávala množinu
history
mezi funkcemi z Channel
u. Jelikož funkce
through
nic takového neumí, musíme si pomoci sami –
po spuštění Channel
u vytvoříme měnitelnou množinu, kterou
dáme do všech produkovaných funkcí:
def nubChannel[A]: Channel[Task, A, Option[A]] = eval(Task.delay { collection.mutable.Set.empty[A] }).flatMap { history => constant((x: A) => Task.delay { if (history.contains(x)) None else { history += x Some(x) } }).toSource }
Použití nub
:
Process(1, 2, 1, 0, 1, 2).toSource |> nub()
Použití nubChannel
:
(Process(1, 2, 1, 0, 1, 2).toSource through nubChannel) |> process1.stripNone
Na rozdíl od Channel
u může Process1
vrátit
libovolný počet prvků a snáze se tam pracuje se stavem. Na rozdíl
od Process1
může Channel
provádět vedlejší efekty. Pokud nepotřebujeme vedlejší efekty,
dáváme přednost Process1
před Channel
em –
důvodem je právě flexibilita Process1
.
Hezké je, že řadu funkcí, které jsme si zde představili, může
napsat i uživatel knihovny. Například p.drain
můžeme nahradit p.flatMap { x => empty }
nebo p.run
lze nahradit funkcemi p.drain.runLog
. Už jsme také viděli,
že repeatEval(t)
je zkratka za eval(t).repeat
,
nebo ne? Skutečnost bohužel není tak růžová, jak by se mohlo zdát:
val x = eval(Task.delay { throw Cause.Terminated(Cause.End) }).repeat val y = repeatEval(Task.delay { throw Cause.Terminated(Cause.End) })
Na rozdíl od procesu y
proces x
nikdy neskončí.
V případě výjimky Cause.Terminated(Cause.End)
se tedy
repeatEval
nechová jako eval
a repeat
.
V případě jiných výjimek se x
i y
chovají stejně.
Příkladem stejného charakteru je proces p.eval
, který můžeme
zapsat jako p.flatMap(eval)
, ale opět zjistíme, že
se v případě výjimky Cause.Terminated(Cause.End)
obě konstrukce liší:
val x = constant(Task.delay { throw Cause.Terminated(Cause.End) }).flatMap(eval) val y = constant(Task.delay { throw Cause.Terminated(Cause.End) }).eval
V uvedeném příkladu x
opět neskončí a y
ano.
Pravděpodobně z tohoto důvodu bude
výjimka Cause.Terminated(Cause.End)
odstraněna
z další verze knihovny scalaz-stream.
Další plánovaná změna v knihovně scalaz-stream se týká funkcí
++
a onComplete
. Kdybychom místo ++
a onComplete
použili
syntaxi jazyka Scala, pak by se proces p ++ q
zapisoval jako
{ p; q }
a p onComplete q
jako try { p } finally { q }
.
Následující dva procesy se tedy chovají jinak:
val x = (p ++ q) onComplete r val y = p ++ (q onComplete r)
Použijeme-li syntax Scaly, je x ekvivalentní
try { p; q } finally { r }
a y ekvivalentní
p try { q } finally { r }
U x
se r
vykoná vždy, zatímco u y
se r
nevykoná, pokud p
skončí výjimkou.
Některým uživatelům knihovny se toto chování nelíbí a chtěli by, aby
se proces r
v obou případech vykonal vždy.
Příště se naučíme, jak napojit více procesů na jeden proces.
Řeč bude o procesech typu Tee
a Wye
a o funkcích tee
a wye
.
Například nám již známé funkce zip
a zipWith
lze implementovat pomocí typu Tee
a funkce tee
.
Nástroje: Tisk bez diskuse
Tiskni Sdílej:
pull based přístupu scalaz-stream (následující články řetězu tahají data od předchozích)Ano, scalaz-stream je pull based. Push lze nasimulovat například pomocí fronty:
val p = (Process(1, 2, 3).toSource ++ eval_(Task.delay(println("print")))).repeat val processItem: Sink[Task, Int] = io.stdOutLines.contramap(_.toString) // Vytvorime frontu. val q = async.unboundedQueue[Int] // Prvky produkovane procesem p posleme do fronty. p.to(q.enqueue).run.runAsync(x => println("enqueue end")) // q.dequeue je proces prvku z fronty. // Prvky z fronty posilame do Sinku processItem, ktery prvky zpracovava. // Az jsou prvky zpracovany (nebo po vyhozeni vyjimky) frontu zavreme // -- zavreni fronty zajisti ukonceni procesu, jenz dava prvky do fronty // -- tj. ukonceni p.to(q.enqueue).run.runAsync(x => println("enqueue end")). (q.dequeue.to(processItem).take(2) onComplete eval_(q.close)).run.runKód obvykle několikrát vypíše řetězec print – počet vypsaných řetězců print závisí na tom, jak rychle se prvky produkované
p
posílají do fronty q
. Kromě neomezené fronty async.unboundedQueue
existuje i omezená fronta async.boundedQueue
.
A jaký je vlastně rozdíl mezi pull-based přístupem a obyčejnými streamy?Myslím, že obyčejné streamy jsou také pull based.