Databricks SDK for Go

この記事では、 Databricks SDK for Go を使用して Databricks の運用を自動化し、開発を加速する方法について説明します。 この記事では、Databricks SDK for Go の README、API リファレンスおよび例を補足します。

この機能は ベータ版 であり、本番環境で使用しても問題ありません。

ベータ期間中、Databricks では、プロジェクトの go.mod ファイルなど、コードが依存する Databricks SDK for Go の特定のマイナー バージョンへの依存関係をピン留めすることをお勧めします。 依存関係の固定の詳細については、「 依存関係の管理」を参照してください。

始める前に

Databricks SDK for Go の使用を開始する前に、開発マシンに次のものが必要です。

Go 用 Databricks SDK の使用を開始する

  1. Go が既にインストールされ、既存の Go コード プロジェクトが既に 作成 され 、Databricks 認証 が構成されている開発コンピューターで、 コマンドを実行して go.mod Go コードの依存関係を追跡する ファイルを作成します。 go mod init

    go mod init sample
    
  2. go mod edit -require コマンドを実行して Databricks SDK for Go パッケージへの依存関係を取得し、 0.8.0CHANGELOG に記載されている最新バージョンの Databricks SDK for Go パッケージに置き換えます。

    go mod edit -require github.com/databricks/databricks-sdk-go@v0.8.0
    

    go.mod ファイルは次のようになります。

    module sample
    
    go 1.18
    
    require github.com/databricks/databricks-sdk-go v0.8.0
    
  3. プロジェクト内で、Databricks SDK for Go をインポートする Go コード ファイルを作成します。 次の例では、次の内容の main.go という名前のファイルで、Databricks ワークスペース内のすべてのクラスターを一覧表示します。

    package main
    
    import (
      "context"
    
      "github.com/databricks/databricks-sdk-go"
      "github.com/databricks/databricks-sdk-go/service/compute"
    )
    
    func main() {
      w := databricks.Must(databricks.NewWorkspaceClient())
      all, err := w.Clusters.ListAll(context.Background(), compute.ListClustersRequest{})
      if err != nil {
        panic(err)
      }
      for _, c := range all {
        println(c.ClusterName)
      }
    }
    
  4. 不足しているモジュールの依存関係を追加するには、 go mod tidy コマンドを実行します。

    go mod tidy
    

    エラー go: warning: "all" matched no packagesが表示された場合は、Databricks SDK for Go をインポートする Go コード ファイルを追加するのを忘れています。

  5. main モジュール内のパッケージのビルドとテストをサポートするために必要なすべてのパッケージのコピーを取得するには、 go mod vendor コマンドを実行します。

    go mod vendor
    
  6. Databricks 認証用に開発用コンピューターを設定します。

  7. go run コマンドを実行して、 main.goという名前のファイルを想定して、Go コード ファイルを実行します。

    go run main.go
    

    前の w := databricks.Must(databricks.NewWorkspaceClient())の呼び出しで引数として *databricks.Config を設定しないことで、Databricks SDK for Go は既定のプロセスを使用して Databricks 認証を実行しようとします。この既定の動作をオーバーライドするには、「 Databricks アカウントまたはワークスペースで Databricks SDK for Go を認証する」を参照してください。

Go 用に Databricks SDK を更新する

変更ログに記載されている Databricks SDK for Go パッケージの 1 つを使用するように Go プロジェクトを更新するには、次の手順を実行します。

  1. プロジェクトのルートから go get コマンドを実行し、更新を実行するための -u フラグを指定し、Databricks SDK for Go パッケージの名前とターゲット バージョン番号を指定します。 たとえば、バージョン 0.12.0に更新するには、次のコマンドを実行します。

    go get -u github.com/databricks/databricks-sdk-go@v0.12.0
    
  2. 不足しているモジュールや古いモジュールの依存関係を追加および更新するには、 go mod tidy コマンドを実行します。

    go mod tidy
    
  3. go mod vendor コマンドを実行して、 main モジュール内のパッケージのビルドとテストをサポートするために必要なすべての新規および更新されたパッケージのコピーを取得します。

    go mod vendor
    

