PySparkの分散される処理単位であるクロージャと共有変数の仕組み

Spark では、処理が分散されて、複数のノードやスレッドで実行されますが、 分散先でのデータの共有方法と、分散先で実行される処理単位のクロージャについて説明します。

共有変数

Spark には、データの共有とか、集約するための仕組みが備わっています。
それが、ブロードキャスト(Broadcast)と集約器(Accumulator) です。
http://mogile.web.fc2.com/spark/spark220/rdd-programming-guide.html#shared-variables

ブロードキャスト
参照専用の共有メモリのようなもので、ドライバーで共有したい変数をブロードキャスト変数として登録すると、その変数のラッパーオブジェクトが作成されて、ラッパーオブジェクトが、各エグゼキューターに配信されます。
エグゼキューターからは、ラッパーを通じて、共有変数の実態にアクセスすることができます。 このあたりは、通信のコストを下げるために効果的なブロードキャストアルゴリズムで実装されていると、マニュアルに記載されています。

集約器(Accumulator)
accumulatorは、ドライバーで初期化されて、エグゼキューターからは、加算処理だけが可能で読み取りは出来きません。 読み取りはドライバーからのみ行うことができます。
何かの件数をカウントしたり合計値を求めるような処理に使われます。

クロージャ

PySpark では、ドライバがRDDのメソッドをクロージャとしてシリアライズして、エグゼキューターに配信して、エグゼキューターでデシリアライズされて、処理が実行されます。

Python は "全てがオブジェクト" ということで、関数もオブジェクトなので、関数自体が様々な属性を持っています。
例えば、func()という関数は、func.__code__でコード自体を参照可能だし、func.__globals__で関数内で参照している global な変数がリストされます。他にも、func.__defaults__func.__dict__などもあります。 https://docs.python.org/ja/3/reference/datamodel.html

クロージャとは、関数と関数内で使用される変数などがセットになったものを指します。 ときどき、クロージャとlambda関数が同じものとして説明されている記事を見ることがありますが、違うものです。lambda関数は、無名関数がクロージャを構成するときの要素の1つというのが、正しいです。

繰り返しになりますが、PySparkは、クロージャをシリアライズして、各エグゼキューターに配信します。

Python のシリアライズモジュールには pickle, marshal, dill, cloudpickle などがあります。(※この記事もあわせて読んでください。https://qiita.com/kojisuganuma/items/e9b29e8e5ef5f5f289b2
PySpark のシリアライズモジュールは、Python標準の pickle を元にした、独自のものでした。 実際のソースコードは、こちらです。 https://github.com/apache/spark/blob/d48935400ca47275f677b527c636976af09332c8/python/pyspark/cloudpickle.py#L222
A func comprises: code, globals, defaults, closure, and dict.とコメントがあるとおり、 PySparkがエグゼキューターに配信するクロージャは、以下を要素としています。

  • コード
  • 関数内で使っているグローバル変数
  • 引数のデフォルト値
  • 引数以外の変数に対してのセル (cell) 群
  • 関数属性の名前空間

シリアライズされる変数(グローバル変数など)が持つ値は、”宣言時のスコープ"によって設定された値です。 間違いやすいのは、global変数が宣言されていて、その変数の設定を init() などで、別の関数で初期化してたりすると、たとえ、それが RDD を実行する前だとしても、配信先では、復元されないということです。あくまで、変数宣言時のスコープで行われる処理だけがシリアライズされます。

参考記事

https://qiita.com/Hiroki11x/items/4f5129094da4c91955bc#%E2%85%B3-rdd%E3%81%AE%E6%93%8D%E4%BD%9C
https://www.lifewithpython.com/2014/09/python-use-closures.html

stackoverflow にも興味深い記事がありました。
「PySparkがブロードキャストされなかった変数を参照できるのはなぜですか?」
https://stackoverflow.com/questions/33337446/why-is-pyspark-picking-up-a-variable-that-was-not-broadcast
以下、回答の訳です。

基本的には、クロージャはシリアライズされて各executorとtaskに送られ、 task実行中にアクセスできるようになるので、 RDDのスコープ外のすべての変数をブロードキャストする必要はありません。
ブロードキャスト変数は、ブロードキャストされるデータセットが大きい場合に便利です。

まとめ

ブロードキャストとクロージャの使い分け

グローバル変数の初期値としてセットして、それを RDD や UDF の関数内から、参照するように実装しておけば、クロージャとして、データも一緒にエグゼキューターに配信されるので、小さいデータは、クロージャとして実装して、大きいデータはブロードキャストの機能を使うという使い分けができます。

分散実行されるコードの境界

Javaと違って、Pythonのコードだと、分散先で実行されるコードの境界がわかりづらいのだけど、その答えは、クロージャが実行されるということでした。

ご参考までに。

最近の記事タグ

\(^▽^*) 私たちと一緒に働いてみませんか? (*^▽^)/

少しでも興味をお持ちいただけたら、お気軽に、お問い合わせください。

採用応募受付へ

(採用応募じゃなく、ただ、会ってみたいという方も、大歓迎です。)