メインコンテンツまでスキップ

ウィンドウクラス

DataFrames でウィンドウを定義するためのユーティリティ関数。

Spark Connectをサポート

クラス属性

属性

説明

unboundedPreceding

境界のないウィンドウ フレームの開始を表す境界値。

unboundedFollowing

境界のないウィンドウ フレームの終了を表す境界値。

currentRow

ウィンドウ フレーム内の現在の行を表す境界値。

方法

手法

説明

orderBy(*cols)

順序が定義された WindowSpec を作成します。

partitionBy(*cols)

パーティションが定義された WindowSpec を作成します。

rangeBetween(start, end)

現在の行の ORDER BY 値からの範囲ベースのオフセットを使用して、 start (含む) からend (含む) までのフレーム境界が定義された WindowSpec を作成します。

rowsBetween(start, end)

現在の行からの行ベースのオフセットを使用して、 start (含む) からend (含む) までのフレーム境界が定義された WindowSpec を作成します。

注意

順序が定義されていない場合は、デフォルトで無制限のウィンドウ フレーム (rowFrame、unboundedPreceding、unboundedFollowing) が使用されます。順序付けが定義されている場合、デフォルトでは拡大ウィンドウ フレーム (rangeFrame、unboundedPreceding、currentRow) が使用されます。

順序付けと行フレームを備えた基本ウィンドウ

Python
from pyspark.sql import Window

# ORDER BY date ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW
window = Window.orderBy("date").rowsBetween(Window.unboundedPreceding, Window.currentRow)

レンジフレーム付きパーティションウィンドウ

Python
from pyspark.sql import Window

# PARTITION BY country ORDER BY date RANGE BETWEEN 3 PRECEDING AND 3 FOLLOWING
window = Window.orderBy("date").partitionBy("country").rangeBetween(-3, 3)

パーティション内の行番号

Python
from pyspark.sql import Window, functions as sf

df = spark.createDataFrame(
[(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")], ["id", "category"]
)

# Show row number ordered by id within each category partition
window = Window.partitionBy("category").orderBy("id")
df.withColumn("row_number", sf.row_number().over(window)).show()

行ベースのフレームによる累計

Python
from pyspark.sql import Window, functions as sf

df = spark.createDataFrame(
[(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")], ["id", "category"]
)

# Sum id values from the current row to the next row within each partition
window = Window.partitionBy("category").orderBy("id").rowsBetween(Window.currentRow, 1)
df.withColumn("sum", sf.sum("id").over(window)).sort("id", "category", "sum").show()

範囲ベースのフレームによる累計

Python
from pyspark.sql import Window, functions as sf

df = spark.createDataFrame(
[(1, "a"), (1, "a"), (2, "a"), (1, "b"), (2, "b"), (3, "b")], ["id", "category"]
)

# Sum id values from the current id value to id + 1 within each partition
window = Window.partitionBy("category").orderBy("id").rangeBetween(Window.currentRow, 1)
df.withColumn("sum", sf.sum("id").over(window)).sort("id", "category").show()