Databricks アカウントまたはワークスペースを使用して Go 用の Databricks SDK を認証する

Databricks SDK for Go は、 Databricks クライアント統合認証 標準を実装しており、認証に対するアーキテクチャとプログラムによる統合アプローチと一貫性のあるアプローチです。 このアプローチにより、Databricks による認証の設定と自動化をより一元化し、予測可能にすることができます。 これにより、Databricks 認証を一度構成すると、認証構成をさらに変更することなく、その構成を複数の Databricks ツールや SDK で使用できます。 Go のより完全なコード例など、詳細については、「クライアント統合認証Databricks」を参照してください。

Databricks SDK for Go を使用して Databricks 認証を初期化するために使用できるコーディング パターンには、次のようなものがあります。

  • Databricks のデフォルト認証を使用するには、次のいずれかを実行します。

    • ターゲット Databricks 認証の種類に必要なフィールドを持つカスタム Databricks 構成プロファイル を作成または識別します。 次に、 DATABRICKS_CONFIG_PROFILE 環境変数をカスタム構成プロファイルの名前に設定します。

    • ターゲットの Databricks 認証の種類に必要な環境変数を設定します。

    次に、たとえば、次のように Databricks デフォルト認証を使用して WorkspaceClient オブジェクトをインスタンス化します。

    import (
      "github.com/databricks/databricks-sdk-go"
    )
    // ...
    w := databricks.Must(databricks.NewWorkspaceClient())
    
  • 必須フィールドのハードコーディングはサポートされていますが、Databricks personal アクセストークンなどの機密情報がコード内で公開されるリスクがあるため、お勧めしません。 次の例では、Databricks トークン認証のために Databricks ホストとアクセストークンの値をハードコーディングします。

    import (
      "github.com/databricks/databricks-sdk-go"
      "github.com/databricks/databricks-sdk-go/config"
    )
    // ...
    w := databricks.Must(databricks.NewWorkspaceClient(&databricks.Config{
      Host:  "https://...",
      Token: "...",
    }))
    

Databricks SDK for Go README の 「認証 」も参照してください。

次のコード例は、Databricks SDK for Go を使用して、クラスターの作成と削除、ジョブの実行、アカウント ユーザーの一覧表示を行う方法を示しています。 これらのコード例では、 Databricks SDK for Go の既定の Databricks 認証 プロセスを使用します。

その他のコード例については、GitHub の Databricks SDK for Go リポジトリにある サンプル フォルダーを参照してください。

クラスターを作成する

このコード例では、利用可能な最新の Databricks Runtime 長期サポート (LTS) バージョンと、ローカル ディスクを持つ使用可能な最小のクラスター ノード タイプを使用してクラスターを作成します。 このクラスターにはワーカーが 1 つあり、クラスターは 15 分のアイドル時間が経過すると自動的に終了します。 CreateAndWait メソッドを呼び出すと、新しいクラスターがワークスペースで実行されるまでコードが停止します。

package main

import (
  "context"
  "fmt"

  "github.com/databricks/databricks-sdk-go"
  "github.com/databricks/databricks-sdk-go/service/compute"
)

