メインコンテンツまでスキップ

DataFrameクラス

名前付き列にグループ化されたデータの分散コレクション。

DataFrame は Spark SQL のリレーショナル テーブルに相当し、SparkSession のさまざまな関数を使用して作成できます。

重要

DataFrame はコンストラクターを使用して直接作成しないでください。

Spark Connectをサポート

プロパティ

属性

説明

sparkSession

この DataFrame を作成したSparkSessionを返します。

rdd

コンテンツを行の RDD として返します (クラシック モードのみ)。

na

欠損値を処理するためのDataFrameNaFunctionsを返します。

stat

統計関数のDataFrameStatFunctionsを返します。

write

非ストリーミング DataFrame のコンテンツを外部ストレージに保存するためのインターフェイス。

writeStream

ストリーミング DataFrame のコンテンツを外部ストレージに保存するためのインターフェース。

schema

この DataFrame のスキーマを StructType として返します。

dtypes

すべての列名とそのデータ型をリストとして返します。

columns

DataFrame 内のすべての列の名前をリストとして取得します。

storageLevel

DataFrame の現在のストレージ レベルを取得します。

isStreaming

この DataFrame に、到着すると継続的にデータを返す 1 つ以上のソースが含まれている場合は True を返します。

executionInfo

クエリが実行された後、ExecutionInfo オブジェクトを返します。

plot

関数をプロットするためのPySparkPlotAccessorを返します。

方法

データの表示と検査

手法

説明

toJSON(use_unicode)

DataFrame を文字列または DataFrame の RDD に変換します。

printSchema(level)

スキーマをツリー形式で出力します。

explain(extended, mode)

デバッグの目的で、(論理および物理)プランをコンソールに出力します。

show(n, truncate, vertical)

DataFrame の最初の n 行をコンソールに出力します。

collect()

DataFrame 内のすべてのレコードを Row のリストとして返します。

toLocalIterator(prefetchPartitions)

この DataFrame 内のすべての行を含む反復子を返します。

take(num)

最初の num 行を Row のリストとして返します。

tail(num)

最後の num 行を Row のリストとして返します。

head(n)

最初の n 行を返します。

first()

最初の行を Row として返します。

count()

この DataFrame 内の行数を返します。

isEmpty()

DataFrame空かどうかを確認し、ブール値を返します。

describe(*cols)

数値列と文字列列の基本的な統計情報をコンピュートします。

summary(*statistics)

コンピュートは、数値列と文字列列の統計を指定します。

一時的なビュー

手法

説明

createTempView(name)

この DataFrame を使用してローカルの一時ビューを作成します。

createOrReplaceTempView(name)

この DataFrame を使用してローカルの一時ビューを作成または置き換えます。

createGlobalTempView(name)

この DataFrame を使用してグローバル一時ビューを作成します。

createOrReplaceGlobalTempView(name)

指定された名前を使用してグローバル一時ビューを作成または置き換えます。

選択と投影

手法

説明

select(*cols)

一連の式を投影し、新しい DataFrame を返します。

selectExpr(*expr)

SQL 式のセットを投影し、新しい DataFrame を返します。

filter(condition)

指定された条件を使用して行をフィルタリングします。

where(condition)

フィルターのエイリアス。

drop(*cols)

指定された列のない新しい DataFrame を返します。

toDF(*cols)

新しく指定された列名を持つ新しい DataFrame を返します。

withColumn(colName, col)

列を追加するか、同じ名前を持つ既存の列を置き換えることで、新しい DataFrame を返します。

withColumns(*colsMap)

複数の列を追加するか、同じ名前を持つ既存の列を置き換えて、新しい DataFrame を返します。

withColumnRenamed(existing, new)

既存の列の名前を変更して新しい DataFrame を返します。

withColumnsRenamed(colsMap)

複数の列の名前を変更して新しい DataFrame を返します。

withMetadata(columnName, metadata)

既存の列をメタデータで更新して新しい DataFrame を返します。

metadataColumn(colName)

論理列名に基づいてメタデータ列を選択し、それを列として返します。

colRegex(colName)

正規表現として指定された列名に基づいて列を選択し、それを列として返します。

並べ替えと順序付け

手法

説明

sort(*cols, **kwargs)

指定された列でソートされた新しい DataFrame を返します。

orderBy(*cols, **kwargs)

ソートの別名。

sortWithinPartitions(*cols, **kwargs)

指定された列で各パーティションをソートした新しい DataFrame を返します。

集約とグループ化

