Portál AbcLinuxu, 15. prosinec 2017 03:54

Funkcionální programování ve Scale 2.

20. 4. 2015 | Redakce
Články - Funkcionální programování ve Scale 2.  

Proudy dat jsou v knihovně scalaz-stream reprezentovány jako procesy. V tomto článku si řekneme, co je to proces, ukážeme si, jak se procesy konstruují a jak je lze propojit pomocí rour.

scalaz-stream 2: Transformace proudů

V článku budeme pracovat s knihovnou scalaz-stream verze 0.7a, začneme jejím stažením:

git clone https://github.com/scalaz/scalaz-stream
cd scalaz-stream
git checkout series/0.7a

Knihovnu budeme používat z REPLu:

./sbt console

Příklady v článku předpokládají následující importy:

import scalaz.concurrent.Task
import scalaz.stream

Konstrukce procesů

Knihovna scalaz-stream reprezentuje proudy dat jako procesy, které provádí určité požadavky a na základě jejich výsledků produkují prvky. Tyto procesy mají typ Process[F[_], A], kde F je typ požadavků a A je typ produkovaných prvků. Například proud řádků ze souboru je reprezentován procesem

io.linesR("file.txt")

typu Process[Task, String]. Tento proces dělá požadavky typu Task a produkuje řádky typu String. Task je typ požadavků, jenž zahrnují vedlejší efekty, zejména (asynchronní) vstup a výstup. Typ Process[Task, String] nám tedy říká, že proces bude dělat vstup a výstup a produkovat řetězce.

Jak jsme viděli minule, s procesy lze v určitých situacích pracovat jako se sekvencemi. Například prázdnou sekvenci resp. sekvenci s prvky 1, 2, 4, 8 zkonstruujeme takto

Seq.empty // Alternativne: Seq()
Seq(1, 2, 4, 8)

a analogicky zkonstruujeme proces, jenž neprodukuje žádné prvky resp. produkuje prvky 1, 2, 4, 8

Process.empty // Alternativne: Process()
Process(1, 2, 4, 8)

Tyto dva procesy mají typ Process[Nothing, Nothing] resp. Process[Nothing, Int]. Z typů je patrné, že první proces neprodukuje žádné prvky a druhý proces produkuje celá čísla. Dále je vidět, že oba procesy nedělají žádné požadavky.

Typ procesů je kovariantní v obou parametrech, tudíž máme-li

val p: Process[Nothing, Int] = Process(1)

lze provést následující přiřazení

val q: Process[Task, Int] = p
val r: Process[Nothing, Any] = p
val s: Process[Task, Any] = p

Chceme-li změnit typ požadavku z Nothing na Task, lze použít funkci toSource.

V modulu Process

import Process._

se nachází celá řada funkcí pro konstrukci procesů. Oblíbené jsou například funkce emit a emitAll. První vytváří proces, jenž produkuje právě jeden prvek, druhá vytváří proces, jenž produkuje prvky z dané sekvence. Tedy

emit(1)

se chová jako Process(1) a

emitAll(Seq(1, 2, 4))

se chová jako Process(Seq(1, 2, 4): _*). Předaná sekvence může být i nekonečná:

emitAll(Stream.continually(1))

Procesy, jenž produkují nekonečně mnoho prvků, můžeme vytvářet i bez nekonečných sekvencí. Například

Process(1, 2, 3).repeat

se chová jako Process(1, 2, 3) ++ Process(1, 2, 3) ++ Process(1, 2, 3) ++ …, což můžeme zapsat také pomocí rekurze

def repeated: Process[Nothing, Int] =
    Process(1, 2, 3) ++ repeated

Vyhodnocení výrazu repeated skončí díky tomu, že se druhý argument ++ předává jménem – tedy nevyhodnocuje se před zavoláním funkce ++. Při vytváření složitějších procesů využijeme funkce s parametry:

def fib(a: Int = 0, b: Int = 1): Process[Nothing, Int] =
    emit(a) ++ fib(b, a + b)

Volání fib() vytvoří proces Fibonacciho čísel, stav procesu se předává pomocí parametrů funkce.

Spouštění procesů

Chceme-li, aby program něco dělal, musíme ho spustit. Stejně to funguje i s procesy – aby proces něco dělal, musíme ho spustit. V minulém díle jsme procesy spouštěli dvojím voláním funkce run:

io.linesR("file.txt").run.run

Přirozenou otázkou je, proč se run používá dvakrát. io.linesR("file.txt") má typ Process[Task, String] a první volání run z tohoto procesu vytvoří hodnotu typu Task[Unit]. První volání vlastně nahradí proces, který dělal požadavky typu Task[_], jedním velkým požadavkem typu Task[Unit]. A jak už asi tušíte, aby se něco stalo, i tento velký požadavek typu Task[Unit] je třeba spustit, což se provede druhým voláním run. Výraz

io.linesR("file.txt").run.run

přečte řádky souboru file.txt a vrátí Unit. Pokud bychom chtěli vrátit přečtené řádky, musíme místo prvního run použít runLog:

