共用方式為


Databricks SDK for Go

在本文中,您將瞭解如何使用 Databricks SDK for Go來自動化 Azure Databricks 作業,並加速開發。 本文補充適用於 Go 自述檔API 參考範例的 Databricks SDK。

注意

這項功能提供搶鮮版 (Beta),而且可在生產環境中使用。

在 Beta 期間,Databricks 建議您將相依性釘選到程式代碼相依於程式代碼所相依之 Databricks SDK for Go 的特定次要版本,例如,在專案的 go.mod 檔案中。 如需釘選相依性的詳細資訊,請參閱 管理相依性

開始之前

開始使用 Databricks SDK for Go 之前,您的開發電腦必須具有:

Get 開始使用 Databricks SDK for Go

  1. 在已安裝 Go go mod init例如:

    go mod init sample
    
  2. 執行 go mod edit -require 命令以相依於 Databricks SDK for Go 套件,並將 取代0.8.0為最新版的 Databricks SDK for Go 套件,如 CHANGELOG所列:

    go mod edit -require github.com/databricks/databricks-sdk-go@v0.8.0
    

    您的 go.mod 檔案看起來應該如下所示:

    module sample
    
    go 1.18
    
    require github.com/databricks/databricks-sdk-go v0.8.0
    
  3. 在您的專案中,建立 Go 程式代碼檔案,以匯入 Databricks SDK for Go。 下列範例在名為 main.go 且具有下列內容的檔案中,列出 Azure Databricks 工作區中的所有叢集:

    package main
    
    import (
      "context"
    
      "github.com/databricks/databricks-sdk-go"
      "github.com/databricks/databricks-sdk-go/service/compute"
    )
    
    func main() {
      w := databricks.Must(databricks.NewWorkspaceClient())
      all, err := w.Clusters.ListAll(context.Background(), compute.ListClustersRequest{})
      if err != nil {
        panic(err)
      }
      for _, c := range all {
        println(c.ClusterName)
      }
    }
    
  4. 執行 go mod tidy 命令以新增任何遺漏的模組相依性:

    go mod tidy
    

    注意

    如果您 get 錯誤 go: warning: "all" matched no packages,那就是您忘記新增一個匯入 Databricks SDK for Go 的 Go 程式碼腳本文件。

  5. 執行 main 命令,以擷取支援模組中go mod vendor封裝組建和測試所需的所有套件複本:

    go mod vendor
    
  6. Set 為 Azure Databricks 驗證配置開發機器

  7. 透過執行 main.go 命令,來執行 Go 程式碼檔案,假定檔案名為 go run

    go run main.go
    

    注意

    若未在上述呼叫*databricks.Config中將 w := databricks.Must(databricks.NewWorkspaceClient()) 設定為自變數,Databricks SDK for Go 會使用其預設程式嘗試執行 Azure Databricks 驗證。 若要覆寫此預設行為,請參閱 使用 Azure Databricks 帳戶或工作區驗證 Databricks SDK for Go。

Update Databricks 開發套件(SDK)適用於 Go 編程語言

若要 update Go 專案使用其中一個 Databricks SDK for Go 套件,如 CHANGELOG所列,請執行下列動作:

  1. 從專案的根目錄執行 go get 命令,指定 -u 旗標來執行 update,並提供 Databricks SDK for Go 套件的名稱和目標版本號碼。 例如,若要將 update 升級至版本 0.12.0,請執行下列命令:

    go get -u github.com/databricks/databricks-sdk-go@v0.12.0
    
  2. 執行 go mod tidy 命令,以新增和 update 任何遺失和過時的模組相依性:

    go mod tidy
    
  3. 執行 main 命令,以擷取支援模組中go mod vendor套件組建和測試所需的所有新和更新套件複本:

    go mod vendor
    

使用 Azure Databricks 帳戶或工作區驗證 Databricks SDK for Go

