Byla vydána nová verze 25.10.31 svobodného multiplatformního video editoru Shotcut (Wikipedie) postaveného nad multimediálním frameworkem MLT. Shotcut je vedle zdrojových kódů k dispozici také ve formátech AppImage, Flatpak a Snap.
O víkendu probíhá konference OpenAlt 2025 (Stream). Na programu je spousta zajímavých přednášek. Pokud jste v Brně, stavte se. Vstup zdarma.
Josef Průša představil novou velkoformátovou uzavřenou CoreXY 3D tiskárnu Prusa CORE One L a nový open source standard chytrých cívek OpenPrintTag i s novou přepracovanou špulkou.
Na GOG.com běží Autumn Sale. Při té příležitosti je zdarma hororová počítačová hra STASIS (ProtonDB: Platinum).
Ubuntu 25.10 má nově balíčky sestavené také pro úroveň mikroarchitektury x86-64-v3 (amd64v3).
Byla vydána verze 1.91.0 programovacího jazyka Rust (Wikipedie). Podrobnosti v poznámkách k vydání. Vyzkoušet Rust lze například na stránce Rust by Example.
Ministerstvo průmyslu a obchodu vyhlásilo druhou veřejnou soutěž v programu TWIST, který podporuje výzkum, vývoj a využití umělé inteligence v podnikání. Firmy mohou získat až 30 milionů korun na jeden projekt zaměřený na nové produkty či inovaci podnikových procesů. Návrhy projektů lze podávat od 31. října do 17. prosince 2025. Celková alokace výzvy činí 800 milionů korun.
Google v srpnu oznámil, že na „certifikovaných“ zařízeních s Androidem omezí instalaci aplikací (včetně „sideloadingu“) tak, že bude vyžadovat, aby aplikace byly podepsány centrálně registrovanými vývojáři s ověřenou identitou. Iniciativa Keep Android Open se to snaží zvrátit. Podepsat lze otevřený dopis adresovaný Googlu nebo petici na Change.org.
Byla vydána nová verze 18 integrovaného vývojového prostředí (IDE) Qt Creator. S podporou Development Containers. Podrobný přehled novinek v changelogu.
Cursor (Wikipedie) od společnosti Anysphere byl vydán ve verzi 2.0. Jedná se o multiplatformní proprietární editor kódů s podporou AI (vibe coding).
Task a ukážeme si,
jak konstruovat procesy s vedlejšími efekty, jenž dělají požadavky typu
Task. Poté se budeme věnovat Sinkům a
Channelům. Na závěr článku si řekneme něco o plánovaných změnách
v knihovně scalaz-stream.Používáme knihovnu scalaz-stream verze 0.7a a předpokládáme následující importy:
import scalaz.concurrent.Task import scalaz.stream._ import Process._
Při programování s knihovnou scalaz-stream je zvykem,
že vedlejší efekty dělají pouze procesy s požadavky typu
Task. Pomocí funkce Task.delay můžeme do
Tasku zabalit každý výraz, například:
val t = Task.delay { println("Hi") }
Jak již víme, Task lze spustit pomocí funkce run:
t.run
Při každém spuštění se zabalený výraz vyhodnotí
a získaná hodnota je vrácena funkcí run.
Při každém spuštění t se tedy vypíše řetězec Hi
a run vrátí hodnotu () typu Unit.
Další funkcí, která vytváří Task, je
Task.now:
val t = Task.now(5 + 3)
Rozdíl mezi funkcemi Task.now a Task.delay spočívá
v tom, že funkci Task.now se argument předává hodnotou (argument
je vyhodnocen před zavoláním funkce) a funkci Task.delay
se předává jménem. V našem případě je tedy výraz 5 + 3
vyhodnocen a teprve poté je zkonstruován Task.
Například následující Task vrací pokaždé stejné náhodné číslo:
import scala.util.Random
val badRandNumGen = Task.now { Random.nextInt }
Náhodné číslo je totiž vygenerováno ještě před zkonstruováním
Tasku, při jeho spouštění se žádné náhodné
číslo negeneruje. Použijeme-li Task.delay
místo Task.now
val randNumGen = Task.delay { Random.nextInt }
dostaneme Task, který při každém spuštění generuje nové
náhodné číslo. Pokud výraz uvnitř Tasku vyhodí výjimku,
je tato výjimka následně vyhozena i funkcí run.
Alternativně můžeme použít funkci attemptRun,
která výjimku nevyhazuje, ale vrací. Výsledek funkce
attemptRun má typ Throwable \/ A,
kde \/ je jako Either.
Například
Task.delay { 5 / 0 }.attemptRun
vrátí hodnotu tvaru -\/(_: ArithmeticException).
Chceme-li do Tasku zabalit asynchronní
výpočet, použijeme funkci Task.async:
import scalaz.\/ def startAsyncComputation[A](cb: (Throwable \/ A) => Unit): Unit = ??? val t = Task.async(startAsyncComputation)
Funkce startAsyncComputation spustí asynchronní výpočet a skončí.
Výsledek asynchronního výpočtu se do t předá pomocí callbacku
cb, který funkce startAsyncComputation
dostala jako argument. Funkce Task.async je obzvláště užitečná
pro propojení s okolním světem. Například díky Task.async snadno
napíšeme funkci, která převede Future na Task:
import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits. global import scala.util.{Failure, Success} import scalaz.{-\/, \/-} def futureToTask[A](fut: => Future[A]): Task[A] = Task.async[A](cb => fut onComplete { case Success(a) => cb(\/-(a)) case Failure(t) => cb(-\/(t)) })
Future se funkci futureToTask předává jménem,
aby se zamezilo jejímu spuštění před spuštěním
Tasku. Zde je příklad použití:
val t = futureToTask(Future(println("Hi")))
t.run
Na závěr našeho stručného představení Tasku poznamenejme,
že Task je monáda, k dispozici jsou tudíž funkce
map, flatMap i for notace.
Tasky
Máme-li Task, můžeme pomocí eval vytvořit proces,
který Task spustí a vyprodukuje hodnotu z Tasku.
Například proces, který vyprodukuje jedno náhodné číslo:
eval(randNumGen)
Název eval je poněkud zavádějící, neboť eval
Tasky nespouští a ani nijak nevyhodnocuje, eval
pouze zabalí Task do procesu. Například
val randomNumbers = eval(randNumGen).repeat
je proces s mnoha náhodnými čísly, nikoliv s jedním
náhodným číslem, které se opakuje. eval do procesu zabalí celý
Task, tj. nedojde k tomu, že by eval napřed spustil
Task randNumGen
a poté do procesu zabalil pouze výsledek Tasku.
eval a repeat se často používají společně,
proto vznikla funkce repeatEval, která nám výše uvedený
proces náhodných čísel dovoluje přepsat na jedno volání:
val randomNumbers = repeatEval(randNumGen)
Pokud nás výsledek Tasku nezajímá, můžeme použít
eval_, který Task spustí, ale jeho výsledek zahodí:
val p = eval_(Task.delay { println("Starting") }) ++ randomNumbers
p je proces produkující čísla, kdybychom však místo
eval_ použili eval, měli bychom proces produkující
AnyVal – první vyprodukovaný prvek
by byl () vrácený println("Starting"),
ten by byl následován náhodnými čísly.
Obecně, chceme-li, aby proces žádné prvky neprodukoval, můžeme použít
funkci drain, ta z procesu, který produkuje prvky typu
A, udělá proces, který neprodukuje žádné prvky
(typ produkovaných prvků je Nothing).
eval_(t) je zkratka za eval(t).drain.
Nyní se pokusíme napsat proces, který přečte a vyprodukuje řádky
ze souboru – něco jako io.linesR. Náš první pokus je:
import scala.io.Source
def badLinesR(file: String): Process[Task, String] =
eval(Task.delay { Source.fromFile(file) }).flatMap { src =>
lazy val lines = src.getLines
repeatEval(Task.delay { lines.next }) ++ eval_(Task.delay { src.close })
}
První problém funkce badLinesR je, že volá
lines.next aniž by testovala lines.hasNext.
Pokud je lines.haxNext false, není chování
lines.next definováno. Otázkou je, co dělat, když je
lines.hasNext false?
Pro tyto případy existuje speciální výjimka
Cause.Terminated(Cause.End),
kterou můžeme vyhodit. Tato výjimka zajistí, že repeatEval skončí
a pokračuje se další částí procesu – v našem případě
eval_(Task.delay { src.close }).
Další problém funkce badLinesR je, že nezaručuje uzavření souboru.
Tento problém nastane, pokud proces předčasně přerušíme, například pomocí
take:
// Simuluje cteni souboru se dvema radky.
val p =
eval(Task.delay { println("Line 1") }) ++
eval(Task.delay { println("Line 2") }) ++
eval_(Task.delay { println("Closing") })
p.take(1).run.run // Closing se nevypise.
p.take(2).run.run // Closing se nevypise.
p.take(3).run.run
Problém s neuzavřením souboru rovněž nastane, když Task vyhodí
jinou výjimku než Cause.Terminated(Cause.End):
val p =
eval(Task.delay { 5 / 0 }) ++
eval_(Task.delay { println("Closing") })
p.run.run // Closing se nevypise, run vyhodi vyjimku.
S výjimkou Cause.Terminated(Cause.End) tento problém nenastane
a soubor se uzavře:
val p =
eval(Task.delay { throw Cause.Terminated(Cause.End) }) ++
eval_(Task.delay { println("Closing") })
p.run.run // Closing se vypise, run zadnou vyjimku nevyhodi.
Obecně, máme-li proces tvaru p ++ q a skončí-li p
výjimkou jinou než Cause.Terminated(Cause.End), proces
q se vůbec nevykoná.
Pokud se q má vykonat v každém případě bez ohledu na to,
jak skončil proces p, je třeba použít onComplete
místo ++:
val p =
eval(Task.delay { 5 / 0 }) onComplete
eval_(Task.delay { println("Closing") })
p.run.run // Closing se vypise, run vyhodi vyjimku.
Nyní již známe vše, abychom napsali funkci linesR:
def linesR(file: String): Process[Task, String] =
eval(Task.delay { Source.fromFile(file) }).flatMap { src =>
lazy val lines = src.getLines
repeatEval(Task.delay {
if (lines.hasNext) lines.next
else throw Cause.Terminated(Cause.End)
}) onComplete eval_(Task.delay { src.close })
}
V knihovně scalaz-stream je funkce io.resource, která zobecňuje
naši funkci linesR pro jiné zdroje. Ostatní funkce v modulu
io (včetně funkce io.linesR) využívají tuto obecnou
funkci io.resource.
Jak již víme, p onComplete q zajistí, že se q
spustí bez ohledu na to, jak skončilo p. Kromě toho
onComplete zajistí, že proces q není možné přerušit:
val p = emit(1)
val q =
eval_(Task.delay { println("One") }) ++
emit(2) ++
eval_(Task.delay { println("Two") })
val r = p onComplete q
r.take(2).run.run // Vypise One i Two.
Ačkoliv z procesu r načítáme pouze dva prvky, proces
q doběhne celý – vypíše se i řetězec Two.
Pokud by q byl proces, který nikdy neskončí (například
emit(0).repeat), tak by ani proces r.take(2) nikdy
neskončil. p onComplete q se nejčastěji používá v situacích,
kdy proces q má uklidit po procesu p –
q nejde přerušit, aby bylo zaručeno, že se úklid provede celý.
Sink a Channel
V této části se budeme zabývat tím, jak
prvky produkované procesem někam uložit nebo někam zapsat.
Pokud proces produkuje málo prvků a stačí, když se zápis provede
až celý proces skončí, můžeme použít runLog. Proces spustíme
pomocí runLog.run a poté, co skončí, vyprodukované prvky zapíšeme.
Situace se ovšem zkomplikuje, když proces produkuje (neomezeně) mnoho
prvků a my je chceme zapisovat průběžně. V některých případech (například
logování) lze zápisy umístit do map nebo do nějakého jiného
procesu typu Process1. Pokud však zápisy prvků používají
Task (například proto, že jsou asynchronní, nebo proto,
že je dobrým zvykem vedlejší efekty umístit do Tasků),
je třeba použít i proces
s typem požadavků Task, což Process1 není.
Naštěstí má výše nastíněný problém velmi jednoduché řešení. Tím řešením je proces funkcí. Pojďme se podívat na příklad. Mějme proces řádků ze souboru
val p: Process[Task, String] = io.linesR("file.txt")
a předpokládejme, že chceme tyto řádky zapsat na standardní výstup. K zápisu na standardní výstup použijeme proces funkcí, které zapisují na standardní výstup:
val stdOut: Process[Task, String => Unit] = constant(println)
constant(a) je zkratka za emit(a).repeat.
Pro každý řádek z p zavoláme funkci
ze stdOut – k tomu lze použít zip
a map
p.zip(stdOut).map { case (line, f) => f(line) }.run.run
nebo zipWith:
p.zipWith(stdOut) { (line, f) => f(line) }.run.run
To funguje, ale vedlejší efekt f(line)
se stále provádí v procesu typu Process1.
Navíc opět nepoužíváme Task – funkce, které provádí
vedlejší efekty, mají typ String => Unit.
Změníme tedy typ funkcí ze String => Unit
na String => Task[Unit]:
val stdOut: Process[Task, String => Task[Unit]] =
constant(s => Task.delay { println(s) })
Když spustíme náš kód
p.zipWith(stdOut) { (line, f) => f(line) }.run.run
zjistíme, že se nic nevypisuje. Problém je v tom, že proces
p.zipWith(stdOut) { (line, f) => f(line) }
produkuje Tasky, které nespouští – typ procesu je
Process[Task, Task[Unit]].
Aby se něco vypsalo, potřebujeme produkované Tasky spustit, což
provede funkce eval:
p.zipWith(stdOut) { (line, f) => f(line) }.eval.run.run
eval tedy z procesu typu Process[Task, Task[A]]
udělá proces typu Process[Task, A]. stdOut
je Sink. Pro typ Process[Task, A => Task[Unit]]
existuje typový alias Sink[Task, A].
Data do Sinku posíláme funkcí to:
p.to(stdOut).run.run
Funkce to je implementována pomocí zipWith
a eval – přesně jako v našem příkladu.
S pomocí funkce io.resource
snadno naimplementujeme Sink, který zapisuje řádky do souboru:
import java.io.FileWriter
def linesW(file: String): Sink[Task, String] =
io.resource(
Task.delay { new FileWriter(file) }
)(
fw => Task.delay { fw.close() }
)(
fw => Task.now(line => Task.delay { fw.write(line); fw.write("\n") })
)
to nahrazuje produkované prvky hodnotou ().
Pokud chceme prvky zachovat, musíme použít funkci observe –
ta se chová jako to, ale nenahrazuje prvky hodnotou
(). S pomocí observe můžeme poslat prvky do více
Sinků:
p.observe(stdOut).to(linesW("file2.txt")).run.run
Do Sinku stdOut můžeme posílat pouze řetězce,
pokud bychom tam chtěli poslat několik náhodných
čísel, museli bychom čísla převést na řetězce:
randomNumbers.map(_.toString).observe(stdOut).take(10). runLog.run
Stinnou stránkou tohoto řešení je, že proces už neprodukuje
čísla, ale řetězce. V podobných případech se hodí funkce
contramap, která změní změní typ vstupních prvků
Sinku:
randomNumbers.observe(stdOut.contramap(_.toString)).take( 10).runLog.run
contramap lze snadno implementovat pomocí
map – v našem případě by map
transformoval funkce typu String => Task[Unit] na funkce typu
Int => Task[Unit].
Sink[Task, A] je proces, který produkuje funkce
A => Task[Unit].
Nahradíme-li Unit typovým parametrem B, dostaneme
proces, kterému se říká Channel. Channel[Task, A, B]
je tedy alias pro Process[Task, A => Task[B]]
a Sink[Task, A] můžeme chápat jako alias
pro Channel[Task, A, Unit]. Channel[Task, A, B]
používáme pro transformaci prvků typu A na prvky typu
B. Do Channelu se prvky posílají pomocí funkce
through, typ vstupních prvků lze změnit pomocí funkce
contramap.
Připomeňme si Process1 nub pro odstranění duplikátů
z minulého dílu
def nub[A](history: Set[A] = Set.empty[A]): Process1[A, A] =
receive1 { x =>
if (history.contains(x)) nub(history)
else emit(x) ++ nub(history + x)
}
a zkusme napsat Channel, který dělá totéž. První nepříjemnost je,
že Channel transformuje jeden prvek vstupu
na jeden prvek výstupu, jenže nub v případě duplikátů
žádné prvky neprodukuje. Toto omezení Channelů obejdeme tak,
že použijeme Channel typu
Channel[Task, A, Option[A]]
(místo Channel[Task, A, A]) –
pro duplikáty bude Channel vracet None.
Další nepříjemností je, že nemůžeme použít rekurzi pro předávání
aktualizované množiny history. Problém je v tom,
že Channel napřed vyprodukuje funkce
A => Task[Option[A]]
a teprve poté jsou tyto funkce aplikovány a Tasky spuštěny
pomocí through. Samotný Channel tedy žádné
filtrování neprovádí – ten pouze produkuje funkce
A => Task[Option[A]]. Filtrování
se provádí až pomocí through – je to tedy funkce
through, od níž bychom potřebovali, aby předávala množinu
history mezi funkcemi z Channelu. Jelikož funkce
through nic takového neumí, musíme si pomoci sami –
po spuštění Channelu vytvoříme měnitelnou množinu, kterou
dáme do všech produkovaných funkcí:
def nubChannel[A]: Channel[Task, A, Option[A]] =
eval(Task.delay {
collection.mutable.Set.empty[A]
}).flatMap { history =>
constant((x: A) => Task.delay {
if (history.contains(x)) None
else {
history += x
Some(x)
}
}).toSource
}
Použití nub:
Process(1, 2, 1, 0, 1, 2).toSource |> nub()
Použití nubChannel:
(Process(1, 2, 1, 0, 1, 2).toSource through nubChannel) |> process1.stripNone
Na rozdíl od Channelu může Process1 vrátit
libovolný počet prvků a snáze se tam pracuje se stavem. Na rozdíl
od Process1 může Channel
provádět vedlejší efekty. Pokud nepotřebujeme vedlejší efekty,
dáváme přednost Process1 před Channelem –
důvodem je právě flexibilita Process1.
Hezké je, že řadu funkcí, které jsme si zde představili, může
napsat i uživatel knihovny. Například p.drain
můžeme nahradit p.flatMap { x => empty } nebo p.run
lze nahradit funkcemi p.drain.runLog. Už jsme také viděli,
že repeatEval(t) je zkratka za eval(t).repeat,
nebo ne? Skutečnost bohužel není tak růžová, jak by se mohlo zdát:
val x = eval(Task.delay { throw Cause.Terminated(Cause.End) }).repeat
val y = repeatEval(Task.delay { throw Cause.Terminated(Cause.End) })
Na rozdíl od procesu y proces x nikdy neskončí.
V případě výjimky Cause.Terminated(Cause.End) se tedy
repeatEval nechová jako eval a repeat.
V případě jiných výjimek se x i y chovají stejně.
Příkladem stejného charakteru je proces p.eval, který můžeme
zapsat jako p.flatMap(eval), ale opět zjistíme, že
se v případě výjimky Cause.Terminated(Cause.End)
obě konstrukce liší:
val x = constant(Task.delay { throw Cause.Terminated(Cause.End) }).flatMap(eval)
val y = constant(Task.delay { throw Cause.Terminated(Cause.End) }).eval
V uvedeném příkladu x opět neskončí a y ano.
Pravděpodobně z tohoto důvodu bude
výjimka Cause.Terminated(Cause.End) odstraněna
z další verze knihovny scalaz-stream.
Další plánovaná změna v knihovně scalaz-stream se týká funkcí
++ a onComplete. Kdybychom místo ++
a onComplete použili
syntaxi jazyka Scala, pak by se proces p ++ q zapisoval jako
{ p; q }
a p onComplete q jako try { p } finally { q }.
Následující dva procesy se tedy chovají jinak:
val x = (p ++ q) onComplete r val y = p ++ (q onComplete r)
Použijeme-li syntax Scaly, je x ekvivalentní
try { p; q } finally { r }
a y ekvivalentní
p
try { q } finally { r }
U x se r vykoná vždy, zatímco u y
se r nevykoná, pokud p skončí výjimkou.
Některým uživatelům knihovny se toto chování nelíbí a chtěli by, aby
se proces r v obou případech vykonal vždy.
Příště se naučíme, jak napojit více procesů na jeden proces.
Řeč bude o procesech typu Tee a Wye
a o funkcích tee a wye.
Například nám již známé funkce zip a zipWith
lze implementovat pomocí typu Tee a funkce tee.
Nástroje: Tisk bez diskuse
Tiskni
Sdílej:
pull based přístupu scalaz-stream (následující články řetězu tahají data od předchozích)Ano, scalaz-stream je pull based. Push lze nasimulovat například pomocí fronty:
val p = (Process(1, 2, 3).toSource ++ eval_(Task.delay(println("print")))).repeat
val processItem: Sink[Task, Int] = io.stdOutLines.contramap(_.toString)
// Vytvorime frontu.
val q = async.unboundedQueue[Int]
// Prvky produkovane procesem p posleme do fronty.
p.to(q.enqueue).run.runAsync(x => println("enqueue end"))
// q.dequeue je proces prvku z fronty.
// Prvky z fronty posilame do Sinku processItem, ktery prvky zpracovava.
// Az jsou prvky zpracovany (nebo po vyhozeni vyjimky) frontu zavreme
// -- zavreni fronty zajisti ukonceni procesu, jenz dava prvky do fronty
// -- tj. ukonceni p.to(q.enqueue).run.runAsync(x => println("enqueue end")).
(q.dequeue.to(processItem).take(2) onComplete eval_(q.close)).run.run
Kód obvykle několikrát vypíše řetězec print – počet vypsaných řetězců print závisí na tom, jak rychle se prvky produkované p posílají do fronty q. Kromě neomezené fronty async.unboundedQueue existuje i omezená fronta async.boundedQueue.
A jaký je vlastně rozdíl mezi pull-based přístupem a obyčejnými streamy?Myslím, že obyčejné streamy jsou také pull based.