io.linesR("file.txt").runLog.run

runLog uloží prvky, jenž proces vyprodukuje. V našem případě runLog vrátí hodnotu typu Task[IndexedSeq[String]], jejímž spuštěním dostaneme sekvenci řádků souboru. Stejně jako run ani runLog neprovádí žádné vedlejší efekty, jedná se pouze o transformaci procesu s požadavky typu F[_] na hodnotu typu F[Unit] v případě run resp. F[A] v případě runLog (A je typ produkovaných prvků). Je to až druhý run na hodnotě typu Task[Unit] resp. Task[A], jenž provede vedlejší efekty.

run a runLog jde použít pouze na procesy, kde typ požadavku splňuje určité vlastnosti (přesněji: typ požadavků musí mít instanci Monad a Catchable; Process pak můžeme chápat jako monad transformer). Například proces, jenž neprovádí žádné požadavky, má typ požadavku Nothing, který tyto vlastnosti nesplňuje, proto nelze psát

emit(1).runLog

Pro procesy, jenž neprovádí žádné požadavky, máme typový alias Process0

emit(1): Process0[Int]

a produkované prvky lze získat pomocí funkce toList

emit(1).toList

Alternativně můžeme využít kovarianci procesů a proces přetypovat, aby měl požadavky typu Task:

emit(1).toSource.runLog.run

Transformace proudů

Síla knihovny scalaz-stream spočívá v tom, že složitou funkcionalitu lze poskládat z malých a znovupoužitelných dílků. Například máme-li proces, jenž produkuje čísla 0, 1, 2, -3

Process(0, 1, 2, -3)

a proces, jenž filtruje kladná čísla

process1.filter[Int](_ > 0)

pak tyto procesy můžeme propojit pomocí funkce |>

Process(0, 1, 2, -3) |> process1.filter(_ > 0)