Databricks SDK for Go 會 實作 Databricks 用戶端統一驗證 標準,這是一種整合且一致的架構和驗證程序設計方法。 這種方法有助於使 Azure Databricks 的設定和自動化驗證更加集中且可預測。 可讓您只需設定 Databricks 驗證一次,然後即可在多個 Databricks 工具和 SDK 中使用該組態,而無需進一步變更驗證組態。 如需詳細資訊,包括 Go 中更完整的程式代碼範例,請參閱 Databricks 用戶端整合驗證

使用 Databricks SDK for Go 初始化 Databricks 驗證的一些可用編碼模式包括:

  • 透過執行下列其中一項動作,使用 Databricks 預設驗證:

    • 建立或識別具有目標 Databricks 驗證類型所需欄位的自訂 Databricks 組態設定檔。 然後將 DATABRICKS_CONFIG_PROFILE 環境變數 set 自定義組態配置檔的名稱。
    • Set 針對目標的 Databricks 驗證類型所需的環境變數。

    然後使用 Databricks 預設驗證具現化一個 WorkspaceClient 物件,如下所示:

    import (
      "github.com/databricks/databricks-sdk-go"
    )
    // ...
    w := databricks.Must(databricks.NewWorkspaceClient())
    
  • 支援硬式編碼必要欄位,但不建議這麼做,因為它可能會暴露程式碼中的敏感性資訊,例如 Azure Databricks 個人存取權杖。 下列範例會硬編碼 Azure Databricks 主機和存取令牌 values 用於 Databricks 令牌驗證:

    import (
      "github.com/databricks/databricks-sdk-go"
      "github.com/databricks/databricks-sdk-go/config"
    )
    // ...
    w := databricks.Must(databricks.NewWorkspaceClient(&databricks.Config{
      Host:  "https://...",
      Token: "...",
    }))
    

請參閱 Databricks SDK for Go README 中的驗證

範例

下列程式代碼範例示範如何使用 Databricks SDK for Go 來建立和刪除叢集、執行作業,以及 list 帳戶使用者。 這些程式代碼範例會使用 Databricks SDK for Go 的預設 Azure Databricks 驗證 程式。

如需其他程式代碼範例,請參閱 GitHub 中 Databricks SDK for Go 存放庫中的 examples 資料夾。

建立叢集

此程式代碼範例會建立具有最新可用 Databricks Runtime 長期支援 (LTS) 版本的叢集,以及具有本機磁碟的最小可用叢集節點類型。 此叢集有一個背景工作角色,且叢集會在閒置時間 15 分鐘後將自動終止。 方法 CreateAndWait 呼叫會導致程式代碼暫停,直到新叢集在工作區中執行為止。

package main

import (
  "context"
  "fmt"

  "github.com/databricks/databricks-sdk-go"
  "github.com/databricks/databricks-sdk-go/service/compute"
)

func main() {
  const clusterName            = "my-cluster"
  const autoTerminationMinutes = 15
  const numWorkers             = 1

  w   := databricks.Must(databricks.NewWorkspaceClient())
  ctx := context.Background()

  // Get the full list of available Spark versions to choose from.
  sparkVersions, err := w.Clusters.SparkVersions(ctx)

  if err != nil {
    panic(err)
  }

  // Choose the latest Long Term Support (LTS) version.
  latestLTS, err := sparkVersions.Select(compute.SparkVersionRequest{
    Latest:          true,
    LongTermSupport: true,
  })

  if err != nil {
    panic(err)
  }

  // Get the list of available cluster node types to choose from.
  nodeTypes, err := w.Clusters.ListNodeTypes(ctx)

  if err != nil {
    panic(err)
  }

  // Choose the smallest available cluster node type.
  smallestWithLocalDisk, err := nodeTypes.Smallest(clusters.NodeTypeRequest{
    LocalDisk: true,
  })

  if err != nil {
    panic(err)
  }

  fmt.Println("Now attempting to create the cluster, please wait...")

  runningCluster, err := w.Clusters.CreateAndWait(ctx, compute.CreateCluster{
    ClusterName:            clusterName,
    SparkVersion:           latestLTS,
    NodeTypeId:             smallestWithLocalDisk,
    AutoterminationMinutes: autoTerminationMinutes,
    NumWorkers:             numWorkers,
  })

  if err != nil {
    panic(err)
  }

  switch runningCluster.State {
  case compute.StateRunning:
    fmt.Printf("The cluster is now ready at %s#setting/clusters/%s/configuration\n",
      w.Config.Host,
      runningCluster.ClusterId,
    )
  default:
    fmt.Printf("Cluster is not running or failed to create. %s", runningCluster.StateMessage)
  }

  // Output:
  //
  // Now attempting to create the cluster, please wait...
  // The cluster is now ready at <workspace-host>#setting/clusters/<cluster-id>/configuration
}

