メインコンテンツまでスキップ

遅れ

ウィンドウ関数: 現在の行のoffset行前にある値を返します。現在の行の前の行数がoffset行未満の場合、 default返します。たとえば、 offsetが 1 の場合、ウィンドウ パーティション内の任意の時点での前の行が返されます。

これは SQL の LAG 関数と同等です。

構文

Python
from pyspark.sql import functions as sf

sf.lag(col, offset=1, default=None)

パラメーター

パラメーター

Type

説明

col

pyspark.sql.Column または列名

列または式の名前。

offset

int、オプション

拡張する行数。デフォルトは1です。

default

オプション

デフォルト値。

戻り値

pyspark.sql.Column: offsetに基づく現在の行の前の値。

例1 : ラグを使用して前の値を取得する

Python
from pyspark.sql import functions as sf
from pyspark.sql import Window
df = spark.createDataFrame(
[("a", 1), ("a", 2), ("a", 3), ("b", 8), ("b", 2)], ["c1", "c2"])
df.show()
Output
+---+---+
| c1| c2|
+---+---+
| a| 1|
| a| 2|
| a| 3|
| b| 8|
| b| 2|
+---+---+
Python
w = Window.partitionBy("c1").orderBy("c2")
df.withColumn("previous_value", sf.lag("c2").over(w)).show()
Output
+---+---+--------------+
| c1| c2|previous_value|
+---+---+--------------+
| a| 1| NULL|
| a| 2| 1|
| a| 3| 2|
| b| 2| NULL|
| b| 8| 2|
+---+---+--------------+

例2 : デフォルト値でラグを使用する

Python
from pyspark.sql import functions as sf
from pyspark.sql import Window
df = spark.createDataFrame(
[("a", 1), ("a", 2), ("a", 3), ("b", 8), ("b", 2)], ["c1", "c2"])
w = Window.partitionBy("c1").orderBy("c2")
df.withColumn("previous_value", sf.lag("c2", 1, 0).over(w)).show()
Output
+---+---+--------------+
| c1| c2|previous_value|
+---+---+--------------+
| a| 1| 0|
| a| 2| 1|
| a| 3| 2|
| b| 2| 0|
| b| 8| 2|
+---+---+--------------+

例3 : オフセット2のラグを使用する

Python
from pyspark.sql import functions as sf
from pyspark.sql import Window
df = spark.createDataFrame(
[("a", 1), ("a", 2), ("a", 3), ("b", 8), ("b", 2)], ["c1", "c2"])
w = Window.partitionBy("c1").orderBy("c2")
df.withColumn("previous_value", sf.lag("c2", 2, -1).over(w)).show()
Output
+---+---+--------------+
| c1| c2|previous_value|
+---+---+--------------+
| a| 1| -1|
| a| 2| -1|
| a| 3| 1|
| b| 2| -1|
| b| 8| -1|
+---+---+--------------+