čímž získáme proces, jenž produkuje čísla 1, 2. |> funguje jako roura z shellu, zápis p |> q napojí výstup procesu p na vstup procesu q. process1.filter[Int](_ > 0) má typ Process1[Int, Int]. Typ Process1[A, B] značí procesy, které dělají požadavky na prvky typu A a produkují prvky typu B. Process1[A, B] je typový alias pro Process[Env[A,Any]#Is, B] – typ požadavků tedy není A, ale Env[A,Any]#Is. Alternativně lze místo |> použít funkci pipe, tj. p.pipe(q) je jiný zápis p |> q.

Modul process1 je plný procesů typu Process1, které jde použít pro transformaci proudů. Některé z těchto transformací jako například filter nebo collect se používají tak často, že si vysloužili zkratku, například místo

Process(0, 1, 2, -3) |> process1.filter(_ > 0)

můžeme psát

Process(0, 1, 2, -3).filter(_ > 0)

Jelikož Process0[B] je podtyp Process1[A, B], lze proces typu Process0[B] použít všude, kde je očekáván proces typu Process1[A, B]:

io.linesR("file.txt") |> emit(1)

Roura funguje na principu pull. Máme-li p |> q, roura interpretuje q a v okamžiku, kdy q požaduje prvek z p, roura začne interpretovat p, dokud p nevyprodukuje prvek nebo neskončí, pak pokračuje v interpretaci q. Jelikož proces emit(1) žádný prvek nepožaduje, není důvod začít interpretovat proces io.linesR("file.txt"), tedy k otevření souboru file.txt nedojde.

Hodnoty typu Process1 jde kombinovat standardním způsobem. Například vezmeme-li

val p: Process1[Int, Int] = await1
val q: Process1[Int, Int] = process1.skip
pak
(p ++ q).repeat

je proces, jenž vypustí každý druhý prvek. process1.skip načte jeden prvek a skončí, žádný prvek není vyprodukován. await1 načte jeden prvek, vyprodukuje ho a skončí. Zobecněním await1 je proces process1.take(n), který načte n prvků a skončí, přičemž každý prvek je ihned po načtení vyprodukován. await1 je tedy totéž, co process1.take(1). Například chceme-li aplikovat filtr pouze na prvních 5 prvků, můžeme psát

process1.take[Int](5).filter(_ > 0) ++ process1.id

kde process1.id je identita – proces, který načítá prvky a beze změn je produkuje.

Psaní vlastních transformací

Důležitými stavebními kameny při psaní procesů typu Process1 jsou funkce await1 a flatMap. flatMap, známý ze sekvencí, zamění každý prvek procesu za proces. Process(1, 2, 3).map(f) se chová jako

emit(f(1)) ++ emit(f(2)) ++ emit(f(3))

a Process(1, 2, 3).flatMap(f) se chová jako

f(1) ++ f(2) ++ f(3)

Mějme predikát positive na celých číslech:

def positive(i: Int) = i > 0

Chceme-li definovat process1.filter(positive), začneme procesem, který přefiltruje jeden prvek a pak skončí:

val filterOne = await1[Int].flatMap { x =>
  if (positive(x)) emit(x)
  else empty
}

Je-li x kladné, je vyprodukováno (emit(x)), v opačném případě není vyprodukováno nic (empty). S pomocí filterOne definujeme process1.filter(positive) jako

filterOne.repeat

Ekvivalentem zápisu await1.flatMap(f) je zápis receive1(f). Proces receive1(f) načte prvek x a pokračuje procesem f(x). Pokud se žádný prvek načíst nepodaří, pokračuje procesem empty.

Procesy typu Process1 mohou mít stav, můžeme tak napsat například proces, který odstraní duplikáty:

def nub[A](history: Set[A] = Set.empty[A]): Process1[A, A] =
  receive1 { x =>
    if (history.contains(x)) nub(history)
    else emit(x) ++ nub(history + x)
  }

Prvky, které se objevily, ukládáme do množiny history. Pokud se prvek neobjevil, tak ho vyprodukujeme pomocí emit a pokračujeme s množinou rozšířenou o x (rekurzivní volání nub(history + x)).

V některých situacích receive1 nestačí. Například chceme-li vytvořit Process1, jenž sečte všechna čísla a poté součet vyprodukuje:

def badSum(s: Int = 0): Process1[Int, Int] = receive1 { x =>
  badSum(s + x)
}

badSum součet nevyprodukuje. Součet bychom chtěli totiž vyprodukovat až poté, co jsou všechna čísla sečtena, jenže jak to poznat? Jak poznat, které číslo je poslední? Pro tyto případy je k dispozici funkce receive1Or:

def sum(s: Int = 0): Process1[Int, Int] = receive1Or[Int, Int](emit(s)) { x =>
  sum(s + x)
}

První parametr funkce receive1Or je proces, který se použije v případě, že už byly načteny všechny prvky – v našem případě emit(s). Funkce receive1(f) je tedy zkratkou za receive1Or(empty)(f).

Závěr

Ukázali jsme si, jak lze dva procesy propojit rourou. Roura (pipe) napojí jeden proces na proces typu Process1. Příště se budeme zabývat procesy s vedlejšími efekty a ukážeme si propojení tvaru T (tee), které nám dovolí napojit dva procesy na proces typu Tee.

Spinoco

Související články

Funkcionální programování ve Scale 1.

Odkazy a zdroje

Za článek děkujeme společnosti Spinoco, autorem je Radek Miček.

Další články z této rubriky

Syncthing
Twibright Registrator: Instalace, odinstalace, test, základní použití
Twibright Registrator: fotografie v šeru bez stativu 2
Twibright Registrator: fotografie v šeru bez stativu 1
Filtrujeme čtivé texty z Projektu Gutenberg 9

Diskuse k tomuto článku

20.4.2015 19:09 ava
Rozbalit Rozbalit vše Re: Funkcionální programování ve Scale 2.
Odpovědět | Sbalit | Link | Blokovat | Admin
Tak už se těším až budu ve středu u rychlého internetu a článek si pořádně projedu..
22.4.2015 14:34 ava
Rozbalit Rozbalit vše Re: Funkcionální programování ve Scale 2.
Odpovědět | Sbalit | Link | Blokovat | Admin
Uff, pěkný článek, dal mi zabrat. Naštěstí jsem si včera přečetl ty doporučované stackless scala with free monads (což pro mě byla taky pořádná fuška), díky tomu jsem aspoň trochu pochopil, co vlastně znamená ten typový parametr Task v Process, a jak je poskládané vykonávání procesu. Není mi moc jasný ten typ Env, kterým se Process parametrizuje jinde, ale předpokládám, že k tomu se asi blíže dostaneme příště.

Zatím se mi z článků scalaz-stream líbí, přijde mi, že se v ní lépe skládají vlastní operátory (procesy) než v RxJava (RxScala). Taky je asi lépe odděleno sestavení procesu a jeho vykonání, líbí se mi, že si můžu vybrat třeba mezi run a runLog, a určitě by bylo možné s procesem provést i jiné věci (např. serializovat?)

Je to tedy série článků na hranici (a z půlky za hranicí :) toho, co má hlava dokáže pobrat, ale je to zajímavé. Ještě jednou díky. Doufám, že další bude nejdřív příští týden, ať taky v práci udělám něco pro šéfa :)
23.4.2015 09:51 Radek Miček | skóre: 23 | blog: radekm_blog
Rozbalit Rozbalit vše Re: Funkcionální programování ve Scale 2.
určitě by bylo možné s procesem provést i jiné věci (např. serializovat?)
Serializace asi nebude úplně jednoduchá, neboť procesy obsahují funkce (nebo dokonce uzávěry). Například serializovat následující proces
process1.filter(f(criteria))
znamená mj. serializovat i funkci f a hodnotu criteria.
Není mi moc jasný ten typ Env, kterým se Process parametrizuje jinde, ale předpokládám, že k tomu se asi blíže dostaneme příště.
Env si můžeme vysvětlit, ale uděláme to spíše až přespříště. Příště se podíváme, jak se používá Task, tee, Sink a Channel.

ISSN 1214-1267, (c) 1999-2007 Stickfish s.r.o.