Зарегистрируйтесь для доступа к 15+ бесплатным курсам по программированию с тренажером

Эрланг на практике. Работа с потоками на низком уровне. Эрланг на практике

Эффективная поддержка многопоточности -- одна из главных фишек эрланг. И она же является базой для других фишек: масштабируемости, распределенности, устойчивости к ошибкам.

Легковесные потоки

Эрланг имеет собственную реализацию многопоточности на уровне виртуальной машины. Конечно, это работает поверх процессов операционной системы. Но поверх одного такого процесса могут работать сотни и тысячи потоков эрланг. И виртуальная машина управляет ими независимо от операционной системы.

В разных операционных системах есть разные сущности: процессы, потоки, нити и т.д. Они отличаются реализацией и возможностями. Но в эрланг такая сущность только одна. Я буду называть ее "поток". Но если где-то упомяну "процесс", то знайте, что это одно и то же :)

Особенность потоков эрланг в том, что они легковесные. Это значит, что они:

  • быстро стартуют и завершаются;
  • быстро переключаются;
  • потребляют мало памяти.

Новый поток создается и стартует за 3-5 микросекунд. На старте он получает 2,5Кб памяти (стек, куча и служебная информация о потоке).

Виртуальная машина имеет лимит на число потоков, по умолчанию это 262,144 (218). Лимит можно увеличить до 134,217,727 (227). Но вряд ли кому-то захочется создать 134 млн потоков на одной ноде, тем более, что для этого понадобится 336Гб оперативной памяти :)

Легковесность потоков качественно меняет архитектуру. Потоки не являются ограниченным ресурсом, как в большинстве других языков. Можно легко выделить отдельный поток на обслуживание каждого клиента, и позволить клиенту занимать этот поток сколько угодно долго.

Поэтому типичная область применения эрланг -- это сервера, которые должны обслуживать большое количество клиентов. Особенно если соединения с клиентами являются долгоживущими.

spawn

Для создания нового потока используется функция spawn.

Она имеет несколько вариантов:

  • spawn(Fun) -> pid()
  • spawn(Node, Fun) -> pid()
  • spawn(Module, Function, Args) -> pid()
  • spawn(Node, Module, Function, Args) -> pid()

Так или иначе в аргументах spawn указывается точка входа для нового потока, с которой он начинает выполнятся. Далее поток либо выполнит весь код, и завершится; либо попадет в бесконечную рекурсию и будет выполнятся бесконечно; либо завершится аварийно из-за ошибки.

Функция spawn возвращает Pid -- идентификатор процесса (process identifier). Зная Pid, можно посылать процессу сообщения и получать информацию о нем.

Давайте попробуем запустить несколько потоков:

4> G = fun(X) -> timer:sleep(10), io:format("~p~n", [X]) end.
 #Fun<erl_eval.6.13229925>
5> [spawn(fun() -> G(X) end) || X <- lists:seq(1,10)].
[<0.273.0>,<0.274.0>,<0.275.0>,<0.276.0>,<0.277.0>,
<0.278.0>,<0.279.0>,<0.280.0>,<0.281.0>,<0.282.0>]
2
1
4
3
5
8
7
6
10
9

Создаем анонимную функцию, в которой после паузы в 10 миллисекунд выводим на консоль ее аргумент. С помощью генератора списков создаем 10 потоков. Каждый из них выполняет эту функцию независимо от остальных. Видим, что очередность вывода чисел на консоль не совпадает с очередностью создания потоков. Вывод будет случайным, в зависимости от того, как планировщик раздавал управление потокам.

Отправка сообщений

Потоки эрланг имеют каждый свою изолированную область памяти (стек и кучу), не читают и не пишут в чужую память, а обмениваются сообщениями.

Для этого используется оператор ! (bang):

Pid ! Message

Pid должен быть идентификатором процесса, а Message -- любая структура данных или атомарное значение.

Отправка сообщения выполняется асинхронно. То есть, поток не блокируется и не ждет ответа, а продолжает выполнение. При попытке отправить сообщение несуществующему процессу, сообщение просто игнорируется. Ошибки при этом не происходит.

