Google Chrome 136 byl prohlášen za stabilní. Nejnovější stabilní verze 136.0.7103.59 přináší řadu novinek z hlediska uživatelů i vývojářů. Podrobný přehled v poznámkách k vydání. Opraveno bylo 8 bezpečnostních chyb. Vylepšeny byly také nástroje pro vývojáře.
Homebrew (Wikipedie), správce balíčků pro macOS a od verze 2.0.0 také pro Linux, byl vydán ve verzi 4.5.0. Na stránce Homebrew Formulae lze procházet seznamem balíčků. K dispozici jsou také různé statistiky.
Byl vydán Mozilla Firefox 138.0. Přehled novinek v poznámkách k vydání a poznámkách k vydání pro vývojáře. Řešeny jsou rovněž bezpečnostní chyby. Nový Firefox 138 je již k dispozici také na Flathubu a Snapcraftu.
Šestnáctý ročník ne-konference jOpenSpace se koná 3. – 5. října 2025 v Hotelu Antoň v Telči. Pro účast je potřeba vyplnit registrační formulář. Ne-konference neznamená, že se organizátorům nechce připravovat program, ale naopak dává prostor všem pozvaným, aby si program sami složili z toho nejzajímavějšího, čím se v poslední době zabývají nebo co je oslovilo. Obsah, který vytvářejí všichni účastníci, se skládá z desetiminutových
… více »Richard Stallman přednáší ve středu 7. května od 16:30 na Technické univerzitě v Liberci o vlivu technologií na svobodu. Přednáška je určená jak odborné tak laické veřejnosti.
Jean-Baptiste Mardelle se v příspěvku na blogu rozepsal o novinkám v nejnovější verzi 25.04.0 editoru videa Kdenlive (Wikipedie). Ke stažení také na Flathubu.
TmuxAI (GitHub) je AI asistent pro práci v terminálu. Vyžaduje účet na OpenRouter.
Byla vydána nová verze R14.1.4 desktopového prostředí Trinity Desktop Environment (TDE, fork KDE 3.5, Wikipedie). Přehled novinek i s náhledy v poznámkách k vydání. Podrobný přehled v Changelogu.
Bylo vydáno OpenBSD 7.7. Opět bez písničky.
Task
a ukážeme si,
jak konstruovat procesy s vedlejšími efekty, jenž dělají požadavky typu
Task
. Poté se budeme věnovat Sink
ům a
Channel
ům. Na závěr článku si řekneme něco o plánovaných změnách
v knihovně scalaz-stream.Používáme knihovnu scalaz-stream verze 0.7a a předpokládáme následující importy:
import scalaz.concurrent.Task import scalaz.stream._ import Process._
Při programování s knihovnou scalaz-stream je zvykem,
že vedlejší efekty dělají pouze procesy s požadavky typu
Task
. Pomocí funkce Task.delay
můžeme do
Task
u zabalit každý výraz, například:
val t = Task.delay { println("Hi") }
Jak již víme, Task
lze spustit pomocí funkce run
:
t.run
Při každém spuštění se zabalený výraz vyhodnotí
a získaná hodnota je vrácena funkcí run
.
Při každém spuštění t
se tedy vypíše řetězec Hi
a run
vrátí hodnotu ()
typu Unit
.
Další funkcí, která vytváří Task
, je
Task.now
:
val t = Task.now(5 + 3)
Rozdíl mezi funkcemi Task.now
a Task.delay
spočívá
v tom, že funkci Task.now
se argument předává hodnotou (argument
je vyhodnocen před zavoláním funkce) a funkci Task.delay
se předává jménem. V našem případě je tedy výraz 5 + 3
vyhodnocen a teprve poté je zkonstruován Task
.
Například následující Task
vrací pokaždé stejné náhodné číslo:
import scala.util.Random val badRandNumGen = Task.now { Random.nextInt }
Náhodné číslo je totiž vygenerováno ještě před zkonstruováním
Task
u, při jeho spouštění se žádné náhodné
číslo negeneruje. Použijeme-li Task.delay
místo Task.now
val randNumGen = Task.delay { Random.nextInt }
dostaneme Task
, který při každém spuštění generuje nové
náhodné číslo. Pokud výraz uvnitř Task
u vyhodí výjimku,
je tato výjimka následně vyhozena i funkcí run
.
Alternativně můžeme použít funkci attemptRun
,
která výjimku nevyhazuje, ale vrací. Výsledek funkce
attemptRun
má typ Throwable \/ A
,
kde \/
je jako Either
.
Například
Task.delay { 5 / 0 }.attemptRun
vrátí hodnotu tvaru -\/(_: ArithmeticException)
.
Chceme-li do Task
u zabalit asynchronní
výpočet, použijeme funkci Task.async
:
import scalaz.\/ def startAsyncComputation[A](cb: (Throwable \/ A) => Unit): Unit = ??? val t = Task.async(startAsyncComputation)
Funkce startAsyncComputation
spustí asynchronní výpočet a skončí.
Výsledek asynchronního výpočtu se do t
předá pomocí callbacku
cb
, který funkce startAsyncComputation
dostala jako argument. Funkce Task.async
je obzvláště užitečná
pro propojení s okolním světem. Například díky Task.async
snadno
napíšeme funkci, která převede Future
na Task
:
import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits. global import scala.util.{Failure, Success} import scalaz.{-\/, \/-} def futureToTask[A](fut: => Future[A]): Task[A] = Task.async[A](cb => fut onComplete { case Success(a) => cb(\/-(a)) case Failure(t) => cb(-\/(t)) })
Future
se funkci futureToTask
předává jménem,
aby se zamezilo jejímu spuštění před spuštěním
Task
u. Zde je příklad použití:
val t = futureToTask(Future(println("Hi"))) t.run
Na závěr našeho stručného představení Task
u poznamenejme,
že Task
je monáda, k dispozici jsou tudíž funkce
map
, flatMap
i for
notace.
Task
y
Máme-li Task
, můžeme pomocí eval
vytvořit proces,
který Task
spustí a vyprodukuje hodnotu z Task
u.
Například proces, který vyprodukuje jedno náhodné číslo:
eval(randNumGen)
Název eval
je poněkud zavádějící, neboť eval
Task
y nespouští a ani nijak nevyhodnocuje, eval
pouze zabalí Task
do procesu. Například
val randomNumbers = eval(randNumGen).repeat
je proces s mnoha náhodnými čísly, nikoliv s jedním
náhodným číslem, které se opakuje. eval
do procesu zabalí celý
Task
, tj. nedojde k tomu, že by eval
napřed spustil
Task
randNumGen
a poté do procesu zabalil pouze výsledek Task
u.
eval
a repeat
se často používají společně,
proto vznikla funkce repeatEval
, která nám výše uvedený
proces náhodných čísel dovoluje přepsat na jedno volání:
val randomNumbers = repeatEval(randNumGen)
Pokud nás výsledek Task
u nezajímá, můžeme použít
eval_
, který Task
spustí, ale jeho výsledek zahodí:
val p = eval_(Task.delay { println("Starting") }) ++ randomNumbers
p
je proces produkující čísla, kdybychom však místo
eval_
použili eval
, měli bychom proces produkující
AnyVal
– první vyprodukovaný prvek
by byl ()
vrácený println("Starting")
,
ten by byl následován náhodnými čísly.
Obecně, chceme-li, aby proces žádné prvky neprodukoval, můžeme použít
funkci drain
, ta z procesu, který produkuje prvky typu
A
, udělá proces, který neprodukuje žádné prvky
(typ produkovaných prvků je Nothing
).
eval_(t)
je zkratka za eval(t).drain
.
Nyní se pokusíme napsat proces, který přečte a vyprodukuje řádky
ze souboru – něco jako io.linesR
. Náš první pokus je:
import scala.io.Source def badLinesR(file: String): Process[Task, String] = eval(Task.delay { Source.fromFile(file) }).flatMap { src => lazy val lines = src.getLines repeatEval(Task.delay { lines.next }) ++ eval_(Task.delay { src.close }) }
První problém funkce badLinesR
je, že volá
lines.next
aniž by testovala lines.hasNext
.
Pokud je lines.haxNext
false
, není chování
lines.next
definováno. Otázkou je, co dělat, když je
lines.hasNext
false
?
Pro tyto případy existuje speciální výjimka
Cause.Terminated(Cause.End)
,
kterou můžeme vyhodit. Tato výjimka zajistí, že repeatEval
skončí
a pokračuje se další částí procesu – v našem případě
eval_(Task.delay { src.close })
.
Další problém funkce badLinesR
je, že nezaručuje uzavření souboru.
Tento problém nastane, pokud proces předčasně přerušíme, například pomocí
take
:
// Simuluje cteni souboru se dvema radky. val p = eval(Task.delay { println("Line 1") }) ++ eval(Task.delay { println("Line 2") }) ++ eval_(Task.delay { println("Closing") }) p.take(1).run.run // Closing se nevypise. p.take(2).run.run // Closing se nevypise. p.take(3).run.run
Problém s neuzavřením souboru rovněž nastane, když Task
vyhodí
jinou výjimku než Cause.Terminated(Cause.End)
:
val p = eval(Task.delay { 5 / 0 }) ++ eval_(Task.delay { println("Closing") }) p.run.run // Closing se nevypise, run vyhodi vyjimku.
S výjimkou Cause.Terminated(Cause.End)
tento problém nenastane
a soubor se uzavře:
val p = eval(Task.delay { throw Cause.Terminated(Cause.End) }) ++ eval_(Task.delay { println("Closing") }) p.run.run // Closing se vypise, run zadnou vyjimku nevyhodi.
Obecně, máme-li proces tvaru p ++ q
a skončí-li p
výjimkou jinou než Cause.Terminated(Cause.End)
, proces
q
se vůbec nevykoná.
Pokud se q
má vykonat v každém případě bez ohledu na to,
jak skončil proces p
, je třeba použít onComplete
místo ++
:
val p = eval(Task.delay { 5 / 0 }) onComplete eval_(Task.delay { println("Closing") }) p.run.run // Closing se vypise, run vyhodi vyjimku.
Nyní již známe vše, abychom napsali funkci linesR
:
def linesR(file: String): Process[Task, String] = eval(Task.delay { Source.fromFile(file) }).flatMap { src => lazy val lines = src.getLines repeatEval(Task.delay { if (lines.hasNext) lines.next else throw Cause.Terminated(Cause.End) }) onComplete eval_(Task.delay { src.close }) }
V knihovně scalaz-stream je funkce io.resource
, která zobecňuje
naši funkci linesR
pro jiné zdroje. Ostatní funkce v modulu
io
(včetně funkce io.linesR
) využívají tuto obecnou
funkci io.resource
.
Jak již víme, p onComplete q
zajistí, že se q
spustí bez ohledu na to, jak skončilo p
. Kromě toho
onComplete
zajistí, že proces q
není možné přerušit:
val p = emit(1) val q = eval_(Task.delay { println("One") }) ++ emit(2) ++ eval_(Task.delay { println("Two") }) val r = p onComplete q r.take(2).run.run // Vypise One i Two.
Ačkoliv z procesu r
načítáme pouze dva prvky, proces
q
doběhne celý – vypíše se i řetězec Two.
Pokud by q
byl proces, který nikdy neskončí (například
emit(0).repeat
), tak by ani proces r.take(2)
nikdy
neskončil. p onComplete q
se nejčastěji používá v situacích,
kdy proces q
má uklidit po procesu p
–
q
nejde přerušit, aby bylo zaručeno, že se úklid provede celý.
Sink
a Channel
V této části se budeme zabývat tím, jak
prvky produkované procesem někam uložit nebo někam zapsat.
Pokud proces produkuje málo prvků a stačí, když se zápis provede
až celý proces skončí, můžeme použít runLog
. Proces spustíme
pomocí runLog.run
a poté, co skončí, vyprodukované prvky zapíšeme.
Situace se ovšem zkomplikuje, když proces produkuje (neomezeně) mnoho
prvků a my je chceme zapisovat průběžně. V některých případech (například
logování) lze zápisy umístit do map
nebo do nějakého jiného
procesu typu Process1
. Pokud však zápisy prvků používají
Task
(například proto, že jsou asynchronní, nebo proto,
že je dobrým zvykem vedlejší efekty umístit do Task
ů),
je třeba použít i proces
s typem požadavků Task
, což Process1
není.
Naštěstí má výše nastíněný problém velmi jednoduché řešení. Tím řešením je proces funkcí. Pojďme se podívat na příklad. Mějme proces řádků ze souboru
val p: Process[Task, String] = io.linesR("file.txt")
a předpokládejme, že chceme tyto řádky zapsat na standardní výstup. K zápisu na standardní výstup použijeme proces funkcí, které zapisují na standardní výstup:
val stdOut: Process[Task, String => Unit] = constant(println)
constant(a)
je zkratka za emit(a).repeat
.
Pro každý řádek z p
zavoláme funkci
ze stdOut
– k tomu lze použít zip
a map
p.zip(stdOut).map { case (line, f) => f(line) }.run.run
nebo zipWith
:
p.zipWith(stdOut) { (line, f) => f(line) }.run.run
To funguje, ale vedlejší efekt f(line)
se stále provádí v procesu typu Process1
.
Navíc opět nepoužíváme Task
– funkce, které provádí
vedlejší efekty, mají typ String => Unit
.
Změníme tedy typ funkcí ze String => Unit
na String => Task[Unit]
:
val stdOut: Process[Task, String => Task[Unit]] = constant(s => Task.delay { println(s) })
Když spustíme náš kód
p.zipWith(stdOut) { (line, f) => f(line) }.run.run
zjistíme, že se nic nevypisuje. Problém je v tom, že proces
p.zipWith(stdOut) { (line, f) => f(line) }
produkuje Task
y, které nespouští – typ procesu je
Process[Task, Task[Unit]]
.
Aby se něco vypsalo, potřebujeme produkované Task
y spustit, což
provede funkce eval
:
p.zipWith(stdOut) { (line, f) => f(line) }.eval.run.run
eval
tedy z procesu typu Process[Task, Task[A]]
udělá proces typu Process[Task, A]
. stdOut
je Sink
. Pro typ Process[Task, A => Task[Unit]]
existuje typový alias Sink[Task, A]
.
Data do Sink
u posíláme funkcí to
:
p.to(stdOut).run.run
Funkce to
je implementována pomocí zipWith
a eval
– přesně jako v našem příkladu.
S pomocí funkce io.resource
snadno naimplementujeme Sink
, který zapisuje řádky do souboru:
import java.io.FileWriter def linesW(file: String): Sink[Task, String] = io.resource( Task.delay { new FileWriter(file) } )( fw => Task.delay { fw.close() } )( fw => Task.now(line => Task.delay { fw.write(line); fw.write("\n") }) )
to
nahrazuje produkované prvky hodnotou ()
.
Pokud chceme prvky zachovat, musíme použít funkci observe
–
ta se chová jako to
, ale nenahrazuje prvky hodnotou
()
. S pomocí observe
můžeme poslat prvky do více
Sink
ů:
p.observe(stdOut).to(linesW("file2.txt")).run.run
Do Sink
u stdOut
můžeme posílat pouze řetězce,
pokud bychom tam chtěli poslat několik náhodných
čísel, museli bychom čísla převést na řetězce:
randomNumbers.map(_.toString).observe(stdOut).take(10). runLog.run
Stinnou stránkou tohoto řešení je, že proces už neprodukuje
čísla, ale řetězce. V podobných případech se hodí funkce
contramap
, která změní změní typ vstupních prvků
Sink
u:
randomNumbers.observe(stdOut.contramap(_.toString)).take( 10).runLog.run
contramap
lze snadno implementovat pomocí
map
– v našem případě by map
transformoval funkce typu String => Task[Unit]
na funkce typu
Int => Task[Unit]
.
Sink[Task, A]
je proces, který produkuje funkce
A => Task[Unit]
.
Nahradíme-li Unit
typovým parametrem B
, dostaneme
proces, kterému se říká Channel
. Channel[Task, A, B]
je tedy alias pro Process[Task, A => Task[B]]
a Sink[Task, A]
můžeme chápat jako alias
pro Channel[Task, A, Unit]
. Channel[Task, A, B]
používáme pro transformaci prvků typu A
na prvky typu
B
. Do Channel
u se prvky posílají pomocí funkce
through
, typ vstupních prvků lze změnit pomocí funkce
contramap
.
Připomeňme si Process1
nub
pro odstranění duplikátů
z minulého dílu
def nub[A](history: Set[A] = Set.empty[A]): Process1[A, A] = receive1 { x => if (history.contains(x)) nub(history) else emit(x) ++ nub(history + x) }
a zkusme napsat Channel
, který dělá totéž. První nepříjemnost je,
že Channel
transformuje jeden prvek vstupu
na jeden prvek výstupu, jenže nub
v případě duplikátů
žádné prvky neprodukuje. Toto omezení Channel
ů obejdeme tak,
že použijeme Channel
typu
Channel[Task, A, Option[A]]
(místo Channel[Task, A, A]
) –
pro duplikáty bude Channel
vracet None
.
Další nepříjemností je, že nemůžeme použít rekurzi pro předávání
aktualizované množiny history
. Problém je v tom,
že Channel
napřed vyprodukuje funkce
A => Task[Option[A]]
a teprve poté jsou tyto funkce aplikovány a Task
y spuštěny
pomocí through
. Samotný Channel
tedy žádné
filtrování neprovádí – ten pouze produkuje funkce
A => Task[Option[A]]
. Filtrování
se provádí až pomocí through
– je to tedy funkce
through
, od níž bychom potřebovali, aby předávala množinu
history
mezi funkcemi z Channel
u. Jelikož funkce
through
nic takového neumí, musíme si pomoci sami –
po spuštění Channel
u vytvoříme měnitelnou množinu, kterou
dáme do všech produkovaných funkcí:
def nubChannel[A]: Channel[Task, A, Option[A]] = eval(Task.delay { collection.mutable.Set.empty[A] }).flatMap { history => constant((x: A) => Task.delay { if (history.contains(x)) None else { history += x Some(x) } }).toSource }
Použití nub
:
Process(1, 2, 1, 0, 1, 2).toSource |> nub()
Použití nubChannel
:
(Process(1, 2, 1, 0, 1, 2).toSource through nubChannel) |> process1.stripNone
Na rozdíl od Channel
u může Process1
vrátit
libovolný počet prvků a snáze se tam pracuje se stavem. Na rozdíl
od Process1
může Channel
provádět vedlejší efekty. Pokud nepotřebujeme vedlejší efekty,
dáváme přednost Process1
před Channel
em –
důvodem je právě flexibilita Process1
.
Hezké je, že řadu funkcí, které jsme si zde představili, může
napsat i uživatel knihovny. Například p.drain
můžeme nahradit p.flatMap { x => empty }
nebo p.run
lze nahradit funkcemi p.drain.runLog
. Už jsme také viděli,
že repeatEval(t)
je zkratka za eval(t).repeat
,
nebo ne? Skutečnost bohužel není tak růžová, jak by se mohlo zdát:
val x = eval(Task.delay { throw Cause.Terminated(Cause.End) }).repeat val y = repeatEval(Task.delay { throw Cause.Terminated(Cause.End) })
Na rozdíl od procesu y
proces x
nikdy neskončí.
V případě výjimky Cause.Terminated(Cause.End)
se tedy
repeatEval
nechová jako eval
a repeat
.
V případě jiných výjimek se x
i y
chovají stejně.
Příkladem stejného charakteru je proces p.eval
, který můžeme
zapsat jako p.flatMap(eval)
, ale opět zjistíme, že
se v případě výjimky Cause.Terminated(Cause.End)
obě konstrukce liší:
val x = constant(Task.delay { throw Cause.Terminated(Cause.End) }).flatMap(eval) val y = constant(Task.delay { throw Cause.Terminated(Cause.End) }).eval
V uvedeném příkladu x
opět neskončí a y
ano.
Pravděpodobně z tohoto důvodu bude
výjimka Cause.Terminated(Cause.End)
odstraněna
z další verze knihovny scalaz-stream.
Další plánovaná změna v knihovně scalaz-stream se týká funkcí
++
a onComplete
. Kdybychom místo ++
a onComplete
použili
syntaxi jazyka Scala, pak by se proces p ++ q
zapisoval jako
{ p; q }
a p onComplete q
jako try { p } finally { q }
.
Následující dva procesy se tedy chovají jinak:
val x = (p ++ q) onComplete r val y = p ++ (q onComplete r)
Použijeme-li syntax Scaly, je x ekvivalentní
try { p; q } finally { r }
a y ekvivalentní
p try { q } finally { r }
U x
se r
vykoná vždy, zatímco u y
se r
nevykoná, pokud p
skončí výjimkou.
Některým uživatelům knihovny se toto chování nelíbí a chtěli by, aby
se proces r
v obou případech vykonal vždy.
Příště se naučíme, jak napojit více procesů na jeden proces.
Řeč bude o procesech typu Tee
a Wye
a o funkcích tee
a wye
.
Například nám již známé funkce zip
a zipWith
lze implementovat pomocí typu Tee
a funkce tee
.
Nástroje: Tisk bez diskuse
Tiskni
Sdílej:
pull based přístupu scalaz-stream (následující články řetězu tahají data od předchozích)Ano, scalaz-stream je pull based. Push lze nasimulovat například pomocí fronty:
val p = (Process(1, 2, 3).toSource ++ eval_(Task.delay(println("print")))).repeat val processItem: Sink[Task, Int] = io.stdOutLines.contramap(_.toString) // Vytvorime frontu. val q = async.unboundedQueue[Int] // Prvky produkovane procesem p posleme do fronty. p.to(q.enqueue).run.runAsync(x => println("enqueue end")) // q.dequeue je proces prvku z fronty. // Prvky z fronty posilame do Sinku processItem, ktery prvky zpracovava. // Az jsou prvky zpracovany (nebo po vyhozeni vyjimky) frontu zavreme // -- zavreni fronty zajisti ukonceni procesu, jenz dava prvky do fronty // -- tj. ukonceni p.to(q.enqueue).run.runAsync(x => println("enqueue end")). (q.dequeue.to(processItem).take(2) onComplete eval_(q.close)).run.runKód obvykle několikrát vypíše řetězec print – počet vypsaných řetězců print závisí na tom, jak rychle se prvky produkované
p
posílají do fronty q
. Kromě neomezené fronty async.unboundedQueue
existuje i omezená fronta async.boundedQueue
.
A jaký je vlastně rozdíl mezi pull-based přístupem a obyčejnými streamy?Myslím, že obyčejné streamy jsou také pull based.