手法

説明

groupBy(*cols)

指定された列で DataFrame をグループ化し、集計を実行できるようにします。

rollup(*cols)

指定された列を使用して、現在の DataFrame の多次元ロールアップを作成します。

cube(*cols)

指定された列を使用して、現在の DataFrame の多次元キューブを作成します。

groupingSets(groupingSets, *cols)

指定されたグループ化セットを使用して、現在の DataFrame の多次元集計を作成します。

agg(*exprs)

グループなしで DataFrame 全体を集計します (df.groupBy().agg() の省略形)。

observe(observation, *exprs)

DataFrameで観察する (名前を付けた) メトリクスを定義します。

結合

手法

説明

join(other, on, how)

指定された結合式を使用して、別の DataFrame と結合します。

crossJoin(other)

別のDataFrameを使用して、直交座標の ... を返します。

lateralJoin(other, on, how)

指定された結合式を使用して、別の DataFrame と横方向結合します。

集合演算

手法

説明

union(other)

この DataFrame と別の DataFrame の行の結合を含む新しい DataFrame を返します。

unionByName(other, allowMissingColumns)

この DataFrame と別の DataFrame の行の結合を含む新しい DataFrame を返します。

intersect(other)

この DataFrame と別の DataFrame の両方の行のみを含む新しい DataFrame を返します。

intersectAll(other)

重複を保持しながら、この DataFrame と別の DataFrame の両方の行を含む新しい DataFrame を返します。

subtract(other)

この DataFrame 内の行を含み、別の DataFrame 内の行を含まない新しい DataFrame を返します。

exceptAll(other)

重複を保持しながら、この DataFrame 内の行を含み、別の DataFrame 内の行を含まない新しい DataFrame を返します。

重複排除

手法

説明

distinct()

この DataFrame 内の個別の行を含む新しい DataFrame を返します。

dropDuplicates(subset)

重複行を削除した新しい DataFrame を返します。オプションで特定の列のみを考慮します。

dropDuplicatesWithinWatermark(subset)

ウォーターマーク内の重複行が削除された新しいDataFrameを返します。オプションで特定の列のみを考慮します。

サンプリングと分割

手法

説明

sample(withReplacement, fraction, seed)

この DataFrame のサンプリングされたサブセットを返します。

sampleBy(col, fractions, seed)

各層に与えられた割合に基づいて、層別サンプルを非置換で返します。

randomSplit(weights, seed)

指定された重みでこの DataFrame をランダムに分割します。

パーティショニング

手法

説明

coalesce(numPartitions)

正確に numPartitions 個のパーティションを持つ新しい DataFrame を返します。

repartition(numPartitions, *cols)

指定されたパーティション式でパーティション化された新しい DataFrame を返します。

repartitionByRange(numPartitions, *cols)

指定されたパーティション式でパーティション化された新しい DataFrame を返します。

repartitionById(numPartitions, partitionIdCol)

指定されたパーティション ID 式でパーティション化された新しい DataFrame を返します。

再形成

手法

説明

unpivot(ids, values, variableColumnName, valueColumnName)

DataFrame をワイド形式からロング形式にピボット解除します。

melt(ids, values, variableColumnName, valueColumnName)

unpivot の別名。

transpose(indexColumn)

指定されたインデックス列の値が新しい列になるように DataFrame を転置します。

欠損データの処理

手法

説明

dropna(how, thresh, subset)

null 値または NaN 値の行を省略した新しい DataFrame を返します。

fillna(value, subset)

null 値が新しい値で埋められた新しい DataFrame を返します。

replace(to_replace, value, subset)

値を別の値に置き換えた新しい DataFrame を返します。

統計関数

手法

説明

approxQuantile(col, probabilities, relativeError)

DataFrame の数値列のおおよその分位数を計算します。

corr(col1, col2, method)

DataFrame の 2 つの列の相関を double 値として計算します。

cov(col1, col2)

名前で指定された列の標本共分散を計算します。

crosstab(col1, col2)

指定された列のペアごとの度数テーブルを計算します。

freqItems(cols, support)

列に頻繁に出現する項目を検索します (誤検出の可能性あり)。

スキーマ操作

手法

説明

to(schema)

指定されたスキーマと一致するように各行が調整された新しい DataFrame を返します。

alias(alias)

エイリアスが設定された新しい DataFrame を返します。

反復

手法

説明

foreach(f)

この DataFrame のすべての行に f 関数を適用します。

foreachPartition(f)