永久刪除叢集

此程式碼範例會從工作區中永久刪除具有指定叢集 ID 的叢集。

package main

import (
  "context"

  "github.com/databricks/databricks-sdk-go"
  "github.com/databricks/databricks-sdk-go/service/clusters"
)

func main() {
  // Replace with your cluster's ID.
  const clusterId = "1234-567890-ab123cd4"

  w   := databricks.Must(databricks.NewWorkspaceClient())
  ctx := context.Background()

  err := w.Clusters.PermanentDelete(ctx, compute.PermanentDeleteCluster{
    ClusterId: clusterId,
  })

  if err != nil {
    panic(err)
  }
}

執行作業

此程式代碼範例會建立 Azure Databricks 作業,以在指定的叢集上執行指定的筆記本。 當此程式碼執行時,它會從終端機的使用者取得現有的筆記本路徑、現有的叢集標識碼和相關作業設定。 方法 RunNowAndWait 呼叫會導致程式代碼暫停,直到新作業在工作區中完成執行為止。

package main

import (
  "bufio"
  "context"
  "fmt"
  "os"
  "strings"

  "github.com/databricks/databricks-sdk-go"
  "github.com/databricks/databricks-sdk-go/service/jobs"
)

func main() {
  w   := databricks.Must(databricks.NewWorkspaceClient())
  ctx := context.Background()

  nt := jobs.NotebookTask{
    NotebookPath: askFor("Workspace path of the notebook to run:"),
  }

  jobToRun, err := w.Jobs.Create(ctx, jobs.CreateJob{
    Name: askFor("Some short name for the job:"),
    Tasks: []jobs.JobTaskSettings{
      {
        Description:       askFor("Some short description for the job:"),
        TaskKey:           askFor("Some key to apply to the job's tasks:"),
        ExistingClusterId: askFor("ID of the existing cluster in the workspace to run the job on:"),
        NotebookTask:      &nt,
      },
    },
  })

  if err != nil {
    panic(err)
  }

  fmt.Printf("Now attempting to run the job at %s/#job/%d, please wait...\n",
    w.Config.Host,
    jobToRun.JobId,
  )

  runningJob, err := w.Jobs.RunNow(ctx, jobs.RunNow{
    JobId: jobToRun.JobId,
  })

  if err != nil {
    panic(err)
  }

  jobRun, err := runningJob.Get()

  if err != nil {
    panic(err)
  }

  fmt.Printf("View the job run results at %s/#job/%d/run/%d\n",
    w.Config.Host,
    jobRun.JobId,
    jobRun.RunId,
  )

  // Output:
  //
  // Now attempting to run the job at <workspace-host>/#job/<job-id>, please wait...
  // View the job run results at <workspace-host>/#job/<job-id>/run/<run-id>
}

// Get job settings from the user.
func askFor(prompt string) string {
  var s string
  r := bufio.NewReader(os.Stdin)
  for {
    fmt.Fprint(os.Stdout, prompt+" ")
    s, _ = r.ReadString('\n')
    if s != "" {
      break
    }
  }
  return strings.TrimSpace(s)
}

