PySparkのWindow関数がわからなかったのでFunctions使って学習【備忘録メモ】

業務でAzure SynapseのNOTEBOOKを使うことがあり、せっかくなのでPandasよりもPySparkを使いたいなと思ったのでいろいろと試して学んでいるところです。
Pandasの書き方とPySparkの書き方が大きく異なるのですが、データの規模が大きくなる場合や複数のノードでの処理が必要な場合は、PySparkの方が適していることが多いです。
※PySparkはApache SparkのPython APIで、分散処理や大規模なデータの取り扱いに特化している。

PandasよりもPySparkが優れているところ

  1. 大規模データの処理能力: PySparkはApache SparkのPython APIであり、分散処理を活用して大規模なデータセットを効率的に処理できます。複数のノードにわたってデータを分散させ、並列に処理することで高速な計算が可能です。一方、Pandasは通常シングルスレッドでの処理であり、大量のメモリを消費することがあるため、非常に大きなデータセットを扱う際にはPySparkが有利です。
  2. スケーラビリティとパフォーマンス: PySparkはデータ処理タスクを並列化し、データを複数のノードに分散させることで、スケーラビリティとパフォーマンスを向上させます。これにより、データの規模が大きくなっても迅速な処理が可能です。一方、Pandasは大規模なデータセットに対するスケーラビリティが限られているため、メモリ制約に直面する可能性があります。
  3. 豊富な機能と統合: PySparkはデータ処理、機械学習、ストリーミングデータ処理など、多くの機能を提供しています。また、他のビッグデータツールやクラウドプラットフォームと簡単に統合できるため、柔軟性と利便性が高まります。Pandasも優れたデータ操作機能を持っていますが、ビッグデータ環境との統合はPySparkほどスムーズではありません。

PySparkを使ったコードはこんな感じになります。
※PySparkを扱う環境は別記事で説明します。

from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count

# SparkSessionの作成
spark = SparkSession.builder \
    .appName("PySpark Example") \
    .getOrCreate()

# 簡単なデータをリストで定義
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35), ("David", 40), ("Edward", 45)]

# 列名を指定してデータフレームを作成
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)

# データフレームの内容を表示
df.show()

# SparkSessionの停止
spark.stop()

実際にはCSVファイルとかDBからデータを読み込むので、データフレームを作成することはほぼないとは思います……。
PySparkデータフレームを扱う関数もいろいろと触っているうちに慣れてきたのですが、Windowの使い方がよくわからなかったので、備忘録として残しておこうと思います。

Window関数とは?

SQLをある程度扱える人なら割と馴染みがあるかもしれません。
実際PySparkのDataFrame APIの設計はSQLに似ており、特にWindow関数はSQLのウィンドウ関数と非常に類似しています。

しかし私は基本的なSQL文しか理解しておらず、Window関数をばりばりに使った経験がなく、すんなり入っては来ませんでした。

例えば性別でランク付を行う場合はこんな感じになります。
※同じ値は順位がスキップされます。

from pyspark.sql import SparkSession as ss
from pyspark.sql import Window
from pyspark.sql.functions import rank

