Condividi tramite


Associazione di output Apache Kafka per Funzioni di Azure

L'associazione di output consente a un'app Funzioni di Azure di scrivere messaggi in un argomento Kafka.

Importante

Le associazioni Kafka sono disponibili solo per le funzioni nel piano Elastic Premium e dedicato (servizio app). Sono supportate solo nella versione 3.x e successiva del runtime di Funzioni.

Esempio

L'utilizzo dell'associazione dipende dalla modalità C# usata nell'app per le funzioni, che può essere una delle seguenti:

Una libreria di classi di processo di lavoro isolata compilata C# viene eseguita in un processo isolato dal runtime.

Gli attributi usati dipendono dal provider di eventi specifico.

Nell'esempio seguente è presente un tipo MultipleOutputTyperestituito personalizzato , costituito da una risposta HTTP e da un output Kafka.

[Function("KafkaOutput")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();

    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = message,
        HttpResponse = response
    };
}

Nella classe MultipleOutputTypeè Kevent la variabile di associazione di output per l'associazione Kafka.

public class MultipleOutputType
{
    [KafkaOutput("BrokerList",
                "topic",
                Username = "ConfluentCloudUserName",
                Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string Kevent { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

Per inviare un batch di eventi, passare una matrice di stringhe al tipo di output, come illustrato nell'esempio seguente:

[Function("KafkaOutputMany")]

public static MultipleOutputTypeForBatch Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");
    var response = req.CreateResponse(HttpStatusCode.OK);

    string[] messages = new string[2];
    messages[0] = "one";
    messages[1] = "two";

    return new MultipleOutputTypeForBatch()
    {
        Kevents = messages,
        HttpResponse = response
    };
}

La matrice di stringhe è definita come Kevents proprietà nella classe in cui è definita l'associazione di output:

public class MultipleOutputTypeForBatch
{
    [KafkaOutput("BrokerList",
                 "topic",
                 Username = "ConfluentCloudUserName",
                 Password = "ConfluentCloudPassword",
        Protocol = BrokerProtocol.SaslSsl,
        AuthenticationMode = BrokerAuthenticationMode.Plain
    )]        
    public string[] Kevents { get; set; }

    public HttpResponseData HttpResponse { get; set; }
}

La funzione seguente aggiunge intestazioni ai dati di output Kafka:

[Function("KafkaOutputWithHeaders")]

public static MultipleOutputType Output(
    [HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
    FunctionContext executionContext)
{
    var log = executionContext.GetLogger("HttpFunction");
    log.LogInformation("C# HTTP trigger function processed a request.");

    string message = req.FunctionContext
                        .BindingContext
                        .BindingData["message"]
                        .ToString();
    string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
    var response = req.CreateResponse(HttpStatusCode.OK);
    return new MultipleOutputType()
    {
        Kevent = kevent,
        HttpResponse = response
    };
}

Per un set completo di esempi .NET funzionanti, vedere il repository di estensioni Kafka.

Nota

Per un set equivalente di esempi typeScript, vedere il repository di estensioni Kafka

Le proprietà specifiche del file function.json dipendono dal provider di eventi, che in questi esempi sono Confluent o Hub eventi di Azure. Gli esempi seguenti illustrano un'associazione di output Kafka per una funzione attivata da una richiesta HTTP e invia i dati dalla richiesta all'argomento Kafka.

L'function.json seguente definisce il trigger per il provider specifico in questi esempi:

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputKafkaMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "ConfluentCloudUsername",
      "password": "ConfluentCloudPassword",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "res"
    }
  ]
}

Il codice seguente invia quindi un messaggio all'argomento:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = (req.query.message);
    context.bindings.outputKafkaMessage = message;
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: 'Ok'
    };
}

Il codice seguente invia più messaggi come matrice allo stesso argomento:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');
    
    context.bindings.outputKafkaMessages = ["one", "two"];
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: responseMessage
    };
}

Nell'esempio seguente viene illustrato come inviare un messaggio di evento con intestazioni allo stesso argomento Kafka:

// This sample will create topic "topic" and send message to it. 
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
    context.log('JavaScript HTTP trigger function processed a request.');

    const message = (req.query.message || (req.body && req.body.message));
    const responseMessage = message
        ? "Message received: " + message + ". The message transfered to the kafka broker."
        : "This HTTP triggered function executed successfully. Pass a message in the query string or in the request body for a personalized response.";
    context.bindings.outputKafkaMessage = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"javascript\" }] }"
    context.res = {
        // status: 200, /* Defaults to 200 */
        body: responseMessage
    };
}

