Compartir a través de


Azure OpenAI para macrodatos

El servicio Azure OpenAI se puede usar para resolver un gran número de tareas de lenguaje natural mediante la solicitud de la API de finalización. Para facilitar el escalado de los flujos de trabajo de solicitud de algunos ejemplos a grandes conjuntos de datos de ejemplos, hemos integrado el servicio Azure OpenAI con la biblioteca de aprendizaje automático distribuida SynapseML. Esta integración facilita el uso del marco de computación distribuida de Apache Spark para procesar millones de mensajes con el servicio OpenAI. Este tutorial muestra cómo aplicar grandes modelos de lenguaje a escala distribuida a través de Azure OpenAI y Azure Synapse Analytics.

Requisitos previos

Los requisitos previos clave para este inicio rápido incluyen un recurso de Azure OpenAI en funcionamiento y un clúster de Apache Spark con SynapseML instalado.

Importación de esta guía como cuaderno

El siguiente paso consiste en agregar este código al clúster de Spark. Puede crear un cuaderno en la plataforma Spark y copiar el código en este cuaderno para ejecutar la demostración. O bien, descargue el cuaderno e impórtelo en Synapse Analytics

  1. Descargar esta demostración como un cuaderno (seleccione Rawy guarde el archivo)
  2. Importe el cuaderno en el área de trabajo de Synapse o, si usa Fabric, impórtelo en el área de trabajo de Fabric
  3. Instale SynapseML en el clúster. Consulte las instrucciones de instalación de Synapse en la parte inferior del sitio web de SynapseML. Si usa Fabric, consulte la guía de instalación. Esto requiere pegar una celda adicional en la parte superior del cuaderno que importó.
  4. Conecte el cuaderno a un clúster y siga los pasos, editando y ejecutando las celdas.

Rellene la información del servicio

A continuación, edite la celda del cuaderno para que apunte al servicio. En particular, configure las variables service_name, deployment_name, location y key para que coincidan con su servicio OpenAI:

import os
from pyspark.sql import SparkSession
from synapse.ml.core.platform import running_on_synapse, find_secret

# Bootstrap Spark Session
spark = SparkSession.builder.getOrCreate()

if running_on_synapse():
    from notebookutils.visualization import display

# Fill in the following lines with your service information
# Learn more about selecting which embedding model to choose: https://openai.com/blog/new-and-improved-embedding-model
service_name = "synapseml-openai"
deployment_name = "gpt-35-turbo"
deployment_name_embeddings = "text-embedding-ada-002"

key = find_secret(
    "openai-api-key"
)  # please replace this line with your key as a string

assert key is not None and service_name is not None

Creación de un conjunto de datos de mensajes

A continuación, cree un dataframe que consta de una serie de filas, con un mensaje por fila.

También puede cargar datos directamente desde ADLS u otras bases de datos. Para más información sobre cómo cargar y preparar dataframes de Spark, consulte la guía de carga de datos de Apache Spark.

df = spark.createDataFrame(
    [
        ("Hello my name is",),
        ("The best code is code thats",),
        ("SynapseML is ",),
    ]
).toDF("prompt")

Creación del cliente de Apache Spark de OpenAICompletion

Para aplicar el servicio de finalización de OpenAI al dataframe que ha creado, cree un objeto OpenAICompletion, que actúe como cliente distribuido. Los parámetros del servicio se pueden establecer con un valor único o mediante una columna del dataframe con los establecedores adecuados en el objeto OpenAICompletion. Aquí estamos estableciendo maxTokens en 200. Un token tiene alrededor de cuatro caracteres y este límite se aplica a la suma del mensaje y el resultado. También estamos estableciendo el parámetro promptCol con el nombre de la columna Prompt en el dataframe.

from synapse.ml.cognitive import OpenAICompletion

completion = (
    OpenAICompletion()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name)
    .setCustomServiceName(service_name)
    .setMaxTokens(200)
    .setPromptCol("prompt")
    .setErrorCol("error")
    .setOutputCol("completions")
)

Transformación del dataframe con el Cliente de OpenAICompletion

Después de finalizar el dataframe y el cliente de finalización, puede transformar el conjunto de datos de entrada y agregar una columna denominada completions con toda la información que agrega el servicio. Seleccione solo el texto para simplificar.

from pyspark.sql.functions import col

completed_df = completion.transform(df).cache()
display(
    completed_df.select(
        col("prompt"),
        col("error"),
        col("completions.choices.text").getItem(0).alias("text"),
    )
)

La salida debe tener un aspecto similar al siguiente. El texto de finalización será diferente de la muestra.

prompt error text
Hola mi nombre es null Makaveli, tengo dieciocho años y quiero ser rapero cuando sea mayor, me encanta escribir y hacer música, soy de Los Angeles, CA
El mejor código es el código que es null comprensible Esto es una declaración subjetiva, y no hay respuestas correctas.
SynapseML es null un algoritmo de aprendizaje automático que puede aprender a predecir el resultado futuro de los eventos.

Más ejemplos de utilización.

Generar inserciones de texto

