Condividi tramite


read_kinesis funzione con valori di tabella di streaming

Si applica a: segno di spunta sì Databricks SQL segno di spunta sì Databricks Runtime 13.3 LTS e versioni successive

Restituisce una tabella con i record letti da Kinesis da uno o più flussi.

Sintassi

read_kinesis ( { parameter => value } [, ...] )

Argomenti

read_kinesis richiede la chiamata al parametro denominato.

L'unico argomento obbligatorio è streamName. Tutti gli altri argomenti sono facoltativi.

Le descrizioni degli argomenti sono brevi qui. Per altre informazioni, vedere la documentazione di Amazon Consulta .

Sono disponibili varie opzioni di connessione per connettersi ed eseguire l'autenticazione con AWS. awsAccessKeye awsSecretKey possono essere specificati negli argomenti della funzione usando la funzione privata, impostata manualmente negli argomenti o configurata come variabili di ambiente come indicato di seguito. roleArn, roleExternalID, roleSessionName può essere usato anche per eseguire l'autenticazione con AWS usando i profili di istanza. Se nessuno di questi elementi viene specificato, userà la catena di provider AWS predefinita.

Parametro Tipo Descrizione
streamName STRING Elenco obbligatorio e delimitato da virgole di uno o più flussi di classi.
awsAccessKey STRING Chiave di accesso AWS, se presente. È anche possibile specificare tramite le varie opzioni supportate tramite la catena di provider di credenziali predefinita di AWS, incluse le variabili di ambiente (AWS_ACCESS_KEY_ID) e un file di profili credenziali.
awsSecretKey STRING Chiave privata che corrisponde alla chiave di accesso. Può essere specificato negli argomenti o tramite le varie opzioni supportate tramite la catena di provider di credenziali predefinita di AWS, incluse le variabili di ambiente (AWS_SECRET_KEY o AWS_SECRET_ACCESS_KEY) e un file dei profili di credenziali.
roleArn STRING Nome della risorsa Amazon del ruolo da presupporre quando si accedeao.
roleExternalId STRING Usato per delegare l'accesso all'account AWS.
roleSessionName STRING Nome della sessione del ruolo AWS.
stsEndpoint STRING Endpoint per la richiesta di credenziali di accesso temporanee.
region STRING Area per i flussi da specificare. Il valore predefinito è l'area risolta localmente.
endpoint STRING endpoint regionale per i flussi di dati di Languages. Il valore predefinito è l'area risolta localmente.
initialPosition STRING Posizione iniziale per la lettura da nel flusso. Uno dei seguenti: 'latest' (impostazione predefinita), 'trim_horizon', 'earliest', 'at_timestamp'.
consumerMode STRING Uno dei seguenti: "polling" (impostazione predefinita) o "EFO" (enhanced-fan-out).
consumerName STRING Nome del consumer. Tutti i consumer hanno il prefisso "databricks_". Il valore predefinito è una stringa vuota.
registerConsumerTimeoutInterval STRING il timeout massimo per attendere la registrazione del consumer EFO Di Eseguire la registrazione del consumer con il flusso DiOs Prima di generare un errore. Il valore predefinito è '300s'.
requireConsumerDeregistration BOOLEAN true per annullare la registrazione del consumer EFO alla terminazione della query. Il valore predefinito è false.
deregisterConsumerTimeoutInterval STRING Timeout massimo in attesa che il consumer EFO di Venga annullata la registrazione del consumer EFO con ilflussoo prima di generare un errore. Il valore predefinito è '300s'.
consumerRefreshInterval STRING Intervallo in cui il consumer viene controllato e aggiornato. Il valore predefinito è '300s'.

Gli argomenti seguenti vengono usati per controllare la velocità effettiva di lettura e la latenza per l'uso di Integers:

Parametro Tipo Descrizione
maxRecordsPerFetch INTEGER (>0) Facoltativo, con un valore predefinito di 10.000 record da leggere per ogni richiesta API a Esegues.
maxFetchRate STRING Velocità di prelettura dei dati per partizione. Valore compreso tra '1,0' e '2,0' misurato in MB/s. Il valore predefinito è '1.0'.
minFetchPeriod STRING Tempo di attesa massimo tra tentativi di prelettura consecutivi. Il valore predefinito è '400ms'.
maxFetchDuration STRING Durata massima del buffer dei nuovi dati prelettura. Il valore predefinito è '10s'.
fetchBufferSize STRING Quantità di dati per il trigger successivo. Il valore predefinito è "20 gb".
shardsPerTask INTEGER (>0) Numero di partizioni da cui eseguire il prelettura in parallelo per ogni attività Spark. L'impostazione predefinita è 5.
shardFetchinterval STRING Frequenza con cui eseguire il polling per il partizionamento orizzontale. Il valore predefinito è "1s".
coalesceThresholdBlockSize INTEGER (>0) Soglia in corrispondenza della quale si verifica l'unione automatica. Il valore predefinito è 10.000.000.
coalesce BOOLEAN true per unire le richieste prelettura. Il valore predefinito è true.
coalesceBinSize INTEGER (>0) Dimensioni approssimative del blocco dopo l'unione. Il valore predefinito è 128.000.000.
reuseKinesisClient BOOLEAN true per riutilizzare il client Di Classe archiviato nella cache. Il valore predefinito è true tranne in un cluster PE.
clientRetries INTEGER (>0) Numero di tentativi nello scenario di ripetizione dei tentativi. L'impostazione predefinita è 5.

Valori restituiti

Una tabella di record Dios con lo schema seguente:

Nome Tipo di dati Nullable Standard Descrizione
partitionKey STRING No Chiave utilizzata per distribuire i dati tra le partizioni di un flusso. Tutti i record di dati con la stessa chiave di partizione verranno letti dalla stessa partizione.
data BINARY No Payload dei dati della classe della classe del formato di base 64 codificato.
stream STRING No Nome del flusso da cui sono stati letti i dati.
shardId STRING No Identificatore univoco per la partizione da cui sono stati letti i dati.
sequenceNumber BIGINT No Identificatore univoco del record all'interno della partizione.
approximateArrivalTimestamp TIMESTAMP No Ora approssimativa in cui il record è stato inserito nel flusso.

Le colonne (stream, shardId, sequenceNumber) costituiscono una chiave primaria.

Esempi

-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
        awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
        initialPosition => 'earliest');

-- The data would now need to be queried from the testing.streaming_table

-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest');

-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
    SELECT * FROM STREAM read_kinesis (
        streamName => 'test_databricks',
        initialPosition => 'earliest',
        roleArn => 'arn:aws:iam::123456789012:role/MyRole',
        roleSessionName => 'testing@databricks.com');