Per un set completo di esempi JavaScript funzionanti, vedere il repository di estensioni Kafka.

Le proprietà specifiche del file function.json dipendono dal provider di eventi, che in questi esempi sono Confluent o Hub eventi di Azure. Gli esempi seguenti illustrano un'associazione di output Kafka per una funzione attivata da una richiesta HTTP e invia i dati dalla richiesta all'argomento Kafka.

L'function.json seguente definisce il trigger per il provider specifico in questi esempi:

{
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "Request",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username" : "%ConfluentCloudUserName%",
      "password" : "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN",
      "direction": "out"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "Response"
    }
  ]
}

Il codice seguente invia quindi un messaggio all'argomento:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message

$message

Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

Il codice seguente invia più messaggi come matrice allo stesso argomento:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

$message = @("one", "two")
Push-OutputBinding -Name outputMessage -Value ($message)

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
})

Nell'esempio seguente viene illustrato come inviare un messaggio di evento con intestazioni allo stesso argomento Kafka:

using namespace System.Net

# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)

# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."

# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
if (-not $message) {
    $message = $Request.Body.Message
}

$kevent = @{
    Offset = 364
    Partition = 0
    Topic = "kafkaeventhubtest1"
    Timestamp = "2022-04-09T03:20:06.591Z"
    Value = $message
    Headers= @(@{
        Key= "test"
        Value= "powershell"
    }
    )
}

Push-OutputBinding -Name Message -Value $kevent

# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
    StatusCode = [HttpStatusCode]::OK
    Body = 'ok'
})

Per un set completo di esempi di PowerShell funzionanti, vedere il repository di estensioni Kafka.

Le proprietà specifiche del file function.json dipendono dal provider di eventi, che in questi esempi sono Confluent o Hub eventi di Azure. Gli esempi seguenti illustrano un'associazione di output Kafka per una funzione attivata da una richiesta HTTP e invia i dati dalla richiesta all'argomento Kafka.

L'function.json seguente definisce il trigger per il provider specifico in questi esempi:

{
  "scriptFile": "main.py",
  "bindings": [
    {
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get"
      ]
    },
    {
      "type": "kafka",
      "direction": "out",
      "name": "outputMessage",
      "brokerList": "BrokerList",
      "topic": "topic",
      "username": "%ConfluentCloudUserName%",
      "password": "%ConfluentCloudPassword%",
      "protocol": "SASLSSL",
      "authenticationMode": "PLAIN"
    },
    {
      "type": "http",
      "direction": "out",
      "name": "$return"
    }
  ]
}

Il codice seguente invia quindi un messaggio all'argomento:

import logging

import azure.functions as func


def main(req: func.HttpRequest, outputMessage: func.Out[str]) -> func.HttpResponse:
    input_msg = req.params.get('message')
    outputMessage.set(input_msg)
    return 'OK'

Il codice seguente invia più messaggi come matrice allo stesso argomento:

import logging
import typing
from azure.functions import Out, HttpRequest, HttpResponse
import json

def main(req: HttpRequest, outputMessage: Out[str] ) -> HttpResponse:
    outputMessage.set(['one', 'two'])
    return 'OK'

Nell'esempio seguente viene illustrato come inviare un messaggio di evento con intestazioni allo stesso argomento Kafka:

import logging

import azure.functions as func
import json