この DataFrame の各パーティションに f 関数を適用します。

キャッシュと永続性

手法

説明

cache()

DataFrame をデフォルトのストレージ レベル (MEMORY_AND_DISK_DESER) で保存します。

persist(storageLevel)

操作間で DataFrame の内容を保持するためのストレージ レベルを設定します。

unpersist(blocking)

DataFrame を非永続としてマークし、メモリとディスクからそのすべてのブロックを削除します。

チェックポイント

手法

説明

checkpoint(eager)

この DataFrame のチェックポイント バージョンを返します。

localCheckpoint(eager, storageLevel)

この DataFrame のローカルにチェックポイントされたバージョンを返します。

ストリーミング操作

手法

説明

withWatermark(eventTime, delayThreshold)

この DataFrame のイベント時間のウォーターマークを定義します。

最適化のヒント

手法

説明

hint(name, *parameters)

現在の DataFrame に関するヒントを指定します。

制限とオフセット

手法

説明

limit(num)

結果の数を指定された数に制限します。

offset(num)

最初の n 行をスキップして新しい DataFrame を返します。

高度な変換

手法

説明

transform(func, *args, **kwargs)

新しい DataFrame を返します。カスタム変換を連鎖するための簡潔な構文。

変換方法

手法

説明

toPandas()

この DataFrame の内容を Pandas pandas.DataFrame として返します。

toArrow()

この DataFrame の内容を PyArrow pyarrow.Table として返します。

pandas_api(index_col)

既存の DataFrame を pandas-on-Spark DataFrame に変換します。

mapInPandas(func, schema, barrier, profile)

Python ネイティブ関数を使用して、現在の DataFrame 内のバッチの反復子をマップします。

mapInArrow(func, schema, barrier, profile)

pyarrow.RecordBatch で実行される Python ネイティブ関数を使用して、現在の DataFrame 内のバッチの反復子をマップします。

データの書き込み

手法

説明

writeTo(table)

v2 ソース用の書き込み構成ビルダーを作成します。

mergeInto(table, condition)

ソース テーブルに基づく更新、挿入、および削除のセットをターゲット テーブルにマージします。

DataFrameの比較

手法

説明

sameSemantics(other)

両方のDataFrames内の論理クエリ プランが等しい場合は True を返します。

semanticHash()

この DataFrame に対する論理クエリ プランのハッシュ コードを返します。

メタデータとファイル情報

手法

説明

inputFiles()

この DataFrame を構成するファイルのベストエフォート スナップショットを返します。

高度なSQL機能

手法

説明

isLocal()

collect メソッドと take メソッドをローカルで実行できる場合は True を返します。

asTable()

DataFrame を TVF のテーブル引数として使用できる TableArg オブジェクトに変換します。

scalar()

正確に 1 行と 1 列を含む SCALAR サブクエリの列オブジェクトを返します。

exists()

EXISTS サブクエリの列オブジェクトを返します。

基本的なDataFrame操作

Python
# Create a DataFrame
people = spark.createDataFrame([
{"deptId": 1, "age": 40, "name": "Hyukjin Kwon", "gender": "M", "salary": 50},
{"deptId": 1, "age": 50, "name": "Takuya Ueshin", "gender": "M", "salary": 100},
{"deptId": 2, "age": 60, "name": "Xinrong Meng", "gender": "F", "salary": 150},
{"deptId": 3, "age": 20, "name": "Haejoon Lee", "gender": "M", "salary": 200}
])

# Select columns
people.select("name", "age").show()

# Filter rows
people.filter(people.age > 30).show()

# Add a new column
people.withColumn("age_plus_10", people.age + 10).show()

集約とグループ化

Python
# Group by and aggregate
people.groupBy("gender").agg({"salary": "avg", "age": "max"}).show()

# Multiple aggregations
from pyspark.sql import functions as F
people.groupBy("deptId").agg(
F.avg("salary").alias("avg_salary"),
F.max("age").alias("max_age")
).show()

結合

Python
# Create another DataFrame
department = spark.createDataFrame([
{"id": 1, "name": "PySpark"},
{"id": 2, "name": "ML"},
{"id": 3, "name": "Spark SQL"}
])

# Join DataFrames
people.join(department, people.deptId == department.id).show()

複雑な変換

Python
# Chained operations
result = people.filter(people.age > 30) \\
.join(department, people.deptId == department.id) \\
.groupBy(department.name, "gender") \\
.agg({"salary": "avg", "age": "max"}) \\
.sort("max(age)")
result.show()