func main() {
  const clusterName            = "my-cluster"
  const autoTerminationMinutes = 15
  const numWorkers             = 1

  w   := databricks.Must(databricks.NewWorkspaceClient())
  ctx := context.Background()

  // Get the full list of available Spark versions to choose from.
  sparkVersions, err := w.Clusters.SparkVersions(ctx)

  if err != nil {
    panic(err)
  }

  // Choose the latest Long Term Support (LTS) version.
  latestLTS, err := sparkVersions.Select(compute.SparkVersionRequest{
    Latest:          true,
    LongTermSupport: true,
  })

  if err != nil {
    panic(err)
  }

  // Get the list of available cluster node types to choose from.
  nodeTypes, err := w.Clusters.ListNodeTypes(ctx)

  if err != nil {
    panic(err)
  }

  // Choose the smallest available cluster node type.
  smallestWithLocalDisk, err := nodeTypes.Smallest(clusters.NodeTypeRequest{
    LocalDisk: true,
  })

  if err != nil {
    panic(err)
  }

  fmt.Println("Now attempting to create the cluster, please wait...")

  runningCluster, err := w.Clusters.CreateAndWait(ctx, compute.CreateCluster{
    ClusterName:            clusterName,
    SparkVersion:           latestLTS,
    NodeTypeId:             smallestWithLocalDisk,
    AutoterminationMinutes: autoTerminationMinutes,
    NumWorkers:             numWorkers,
  })

  if err != nil {
    panic(err)
  }

  switch runningCluster.State {
  case compute.StateRunning:
    fmt.Printf("The cluster is now ready at %s#setting/clusters/%s/configuration\n",
      w.Config.Host,
      runningCluster.ClusterId,
    )
  default:
    fmt.Printf("Cluster is not running or failed to create. %s", runningCluster.StateMessage)
  }

  // Output:
  //
  // Now attempting to create the cluster, please wait...
  // The cluster is now ready at <workspace-host>#setting/clusters/<cluster-id>/configuration
}

クラスターを完全に削除する

このコード例では、指定したクラスター ID を持つクラスターをワークスペースから完全に削除します。

package main

import (
  "context"

  "github.com/databricks/databricks-sdk-go"
  "github.com/databricks/databricks-sdk-go/service/clusters"
)

func main() {
  // Replace with your cluster's ID.
  const clusterId = "1234-567890-ab123cd4"

  w   := databricks.Must(databricks.NewWorkspaceClient())
  ctx := context.Background()

  err := w.Clusters.PermanentDelete(ctx, compute.PermanentDeleteCluster{
    ClusterId: clusterId,
  })

  if err != nil {
    panic(err)
  }
}

ジョブの実行

このコード例では、指定したクラスターで指定したノートブックを実行する Databricks ジョブを作成します。 コードを実行すると、既存のノートブックのパス、既存のクラスター ID、および関連するジョブ設定がターミナルのユーザーから取得されます。 RunNowAndWait メソッドを呼び出すと、ワークスペースで新しいジョブの実行が完了するまでコードが停止します。

package main

import (
  "bufio"
  "context"
  "fmt"
  "os"
  "strings"

  "github.com/databricks/databricks-sdk-go"
  "github.com/databricks/databricks-sdk-go/service/jobs"
)

func main() {
  w   := databricks.Must(databricks.NewWorkspaceClient())
  ctx := context.Background()

  nt := jobs.NotebookTask{
    NotebookPath: askFor("Workspace path of the notebook to run:"),
  }

  jobToRun, err := w.Jobs.Create(ctx, jobs.CreateJob{
    Name: askFor("Some short name for the job:"),
    Tasks: []jobs.JobTaskSettings{
      {
        Description:       askFor("Some short description for the job:"),
        TaskKey:           askFor("Some key to apply to the job's tasks:"),
        ExistingClusterId: askFor("ID of the existing cluster in the workspace to run the job on:"),
        NotebookTask:      &nt,
      },
    },
  })

  if err != nil {
    panic(err)
  }

  fmt.Printf("Now attempting to run the job at %s/#job/%d, please wait...\n",
    w.Config.Host,
    jobToRun.JobId,
  )

  runningJob, err := w.Jobs.RunNow(ctx, jobs.RunNow{
    JobId: jobToRun.JobId,
  })

  if err != nil {
    panic(err)
  }

  jobRun, err := runningJob.Get()

  if err != nil {
    panic(err)
  }

  fmt.Printf("View the job run results at %s/#job/%d/run/%d\n",
    w.Config.Host,
    jobRun.JobId,
    jobRun.RunId,
  )

  // Output:
  //
  // Now attempting to run the job at <workspace-host>/#job/<job-id>, please wait...
  // View the job run results at <workspace-host>/#job/<job-id>/run/<run-id>
}

