Elixir に入門したいので雑な 分散KVS を自作した

長い連休を利用して、Elixir入門のために分散kVSを自作しました。

前書き

動機は以下の2点です。
  • Elixirに入門したかった
  • 何か自分で手を動かしたかった
Erlangが書けるので同じErlangVM上の言語であるElixirが前から気になっていました。
丁度いい機会だし、入門してみるかな、くらいの気持ちで始めました。

初めてだったので、株式会社gumiが連載しているElixir入門の連載を開始(Elixir入門もくじ) - DEV Community 👩‍💻👨‍💻を参考にしました。単純な日本語翻訳ではなく、適宜加筆修正をしているようです。(後半に行くに連れて雑な文が増えているのが少しだけ残念でした。)

自分は集中力が無いので通読は苦手なのですが、23個の記事に全て目を通してから次を考えることにしました。Erlangと同じErlangVMで動くので、他の全く知らない言語よりは理解がしやすいだろうと思っていましたが、軽量プロセスやメッセージのやり取りといった基礎は共通していても、かなり機能が豊富、という印象を受けました。

なんとか全てを読み終わった後に、ErlangVMなので分散システムがいいな、と思い雑な分散KVSを作って見ることにしました。適当に検索したところBuilding a Distributed Fault-Tolerant Key-Value Store - The Blog of Fourthbitという、Apache Cassandraの概要を紹介しつつ、KVSの大事な部分の実装ぽいものを説明する、という記事を今回の目的にぴったりの記事を発見しました。この記事は実装するためにかなり参考になりました。

目標

  • 休みの間にそれっぽく動くところまで持っていく
  • CRUD (Create/Read/Update/Delete) のインターフェースを持つ
  • 1台は落ちても読み書きができる
    • 耐障害性

出来たもの

機能の概要

  • N台1組のElixirクラスタを構築する
    • 実行時のノード追加は不可
  • データはオンメモリで保持している
  • CRUD (Create/Read/Update/Delete) のインターフェースを持つ
    • Elixir上の公開関数
    • Key/ValueはElixirのterm
  • どのノードでもリクエストを処理可能
    • ノードAへの実行と、ノードBへの実行の結果は同じなので、単一点が無く負荷分散になる
    • ここは上記のCassandraを参考にしました
  • クラスタ上の他のノードにレプリケーションをする方式
  • 1台以上のノードが落ちてもKVSとして動作し続けることができる
    • 例えば、5台ノードのうち、2台までなら落ちても平気
  • ノードダウンを全てのノードが検知し、レプリケーションをし直す
    • これによってデータを損失しにくい
    • 安直過ぎる実装なので実行は高コスト

実装概要

要所だけ簡単に書きます。

まず、設定として、クラスタのメンバとレプリケーションする数(デフォルト3)は与えられます。
次にSupervision treeはこのようになっています。
役割を「データを保存するだけのStorage」と「分散システムを成立させるCoordinator」の2つに大別しました。

Storageは今回はオンメモリなのでハッシュマップをそのまま保持しているだけです。

Coordinatorの方は更に「リクエストを処理するWorker」と「ノードダウンを監視し、冗長性を維持するStabilizer」の2つに分かれます。

Workerは
  • リクエスト毎に起動
    • ノードへのリクエストで詰まらないように
  • レプリケーションノードを選択
    • ノードリストを保持しており、hash(key) % node_count から replication_count個のノードを選択します
    • 末尾に来た場合は先頭から残りを取ります
    • 例: [node1, node2, node3, node4, node5]からnode4, node5, node1を選択
  • レプリケーションノードへ処理を指示
    • RPCでそれぞれのノードで処理を実行します
  • レプリケーションノードからの結果のうち半分以上が成功ならば成功として扱う
    • ここでは50%が正常な応答を返せばその処理は成功したものと見なしています
    • ここを全てのノードが成功したときのみ成功、1個でも成功したなら成功、とするかによって、整合性のレベルが変わってくるそうです。
をします

Stabilizerは
  • 他の全ノードの監視
  • ローカルに保持している設定からダウンしたノードを削除
  • ダウンを検知して、そのノードが持っていた値を他のノードに分散
    • 非常に愚直な実装なので高コストで無駄もあります
    • 生きている全てのノードが、Storageに保存されている全てのキーのハッシュを再計算し、ダウンしたノードの持っていたKey-Valueがあれば再登録する、自分がレプリケーションから外れていれば削除する、という処理
をします

動作テスト

