Co nás ph naučil 3: actor model

druhém zápisku jsem nastínil technické věci, jako coding conventions a generování projektů. To jsou poměrně nudné a nezajímavé veci. Na řadě je tedy actor model.

Actor model je způsob práce s "konkurentním" (concurrent) během kódu. Mimochodem první paper byl publikovaný v roce 1973, takže se nejedná o nijak novou věc.

Klasické postupy programování končí u procesů, nebo vláken, zamykání, semaforů, mutexů, sdílené paměti a podobně. A jako takové vede ke spoustě záludných chyb a deadlocků a velice obtížně odhalitelných problémů při běhu vašeho kódu. A až všechno odladíte pro 4 jádra, někdo váš kód spustí na 8.

Tohle všechno je rozepsáno v Scalable C: Problem: is my code thread-safe?

Actor model znamená, že je program modelován jako jeden, či více nezávislých aktorů. Ty mezi sebou mohou komunikovat pouze pomocí posílání zpráv. Jeden actor pak může spustit a kontrolovat vícero actorů.

Třída zactor

Tento model, mimochodem, je model jazyka Erlang/Elixir, který se používání na psaní vysoce škálovatelného software.

#include <czmq.h> static void s_actor1 (zsock_t *pipe, void *args) { char *name = strdup (args); zsock_signal (pipe, 0); for (int i = 0; i != 10; i++) zsys_info ("%s:\tcount %d

", name, i); zstr_free (&name); } int main () { zactor_t *actor = zactor_new (s_actor1, "actor1"); while (true) { char *str = zstr_recv (actor); if (str) { puts (str); zstr_free (&str); } else break; } zactor_destroy (&actor); }

pipe

inproc

zsys_signal

Jak ukončit program

#include <czmq.h> static void s_actor1 (zsock_t *pipe, void *args) { char *name = strdup (args); zsock_signal (pipe, 0); for (int i = 0; i != 10; i++) zsys_info ("%s:\tcount %d

", name, i); zstr_send (pipe, "$DONE"); zstr_free (&name); } čte a int main () { .... char *str = zstr_recv (actor); if (str && streq (str, "$DONE")) break;

Jak poslat data do actoru

static void s_actor1 (zsock_t *pipe, void *args) { zsock_signal (pipe, 0); zpoller_t *poller = zpoller_new (pipe, NULL); while (!zsys_interrupted) { void *which = zpoller_wait (poller, -1); if (!which) break; zmsg_t *msg = zmsg_recv (pipe); char *cmd = zmsg_popstr (msg); if (!cmd || streq (cmd, "$TERM")) { zmsg_destroy (&msg); zstr_free (&cmd); break; } else if (streq (cmd, "COUNT")) { char *smax = zmsg_popstr (msg); int max = atoi (smax); for (int i = 0; i != max; i++) zsys_info ("count %d", i); zstr_send (pipe, "$END"); } zmsg_destroy (&msg); zstr_free (&cmd); } zpoller_destroy (&poller); } ... int main () { zactor_t *actor = zactor_new (s_actor1, NULL); zstr_sendx (actor, "COUNT", "42", NULL);

zstr_sendx

zpoller

Ale k čemu je to vlastně dobré?

libmlm.so

malamute

mlm_server

LD_PRELOAD

cwrap

socket_wrapper

#include <malamute.h> ... char *endpoint = "inproc://@/malamute"; zactor_t *server = zactor_new (mlm_server, "Malamute"); if (verbose) zstr_sendx (server, "VERBOSE", NULL); zstr_sendx (server, "BIND", endpoint, NULL); while (true) { char *str = zstr_recv (server); if (str) { puts (str); zstr_free (&str); } else { zsys_info ("Interrupted"); break; } } zactor_destroy (&server);