Поток может отправить сообщение самому себе.

10> self() ! hello.
hello
11> flush().
Shell got hello
ok

Почтовый ящик

У каждого потока есть специальная область памяти -- почтовый ящик (mailbox), куда копируются адресованные ему сообщения. Там сообщения накапливаются в очереди, в порядке их появления.

Чтобы прочитать сообщения в почтовом ящике, нужно использовать конструкцию receive.

receive
    Pattern1 [when Guard1] ->
        Expressions1;
    Pattern2 [when Guard2] ->
        Expressions2;
    ...
end

Синтаксис аналогичен конструкции case.

В этом примере поток отправляет сообщение самому себе, и получает его с помощью receive:

12> self() ! "hello again".
"hello again"
13> receive
13> Msg -> io:format("got message:~p~n", [Msg])
13> end.
got message:"hello again"
ok

При вызове receive поток берет сообщение из очереди и сопоставляет его с имеющимися шаблонами. Если находится подходящий шаблон, то выполняется соответствующий блок кода, и затем код после receive. А сообщение удаляется из почтового ящика. Другие сообщения в почтовом ящике остаются до следующего вызова receive. Если сообщение не совпало ни с одним шаблоном, то оно остается в очереди, и для проверки берется следующее.

Чтобы четко разобраться, как работает receive в разных ситуациях, сделаем тест. Будем отправлять разные сообщения, и с помощью receive будем выбирать только те, которые соответствуют шаблону {msg, Any}.

-module(mb).
-export([test/0]).

test() ->
    test_messages("test1, ящик пустой", []),

    test_messages("test2, одно сообщение, матчится",
                  [{msg, 1}]),

    test_messages("test3, одно сообщение, не матчится",
                  [msg1]),

    test_messages("test4, 3 сообщения, все матчатся",
                  [{msg, 1}, {msg, 2}, {msg, 3}]),

    test_messages("test5, 3 сообщения, все не матчатся",
                  [msg1, msg2, msg3]),

    test_messages("test6, 4 сообщения, часть матчится, часть не матчится",
                  [{msg, 1}, msg2, {msg, 3}, msg4]),

    test_messages("test7, 4 сообщения, часть матчится, часть не матчится",
                  [msg1, {msg, 2}, msg3, {msg, 4}]),

    ok.

test_messages(TestName, Messages) ->
    io:format("~n### ~ts~ntest_messages: ~p~n", [TestName, Messages]),
    flush(),
    [self() ! Msg || Msg <- Messages],

    io:format("call receive~n"),
    Res = receive
              {msg, M} -> {msg, M}
          after 100 -> timeout
          end,
    io:format("after receive got: ~p~n", [Res]),
    [{messages, Left}] = process_info(self(), [messages]),
    io:format("left in mailbox: ~p~n", [Left]),
    ok.

flush() ->
    receive
        _ -> flush()
    after 100 -> ok
    end.

Функция flush/0 очищает почтовый ящик перед каждым тестом, для чистоты эксперимента.

По результатам теста мы видим, что receive выбирает одно сообщение, совпадающее с шаблоном, если такое есть. Если нет, то поток блокируется.

В тесте этого не видно, но поток блокируется либо на указанное время, либо до получения подходящего сообщения.

Почтовый ящик -- самая частая причина утечки памяти в эрланг. Если receive не вызывается, или вызывается, но обрабатывает не все сообщения, то утечка памяти неизбежна. Кроме того, по мере роста очереди, каждый проход по ней становится все медленнее и медленнее.

Все эрланг программисты об этом знают. И если обнаруживается утечка памяти, то диагностика проблемы начинается с очередей в почтовых ящиках.

Хорошая практика -- делать в receive последний шаблон такой, чтобы он совпадал с любым сообщением. И в этом случае писать в лог о том, что поток получил сообщение, которое он не умеет обрабатывать.

receive
    {do_something, Data} -> do_something(Data);
    Any -> lager:warning("Got unknown message ~p", [Any])
end,

Это уменьшит вероятность проблем с почтовым ящиком. Но не гарантирует полностью их отсутствие. Возможно ситуация, когда сообщения поступают быстрее, чем поток успевает их обрабатывать. Увы, но универсального рецепта для таких ситуаций нет :)