def test(is_wait \\ true) do
    kvs = [
      {"hoge", "fuga"},
      {"piyo", "poyo"},
      {"foo", "bar"},
      {"erlang", "ok"},
      {"elixir", ":ok"}
    ]

    members = Brownie.Application.get_cluster_members()

    # テスト用の値を作成
    # do_async/1で、クラスタのメンバからランダムにノードを選択して、並列にリクエストを送信する
    results =
      kvs
      |> Enum.map(fn {key, value} -> {:create, key, value} end)
      |> do_async(members)

    # 成功しているか確認
    if not Enum.all?(results, &(:ok == &1)) do
      raise("Create values failed! Results: #{inspect(results)}")
    end

    # ノードをわざと落とす
    Enum.at(Brownie.Application.get_cluster_members(), 1)
    |> :rpc.call(:erlang, :halt, [])

    if is_wait do
      # 少しずるいがStabilizationを待つ
      # これを待たないとしても、1台までのノードダウンなら読み込みは正常にできる
      # 書き込みは、ダウンしたノードがメンバから外されるまでのタイミングでよってはエラーになってしまう
      Process.sleep(3 * 1000)
    end

    # Stalibilizerがメンバを更新したはずなので、再取得
    members = Brownie.Application.get_cluster_members()

    # 作成した値を同じように並列で読み込んで正しい値が取れるかを見る.
    results =
      kvs
      |> Enum.map(fn {key, _} -> {:read, key} end)
      |> do_async(members)
      |> Enum.sort()

    expecteds =
      kvs
      |> Enum.map(fn {_, v} -> {:ok, v} end)
      |> Enum.sort()

    if not (results == expecteds) do
      raise("Read values failed after node down! Results: #{inspect(results)}")
    end

    :ok
  end

上記のテストコードがBrownie.test/1として実行可能です。
make runでdocker-composeでコンテナ5つを繋げてクラスタを構築します。
別のシェルを立ち上げて、make attachでノードの一つにattachして実行します。
17:24:45.099 [debug] Execute: {:create, "elixir", ":ok"} -> [:"five@brownie5.com", :"one@brownie1.com", :"two@brownie2.com"]
17:24:45.099 [debug] Execute: {:create, "erlang", "ok"} -> [:"five@brownie5.com", :"one@brownie1.com", :"two@brownie2.com"]
17:24:45.099 [debug] Execute: {:create, "piyo", "poyo"} -> [:"four@brownie4.com", :"five@brownie5.com", :"one@brownie1.com"]
17:24:45.099 [debug] Execute: {:create, "foo", "bar"} -> [:"three@brownie3.com", :"four@brownie4.com", :"five@brownie5.com"]
17:24:45.100 [debug] Execute: {:create, "hoge", "fuga"} -> [:"four@brownie4.com", :"five@brownie5.com", :"one@brownie1.com"]
17:24:45.102 [debug] Results: ["three@brownie3.com": :ok, "four@brownie4.com": :ok, "five@brownie5.com": :ok]
17:24:45.102 [debug] Results: ["four@brownie4.com": :ok, "five@brownie5.com": :ok, "one@brownie1.com": :ok]
17:24:45.102 [debug] Results: ["four@brownie4.com": :ok, "five@brownie5.com": :ok, "one@brownie1.com": :ok]
17:24:45.103 [debug] Results: ["five@brownie5.com": :ok, "one@brownie1.com": :ok, "two@brownie2.com": :ok]
17:24:45.103 [debug] Results: ["five@brownie5.com": :ok, "one@brownie1.com": :ok, "two@brownie2.com": :ok]
17:24:45.104 [error] Detect nodedown :"two@brownie2.com"
17:24:45.104 [debug] Stabilize target keys: ["foo"]
17:24:45.104 [debug] Replicate again key: "foo", nodes: [:"five@brownie5.com", :"one@brownie1.com"]
17:24:45.105 [error] Detect nodedown :"two@brownie2.com"
17:24:45.105 [debug] Stabilize target keys: ["elixir", "erlang"]
17:24:45.105 [debug] Replicate again key: "elixir", nodes: [:"four@brownie4.com", :"one@brownie1.com"]
17:24:45.105 [debug] Replicate again key: "erlang", nodes: [:"one@brownie1.com", :"three@brownie3.com"]
17:24:45.106 [info]  Stabilization result: :ok
17:24:45.107 [error] Detect nodedown :"two@brownie2.com"
17:24:45.104 [error] Detect nodedown :"two@brownie2.com"
17:24:45.104 [debug] Stabilize target keys: []
17:24:45.104 [info]  Stabilization result: :ok
17:24:45.112 [info]  Stabilization result: :ok
17:24:45.114 [debug] Stabilize target keys: ["hoge", "piyo"]
17:24:45.114 [debug] Replicate again key: "hoge", nodes: [:"three@brownie3.com", :"five@brownie5.com"]
17:24:45.114 [debug] Replicate again key: "piyo", nodes: [:"five@brownie5.com", :"one@brownie1.com", :"three@brownie3.com"]
17:24:45.115 [info]  Stabilization result: :ok
iex(two@brownie2.com)1> brownie2.com exited with code 0
17:24:48.105 [debug] Execute: {:read, "erlang"} -> [:"five@brownie5.com", :"one@brownie1.com", :"three@brownie3.com"]
17:24:48.106 [debug] Execute: {:read, "foo"} -> [:"five@brownie5.com", :"one@brownie1.com", :"three@brownie3.com"]
17:24:48.106 [debug] Execute: {:read, "elixir"} -> [:"four@brownie4.com", :"five@brownie5.com", :"one@brownie1.com"]
17:24:48.108 [debug] Results: ["five@brownie5.com": {:ok, "ok"}, "one@brownie1.com": {:ok, "ok"}, "three@brownie3.com": {:ok, "ok"}]
17:24:48.109 [debug] Results: ["five@brownie5.com": {:ok, "bar"}, "one@brownie1.com": {:ok, "bar"}, "three@brownie3.com": {:ok, "bar"}]
17:24:48.110 [debug] Results: ["four@brownie4.com": {:ok, ":ok"}, "five@brownie5.com": {:ok, ":ok"}, "one@brownie1.com": {:ok, ":ok"}]
17:24:48.112 [debug] Execute: {:read, "piyo"} -> [:"five@brownie5.com", :"one@brownie1.com", :"three@brownie3.com"]
17:24:48.112 [debug] Execute: {:read, "hoge"} -> [:"three@brownie3.com", :"four@brownie4.com", :"five@brownie5.com"]
17:24:48.112 [debug] Results: ["three@brownie3.com": {:ok, "fuga"}, "four@brownie4.com": {:ok, "fuga"}, "five@brownie5.com": {:ok, "fuga"}]
17:24:48.112 [debug] Results: ["five@brownie5.com": {:ok, "poyo"}, "one@brownie1.com": {:ok, "poyo"}, "three@brownie3.com": {:ok, "poyo"}]
かなり見づらいですが、実行時のログの一例です。
Replicate againというログが出ている箇所でダウンしたノードから別のノードに再度レプリケーションを行っています。
また、並列動作するため毎回注意してください。

