共用方式為


適用於巨量資料的 Azure OpenAI

Azure OpenAI 服務可用於透過提示完成 API,來處理大量的自然語言工作。 為了方便您將提示工作流程從數個範例調整為大型範例資料集,我們整合了 Azure OpenAI 服務與分散式機器學習程式庫 SynapseML。 透過這項整合,您將可輕鬆使用 Apache Spark 分散式運算架構來處理 OpenAI 服務的數百萬個提示。 本教學課程說明如何使用 Azure OpenAI 和 Azure Synapse Analytics,以分散式規模套用大型語言模型。

必要條件

本快速入門的主要必要條件包括運作中的 Azure OpenAI 資源,以及已安裝 SynapseML 的 Apache Spark 叢集。

匯入此指南作為筆記本

下一個步驟是將此程式碼新增至您的 Spark 叢集。 您可以在 Spark 平台中建立筆記本,然後將程式碼複製到此筆記本中,以執行示範。 或下載筆記本,並將其匯入 Synapse Analytics 中

  1. 下載此示範作為筆記本 (選取 [原始],然後儲存檔案)
  2. 將筆記本匯入 Synapse 工作區中,或者,如果使用 Fabric,請匯入 Fabric 工作區中
  3. 在您的叢集上安裝 SynapseML。 請參閱 SynapseML 網站底部的 Synapse 安裝指示。 如果您使用 Fabric,請參閱安裝指南。 這就需要在您匯入的筆記本頂端貼上額外的資料格。
  4. 將您的筆記本連線至叢集,然後繼續編輯和執行資料格。

填入服務資訊

接著,編輯筆記本中的儲存格以指向您的服務。 特別是設定 service_namedeployment_namelocationkey 變數,使其符合您的 OpenAI 服務:

import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import running_on_synapse, find_secret

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

if running_on_synapse():
    from notebookutils.visualization import display

# Fill in the following lines with your service information
# Learn more about selecting which embedding model to choose: https://openai.com/blog/new-and-improved-embedding-model
service_name = "synapseml-openai"
deployment_name = "gpt-35-turbo"
deployment_name_embeddings = "text-embedding-ada-002"

key = find_secret(
    "openai-api-key"
)  # please replace this line with your key as a string

assert key is not None and service_name is not None

建立提示的資料集

接下來,建立一個資料框架,內含一系列的資料列,每個資料列各有一個提示。

您也可以直接從 ADLS 或其他資料庫載入資料。 如需有關載入和準備 Spark DataFrame 的詳細資訊,請參閱 Apache Spark 資料載入指南

df = spark.createDataFrame(
    [
        ("Hello my name is",),
        ("The best code is code thats",),
        ("SynapseML is ",),
    ]
).toDF("prompt")

建立 OpenAICompletion Apache Spark 用戶端

若要將 OpenAI Completion 服務套用至您剛建立的 DataFrame,請建立作為分散式用戶端的 OpenAICompletion 物件。 服務的參數可用單一值來設定,或透過在 OpenAICompletion 物件上具有適當 setter 的資料框架資料行來設定。 在此,我們將 maxTokens 設定為 200。 權杖大約是四個字元,而此限制適用於提示和結果的總和。 我們也會使用資料框架中的提示資料行名稱來設定 promptCol 參數。

from synapse.ml.cognitive import OpenAICompletion

completion = (
    OpenAICompletion()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name)
    .setCustomServiceName(service_name)
    .setMaxTokens(200)
    .setPromptCol("prompt")
    .setErrorCol("error")
    .setOutputCol("completions")
)

使用 OpenAICompletion 用戶端轉換 DataFrame

完成 DataFrame 和完成用戶端後,您可以轉換輸入資料集,並新增名為 completions 的資料行 (內含服務新增的所有資訊)。 為求簡單,僅選取文字。

from pyspark.sql.functions import col

completed_df = completion.transform(df).cache()
display(
    completed_df.select(
        col("prompt"),
        col("error"),
        col("completions.choices.text").getItem(0).alias("text"),
    )
)

您的輸出看起來應該像這樣。 完成文字與範例不同。

prompt error text
您好,我的名字是 null Makaveli。我十八歲了,長大後想成為饒舌歌手。我喜歡寫作和製作音樂。我來自加州洛杉磯
最理想的程式碼是 null 可理解的程式碼。這是一個主觀陳述,沒有明確的答案。
SynapseML 是 null 一種機器學習演算法,能夠學習如何預測事件未來的結果。

更多使用方式範例

產生文字內嵌

