Spark では、処理が分散されて、複数のノードやスレッドで実行されますが、 分散先でのデータの共有方法と、分散先で実行される処理単位のクロージャについて説明します。
共有変数
Spark には、データの共有とか、集約するための仕組みが備わっています。
それが、ブロードキャスト(Broadcast)と集約器(Accumulator) です。
http://mogile.web.fc2.com/spark/spark220/rdd-programming-guide.html#shared-variables
ブロードキャスト
参照専用の共有メモリのようなもので、ドライバーで共有したい変数をブロードキャスト変数として登録すると、その変数のラッパーオブジェクトが作成されて、ラッパーオブジェクトが、各エグゼキューターに配信されます。
エグゼキューターからは、ラッパーを通じて、共有変数の実態にアクセスすることができます。 このあたりは、通信のコストを下げるために効果的なブロードキャストアルゴリズムで実装されていると、マニュアルに記載されています。
集約器(Accumulator)
accumulatorは、ドライバーで初期化されて、エグゼキューターからは、加算処理だけが可能で読み取りは出来きません。 読み取りはドライバーからのみ行うことができます。
何かの件数をカウントしたり合計値を求めるような処理に使われます。
クロージャ
PySpark では、ドライバがRDDのメソッドをクロージャとしてシリアライズして、エグゼキューターに配信して、エグゼキューターでデシリアライズされて、処理が実行されます。
Python は "全てがオブジェクト" ということで、関数もオブジェクトなので、関数自体が様々な属性を持っています。
例えば、func()
という関数は、func.__code__
でコード自体を参照可能だし、func.__globals__
で関数内で参照している global な変数がリストされます。他にも、func.__defaults__
、func.__dict__
などもあります。 https://docs.python.org/ja/3/reference/datamodel.html
クロージャとは、関数と関数内で使用される変数などがセットになったものを指します。 ときどき、クロージャとlambda関数が同じものとして説明されている記事を見ることがありますが、違うものです。lambda関数は、無名関数がクロージャを構成するときの要素の1つというのが、正しいです。
繰り返しになりますが、PySparkは、クロージャをシリアライズして、各エグゼキューターに配信します。
Python のシリアライズモジュールには pickle, marshal, dill, cloudpickle などがあります。(※この記事もあわせて読んでください。https://qiita.com/kojisuganuma/items/e9b29e8e5ef5f5f289b2)
PySpark のシリアライズモジュールは、Python標準の pickle を元にした、独自のものでした。 実際のソースコードは、こちらです。 https://github.com/apache/spark/blob/d48935400ca47275f677b527c636976af09332c8/python/pyspark/cloudpickle.py#L222A func comprises: code, globals, defaults, closure, and dict.
とコメントがあるとおり、 PySparkがエグゼキューターに配信するクロージャは、以下を要素としています。
- コード
- 関数内で使っているグローバル変数
- 引数のデフォルト値
- 引数以外の変数に対してのセル (cell) 群
- 関数属性の名前空間
シリアライズされる変数(グローバル変数など)が持つ値は、”宣言時のスコープ"によって設定された値です。 間違いやすいのは、global変数が宣言されていて、その変数の設定を init() などで、別の関数で初期化してたりすると、たとえ、それが RDD を実行する前だとしても、配信先では、復元されないということです。あくまで、変数宣言時のスコープで行われる処理だけがシリアライズされます。
参考記事
https://qiita.com/Hiroki11x/items/4f5129094da4c91955bc#%E2%85%B3-rdd%E3%81%AE%E6%93%8D%E4%BD%9C
https://www.lifewithpython.com/2014/09/python-use-closures.html
stackoverflow にも興味深い記事がありました。
「PySparkがブロードキャストされなかった変数を参照できるのはなぜですか?」
https://stackoverflow.com/questions/33337446/why-is-pyspark-picking-up-a-variable-that-was-not-broadcast
以下、回答の訳です。
基本的には、クロージャはシリアライズされて各executorとtaskに送られ、 task実行中にアクセスできるようになるので、 RDDのスコープ外のすべての変数をブロードキャストする必要はありません。
ブロードキャスト変数は、ブロードキャストされるデータセットが大きい場合に便利です。
まとめ
ブロードキャストとクロージャの使い分け
グローバル変数の初期値としてセットして、それを RDD や UDF の関数内から、参照するように実装しておけば、クロージャとして、データも一緒にエグゼキューターに配信されるので、小さいデータは、クロージャとして実装して、大きいデータはブロードキャストの機能を使うという使い分けができます。
分散実行されるコードの境界
Javaと違って、Pythonのコードだと、分散先で実行されるコードの境界がわかりづらいのだけど、その答えは、クロージャが実行されるということでした。
ご参考までに。
関連記事
- 2024/01/25 TypeScriptで名前付き引数っぽい実装をする TypeScriptでPythonのように関数呼び出し時に引数名を使って「名前=値」の形式で引数を指定するOptions Objectパターンという技を紹介します。
- 2023/10/17 コードの品質を測定する方法 コードの品質を測定する方法が紹介されていました。計測の自動化に向けて、少しまとめてみました。
- 2023/01/26 デメテルの法則 「直接の友達とだけ話すこと」というプログラミングのお約束です
- 2022/11/21 Python の linter python 用のプロジェクトで使っている linter など
- 2020/09/07 AWSのエラーログ監視の設定 AWSにサーバーレスなシステムを構築したときのログ監視のやり方を説明します。簡単に再利用できるようにできるだけCLIで設定します。