feature work

  • HTTP APIを実装する
    • 時間が無くて出来なかったのでこのくらいはしたい
  • 整合性についてもう少し学びたい
  • 処理が失敗したレプリケーションノードへの復旧処理
    • readした値がおかしい、など
  • Storageへの処理の部分で直列化されてしまう
    • GenServer.call/2で実装したので詰まってしまいます
    • この中で更にシャーディングなどすれば並行に読み書きしやすくなったりするのかなと考えています
  • make runの改善
    • 各コンテナでiex -S mixしていてお行儀が悪い(デバッグ用途なら問題ないと思うが)のでプロダクションなどではどうやって動かしているのかくらいは調べたい
  • ExUnitをもう少し使う
    • Erlangのmeckのようにすればいいだろうと思っていたが、Elixirではmockは非推奨らしいので、テストしやすいコードを心がける必要がある
  • この記事を書いていて気づいたが、うまいことやれば最後の1台になっても、大丈夫なように実装は出来そうと思いました。が、実際のケースだとノード数が少なくなって負荷に耐えきれずダウン、という流れになると思うので、あまり実用性は無いと思いました

所感

やはり、ErlangVMなので適当にクラスタを組んで別ノードで実行、というのが非常に簡単にできるために、他の実装に集中できてよかったです。

しかし、ちょっとしたものに使うのであれば、自作せずにErlang/OTP付属のmnesiaという分散DBMSがよさそうです。(10台のクラスタくらいが実用限界という記述はすごいE本に書いてありましたけど…)
調査を十分にしていないですが、ここから真面目に作り込んでいくとmnesiaとの差別化をどうするか、が一番の悩みどころかと思っています。

また、この記事を書いている途中に、友人から聞いた「雑なコードを書く習慣を付けてはいけない」という言葉を思い出しました。
その観点から見ると今回のこれは雑なコードそのものなので、もう少し実装を進めてみたいと考えています。

なんにせよ、当初の目的のElixirに入門するはできました。
分散KVSという観点から分散システムについて学びがあって面白くてよかった、という所感です。

参考文献

コメント

このブログの人気の投稿

カーソルキーさん@つかわない インサートモード編

Android で MIME Type 判別

Vimでカーソル位置などの設定保存 と .vimrc