Associazione di output Apache Kafka per Funzioni di Azure
L'associazione di output consente a un'app Funzioni di Azure di scrivere messaggi in un argomento Kafka.
Importante
Le associazioni Kafka sono disponibili solo per le funzioni nel piano Elastic Premium e dedicato (servizio app). Sono supportate solo nella versione 3.x e successiva del runtime di Funzioni.
Esempio
L'utilizzo dell'associazione dipende dalla modalità C# usata nell'app per le funzioni, che può essere una delle seguenti:
Una libreria di classi di processo di lavoro isolata compilata C# viene eseguita in un processo isolato dal runtime.
Gli attributi usati dipendono dal provider di eventi specifico.
Nell'esempio seguente è presente un tipo MultipleOutputType
restituito personalizzato , costituito da una risposta HTTP e da un output Kafka.
[Function("KafkaOutput")]
public static MultipleOutputType Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.FunctionContext
.BindingContext
.BindingData["message"]
.ToString();
var response = req.CreateResponse(HttpStatusCode.OK);
return new MultipleOutputType()
{
Kevent = message,
HttpResponse = response
};
}
Nella classe MultipleOutputType
è Kevent
la variabile di associazione di output per l'associazione Kafka.
public class MultipleOutputType
{
[KafkaOutput("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)]
public string Kevent { get; set; }
public HttpResponseData HttpResponse { get; set; }
}
Per inviare un batch di eventi, passare una matrice di stringhe al tipo di output, come illustrato nell'esempio seguente:
[Function("KafkaOutputMany")]
public static MultipleOutputTypeForBatch Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
var response = req.CreateResponse(HttpStatusCode.OK);
string[] messages = new string[2];
messages[0] = "one";
messages[1] = "two";
return new MultipleOutputTypeForBatch()
{
Kevents = messages,
HttpResponse = response
};
}
La matrice di stringhe è definita come Kevents
proprietà nella classe in cui è definita l'associazione di output:
public class MultipleOutputTypeForBatch
{
[KafkaOutput("BrokerList",
"topic",
Username = "ConfluentCloudUserName",
Password = "ConfluentCloudPassword",
Protocol = BrokerProtocol.SaslSsl,
AuthenticationMode = BrokerAuthenticationMode.Plain
)]
public string[] Kevents { get; set; }
public HttpResponseData HttpResponse { get; set; }
}
La funzione seguente aggiunge intestazioni ai dati di output Kafka:
[Function("KafkaOutputWithHeaders")]
public static MultipleOutputType Output(
[HttpTrigger(AuthorizationLevel.Anonymous, "get", Route = null)] HttpRequestData req,
FunctionContext executionContext)
{
var log = executionContext.GetLogger("HttpFunction");
log.LogInformation("C# HTTP trigger function processed a request.");
string message = req.FunctionContext
.BindingContext
.BindingData["message"]
.ToString();
string kevent = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"dotnet-isolated\" }] }";
var response = req.CreateResponse(HttpStatusCode.OK);
return new MultipleOutputType()
{
Kevent = kevent,
HttpResponse = response
};
}
Per un set completo di esempi .NET funzionanti, vedere il repository di estensioni Kafka.
Nota
Per un set equivalente di esempi typeScript, vedere il repository di estensioni Kafka
Le proprietà specifiche del file function.json dipendono dal provider di eventi, che in questi esempi sono Confluent o Hub eventi di Azure. Gli esempi seguenti illustrano un'associazione di output Kafka per una funzione attivata da una richiesta HTTP e invia i dati dalla richiesta all'argomento Kafka.
L'function.json seguente definisce il trigger per il provider specifico in questi esempi:
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputKafkaMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username": "ConfluentCloudUsername",
"password": "ConfluentCloudPassword",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out"
},
{
"type": "http",
"direction": "out",
"name": "res"
}
]
}
Il codice seguente invia quindi un messaggio all'argomento:
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
const message = (req.query.message);
context.bindings.outputKafkaMessage = message;
context.res = {
// status: 200, /* Defaults to 200 */
body: 'Ok'
};
}
Il codice seguente invia più messaggi come matrice allo stesso argomento:
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
context.bindings.outputKafkaMessages = ["one", "two"];
context.res = {
// status: 200, /* Defaults to 200 */
body: responseMessage
};
}
Nell'esempio seguente viene illustrato come inviare un messaggio di evento con intestazioni allo stesso argomento Kafka:
// This sample will create topic "topic" and send message to it.
// KafkaTrigger will be trigged.
module.exports = async function (context, req) {
context.log('JavaScript HTTP trigger function processed a request.');
const message = (req.query.message || (req.body && req.body.message));
const responseMessage = message
? "Message received: " + message + ". The message transfered to the kafka broker."
: "This HTTP triggered function executed successfully. Pass a message in the query string or in the request body for a personalized response.";
context.bindings.outputKafkaMessage = "{ \"Offset\":364,\"Partition\":0,\"Topic\":\"kafkaeventhubtest1\",\"Timestamp\":\"2022-04-09T03:20:06.591Z\", \"Value\": \"" + message + "\", \"Headers\": [{ \"Key\": \"test\", \"Value\": \"javascript\" }] }"
context.res = {
// status: 200, /* Defaults to 200 */
body: responseMessage
};
}
Per un set completo di esempi JavaScript funzionanti, vedere il repository di estensioni Kafka.
Le proprietà specifiche del file function.json dipendono dal provider di eventi, che in questi esempi sono Confluent o Hub eventi di Azure. Gli esempi seguenti illustrano un'associazione di output Kafka per una funzione attivata da una richiesta HTTP e invia i dati dalla richiesta all'argomento Kafka.
L'function.json seguente definisce il trigger per il provider specifico in questi esempi:
{
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "Request",
"methods": [
"get"
]
},
{
"type": "kafka",
"name": "outputMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username" : "%ConfluentCloudUserName%",
"password" : "%ConfluentCloudPassword%",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN",
"direction": "out"
},
{
"type": "http",
"direction": "out",
"name": "Response"
}
]
}
Il codice seguente invia quindi un messaggio all'argomento:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
$message
Push-OutputBinding -Name outputMessage -Value ($message)
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
})
Il codice seguente invia più messaggi come matrice allo stesso argomento:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
$message = @("one", "two")
Push-OutputBinding -Name outputMessage -Value ($message)
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
})
Nell'esempio seguente viene illustrato come inviare un messaggio di evento con intestazioni allo stesso argomento Kafka:
using namespace System.Net
# Input bindings are passed in via param block.
param($Request, $TriggerMetadata)
# Write to the Azure Functions log stream.
Write-Host "PowerShell HTTP trigger function processed a request."
# Interact with query parameters or the body of the request.
$message = $Request.Query.Message
if (-not $message) {
$message = $Request.Body.Message
}
$kevent = @{
Offset = 364
Partition = 0
Topic = "kafkaeventhubtest1"
Timestamp = "2022-04-09T03:20:06.591Z"
Value = $message
Headers= @(@{
Key= "test"
Value= "powershell"
}
)
}
Push-OutputBinding -Name Message -Value $kevent
# Associate values to output bindings by calling 'Push-OutputBinding'.
Push-OutputBinding -Name Response -Value ([HttpResponseContext]@{
StatusCode = [HttpStatusCode]::OK
Body = 'ok'
})
Per un set completo di esempi di PowerShell funzionanti, vedere il repository di estensioni Kafka.
Le proprietà specifiche del file function.json dipendono dal provider di eventi, che in questi esempi sono Confluent o Hub eventi di Azure. Gli esempi seguenti illustrano un'associazione di output Kafka per una funzione attivata da una richiesta HTTP e invia i dati dalla richiesta all'argomento Kafka.
L'function.json seguente definisce il trigger per il provider specifico in questi esempi:
{
"scriptFile": "main.py",
"bindings": [
{
"authLevel": "function",
"type": "httpTrigger",
"direction": "in",
"name": "req",
"methods": [
"get"
]
},
{
"type": "kafka",
"direction": "out",
"name": "outputMessage",
"brokerList": "BrokerList",
"topic": "topic",
"username": "%ConfluentCloudUserName%",
"password": "%ConfluentCloudPassword%",
"protocol": "SASLSSL",
"authenticationMode": "PLAIN"
},
{
"type": "http",
"direction": "out",
"name": "$return"
}
]
}
Il codice seguente invia quindi un messaggio all'argomento:
import logging
import azure.functions as func
def main(req: func.HttpRequest, outputMessage: func.Out[str]) -> func.HttpResponse:
input_msg = req.params.get('message')
outputMessage.set(input_msg)
return 'OK'
Il codice seguente invia più messaggi come matrice allo stesso argomento:
import logging
import typing
from azure.functions import Out, HttpRequest, HttpResponse
import json
def main(req: HttpRequest, outputMessage: Out[str] ) -> HttpResponse:
outputMessage.set(['one', 'two'])
return 'OK'
Nell'esempio seguente viene illustrato come inviare un messaggio di evento con intestazioni allo stesso argomento Kafka:
import logging
import azure.functions as func
import json
def main(req: func.HttpRequest, out: func.Out[str]) -> func.HttpResponse:
message = req.params.get('message')
kevent = { "Offset":364,"Partition":0,"Topic":"kafkaeventhubtest1","Timestamp":"2022-04-09T03:20:06.591Z", "Value": message, "Headers": [{ "Key": "test", "Value": "python" }] }
out.set(json.dumps(kevent))
return 'OK'
Per un set completo di esempi di Python funzionanti, vedere il repository di estensioni Kafka.
Le annotazioni usate per configurare l'associazione di output dipendono dal provider di eventi specifico.
La funzione seguente invia un messaggio all'argomento Kafka.
@FunctionName("KafkaOutput")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<String> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
// Parse query parameter
String query = request.getQueryParameters().get("message");
String message = request.getBody().orElse(query);
context.getLogger().info("Message:" + message);
output.setValue(message);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
Nell'esempio seguente viene illustrato come inviare più messaggi a un argomento Kafka.
@FunctionName("KafkaOutputMany")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<String[]> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
String[] messages = new String[2];
messages[0] = "one";
messages[1] = "two";
output.setValue(messages);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
In questo esempio il parametro di associazione di output viene modificato in matrice di stringhe.
L'ultimo esempio usa per queste KafkaEntity
classi e KafkaHeader
:
public class KafkaEntity {
public int Offset;
public int Partition;
public String Timestamp;
public String Topic;
public String Value;
public KafkaHeaders Headers[];
public KafkaEntity(int Offset, int Partition, String Topic, String Timestamp, String Value,KafkaHeaders[] headers) {
this.Offset = Offset;
this.Partition = Partition;
this.Topic = Topic;
this.Timestamp = Timestamp;
this.Value = Value;
this.Headers = headers;
}
public class KafkaHeaders{
public String Key;
public String Value;
public KafkaHeaders(String key, String value) {
this.Key = key;
this.Value = value;
}
La funzione di esempio seguente invia un messaggio con intestazioni a un argomento Kafka.
@FunctionName("KafkaOutputWithHeaders")
public HttpResponseMessage run(
@HttpTrigger(name = "req", methods = {HttpMethod.GET, HttpMethod.POST}, authLevel = AuthorizationLevel.ANONYMOUS) HttpRequestMessage<Optional<String>> request,
@KafkaOutput(
name = "kafkaOutput",
topic = "topic",
brokerList="%BrokerList%",
username = "%ConfluentCloudUsername%",
password = "ConfluentCloudPassword",
authenticationMode = BrokerAuthenticationMode.PLAIN,
// sslCaLocation = "confluent_cloud_cacert.pem", // Enable this line for windows.
protocol = BrokerProtocol.SASLSSL
) OutputBinding<KafkaEntity> output,
final ExecutionContext context) {
context.getLogger().info("Java HTTP trigger processed a request.");
// Parse query parameter
String query = request.getQueryParameters().get("message");
String message = request.getBody().orElse(query);
KafkaHeaders[] headers = new KafkaHeaders[1];
headers[0] = new KafkaHeaders("test", "java");
KafkaEntity kevent = new KafkaEntity(364, 0, "topic", "2022-04-09T03:20:06.591Z", message, headers);
output.setValue(kevent);
return request.createResponseBuilder(HttpStatus.OK).body("Ok").build();
}
Per un set completo di esempi Java funzionanti per Confluent, vedere il repository di estensioni Kafka.
Attributi
Sia le librerie C# in-process che il processo di lavoro isolato usano l'attributo Kafka
per definire il trigger di funzione.
La tabella seguente illustra le proprietà che è possibile impostare usando questo attributo:
Parametro | Descrizione |
---|---|
BrokerList | (Obbligatorio) Elenco di broker Kafka a cui viene inviato l'output. Per altre informazioni, vedere Connessioni . |
Argomento | (Obbligatorio) Argomento a cui viene inviato l'output. |
AvroSchema | (Facoltativo) Schema di un record generico quando si usa il protocollo Avro. |
MaxMessageBytes | (Facoltativo) Dimensione massima del messaggio di output inviato (in MB), con il valore predefinito .1 |
BatchSize | (Facoltativo) Numero massimo di messaggi in batch in un singolo set di messaggi, con un valore predefinito .10000 |
EnableIdempotence | (Facoltativo) Se impostato su true , garantisce che i messaggi vengano prodotti correttamente una sola volta e nell'ordine di produzione originale, con un valore predefinito di false |
MessageTimeoutMs | (Facoltativo) Timeout del messaggio locale, espresso in millisecondi. Questo valore viene applicato solo localmente e limita il tempo in cui un messaggio generato attende il recapito corretto, con un valore predefinito 300000 . Un tempo di 0 è infinito. Questo valore è il tempo massimo usato per recapitare un messaggio (inclusi i tentativi). L'errore di recapito si verifica quando viene superato il numero di tentativi o il timeout del messaggio. |
RequestTimeoutMs | (Facoltativo) Timeout di riconoscimento della richiesta di output, in millisecondi, con un valore predefinito .5000 |
MaxRetries | (Facoltativo) Numero di tentativi di invio di un messaggio non riuscito, con un valore predefinito .2 La ripetizione dei tentativi può causare il riordinamento, a meno che non EnableIdempotence sia impostata su true . |
AuthenticationMode | (Facoltativo) Modalità di autenticazione quando si usa l'autenticazione SASL (Simple Authentication and Security Layer). I valori supportati sono Gssapi , Plain (impostazione predefinita), ScramSha256 , ScramSha512 . |
Nome utente | (Facoltativo) Nome utente per l'autenticazione SASL. Non supportato quando AuthenticationMode è Gssapi . Per altre informazioni, vedere Connessioni . |
Password | (Facoltativo) Password per l'autenticazione SASL. Non supportato quando AuthenticationMode è Gssapi . Per altre informazioni, vedere Connessioni . |
Protocollo | (Facoltativo) Protocollo di sicurezza usato per la comunicazione con i broker. I valori supportati sono plaintext (impostazione predefinita), ssl , , sasl_plaintext sasl_ssl . |
SslCaLocation | (Facoltativo) Percorso del file di certificato DELLA CA per la verifica del certificato del broker. |
SslCertificateLocation | (Facoltativo) Percorso del certificato del client. |
SslKeyLocation | (Facoltativo) Percorso della chiave privata del client (PEM) usato per l'autenticazione. |
SslKeyPassword | (Facoltativo) Password per il certificato del client. |
Annotazioni
L'annotazione KafkaOutput
consente di creare una funzione che scrive in un argomento specifico. Le opzioni supportate includono gli elementi seguenti:
Elemento | Descrizione |
---|---|
name | Nome della variabile che rappresenta i dati negoziati nel codice della funzione. |
brokerList | (Obbligatorio) Elenco di broker Kafka a cui viene inviato l'output. Per altre informazioni, vedere Connessioni . |
topic | (Obbligatorio) Argomento a cui viene inviato l'output. |
dataType | Definisce il modo in cui Funzioni gestisce il valore del parametro. Per impostazione predefinita, il valore viene ottenuto come stringa e Functions tenta di deserializzare la stringa in un oggetto Java normale (POJO). Quando string , l'input viene considerato come una stringa. Quando binary , il messaggio viene ricevuto come dati binari e Funzioni tenta di deserializzarlo in un byte di tipo di parametro effettivo[]. |
avroSchema | (Facoltativo) Schema di un record generico quando si usa il protocollo Avro. Attualmente non è supportato per Java. |
maxMessageBytes | (Facoltativo) Dimensione massima del messaggio di output inviato (in MB), con il valore predefinito .1 |
batchSize | (Facoltativo) Numero massimo di messaggi in batch in un singolo set di messaggi, con un valore predefinito .10000 |
enableIdempotence | (Facoltativo) Se impostato su true , garantisce che i messaggi vengano prodotti correttamente una sola volta e nell'ordine di produzione originale, con un valore predefinito di false |
messageTimeoutMs | (Facoltativo) Timeout del messaggio locale, espresso in millisecondi. Questo valore viene applicato solo localmente e limita il tempo in cui un messaggio generato attende il recapito corretto, con un valore predefinito 300000 . Un tempo di 0 è infinito. Questo è il tempo massimo usato per recapitare un messaggio (inclusi i tentativi). L'errore di recapito si verifica quando viene superato il numero di tentativi o il timeout del messaggio. |
requestTimeoutMs | (Facoltativo) Timeout di riconoscimento della richiesta di output, in millisecondi, con un valore predefinito .5000 |
maxRetries | (Facoltativo) Numero di tentativi di invio di un messaggio non riuscito, con un valore predefinito .2 La ripetizione dei tentativi può causare il riordinamento, a meno che non EnableIdempotence sia impostata su true . |
authenticationMode | (Facoltativo) Modalità di autenticazione quando si usa l'autenticazione SASL (Simple Authentication and Security Layer). I valori supportati sono Gssapi , Plain (impostazione predefinita), ScramSha256 , ScramSha512 . |
username | (Facoltativo) Nome utente per l'autenticazione SASL. Non supportato quando AuthenticationMode è Gssapi . Per altre informazioni, vedere Connessioni . |
password | (Facoltativo) Password per l'autenticazione SASL. Non supportato quando AuthenticationMode è Gssapi . Per altre informazioni, vedere Connessioni . |
protocol | (Facoltativo) Protocollo di sicurezza usato per la comunicazione con i broker. I valori supportati sono plaintext (impostazione predefinita), ssl , , sasl_plaintext sasl_ssl . |
sslCaLocation | (Facoltativo) Percorso del file di certificato DELLA CA per la verifica del certificato del broker. |
sslCertificateLocation | (Facoltativo) Percorso del certificato del client. |
sslKeyLocation | (Facoltativo) Percorso della chiave privata del client (PEM) usato per l'autenticazione. |
sslKeyPassword | (Facoltativo) Password per il certificato del client. |
Impostazione
Nella tabella seguente sono illustrate le proprietà di configurazione dell'associazione impostate nel file function.json.
Proprietà di function.json | Descrizione |
---|---|
type | Deve essere impostato su kafka . |
direction | Deve essere impostato su out . |
name | Nome della variabile che rappresenta i dati negoziati nel codice della funzione. |
brokerList | (Obbligatorio) Elenco di broker Kafka a cui viene inviato l'output. Per altre informazioni, vedere Connessioni . |
topic | (Obbligatorio) Argomento a cui viene inviato l'output. |
avroSchema | (Facoltativo) Schema di un record generico quando si usa il protocollo Avro. |
maxMessageBytes | (Facoltativo) Dimensione massima del messaggio di output inviato (in MB), con il valore predefinito .1 |
batchSize | (Facoltativo) Numero massimo di messaggi in batch in un singolo set di messaggi, con un valore predefinito .10000 |
enableIdempotence | (Facoltativo) Se impostato su true , garantisce che i messaggi vengano prodotti correttamente una sola volta e nell'ordine di produzione originale, con un valore predefinito di false |
messageTimeoutMs | (Facoltativo) Timeout del messaggio locale, espresso in millisecondi. Questo valore viene applicato solo localmente e limita il tempo in cui un messaggio generato attende il recapito corretto, con un valore predefinito 300000 . Un tempo di 0 è infinito. Questo è il tempo massimo usato per recapitare un messaggio (inclusi i tentativi). L'errore di recapito si verifica quando viene superato il numero di tentativi o il timeout del messaggio. |
requestTimeoutMs | (Facoltativo) Timeout di riconoscimento della richiesta di output, in millisecondi, con un valore predefinito .5000 |
maxRetries | (Facoltativo) Numero di tentativi di invio di un messaggio non riuscito, con un valore predefinito .2 La ripetizione dei tentativi può causare il riordinamento, a meno che non EnableIdempotence sia impostata su true . |
authenticationMode | (Facoltativo) Modalità di autenticazione quando si usa l'autenticazione SASL (Simple Authentication and Security Layer). I valori supportati sono Gssapi , Plain (impostazione predefinita), ScramSha256 , ScramSha512 . |
username | (Facoltativo) Nome utente per l'autenticazione SASL. Non supportato quando AuthenticationMode è Gssapi . Per altre informazioni, vedere Connessioni . |
password | (Facoltativo) Password per l'autenticazione SASL. Non supportato quando AuthenticationMode è Gssapi . Per altre informazioni, vedere Connessioni . |
protocol | (Facoltativo) Protocollo di sicurezza usato per la comunicazione con i broker. I valori supportati sono plaintext (impostazione predefinita), ssl , , sasl_plaintext sasl_ssl . |
sslCaLocation | (Facoltativo) Percorso del file di certificato DELLA CA per la verifica del certificato del broker. |
sslCertificateLocation | (Facoltativo) Percorso del certificato del client. |
sslKeyLocation | (Facoltativo) Percorso della chiave privata del client (PEM) usato per l'autenticazione. |
sslKeyPassword | (Facoltativo) Password per il certificato del client. |
Utilizzo
Sia le chiavi che i tipi di valori sono supportati con la serializzazione predefinita di Avro e Protobuf .
L'offset, la partizione e il timestamp per l'evento vengono generati in fase di esecuzione. Solo i valori e le intestazioni possono essere impostati all'interno della funzione. L'argomento viene impostato nel function.json.
Assicurarsi di avere accesso all'argomento Kafka a cui si sta tentando di scrivere. Configurare l'associazione con le credenziali di accesso e connessione all'argomento Kafka.
In un piano Premium è necessario abilitare il monitoraggio della scalabilità di runtime per consentire l'aumento del numero di istanze dell'output Kafka. Per altre informazioni, vedere Abilitare il ridimensionamento in fase di esecuzione.
Per un set completo di impostazioni di host.json supportate per il trigger Kafka, vedere host.json impostazioni.
Connessioni
Tutte le informazioni di connessione richieste dai trigger e dalle associazioni devono essere mantenute nelle impostazioni dell'applicazione e non nelle definizioni di associazione nel codice. Questo vale per le credenziali, che non devono mai essere archiviate nel codice.
Importante
Le impostazioni delle credenziali devono fare riferimento a un'impostazione dell'applicazione. Non impostare le credenziali hardcoded nei file di codice o di configurazione. Quando si esegue localmente, usare il file local.settings.json per le credenziali e non pubblicare il file local.settings.json.
Quando ci si connette a un cluster Kafka gestito fornito da Confluent in Azure, assicurarsi che nel trigger o nell'associazione siano impostate le credenziali di autenticazione seguenti per l'ambiente Confluent Cloud:
Impostazione | Valore consigliato | Descrizione |
---|---|---|
BrokerList | BootstrapServer |
L'impostazione dell'app denominata BootstrapServer contiene il valore del server bootstrap disponibile nella pagina delle impostazioni di Confluent Cloud. Il valore è simile a xyz-xyzxzy.westeurope.azure.confluent.cloud:9092 . |
Nome utente | ConfluentCloudUsername |
L'impostazione dell'app denominata ConfluentCloudUsername contiene la chiave di accesso api dal sito Web Confluent Cloud. |
Password | ConfluentCloudPassword |
L'impostazione dell'app denominata ConfluentCloudPassword contiene il segreto API ottenuto dal sito Web Confluent Cloud. |
I valori stringa usati per queste impostazioni devono essere presenti come impostazioni dell'applicazione in Azure o nella Values
raccolta nel file local.settings.json durante lo sviluppo locale.
È anche necessario impostare , Protocol
AuthenticationMode
e SslCaLocation
nelle definizioni di associazione.