// Get job settings from the user.
func askFor(prompt string) string {
  var s string
  r := bufio.NewReader(os.Stdin)
  for {
    fmt.Fprint(os.Stdout, prompt+" ")
    s, _ = r.ReadString('\n')
    if s != "" {
      break
    }
  }
  return strings.TrimSpace(s)
}

Unity Catalogボリューム内のファイルを管理する

このコード例は、Unity Catalogボリュームにアクセスするために、 WorkspaceClient内のfiles機能へのさまざまな呼び出しを示しています。

package main

import (
  "context"
  "io"
  "os"

  "github.com/databricks/databricks-sdk-go"
  "github.com/databricks/databricks-sdk-go/service/files"
)

func main() {
  w := databricks.Must(databricks.NewWorkspaceClient())

  catalog          := "main"
  schema           := "default"
  volume           := "my-volume"
  volumePath       := "/Volumes/" + catalog + "/" + schema + "/" + volume // /Volumes/main/default/my-volume
  volumeFolder     := "my-folder"
  volumeFolderPath := volumePath + "/" + volumeFolder // /Volumes/main/default/my-volume/my-folder
  volumeFile       := "data.csv"
  volumeFilePath   := volumeFolderPath + "/" + volumeFile // /Volumes/main/default/my-volume/my-folder/data.csv
  uploadFilePath   := "./data.csv"

  // Create an empty folder in a volume.
  err := w.Files.CreateDirectory(
    context.Background(),
    files.CreateDirectoryRequest{DirectoryPath: volumeFolderPath},
  )
  if err != nil {
    panic(err)
  }

  // Upload a file to a volume.
  fileUpload, err := os.Open(uploadFilePath)
  if err != nil {
    panic(err)
  }
  defer fileUpload.Close()

  w.Files.Upload(
    context.Background(),
    files.UploadRequest{
      Contents:  fileUpload,
      FilePath:  volumeFilePath,
      Overwrite: true,
    },
  )

  // List the contents of a volume.
  items := w.Files.ListDirectoryContents(
    context.Background(),
    files.ListDirectoryContentsRequest{DirectoryPath: volumePath},
  )

  for {
    if items.HasNext(context.Background()) {
      item, err := items.Next(context.Background())
      if err != nil {
        break
      }
      println(item.Path)

    } else {
      break
    }
  }

  // List the contents of a folder in a volume.
  itemsFolder := w.Files.ListDirectoryContents(
    context.Background(),
    files.ListDirectoryContentsRequest{DirectoryPath: volumeFolderPath},
  )

  for {
    if itemsFolder.HasNext(context.Background()) {
      item, err := itemsFolder.Next(context.Background())
      if err != nil {
        break
      }
      println(item.Path)
    } else {
      break
    }
  }

  // Print the contents of a file in a volume.
  file, err := w.Files.DownloadByFilePath(
    context.Background(),
    volumeFilePath,
  )
  if err != nil {
    panic(err)
  }

  bufDownload := make([]byte, file.ContentLength)

  for {
    file, err := file.Contents.Read(bufDownload)
    if err != nil && err != io.EOF {
      panic(err)
    }
    if file == 0 {
      break
    }

    println(string(bufDownload[:file]))
  }

  // Delete a file from a volume.
  w.Files.DeleteByFilePath(
    context.Background(),
    volumeFilePath,
  )

  // Delete a folder from a volume.
  w.Files.DeleteDirectory(
    context.Background(),
    files.DeleteDirectoryRequest{
      DirectoryPath: volumeFolderPath,
    },
  )
}

アカウントユーザーを一覧表示する

このコード例では、Databricks アカウント内で使用可能なユーザーを一覧表示します。

package main

import (
  "context"

  "github.com/databricks/databricks-sdk-go"
  "github.com/databricks/databricks-sdk-go/service/iam"
)

func main() {
  a := databricks.Must(databricks.NewAccountClient())
  all, err := a.Users.ListAll(context.Background(), iam.ListAccountUsersRequest{})
  if err != nil {
    panic(err)
  }
  for _, u := range all {
    println(u.UserName)
  }
}

関連リソース

詳細については、以下を参照してください。