メインコンテンツまでスキップ

チュートリアル: Streamlit を使用して Databricks アプリを開発する

このチュートリアルでは、PythonのDatabricks SQL コネクタStreamlit を使用して Databricks アプリを構築する方法を示します。次のことを行うアプリを開発する方法を学習します。

  • Unity Catalogテーブルを読み取り、Streamlit インターフェースに表示します。
  • データを編集して、テーブルに書き戻します。

ステップ 1: 権限を構成する

これらの例では、アプリで アプリの承認を使用することを前提としています。アプリのサービスプリンシパルには、次のものが必要です。

  • SELECT Unity Catalog テーブルに対する権限
  • MODIFY Unity Catalog テーブルに対する権限
  • CAN USE SQLウェアハウスに対する権限

詳細については、Unity Catalog特権とセキュリティ保護可能なオブジェクトおよびSQLウェアハウス ACLを参照してください。

ステップ 2: 依存関係をインストールする

requirements.txtファイルを作成し、以下のパッケージを含めてください。

Text
databricks-sdk
databricks-sql-connector
streamlit
pandas

ステップ 3: アプリの実行を構成する

Databricks Appsでアプリがどのように起動するかを定義するapp.yamlファイルを作成します。

YAML
command: ['streamlit', 'run', 'app.py']

ステップ 4: Unity Catalogテーブルを読む

このサンプルコードはUnity Catalogテーブルからデータを読み込み、Streamlitを使用して表示する方法を示しています。 以下の目的を満たすapp.pyファイルを作成してください。

  • アプリサービスプリンシパル認証を使用します。
  • ユーザーに SQLウェアハウス HTTP パスと Unity Catalog テーブル名の入力を求めます。
  • 指定したテーブルに対して SELECT * クエリを実行します。
  • 結果を Streamlit st.dataframeで表示します。

アプリ.py

Python
import pandas as pd
import streamlit as st
from databricks import sql
from databricks.sdk.core import Config
import os

cfg = Config()

# Use app service principal authentication
def get_connection(http_path):
server_hostname = cfg.host
if server_hostname.startswith('https://'):
server_hostname = server_hostname.replace('https://', '')
elif server_hostname.startswith('http://'):
server_hostname = server_hostname.replace('http://', '')
return sql.connect(
server_hostname=server_hostname,
http_path=http_path,
credentials_provider=lambda: cfg.authenticate,
_use_arrow_native_complex_types=False,
)

# Read data from a Unity Catalog table and return it as a pandas DataFrame
def read_table(table_name: str, conn) -> pd.DataFrame:
with conn.cursor() as cursor:
cursor.execute(f"SELECT * FROM {table_name}")
return cursor.fetchall_arrow().to_pandas()

# Use Streamlit input fields to accept user input
http_path_input = st.text_input(
"Enter your Databricks HTTP Path:", placeholder="/sql/1.0/warehouses/xxxxxx"
)
table_name = st.text_input(
"Specify a Unity Catalog table name:", placeholder="catalog.schema.table"
)

# Display the result in a Streamlit DataFrame
if http_path_input and table_name:
conn = get_connection(http_path_input)
df = read_table(table_name, conn)
st.dataframe(df)
else:
st.warning("Provide both the warehouse path and a table name to load data.")

ステップ 5: Unity Catalogテーブルを編集する

このサンプルコードを使用すると、Streamlitのデータ編集機能を使って、 Unity Catalogテーブルの読み取り、編集、および変更の書き込みを行うことができます。 app.pyファイルに以下の機能を追加してください。

  • 更新されたデータをテーブルに書き戻すには、 INSERT OVERWRITEを使用してください。

アプリ.py

Python
import pandas as pd
import streamlit as st
from databricks import sql
from databricks.sdk.core import Config
import math

cfg = Config()

# Use app service principal authentication
def get_connection(http_path):
server_hostname = cfg.host
if server_hostname.startswith('https://'):
server_hostname = server_hostname.replace('https://', '')
elif server_hostname.startswith('http://'):
server_hostname = server_hostname.replace('http://', '')
return sql.connect(
server_hostname=server_hostname,
http_path=http_path,
credentials_provider=lambda: cfg.authenticate,
_use_arrow_native_complex_types=False,
)

# Read data from a Unity Catalog table and return it as a pandas DataFrame
def read_table(table_name: str, conn) -> pd.DataFrame:
with conn.cursor() as cursor:
cursor.execute(f"SELECT * FROM {table_name}")
return cursor.fetchall_arrow().to_pandas()

# Format values for SQL, handling NaN/None as NULL
def format_value(val):
if val is None or (isinstance(val, float) and math.isnan(val)):
return 'NULL'
else:
return repr(val)

# Use `INSERT OVERWRITE` to update existing rows and insert new ones
def insert_overwrite_table(table_name: str, df: pd.DataFrame, conn):
progress = st.empty()
with conn.cursor() as cursor:
rows = list(df.itertuples(index=False))
values = ",".join([f"({','.join(map(format_value, row))})" for row in rows])
with progress:
st.info("Calling Databricks SQL...")
cursor.execute(f"INSERT OVERWRITE {table_name} VALUES {values}")
progress.empty()
st.success("Changes saved")

# Use Streamlit input fields to accept user input
http_path_input = st.text_input(
"Enter your Databricks HTTP Path:", placeholder="/sql/1.0/warehouses/xxxxxx"
)
table_name = st.text_input(
"Specify a Unity Catalog table name:", placeholder="catalog.schema.table"
)

# Display the result in a Streamlit DataFrame
if http_path_input and table_name:
conn = get_connection(http_path_input)
if conn:
st.success("✅ Connected successfully!")
original_df = read_table(table_name, conn)
edited_df = st.data_editor(original_df, num_rows="dynamic", hide_index=True)
df_diff = pd.concat([original_df, edited_df]).drop_duplicates(keep=False)
if not df_diff.empty:
st.warning(f"⚠️ You have {len(df_diff) // 2} unsaved changes")
if st.button("Save changes"):
insert_overwrite_table(table_name, edited_df, conn)
st.rerun()
else:
st.warning("Provide both the warehouse path and a table name to load data.")

次のステップ