管理 Unity Catalogvolumes 中的檔案

此程式代碼範例示範 WorkspaceClient 內各種 files 功能的呼叫,以存取 Unity Catalog磁碟區

package main

import (
  "context"
  "io"
  "os"

  "github.com/databricks/databricks-sdk-go"
  "github.com/databricks/databricks-sdk-go/service/files"
)

func main() {
  w := databricks.Must(databricks.NewWorkspaceClient())

  catalog          := "main"
  schema           := "default"
  volume           := "my-volume"
  volumePath       := "/Volumes/" + catalog + "/" + schema + "/" + volume // /Volumes/main/default/my-volume
  volumeFolder     := "my-folder"
  volumeFolderPath := volumePath + "/" + volumeFolder // /Volumes/main/default/my-volume/my-folder
  volumeFile       := "data.csv"
  volumeFilePath   := volumeFolderPath + "/" + volumeFile // /Volumes/main/default/my-volume/my-folder/data.csv
  uploadFilePath   := "./data.csv"

  // Create an empty folder in a volume.
  err := w.Files.CreateDirectory(
    context.Background(),
    files.CreateDirectoryRequest{DirectoryPath: volumeFolderPath},
  )
  if err != nil {
    panic(err)
  }

  // Upload a file to a volume.
  fileUpload, err := os.Open(uploadFilePath)
  if err != nil {
    panic(err)
  }
  defer fileUpload.Close()

  w.Files.Upload(
    context.Background(),
    files.UploadRequest{
      Contents:  fileUpload,
      FilePath:  volumeFilePath,
      Overwrite: true,
    },
  )

  // List the contents of a volume.
  items := w.Files.ListDirectoryContents(
    context.Background(),
    files.ListDirectoryContentsRequest{DirectoryPath: volumePath},
  )

  for {
    if items.HasNext(context.Background()) {
      item, err := items.Next(context.Background())
      if err != nil {
        break
      }
      println(item.Path)

    } else {
      break
    }
  }

  // List the contents of a folder in a volume.
  itemsFolder := w.Files.ListDirectoryContents(
    context.Background(),
    files.ListDirectoryContentsRequest{DirectoryPath: volumeFolderPath},
  )

  for {
    if itemsFolder.HasNext(context.Background()) {
      item, err := itemsFolder.Next(context.Background())
      if err != nil {
        break
      }
      println(item.Path)
    } else {
      break
    }
  }

  // Print the contents of a file in a volume.
  file, err := w.Files.DownloadByFilePath(
    context.Background(),
    volumeFilePath,
  )
  if err != nil {
    panic(err)
  }

  bufDownload := make([]byte, file.ContentLength)

  for {
    file, err := file.Contents.Read(bufDownload)
    if err != nil && err != io.EOF {
      panic(err)
    }
    if file == 0 {
      break
    }

    println(string(bufDownload[:file]))
  }

  // Delete a file from a volume.
  w.Files.DeleteByFilePath(
    context.Background(),
    volumeFilePath,
  )

  // Delete a folder from a volume.
  w.Files.DeleteDirectory(
    context.Background(),
    files.DeleteDirectoryRequest{
      DirectoryPath: volumeFolderPath,
    },
  )
}

List 帳戶使用者

此程式代碼範例會列出 Azure Databricks 帳戶內可用的使用者。

package main

import (
  "context"

  "github.com/databricks/databricks-sdk-go"
  "github.com/databricks/databricks-sdk-go/service/iam"
)

func main() {
  a := databricks.Must(databricks.NewAccountClient())
  all, err := a.Users.ListAll(context.Background(), iam.ListAccountUsersRequest{})
  if err != nil {
    panic(err)
  }
  for _, u := range all {
    println(u.UserName)
  }
}

其他資源

如需詳細資訊,請參閱