def main(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
    message = req.params.get('message')
    kevent =  { "Offset":364,"Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
    out.set(json.dumps(kevent))
    return 'OK'

Per un set completo di esempi di Python funzionanti, vedere il repository di estensioni Kafka.

Le annotazioni usate per configurare l'associazione di output dipendono dal provider di eventi specifico.

La funzione seguente invia un messaggio all'argomento Kafka.

@FunctionName("KafkaOutput")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");

    // Parse query parameter
    String query = request.getQueryParameters().get("message");
    String message = request.getBody().orElse(query);
    context.getLogger().info("Message:" + message);
    output.setValue(message);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();

Nell'esempio seguente viene illustrato come inviare più messaggi a un argomento Kafka.

@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<String[]> output,
        final ExecutionContext context) {
    context.getLogger().info("Java HTTP trigger processed a request.");
    String[] messages = new String[2];
    messages[0] = "one";
    messages[1] = "two";
    output.setValue(messages);
    return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}

In questo esempio il parametro di associazione di output viene modificato in matrice di stringhe.

L'ultimo esempio usa per queste KafkaEntity classi e KafkaHeader :

public class KafkaEntity {
    public int Offset;
    public int Partition;
    public String Timestamp;
    public String Topic;
    public String Value;
    public KafkaHeaders Headers[];

    public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
        this.Offset = Offset;
        this.Partition = Partition;
        this.Topic = Topic;
        this.Timestamp = Timestamp;
        this.Value = Value;
        this.Headers = headers;
    }
public class KafkaHeaders{
    public String Key;
    public String Value;

    public KafkaHeaders(String key, String value) {
        this.Key = key;
        this.Value = value;
    }

La funzione di esempio seguente invia un messaggio con intestazioni a un argomento Kafka.

@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
        @HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
        @KafkaOutput(
            name = "kafkaOutput",
            topic = "topic",  
            brokerList="%BrokerList%",
            username = "%ConfluentCloudUsername%", 
            password = "ConfluentCloudPassword",
            authenticationMode = BrokerAuthenticationMode.PLAIN,
            // sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.  
            protocol = BrokerProtocol.SASLSSL
        )  OutputBinding<KafkaEntity> output,
        final ExecutionContext context) {
            context.getLogger().info("Java HTTP trigger processed a request.");
    
            // Parse query parameter
            String query = request.getQueryParameters().get("message");
            String message = request.getBody().orElse(query);
            KafkaHeaders[] headers = new KafkaHeaders[1];
            headers[0] = new KafkaHeaders("test", "java");
            KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
            output.setValue(kevent);
            return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
        }

Per un set completo di esempi Java funzionanti per Confluent, vedere il repository di estensioni Kafka.

Attributi

Sia le librerie C# in-process che il processo di lavoro isolato usano l'attributo Kafka per definire il trigger di funzione.

La tabella seguente illustra le proprietà che è possibile impostare usando questo attributo:

Parametro Descrizione
BrokerList (Obbligatorio) Elenco di broker Kafka a cui viene inviato l'output. Per altre informazioni, vedere Connessioni .
Argomento (Obbligatorio) Argomento a cui viene inviato l'output.
AvroSchema (Facoltativo) Schema di un record generico quando si usa il protocollo Avro.
MaxMessageBytes (Facoltativo) Dimensione massima del messaggio di output inviato (in MB), con il valore predefinito .1
BatchSize (Facoltativo) Numero massimo di messaggi in batch in un singolo set di messaggi, con un valore predefinito .10000
EnableIdempotence (Facoltativo) Se impostato su true, garantisce che i messaggi vengano prodotti correttamente una sola volta e nell'ordine di produzione originale, con un valore predefinito di false
MessageTimeoutMs (Facoltativo) Timeout del messaggio locale, espresso in millisecondi. Questo valore viene applicato solo localmente e limita il tempo in cui un messaggio generato attende il recapito corretto, con un valore predefinito 300000. Un tempo di 0 è infinito. Questo valore è il tempo massimo usato per recapitare un messaggio (inclusi i tentativi). L'errore di recapito si verifica quando viene superato il numero di tentativi o il timeout del messaggio.
RequestTimeoutMs (Facoltativo) Timeout di riconoscimento della richiesta di output, in millisecondi, con un valore predefinito .5000
MaxRetries (Facoltativo) Numero di tentativi di invio di un messaggio non riuscito, con un valore predefinito .2 La ripetizione dei tentativi può causare il riordinamento, a meno che non EnableIdempotence sia impostata su true.
AuthenticationMode (Facoltativo) Modalità di autenticazione quando si usa l'autenticazione SASL (Simple Authentication and Security Layer). I valori supportati sono Gssapi, Plain (impostazione predefinita), ScramSha256, ScramSha512.
Nome utente (Facoltativo) Nome utente per l'autenticazione SASL. Non supportato quando AuthenticationMode è Gssapi. Per altre informazioni, vedere Connessioni .
Password (Facoltativo) Password per l'autenticazione SASL. Non supportato quando AuthenticationMode è Gssapi. Per altre informazioni, vedere Connessioni .
Protocollo (Facoltativo) Protocollo di sicurezza usato per la comunicazione con i broker. I valori supportati sono plaintext (impostazione predefinita), ssl, , sasl_plaintextsasl_ssl.
SslCaLocation (Facoltativo) Percorso del file di certificato DELLA CA per la verifica del certificato del broker.
SslCertificateLocation (Facoltativo) Percorso del certificato del client.
SslKeyLocation (Facoltativo) Percorso della chiave privata del client (PEM) usato per l'autenticazione.
SslKeyPassword (Facoltativo) Password per il certificato del client.

Annotazioni

L'annotazione KafkaOutput consente di creare una funzione che scrive in un argomento specifico. Le opzioni supportate includono gli elementi seguenti:

Elemento Descrizione
name Nome della variabile che rappresenta i dati negoziati nel codice della funzione.
brokerList (Obbligatorio) Elenco di broker Kafka a cui viene inviato l'output. Per altre informazioni, vedere Connessioni .
topic (Obbligatorio) Argomento a cui viene inviato l'output.
dataType Definisce il modo in cui Funzioni gestisce il valore del parametro. Per impostazione predefinita, il valore viene ottenuto come stringa e Functions tenta di deserializzare la stringa in un oggetto Java normale (POJO). Quando string, l'input viene considerato come una stringa. Quando binary, il messaggio viene ricevuto come dati binari e Funzioni tenta di deserializzarlo in un byte di tipo di parametro effettivo[].
avroSchema (Facoltativo) Schema di un record generico quando si usa il protocollo Avro. Attualmente non è supportato per Java.
maxMessageBytes (Facoltativo) Dimensione massima del messaggio di output inviato (in MB), con il valore predefinito .1
batchSize (Facoltativo) Numero massimo di messaggi in batch in un singolo set di messaggi, con un valore predefinito .10000
enableIdempotence (Facoltativo) Se impostato su true, garantisce che i messaggi vengano prodotti correttamente una sola volta e nell'ordine di produzione originale, con un valore predefinito di false
messageTimeoutMs (Facoltativo) Timeout del messaggio locale, espresso in millisecondi. Questo valore viene applicato solo localmente e limita il tempo in cui un messaggio generato attende il recapito corretto, con un valore predefinito 300000. Un tempo di 0 è infinito. Questo è il tempo massimo usato per recapitare un messaggio (inclusi i tentativi). L'errore di recapito si verifica quando viene superato il numero di tentativi o il timeout del messaggio.
requestTimeoutMs (Facoltativo) Timeout di riconoscimento della richiesta di output, in millisecondi, con un valore predefinito .5000
maxRetries (Facoltativo) Numero di tentativi di invio di un messaggio non riuscito, con un valore predefinito .2 La ripetizione dei tentativi può causare il riordinamento, a meno che non EnableIdempotence sia impostata su true.
authenticationMode (Facoltativo) Modalità di autenticazione quando si usa l'autenticazione SASL (Simple Authentication and Security Layer). I valori supportati sono Gssapi, Plain (impostazione predefinita), ScramSha256, ScramSha512.
username (Facoltativo) Nome utente per l'autenticazione SASL. Non supportato quando AuthenticationMode è Gssapi. Per altre informazioni, vedere Connessioni .
password (Facoltativo) Password per l'autenticazione SASL. Non supportato quando AuthenticationMode è Gssapi. Per altre informazioni, vedere Connessioni .
protocol (Facoltativo) Protocollo di sicurezza usato per la comunicazione con i broker. I valori supportati sono plaintext (impostazione predefinita), ssl, , sasl_plaintextsasl_ssl.
sslCaLocation (Facoltativo) Percorso del file di certificato DELLA CA per la verifica del certificato del broker.
sslCertificateLocation (Facoltativo) Percorso del certificato del client.
sslKeyLocation (Facoltativo) Percorso della chiave privata del client (PEM) usato per l'autenticazione.
sslKeyPassword (Facoltativo) Password per il certificato del client.

Impostazione

Nella tabella seguente sono illustrate le proprietà di configurazione dell'associazione impostate nel file function.json.

Proprietà di function.json Descrizione
type Deve essere impostato su kafka.
direction Deve essere impostato su out.
name Nome della variabile che rappresenta i dati negoziati nel codice della funzione.
brokerList (Obbligatorio) Elenco di broker Kafka a cui viene inviato l'output. Per altre informazioni, vedere Connessioni .
topic (Obbligatorio) Argomento a cui viene inviato l'output.
avroSchema (Facoltativo) Schema di un record generico quando si usa il protocollo Avro.
maxMessageBytes (Facoltativo) Dimensione massima del messaggio di output inviato (in MB), con il valore predefinito .1
batchSize (Facoltativo) Numero massimo di messaggi in batch in un singolo set di messaggi, con un valore predefinito .10000
enableIdempotence (Facoltativo) Se impostato su true, garantisce che i messaggi vengano prodotti correttamente una sola volta e nell'ordine di produzione originale, con un valore predefinito di false
messageTimeoutMs (Facoltativo) Timeout del messaggio locale, espresso in millisecondi. Questo valore viene applicato solo localmente e limita il tempo in cui un messaggio generato attende il recapito corretto, con un valore predefinito 300000. Un tempo di 0 è infinito. Questo è il tempo massimo usato per recapitare un messaggio (inclusi i tentativi). L'errore di recapito si verifica quando viene superato il numero di tentativi o il timeout del messaggio.
requestTimeoutMs (Facoltativo) Timeout di riconoscimento della richiesta di output, in millisecondi, con un valore predefinito .5000
maxRetries (Facoltativo) Numero di tentativi di invio di un messaggio non riuscito, con un valore predefinito .2 La ripetizione dei tentativi può causare il riordinamento, a meno che non EnableIdempotence sia impostata su true.
authenticationMode (Facoltativo) Modalità di autenticazione quando si usa l'autenticazione SASL (Simple Authentication and Security Layer). I valori supportati sono Gssapi, Plain (impostazione predefinita), ScramSha256, ScramSha512.
username (Facoltativo) Nome utente per l'autenticazione SASL. Non supportato quando AuthenticationMode è Gssapi. Per altre informazioni, vedere Connessioni .
password (Facoltativo) Password per l'autenticazione SASL. Non supportato quando AuthenticationMode è Gssapi. Per altre informazioni, vedere Connessioni .
protocol (Facoltativo) Protocollo di sicurezza usato per la comunicazione con i broker. I valori supportati sono plaintext (impostazione predefinita), ssl, , sasl_plaintextsasl_ssl.
sslCaLocation (Facoltativo) Percorso del file di certificato DELLA CA per la verifica del certificato del broker.
sslCertificateLocation (Facoltativo) Percorso del certificato del client.
sslKeyLocation (Facoltativo) Percorso della chiave privata del client (PEM) usato per l'autenticazione.
sslKeyPassword (Facoltativo) Password per il certificato del client.

Utilizzo

Sia le chiavi che i tipi di valori sono supportati con la serializzazione predefinita di Avro e Protobuf .

L'offset, la partizione e il timestamp per l'evento vengono generati in fase di esecuzione. Solo i valori e le intestazioni possono essere impostati all'interno della funzione. L'argomento viene impostato nel function.json.

Assicurarsi di avere accesso all'argomento Kafka a cui si sta tentando di scrivere. Configurare l'associazione con le credenziali di accesso e connessione all'argomento Kafka.

In un piano Premium è necessario abilitare il monitoraggio della scalabilità di runtime per consentire l'aumento del numero di istanze dell'output Kafka. Per altre informazioni, vedere Abilitare il ridimensionamento in fase di esecuzione.

Per un set completo di impostazioni di host.json supportate per il trigger Kafka, vedere host.json impostazioni.

Connessioni

Tutte le informazioni di connessione richieste dai trigger e dalle associazioni devono essere mantenute nelle impostazioni dell'applicazione e non nelle definizioni di associazione nel codice. Questo vale per le credenziali, che non devono mai essere archiviate nel codice.

Importante

Le impostazioni delle credenziali devono fare riferimento a un'impostazione dell'applicazione. Non impostare le credenziali hardcoded nei file di codice o di configurazione. Quando si esegue localmente, usare il file local.settings.json per le credenziali e non pubblicare il file local.settings.json.

Quando ci si connette a un cluster Kafka gestito fornito da Confluent in Azure, assicurarsi che nel trigger o nell'associazione siano impostate le credenziali di autenticazione seguenti per l'ambiente Confluent Cloud:

Impostazione Valore consigliato Descrizione
BrokerList BootstrapServer L'impostazione dell'app denominata BootstrapServer contiene il valore del server bootstrap disponibile nella pagina delle impostazioni di Confluent Cloud. Il valore è simile a xyz-xyzxzy.westeurope.azure.confluent.cloud:9092.
Nome utente ConfluentCloudUsername L'impostazione dell'app denominata ConfluentCloudUsername contiene la chiave di accesso api dal sito Web Confluent Cloud.
Password ConfluentCloudPassword L'impostazione dell'app denominata ConfluentCloudPassword contiene il segreto API ottenuto dal sito Web Confluent Cloud.

I valori stringa usati per queste impostazioni devono essere presenti come impostazioni dell'applicazione in Azure o nella Values raccolta nel file local.settings.json durante lo sviluppo locale.

È anche necessario impostare , ProtocolAuthenticationModee SslCaLocation nelle definizioni di associazione.

Passaggi successivi