timeout

В тесте выше уже использовался таймаут, но нужно подробнее его объяснить.

receive
    {do_something, Data} -> do_something(Data);
end,

Здесь, если нет подходящего по шаблону сообщения, поток заблокируется и будет ждать, пока сообщение появится. А если оно никогда не появится, то поток так и останется заблокированным.

Поэтому разумно указать максимальное время, на которое можно заблокировать поток.

receive
    {do_something, Data} -> do_something(Data);
after
    5000 -> exp1
end,

Это либо целое число -- время в милисекундах, либо атом infinity. Впрочем, infinity аналогично отсутствию after.

register

Pid -- штука хорошая, но не всегда удобная. Если мы хотим посылать сообщения в поток из нескольких других потоков, нам придется как-то передать всем им Pid получателя.

Но есть альтернатива -- потоку можно дать некое имя, глобальное на уровне всей ноды, и потом обращаться к нему по этому имени.

register(Name, Pid)

Вызов register сгенерирует исключение, если имя уже связано с другим потоком.

Регистрацию потока можно отменить:

unregister(Name)

Можно узнать все имена зарегистрированных потоков, какие есть в ноде:

registered()

И можно узнать Pid потока по имени:

whereis(Name)

Пробуем:

1> registered().
[erl_prim_loader,error_logger,kernel_safe_sup,init,user,rex,
 inet_db,kernel_sup,code_server,standard_error_sup,
 global_name_server,application_controller,file_server_2,
 user_drv,standard_error,global_group]
2> register(erl_console, self()).
true
3> registered().
[erl_prim_loader,error_logger,kernel_safe_sup,init,user,rex,
 inet_db,kernel_sup,code_server,standard_error_sup,
 global_name_server,erl_console,application_controller,
 file_server_2,user_drv,standard_error,global_group]
4> whereis(erl_console).
<0.33.0>
5> self().
<0.33.0>
6> unregister(erl_console).
true

Выводы

Легкие потоки, обмен сообщениями, отсутствие разделяемой памяти дают хорошую базу для:

  • масштабируемости;
  • распределенности;
  • устойчивости к ошибкам.

Поскольку потоки -- дешевый ресурс, их можно создавать в количестве, адекватном нагрузке на систему, и менять вместе с изменением нагрузки.

Если мы умеем передавать сообщения от одного потока другому в рамках одной ноды, то не сложно это делать и между двумя нодами. Нужен только транспорт поверх TCP, и в виртуальной машине эрланг этот транспорт есть.

Поскольку потоки изолированы друг от друга, то падение одного потока не влияет на работу других потоков. Впрочем, устойчивость к ошибкам не появляется сама по себе, программисту еще нужно постараться, чтобы этого добиться :)


Аватары экспертов Хекслета

Остались вопросы? Задайте их в разделе «Обсуждение»

Вам ответят команда поддержки Хекслета или другие студенты

Об обучении на Хекслете

Открыть доступ

Курсы программирования для новичков и опытных разработчиков. Начните обучение бесплатно

  • 130 курсов, 2000+ часов теории
  • 1000 практических заданий в браузере
  • 360 000 студентов
Отправляя форму, вы принимаете «Соглашение об обработке персональных данных» и условия «Оферты», а также соглашаетесь с «Условиями использования»

Наши выпускники работают в компаниях:

Логотип компании Альфа Банк
Логотип компании Aviasales
Логотип компании Yandex
Логотип компании Tinkoff

Используйте Хекслет по-максимуму!

  • Задавайте вопросы по уроку
  • Проверяйте знания в квизах
  • Проходите практику прямо в браузере
  • Отслеживайте свой прогресс

Зарегистрируйтесь или войдите в свой аккаунт

Отправляя форму, вы принимаете «Соглашение об обработке персональных данных» и условия «Оферты», а также соглашаетесь с «Условиями использования»
Изображение Тото

Задавайте вопросы, если хотите обсудить теорию или упражнения. Команда поддержки Хекслета и опытные участники сообщества помогут найти ответы и решить задачу