Pular para o conteúdo principal

atraso

Função de janela: retorna o valor que está offset linhas antes da linha atual e default se houver menos de offset linhas antes da linha atual. Por exemplo, um offset de um retornará a linha anterior em qualquer ponto dado na partição da janela.

Isso é equivalente à função LAG em SQL.

Sintaxe

Python
from pyspark.sql import functions as sf

sf.lag(col, offset=1, default=None)

Parâmetros

Parâmetro

Tipo

Descrição

col

pyspark.sql.Column ou nome da coluna

Nome da coluna ou expressão.

offset

int, opcional

Número de linhas a serem estendidas. O valor padrão é 1.

default

opcional

valor padrão.

Devoluções

pyspark.sql.Column: valor antes da linha atual com base em offset.

Exemplos

Exemplo 1 : Usando o atraso para obter o valor anterior

Python
from pyspark.sql import functions as sf
from pyspark.sql import Window
df = spark.createDataFrame(
[("a", 1), ("a", 2), ("a", 3), ("b", 8), ("b", 2)], ["c1", "c2"])
df.show()
Output
+---+---+
| c1| c2|
+---+---+
| a| 1|
| a| 2|
| a| 3|
| b| 8|
| b| 2|
+---+---+
Python
w = Window.partitionBy("c1").orderBy("c2")
df.withColumn("previous_value", sf.lag("c2").over(w)).show()
Output
+---+---+--------------+
| c1| c2|previous_value|
+---+---+--------------+
| a| 1| NULL|
| a| 2| 1|
| a| 3| 2|
| b| 2| NULL|
| b| 8| 2|
+---+---+--------------+

Exemplo 2 : Usando atraso com um valor default

Python
from pyspark.sql import functions as sf
from pyspark.sql import Window
df = spark.createDataFrame(
[("a", 1), ("a", 2), ("a", 3), ("b", 8), ("b", 2)], ["c1", "c2"])
w = Window.partitionBy("c1").orderBy("c2")
df.withColumn("previous_value", sf.lag("c2", 1, 0).over(w)).show()
Output
+---+---+--------------+
| c1| c2|previous_value|
+---+---+--------------+
| a| 1| 0|
| a| 2| 1|
| a| 3| 2|
| b| 2| 0|
| b| 8| 2|
+---+---+--------------+

Exemplo 3 : Usando atraso com um deslocamento de 2

Python
from pyspark.sql import functions as sf
from pyspark.sql import Window
df = spark.createDataFrame(
[("a", 1), ("a", 2), ("a", 3), ("b", 8), ("b", 2)], ["c1", "c2"])
w = Window.partitionBy("c1").orderBy("c2")
df.withColumn("previous_value", sf.lag("c2", 2, -1).over(w)).show()
Output
+---+---+--------------+
| c1| c2|previous_value|
+---+---+--------------+
| a| 1| -1|
| a| 2| -1|
| a| 3| 1|
| b| 2| -1|
| b| 8| -1|
+---+---+--------------+