DataFrameクラス
名前付き列にグループ化されたデータの分散コレクション。
DataFrame は Spark SQL のリレーショナル テーブルに相当し、SparkSession のさまざまな関数を使用して作成できます。
DataFrame はコンストラクターを使用して直接作成しないでください。
Spark Connectをサポート
プロパティ
属性 | 説明 |
|---|---|
この DataFrame を作成したSparkSessionを返します。 | |
コンテンツを行の RDD として返します (クラシック モードのみ)。 | |
欠損値を処理するためのDataFrameNaFunctionsを返します。 | |
統計関数のDataFrameStatFunctionsを返します。 | |
非ストリーミング DataFrame のコンテンツを外部ストレージに保存するためのインターフェイス。 | |
ストリーミング DataFrame のコンテンツを外部ストレージに保存するためのインターフェース。 | |
この DataFrame のスキーマを StructType として返します。 | |
すべての列名とそのデータ型をリストとして返します。 | |
DataFrame 内のすべての列の名前をリストとして取得します。 | |
DataFrame の現在のストレージ レベルを取得します。 | |
この DataFrame に、到着すると継続的にデータを返す 1 つ以上のソースが含まれている場合は True を返します。 | |
クエリが実行された後、ExecutionInfo オブジェクトを返します。 | |
関数をプロットするためのPySparkPlotAccessorを返します。 |
方法
データの表示と検査
手法 | 説明 |
|---|---|
DataFrame を文字列または DataFrame の RDD に変換します。 | |
スキーマをツリー形式で出力します。 | |
デバッグの目的で、(論理および物理)プランをコンソールに出力します。 | |
DataFrame の最初の n 行をコンソールに出力します。 | |
DataFrame 内のすべてのレコードを Row のリストとして返します。 | |
この DataFrame 内のすべての行を含む反復子を返します。 | |
最初の num 行を Row のリストとして返します。 | |
最後の num 行を Row のリストとして返します。 | |
最初の n 行を返します。 | |
最初の行を Row として返します。 | |
この DataFrame 内の行数を返します。 | |
DataFrame空かどうかを確認し、ブール値を返します。 | |
数値列と文字列列の基本的な統計情報をコンピュートします。 | |
コンピュートは、数値列と文字列列の統計を指定します。 |
一時的なビュー
手法 | 説明 |
|---|---|
この DataFrame を使用してローカルの一時ビューを作成します。 | |
この DataFrame を使用してローカルの一時ビューを作成または置き換えます。 | |
この DataFrame を使用してグローバル一時ビューを作成します。 | |
指定された名前を使用してグローバル一時ビューを作成または置き換えます。 |
選択と投影
手法 | 説明 |
|---|---|
一連の式を投影し、新しい DataFrame を返します。 | |
SQL 式のセットを投影し、新しい DataFrame を返します。 | |
指定された条件を使用して行をフィルタリングします。 | |
フィルターのエイリアス。 | |
指定された列のない新しい DataFrame を返します。 | |
新しく指定された列名を持つ新しい DataFrame を返します。 | |
列を追加するか、同じ名前を持つ既存の列を置き換えることで、新しい DataFrame を返します。 | |
複数の列を追加するか、同じ名前を持つ既存の列を置き換えて、新しい DataFrame を返します。 | |
既存の列の名前を変更して新しい DataFrame を返します。 | |
複数の列の名前を変更して新しい DataFrame を返します。 | |
既存の列をメタデータで更新して新しい DataFrame を返します。 | |
論理列名に基づいてメタデータ列を選択し、それを列として返します。 | |
正規表現として指定された列名に基づいて列を選択し、それを列として返します。 |
並べ替えと順序付け
手法 | 説明 |
|---|---|
指定された列でソートされた新しい DataFrame を返します。 | |
ソートの別名。 | |
指定された列で各パーティションをソートした新しい DataFrame を返します。 |
集約とグループ化
手法 | 説明 |
|---|---|
指定された列で DataFrame をグループ化し、集計を実行できるようにします。 | |
指定された列を使用して、現在の DataFrame の多次元ロールアップを作成します。 | |
指定された列を使用して、現在の DataFrame の多次元キューブを作成します。 | |
指定されたグループ化セットを使用して、現在の DataFrame の多次元集計を作成します。 | |
グループなしで DataFrame 全体を集計します (df.groupBy().agg() の省略形)。 | |
DataFrameで観察する (名前を付けた) メトリクスを定義します。 |
結合
手法 | 説明 |
|---|---|
指定された結合式を使用して、別の DataFrame と結合します。 | |
別のDataFrameを使用して、直交座標の ... を返します。 | |
指定された結合式を使用して、別の DataFrame と横方向結合します。 |
集合演算
手法 | 説明 |
|---|---|
この DataFrame と別の DataFrame の行の結合を含む新しい DataFrame を返します。 | |
この DataFrame と別の DataFrame の行の結合を含む新しい DataFrame を返します。 | |
この DataFrame と別の DataFrame の両方の行のみを含む新しい DataFrame を返します。 | |
重複を保持しながら、この DataFrame と別の DataFrame の両方の行を含む新しい DataFrame を返します。 | |
この DataFrame 内の行を含み、別の DataFrame 内の行を含まない新しい DataFrame を返します。 | |
重複を保持しながら、この DataFrame 内の行を含み、別の DataFrame 内の行を含まない新しい DataFrame を返します。 |
重複排除
手法 | 説明 |
|---|---|
この DataFrame 内の個別の行を含む新しい DataFrame を返します。 | |
重複行を削除した新しい DataFrame を返します。オプションで特定の列のみを考慮します。 | |
ウォーターマーク内の重複行が削除された新しいDataFrameを返します。オプションで特定の列のみを考慮します。 |
サンプリングと分割
手法 | 説明 |
|---|---|
この DataFrame のサンプリングされたサブセットを返します。 | |
各層に与えられた割合に基づいて、層別サンプルを非置換で返します。 | |
指定された重みでこの DataFrame をランダムに分割します。 |
パーティショニング
手法 | 説明 |
|---|---|
正確に numPartitions 個のパーティションを持つ新しい DataFrame を返します。 | |
| 指定されたパーティション式でパーティション化された新しい DataFrame を返します。 |
指定されたパーティション式でパーティション化された新しい DataFrame を返します。 | |
| 指定されたパーティション ID 式でパーティション化された新しい DataFrame を返します。 |
再形成
手法 | 説明 |
|---|---|
DataFrame をワイド形式からロング形式にピボット解除します。 | |
unpivot の別名。 | |
指定されたインデックス列の値が新しい列になるように DataFrame を転置します。 |
欠損データの処理
手法 | 説明 |
|---|---|
null 値または NaN 値の行を省略した新しい DataFrame を返します。 | |
null 値が新しい値で埋められた新しい DataFrame を返します。 | |
値を別の値に置き換えた新しい DataFrame を返します。 |
統計関数
手法 | 説明 |
|---|---|
DataFrame の数値列のおおよその分位数を計算します。 | |
DataFrame の 2 つの列の相関を double 値として計算します。 | |
名前で指定された列の標本共分散を計算します。 | |
指定された列のペアごとの度数テーブルを計算します。 | |
列に頻繁に出現する項目を検索します (誤検出の可能性あり)。 |
スキーマ操作
手法 | 説明 |
|---|---|
指定されたスキーマと一致するように各行が調整された新しい DataFrame を返します。 | |
エイリアスが設定された新しい DataFrame を返します。 |
反復
手法 | 説明 |
|---|---|
この DataFrame のすべての行に f 関数を適用します。 | |
この DataFrame の各パーティションに f 関数を適用します。 |
キャッシュと永続性
手法 | 説明 |
|---|---|
DataFrame をデフォルトのストレージ レベル (MEMORY_AND_DISK_DESER) で保存します。 | |
操作間で DataFrame の内容を保持するためのストレージ レベルを設定します。 | |
DataFrame を非永続としてマークし、メモリとディスクからそのすべてのブロックを削除します。 |
チェックポイント
手法 | 説明 |
|---|---|
この DataFrame のチェックポイント バージョンを返します。 | |
この DataFrame のローカルにチェックポイントされたバージョンを返します。 |
ストリーミング操作
手法 | 説明 |
|---|---|
この DataFrame のイベント時間のウォーターマークを定義します。 |
最適化のヒント
手法 | 説明 |
|---|---|
現在の DataFrame に関するヒントを指定します。 |
制限とオフセット
手法 | 説明 |
|---|---|
結果の数を指定された数に制限します。 | |
最初の n 行をスキップして新しい DataFrame を返します。 |
高度な変換
手法 | 説明 |
|---|---|
新しい DataFrame を返します。カスタム変換を連鎖するための簡潔な構文。 |
変換方法
手法 | 説明 |
|---|---|
この DataFrame の内容を Pandas pandas.DataFrame として返します。 | |
この DataFrame の内容を PyArrow pyarrow.Table として返します。 | |
既存の DataFrame を pandas-on-Spark DataFrame に変換します。 | |
Python ネイティブ関数を使用して、現在の DataFrame 内のバッチの反復子をマップします。 | |
pyarrow.RecordBatch で実行される Python ネイティブ関数を使用して、現在の DataFrame 内のバッチの反復子をマップします。 |
データの書き込み
手法 | 説明 |
|---|---|
v2 ソース用の書き込み構成ビルダーを作成します。 | |
ソース テーブルに基づく更新、挿入、および削除のセットをターゲット テーブルにマージします。 |
DataFrameの比較
手法 | 説明 |
|---|---|
両方のDataFrames内の論理クエリ プランが等しい場合は True を返します。 | |
この DataFrame に対する論理クエリ プランのハッシュ コードを返します。 |
メタデータとファイル情報
手法 | 説明 |
|---|---|
この DataFrame を構成するファイルのベストエフォート スナップショットを返します。 |
高度なSQL機能
手法 | 説明 |
|---|---|
collect メソッドと take メソッドをローカルで実行できる場合は True を返します。 | |
DataFrame を TVF のテーブル引数として使用できる TableArg オブジェクトに変換します。 | |
正確に 1 行と 1 列を含む SCALAR サブクエリの列オブジェクトを返します。 | |
EXISTS サブクエリの列オブジェクトを返します。 |
例
基本的なDataFrame操作
# Create a DataFrame
people = spark.createDataFrame([
{"deptId": 1, "age": 40, "name": "Hyukjin Kwon", "gender": "M", "salary": 50},
{"deptId": 1, "age": 50, "name": "Takuya Ueshin", "gender": "M", "salary": 100},
{"deptId": 2, "age": 60, "name": "Xinrong Meng", "gender": "F", "salary": 150},
{"deptId": 3, "age": 20, "name": "Haejoon Lee", "gender": "M", "salary": 200}
])
# Select columns
people.select("name", "age").show()
# Filter rows
people.filter(people.age > 30).show()
# Add a new column
people.withColumn("age_plus_10", people.age + 10).show()
集約とグループ化
# Group by and aggregate
people.groupBy("gender").agg({"salary": "avg", "age": "max"}).show()
# Multiple aggregations
from pyspark.sql import functions as F
people.groupBy("deptId").agg(
F.avg("salary").alias("avg_salary"),
F.max("age").alias("max_age")
).show()
結合
# Create another DataFrame
department = spark.createDataFrame([
{"id": 1, "name": "PySpark"},
{"id": 2, "name": "ML"},
{"id": 3, "name": "Spark SQL"}
])
# Join DataFrames
people.join(department, people.deptId == department.id).show()
複雑な変換
# Chained operations
result = people.filter(people.age > 30) \\
.join(department, people.deptId == department.id) \\
.groupBy(department.name, "gender") \\
.agg({"salary": "avg", "age": "max"}) \\
.sort("max(age)")
result.show()