spark = ss.builder.appName('job01').getOrCreate()
# CSVファイルの読み込み
df = spark.read.csv('files/users.csv', header=True)
# 先頭20行を抽出
df = df.limit(20)
# age列をintに変換
df = df.withColumn('age', col('age').cast('int'))
wf = Window.partitionBy('GENDER').orderBy('age')
df.withColumn('rank', rank().over(wf)).show()
+-----------+---------+---------------+------+---+----------+---------+--------+-------------------+------------+----+
|undefinedID|     NAME|           KANA|GENDER|age|     BIRTH|POST_CODE|      PW|         CREATED_AT|RESINGED_FLG|rank|
+-----------+---------+---------------+------+---+----------+---------+--------+-------------------+------------+----+
|         19|高坂 章子|コウサカ アキコ|    女| 16| 2007/7/10| 309-2988|IY00yObb|   2020/3/4 0:21:17|        True|   1|
|         10|  丹羽 遥|  タンバ ハルカ|    女| 30|1993/11/24| 172-0053|h8g4Z1Zz|  2006/5/3 10:36:05|        True|   2|
|          6|竹島 花梨|タケシマ カリン|    女| 31| 1993/3/11| 941-5533|eCDDOfQO|1978/10/13 18:33:03|        True|   3|
|         15|狩野 綾香|  カリノ アヤカ|    女| 31| 1993/3/19| 472-4790|C66NJZJA|  1974/2/18 4:34:45|        True|   3|
|          1|江本 春江|  エモト ハルエ|    女| 33|1990/11/25| 045-8460|nqlN7JTE|2020/12/20 23:57:45|       False|   5|
|          5|郡司 克美|  グンジ カツミ|    女| 36|  1987/7/1| 460-7192|YVkybPmB| 1986/4/14 22:04:16|        True|   6|
|         18|江上 音葉|  エガミ オトハ|    女| 38| 1985/5/24| 072-9452|ns11VMve| 1970/11/24 5:07:59|        True|   7|
|         20|河端 邦子|カワバタ クニコ|    女| 41| 1982/6/10| 704-5994|Da5OHd3d| 1974/11/20 9:45:58|       False|   8|
|         12|  森 杏菜|    モリ アンナ|    女| 42|1981/10/23| 353-0945|BoC4ooVd|   1995/2/6 6:03:39|       False|   9|
|         11|  富樫 桜|トミカシ サクラ|    女| 45| 1979/1/15| 772-5717|CpKzT3eo| 1988/6/20 14:55:54|        True|  10|
|          4|斉藤 真琴|サイトウ マコト|    女| 54| 1969/9/18| 613-1299|etq7Bc1U|  1986/1/3 14:11:58|        True|  11|
|          9|八田 真結|    ハッタ マユ|    女| 56|  1968/4/1| 022-3795|QM8Fpgln|  2003/7/21 1:23:03|       False|  12|
|         13|森野 友和|モリノ トモカズ|    男|  7| 2016/8/21| 398-1765|a45y5gTB|  2001/6/9 14:03:11|        True|   1|
|          3|木内 芳明|キウチ ヨシアキ|    男| 22|2001/10/18| 323-2640|jcZySr5c|  1993/8/25 9:25:22|        True|   2|
|          2|三角 信男|  ミスミ ノブオ|    男| 37| 1987/3/10| 344-5757|leQMQglu| 2012/10/16 0:43:59|        True|   3|
|         16|茂木 和徳|モテキ カズノリ|    男| 37|1986/12/21| 001-4543|XHpYtaq0|  2011/3/3 18:28:59|       False|   3|
|          8|  大澤 理|オオサワ サトル|    男| 41| 1982/11/2| 119-0620|OC_CIlUf| 2007/9/26 17:23:22|        True|   5|
|          7|高城 大和|  タカギ ヤマト|    男| 45|  1978/9/2| 558-4101|PjYmbpBV| 1987/11/3 23:59:05|       False|   6|
|         14|坂東 幹男|バンドウ ミキオ|    男| 47|  1976/8/4| 410-5583|bSD5n7Ig|  2002/6/6 23:33:21|       False|   7|
|         17|宮前 浩志|ミヤマエ コウジ|    男| 52| 1972/3/12| 568-5151|x_Zto2aM|  1975/9/20 2:53:32|        True|   8|
+-----------+---------+---------------+------+---+----------+---------+--------+-------------------+------------+----+

他にもWindow関数を使うパターンとしては前後の値を取得したり、連番を振るといった使い方があるわけですが、単純にPysparkのデータフレームにてGroup Byをする場合もpartititonByでパーティションに分けて、Window関数を適用するという使い方になります。
※集約するだけならGroupByでいいんですが、グループ化してから特定の行を抜き出すというパターンです。

よくある使い方としてはある項目列でGroup化して、先頭の行を取得するというパターンでしょうか。

from pyspark.sql import SparkSession as ss
from pyspark.sql import Window
from pyspark.sql.functions import row_number, col, to_timestamp

spark = ss.builder.appName('job01').getOrCreate()
df = spark.read.csv('files/users.csv', header=True)
df = df.limit(20)

# CREATED_AT列をタイムスタンプ型に変換
# ※値が0埋めされていなかったのでそれに合わせたformatを指定しています。
df = df.withColumn("CREATED_AT", to_timestamp(col("CREATED_AT"), "yyyy/M/d H:m:s"))
# ウィンドウの定義
windowSpec = Window.partitionBy("RESINGED_FLG").orderBy(col("CREATED_AT").desc())

# row_number関数を使って、各パーティション内での順位を付ける
df_with_rank = df.withColumn("row_number", row_number().over(windowSpec))

# 最新のレコードを抽出
latest_records = df_with_rank.filter(col("row_number") == 1).drop("row_number")

# 結果を表示
latest_records.show(truncate=False)
+-----------+---------+---------------+------+---+----------+---------+--------+-------------------+------------+
|undefinedID|NAME     |KANA           |GENDER|AGE|BIRTH     |POST_CODE|PW      |CREATED_AT         |RESINGED_FLG|
+-----------+---------+---------------+------+---+----------+---------+--------+-------------------+------------+
|1          |江本 春江|エモト ハルエ  |女    |33 |1990/11/25|045-8460 |nqlN7JTE|2020-12-20 23:57:45|False       |
|19         |高坂 章子|コウサカ アキコ|女    |16 |2007/7/10 |309-2988 |IY00yObb|2020-03-04 00:21:17|True        |
+-----------+---------+---------------+------+---+----------+---------+--------+-------------------+------------+

これはRESINGED_FLG列でGroup化を行い、CREATED_AT列の最新(最大)のレコードを取得する処理です。
例えばユーザーの都道府県や年齢毎の代表レコードを選出する際に利用します。

Pandasだとgroupby, sort_values, headなどを使って代表レコード抽出するんですが、Pysparkの場合はソート後にrow_number関数で連番を振ってからfilterをかける必要があります。

Pandasに比べると記述するコードは若干増えていますが、データ量が膨大だとPysparkの方が圧倒的に早いので慣れるしかないですね。