除了完成文字之外,我們也可以內嵌文字以用於下游演算法或向量擷取結構。 建立內嵌可讓您從大型集合搜尋並擷取文件,而且可以在提示工程不足以處理工作時使用。 如需有關使用 OpenAIEmbedding 的詳細資訊,請參閱我們的內嵌指南

from synapse.ml.cognitive import OpenAIEmbedding

embedding = (
    OpenAIEmbedding()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name_embeddings)
    .setCustomServiceName(service_name)
    .setTextCol("prompt")
    .setErrorCol("error")
    .setOutputCol("embeddings")
)

display(embedding.transform(df))

聊天完成

ChatGPT 和 GPT-4 等模型能夠了解聊天,而不是單一提示。 OpenAIChatCompletion 轉換器會大規模公開此功能。

from synapse.ml.cognitive import OpenAIChatCompletion
from pyspark.sql import Row
from pyspark.sql.types import *


def make_message(role, content):
    return Row(role=role, content=content, name=role)


chat_df = spark.createDataFrame(
    [
        (
            [
                make_message(
                    "system", "You are an AI chatbot with red as your favorite color"
                ),
                make_message("user", "Whats your favorite color"),
            ],
        ),
        (
            [
                make_message("system", "You are very excited"),
                make_message("user", "How are you today"),
            ],
        ),
    ]
).toDF("messages")


chat_completion = (
    OpenAIChatCompletion()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name)
    .setCustomServiceName(service_name)
    .setMessagesCol("messages")
    .setErrorCol("error")
    .setOutputCol("chat_completions")
)

display(
    chat_completion.transform(chat_df).select(
        "messages", "chat_completions.choices.message.content"
    )
)

透過要求批次來改善輸送量

該範例會對服務提出數個要求,每個提示各一個。 若要在單一要求中完成多個提示,請使用批次模式。 首先,在 OpenAICompletion 物件中,不要將 [提示] 資料行設定為 "Prompt",而應為 BatchPrompt 資料行指定 "batchPrompt"。 為此,請建立一個資料框架,內含每個資料列的提示清單。

截至目前為止,單一要求有 20 個提示的限制,且限制 2048 個「權杖」(或約 1500 個字)。

batch_df = spark.createDataFrame(
    [
        (["The time has come", "Pleased to", "Today stocks", "Here's to"],),
        (["The only thing", "Ask not what", "Every litter", "I am"],),
    ]
).toDF("batchPrompt")

接著,我們會建立 OpenAICompletion 物件。 如果您的資料行屬於 Array[String] 類型,請不要設定提示資料行,而應設定 batchPrompt 資料行。

batch_completion = (
    OpenAICompletion()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name)
    .setCustomServiceName(service_name)
    .setMaxTokens(200)
    .setBatchPromptCol("batchPrompt")
    .setErrorCol("error")
    .setOutputCol("completions")
)

在呼叫轉換時,會對每個資料列提出要求。 由於單一資料列中有多個提示,因此每個要求在傳送時都會包含該資料列中的所有提示。 在結果中,要求中的每個資料列都有一個資料列。

completed_batch_df = batch_completion.transform(batch_df).cache()
display(completed_batch_df)

使用自動的小型批次處理器

如果您的資料採用資料行格式,您可以使用 SynapseML 的 FixedMiniBatcherTransformer 將其轉置為資料列格式。

from pyspark.sql.types import StringType
from synapse.ml.stages import FixedMiniBatchTransformer
from synapse.ml.core.spark import FluentAPI

completed_autobatch_df = (
    df.coalesce(
        1
    )  # Force a single partition so that our little 4-row dataframe makes a batch of size 4, you can remove this step for large datasets
    .mlTransform(FixedMiniBatchTransformer(batchSize=4))
    .withColumnRenamed("prompt", "batchPrompt")
    .mlTransform(batch_completion)
)

display(completed_autobatch_df)

翻譯的提示工程

Azure OpenAI 服務 可透過提示工程來處理許多不同的自然語言工作。 以下顯示語言翻譯提示的範例:

translate_df = spark.createDataFrame(
    [
        ("Japanese: Ookina hako \nEnglish: Big box \nJapanese: Midori tako\nEnglish:",),
        (
            "French: Quel heure et il au Montreal? \nEnglish: What time is it in Montreal? \nFrench: Ou est le poulet? \nEnglish:",
        ),
    ]
).toDF("prompt")

display(completion.transform(translate_df))

問題解答的提示

在此,我們會提示 GPT-3 回答一般知識的問題:

qa_df = spark.createDataFrame(
    [
        (
            "Q: Where is the Grand Canyon?\nA: The Grand Canyon is in Arizona.\n\nQ: What is the weight of the Burj Khalifa in kilograms?\nA:",
        )
    ]
).toDF("prompt")

display(completion.transform(qa_df))