read_kinesis
funzione con valori di tabella di streaming
Si applica a: Databricks SQL Databricks Runtime 13.3 LTS e versioni successive
Restituisce una tabella con i record letti da Kinesis da uno o più flussi.
Sintassi
read_kinesis ( { parameter => value } [, ...] )
Argomenti
read_kinesis
richiede la chiamata al parametro denominato.
L'unico argomento obbligatorio è streamName
. Tutti gli altri argomenti sono facoltativi.
Le descrizioni degli argomenti sono brevi qui. Per altre informazioni, vedere la documentazione di Amazon Consulta .
Sono disponibili varie opzioni di connessione per connettersi ed eseguire l'autenticazione con AWS.
awsAccessKey
e awsSecretKey
possono essere specificati negli argomenti della funzione usando la funzione privata, impostata manualmente negli argomenti o configurata come variabili di ambiente come indicato di seguito.
roleArn
, roleExternalID
, roleSessionName
può essere usato anche per eseguire l'autenticazione con AWS usando i profili di istanza.
Se nessuno di questi elementi viene specificato, userà la catena di provider AWS predefinita.
Parametro | Tipo | Descrizione |
---|---|---|
streamName |
STRING |
Elenco obbligatorio e delimitato da virgole di uno o più flussi di classi. |
awsAccessKey |
STRING |
Chiave di accesso AWS, se presente. È anche possibile specificare tramite le varie opzioni supportate tramite la catena di provider di credenziali predefinita di AWS, incluse le variabili di ambiente (AWS_ACCESS_KEY_ID ) e un file di profili credenziali. |
awsSecretKey |
STRING |
Chiave privata che corrisponde alla chiave di accesso. Può essere specificato negli argomenti o tramite le varie opzioni supportate tramite la catena di provider di credenziali predefinita di AWS, incluse le variabili di ambiente (AWS_SECRET_KEY o AWS_SECRET_ACCESS_KEY ) e un file dei profili di credenziali. |
roleArn |
STRING |
Nome della risorsa Amazon del ruolo da presupporre quando si accedeao. |
roleExternalId |
STRING |
Usato per delegare l'accesso all'account AWS. |
roleSessionName |
STRING |
Nome della sessione del ruolo AWS. |
stsEndpoint |
STRING |
Endpoint per la richiesta di credenziali di accesso temporanee. |
region |
STRING |
Area per i flussi da specificare. Il valore predefinito è l'area risolta localmente. |
endpoint |
STRING |
endpoint regionale per i flussi di dati di Languages. Il valore predefinito è l'area risolta localmente. |
initialPosition |
STRING |
Posizione iniziale per la lettura da nel flusso. Uno dei seguenti: 'latest' (impostazione predefinita), 'trim_horizon', 'earliest', 'at_timestamp'. |
consumerMode |
STRING |
Uno dei seguenti: "polling" (impostazione predefinita) o "EFO" (enhanced-fan-out). |
consumerName |
STRING |
Nome del consumer. Tutti i consumer hanno il prefisso "databricks_". Il valore predefinito è una stringa vuota. |
registerConsumerTimeoutInterval |
STRING |
il timeout massimo per attendere la registrazione del consumer EFO Di Eseguire la registrazione del consumer con il flusso DiOs Prima di generare un errore. Il valore predefinito è '300s'. |
requireConsumerDeregistration |
BOOLEAN |
true per annullare la registrazione del consumer EFO alla terminazione della query. Il valore predefinito è false . |
deregisterConsumerTimeoutInterval |
STRING |
Timeout massimo in attesa che il consumer EFO di Venga annullata la registrazione del consumer EFO con ilflussoo prima di generare un errore. Il valore predefinito è '300s'. |
consumerRefreshInterval |
STRING |
Intervallo in cui il consumer viene controllato e aggiornato. Il valore predefinito è '300s'. |
Gli argomenti seguenti vengono usati per controllare la velocità effettiva di lettura e la latenza per l'uso di Integers:
Parametro | Tipo | Descrizione |
---|---|---|
maxRecordsPerFetch |
INTEGER (>0) |
Facoltativo, con un valore predefinito di 10.000 record da leggere per ogni richiesta API a Esegues. |
maxFetchRate |
STRING |
Velocità di prelettura dei dati per partizione. Valore compreso tra '1,0' e '2,0' misurato in MB/s. Il valore predefinito è '1.0'. |
minFetchPeriod |
STRING |
Tempo di attesa massimo tra tentativi di prelettura consecutivi. Il valore predefinito è '400ms'. |
maxFetchDuration |
STRING |
Durata massima del buffer dei nuovi dati prelettura. Il valore predefinito è '10s'. |
fetchBufferSize |
STRING |
Quantità di dati per il trigger successivo. Il valore predefinito è "20 gb". |
shardsPerTask |
INTEGER (>0) |
Numero di partizioni da cui eseguire il prelettura in parallelo per ogni attività Spark. L'impostazione predefinita è 5. |
shardFetchinterval |
STRING |
Frequenza con cui eseguire il polling per il partizionamento orizzontale. Il valore predefinito è "1s". |
coalesceThresholdBlockSize |
INTEGER (>0) |
Soglia in corrispondenza della quale si verifica l'unione automatica. Il valore predefinito è 10.000.000. |
coalesce |
BOOLEAN |
true per unire le richieste prelettura. Il valore predefinito è true . |
coalesceBinSize |
INTEGER (>0) |
Dimensioni approssimative del blocco dopo l'unione. Il valore predefinito è 128.000.000. |
reuseKinesisClient |
BOOLEAN |
true per riutilizzare il client Di Classe archiviato nella cache. Il valore predefinito è true tranne in un cluster PE. |
clientRetries |
INTEGER (>0) |
Numero di tentativi nello scenario di ripetizione dei tentativi. L'impostazione predefinita è 5. |
Valori restituiti
Una tabella di record Dios con lo schema seguente:
Nome | Tipo di dati | Nullable | Standard | Descrizione |
---|---|---|---|---|
partitionKey |
STRING |
No | Chiave utilizzata per distribuire i dati tra le partizioni di un flusso. Tutti i record di dati con la stessa chiave di partizione verranno letti dalla stessa partizione. | |
data |
BINARY |
No | Payload dei dati della classe della classe del formato di base 64 codificato. | |
stream |
STRING |
No | Nome del flusso da cui sono stati letti i dati. | |
shardId |
STRING |
No | Identificatore univoco per la partizione da cui sono stati letti i dati. | |
sequenceNumber |
BIGINT |
No | Identificatore univoco del record all'interno della partizione. | |
approximateArrivalTimestamp |
TIMESTAMP |
No | Ora approssimativa in cui il record è stato inserito nel flusso. |
Le colonne (stream, shardId, sequenceNumber)
costituiscono una chiave primaria.
Esempi
-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
initialPosition => 'earliest');
-- The data would now need to be queried from the testing.streaming_table
-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest');
-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest',
roleArn => 'arn:aws:iam::123456789012:role/MyRole',
roleSessionName => 'testing@databricks.com');