Además de completar el texto, también podemos insertar texto para usarlo en algoritmos de bajada o arquitecturas de recuperación de vectores. La creación de inserciones permite buscar y recuperar documentos de grandes colecciones y puede utilizarse cuando la ingeniería de mensajes no es suficiente para la tarea. Para obtener más información sobre el uso de OpenAIEmbedding, consulte nuestra guía de inserción.

from synapse.ml.cognitive import OpenAIEmbedding

embedding = (
    OpenAIEmbedding()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name_embeddings)
    .setCustomServiceName(service_name)
    .setTextCol("prompt")
    .setErrorCol("error")
    .setOutputCol("embeddings")
)

display(embedding.transform(df))

Finalización del chat

Los modelos como ChatGPT y GPT-4 son capaces de entender chats en lugar de mensajes sueltos. El transformador OpenAIChatCompletion expone esta funcionalidad a gran escala.

from synapse.ml.cognitive import OpenAIChatCompletion
from pyspark.sql import Row
from pyspark.sql.types import *


def make_message(role, content):
    return Row(role=role, content=content, name=role)


chat_df = spark.createDataFrame(
    [
        (
            [
                make_message(
                    "system", "You are an AI chatbot with red as your favorite color"
                ),
                make_message("user", "Whats your favorite color"),
            ],
        ),
        (
            [
                make_message("system", "You are very excited"),
                make_message("user", "How are you today"),
            ],
        ),
    ]
).toDF("messages")


chat_completion = (
    OpenAIChatCompletion()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name)
    .setCustomServiceName(service_name)
    .setMessagesCol("messages")
    .setErrorCol("error")
    .setOutputCol("chat_completions")
)

display(
    chat_completion.transform(chat_df).select(
        "messages", "chat_completions.choices.message.content"
    )
)

Mejora del rendimiento con procesamiento por lotes de solicitudes

El ejemplo realiza varias solicitudes al servicio, una para cada mensaje. Para completar varios mensajes en una sola solicitud, use el modo por lotes. En primer lugar, en el objeto OpenAICompletion, en lugar de establecer la columna Prompt en "Prompt", especifique "batchPrompt" para la columna BatchPrompt. Para ello, cree un dataframe con una lista de mensajes por fila.

En el momento de redactar este documento, hay un límite de 20 mensajes en una sola solicitud y un límite estricto de 2048 "tokens", o aproximadamente 1500 palabras.

batch_df = spark.createDataFrame(
    [
        (["The time has come", "Pleased to", "Today stocks", "Here's to"],),
        (["The only thing", "Ask not what", "Every litter", "I am"],),
    ]
).toDF("batchPrompt")

A continuación, creamos el objeto OpenAICompletion. En lugar de establecer la columna Prompt, establezca la columna batchPrompt si la columna es de tipo Array[String].

batch_completion = (
    OpenAICompletion()
    .setSubscriptionKey(key)
    .setDeploymentName(deployment_name)
    .setCustomServiceName(service_name)
    .setMaxTokens(200)
    .setBatchPromptCol("batchPrompt")
    .setErrorCol("error")
    .setOutputCol("completions")
)

En la llamada a la transformación, se realizará una solicitud por fila. Dado que hay varios mensajes en una sola fila, cada solicitud se envía con todos los mensajes de esa fila. Los resultados contienen una fila para cada fila de la solicitud.

completed_batch_df = batch_completion.transform(batch_df).cache()
display(completed_batch_df)

Uso de un miniprocesador automático

Si los datos están en formato de columna, puede transponerlos al formato de fila mediante FixedMiniBatcherTransformer de SynapseML.

from pyspark.sql.types import StringType
from synapse.ml.stages import FixedMiniBatchTransformer
from synapse.ml.core.spark import FluentAPI

completed_autobatch_df = (
    df.coalesce(
        1
    )  # Force a single partition so that our little 4-row dataframe makes a batch of size 4, you can remove this step for large datasets
    .mlTransform(FixedMiniBatchTransformer(batchSize=4))
    .withColumnRenamed("prompt", "batchPrompt")
    .mlTransform(batch_completion)
)

display(completed_autobatch_df)

Ingeniería de solicitud de traducción

El servicio Azure OpenAI puede resolver muchas tareas de lenguaje natural diferentes a través de la ingeniería de solicitud. Aquí se muestra un ejemplo de solicitud de traducción de idioma:

translate_df = spark.createDataFrame(
    [
        ("Japanese: Ookina hako \nEnglish: Big box \nJapanese: Midori tako\nEnglish:",),
        (
            "French: Quel heure et il au Montreal? \nEnglish: What time is it in Montreal? \nFrench: Ou est le poulet? \nEnglish:",
        ),
    ]
).toDF("prompt")

display(completion.transform(translate_df))

Solicitud de respuesta a preguntas

En este caso, se solicita a GPT-3 respuesta a preguntas de conocimientos generales:

qa_df = spark.createDataFrame(
    [
        (
            "Q: Where is the Grand Canyon?\nA: The Grand Canyon is in Arizona.\n\nQ: What is the weight of the Burj Khalifa in kilograms?\nA:",
        )
    ]
).toDF("prompt")

display(completion.transform(qa_df))