メインコンテンツまでスキップ

チュートリアル: Streamlit を使用して Databricks アプリを開発する

このチュートリアルでは、PythonのDatabricks SQL コネクタStreamlit を使用して Databricks アプリを構築する方法を示します。次のことを行うアプリを開発する方法を学習します。

  • Unity Catalogテーブルを読み取り、Streamlitインターフェースに表示します。
  • データを編集して、テーブルに書き戻します。

手順 :

ステップ 1: 特権を構成する

これらの例では、アプリで アプリの承認を使用することを前提としています。アプリのサービスプリンシパルには、次のものが必要です。

  • SELECT Unity Catalog テーブルに対する権限
  • MODIFY Unity Catalog テーブルに対する権限
  • CAN USE SQLウェアハウスの特典

詳細については、Unity Catalog特権とセキュリティ保護可能なオブジェクトおよびSQLウェアハウス ACLを参照してください。

ステップ 2: 依存関係をインストールする

次のパッケージを requirements.txt ファイルに含めます。

databricks-sdk
databricks-sql-connector
streamlit
pandas
注記

pandas は、テーブル データを編集する場合にのみ必要です。

手順 3: Unity Catalog テーブルを読み取る

この例では、Unity Catalog テーブルからデータを読み取り、Streamlit を使用してデータを表示する方法を示します。アプリは次の手順を実行します。

  1. ユーザーに SQLウェアハウス HTTP パスと Unity Catalog テーブル名の入力を求めます。
  2. Databricks SQL Connector for Python を使用して、キャッシュされた SQL 接続を確立します。
  3. 指定したテーブルに対して SELECT * クエリを実行します。
  4. 結果を Streamlit st.dataframeで表示します。

app.py

Python
import streamlit as st
from databricks import sql
from databricks.sdk.core import Config


cfg = Config() # Set the DATABRICKS_HOST environment variable when running locally


@st.cache_resource # connection is cached
def get_connection(http_path):
return sql.connect(
server_hostname=cfg.host,
http_path=http_path,
credentials_provider=lambda: cfg.authenticate,
)

def read_table(table_name, conn):
with conn.cursor() as cursor:
query = f"SELECT * FROM {table_name}"
cursor.execute(query)
return cursor.fetchall_arrow().to_pandas()

http_path_input = st.text_input(
"Enter your Databricks HTTP Path:", placeholder="/sql/1.0/warehouses/xxxxxx"
)

table_name = st.text_input(
"Specify a :re[UC] table name:", placeholder="catalog.schema.table"
)

if http_path_input and table_name:
conn = get_connection(http_path_input)
df = read_table(table_name, conn)
st.dataframe(df)
注記
  • この例では、 st.cache_resource を使用して、セッション間および再実行間でデータベース接続をキャッシュします。
  • Streamlit 入力フィールド (st.text_input) を使用して、ユーザー入力を受け入れます。

手順 4: Unity Catalog テーブルを編集する

この例では、ユーザーは Streamlit のデータ編集機能を使用して、Unity Catalog テーブルに対する変更の読み取り、編集、および書き戻しを行うことができます。アプリは次の手順を実行します。

  1. 元のテーブルを Pandas DataFrame に読み込みます。
  2. Streamlit エディター (st.data_editor) でテーブルを表示します。
  3. 元の DataFramesと編集されたとの間の変更を検出します。
  4. INSERT OVERWRITE を使用して、更新されたデータをテーブルに書き戻します。

app.py

Python
import pandas as pd
import streamlit as st
from databricks import sql
from databricks.sdk.core import Config


cfg = Config() # Set the DATABRICKS_HOST environment variable when running locally


@st.cache_resource
def get_connection(http_path):
return sql.connect(
server_hostname=cfg.host,
http_path=http_path,
credentials_provider=lambda: cfg.authenticate,
)


def read_table(table_name: str, conn) -> pd.DataFrame:
with conn.cursor() as cursor:
cursor.execute(f"SELECT * FROM {table_name}")
return cursor.fetchall_arrow().to_pandas()


def insert_overwrite_table(table_name: str, df: pd.DataFrame, conn):
progress = st.empty()
with conn.cursor() as cursor:
rows = list(df.itertuples(index=False))
values = ",".join([f"({','.join(map(repr, row))})" for row in rows])
with progress:
st.info("Calling Databricks SQL...")
cursor.execute(f"INSERT OVERWRITE {table_name} VALUES {values}")
progress.empty()
st.success("Changes saved")


http_path_input = st.text_input(
"Specify the HTTP Path to your Databricks SQL Warehouse:",
placeholder="/sql/1.0/warehouses/xxxxxx",
)

table_name = st.text_input(
"Specify a Catalog table name:", placeholder="catalog.schema.table"
)

if http_path_input and table_name:
conn = get_connection(http_path_input)
original_df = read_table(table_name, conn)
edited_df = st.data_editor(original_df, num_rows="dynamic", hide_index=True)

df_diff = pd.concat([original_df, edited_df]).drop_duplicates(keep=False)
if not df_diff.empty:
if st.button("Save changes"):
insert_overwrite_table(table_name, edited_df, conn)
else:
st.warning("Provide both the warehouse path and a table name to load data.")
注記
  • アプリは、元のテーブルと編集されたテーブルの違いを計算して、更新が必要かどうかを判断します。
  • プログレスバーは、 st.infost.successを使用して書き込み操作中にフィードバックを提供します。
  • この方法では、テーブル内のすべての行が置き換えられます。部分的な更新の場合は、別の書き込み戦略を使用します。

次のステップ