read_kinesis
스트리밍 테이블 반환 함수
적용 대상: Databricks SQL Databricks Runtime 13.3 LTS 이상
하나 이상의 스트림에서 Kinesis에서 읽은 레코드가 있는 테이블을 반환합니다.
구문
read_kinesis ( { parameter => value } [, ...] )
인수
read_kinesis
에는 명명된 매개 변수 호출이 필요합니다.
유일한 필수 인수는 .입니다 streamName
. 다른 모든 인수는 선택 사항입니다.
인수에 대한 설명은 여기에서 간략하게 설명합니다. 자세한 내용은 Amazon Kinesis 설명서를 참조하세요.
AWS에 연결하고 인증하는 다양한 연결 옵션이 있습니다.
awsAccessKey
awsSecretKey
- 비밀 함수를 사용하여 함수 인수에 지정하거나, 인수에 수동으로 설정하거나, 아래에 표시된 대로 환경 변수로 구성할 수 있습니다.
roleArn
, roleExternalID
roleSessionName
인스턴스 프로필을 사용하여 AWS를 인증하는 데 사용할 수도 있습니다.
이러한 공급자를 지정하지 않으면 기본 AWS 공급자 체인을 사용합니다.
매개 변수 | 형식 | 설명 |
---|---|---|
streamName |
STRING |
하나 이상의 키네시스 스트림의 필수 쉼표로 구분된 목록입니다. |
awsAccessKey |
STRING |
AWS 액세스 키(있는 경우)입니다. 환경 변수(AWS_ACCESS_KEY_ID ) 및 자격 증명 프로필 파일을 포함하여 AWS 기본 자격 증명 공급자 체인을 통해 지원되는 다양한 옵션을 통해 지정할 수도 있습니다. |
awsSecretKey |
STRING |
액세스 키에 해당하는 비밀 키입니다. 인수 또는 환경 변수(AWS_SECRET_KEY 또는 AWS_SECRET_ACCESS_KEY ) 및 자격 증명 프로필 파일을 포함하여 AWS 기본 자격 증명 공급자 체인을 통해 지원되는 다양한 옵션을 통해 지정할 수 있습니다. |
roleArn |
STRING |
Kinesis에 액세스할 때 가정할 역할의 Amazon 리소스 이름입니다. |
roleExternalId |
STRING |
AWS 계정에 대한 액세스를 위임할 때 사용됩니다. |
roleSessionName |
STRING |
AWS 역할 세션 이름입니다. |
stsEndpoint |
STRING |
임시 액세스 자격 증명을 요청하기 위한 엔드포인트입니다. |
region |
STRING |
지정할 스트림의 지역입니다. 기본값은 로컬로 확인된 지역입니다. |
endpoint |
STRING |
Kinesis 데이터 스트림에 대한 지역별 엔드포인트입니다. 기본값은 로컬로 확인된 지역입니다. |
initialPosition |
STRING |
스트림에서 읽기 위한 시작 위치입니다. 'latest'(기본값), 'trim_horizon', 'earliest', 'at_timestamp' 중 하나입니다. |
consumerMode |
STRING |
'폴링'(기본값) 또는 'EFO'(고급 팬아웃) 중 하나입니다. |
consumerName |
STRING |
소비자의 이름입니다. 모든 소비자는 접두사로 'databricks_'을 갖습니다. 기본값은 빈 문자열입니다. |
registerConsumerTimeoutInterval |
STRING |
Kinesis EFO 소비자가 Kinesis 스트림에 등록될 때까지 대기하는 최대 시간 제한입니다. 기본값은 '300s'입니다. |
requireConsumerDeregistration |
BOOLEAN |
true 쿼리 종료에 EFO 소비자를 등록 취소합니다. 기본값은 false 입니다. |
deregisterConsumerTimeoutInterval |
STRING |
Kinesis EFO 소비자가 오류를 throw하기 전에 Kinesis 스트림으로 등록을 취소할 때까지 대기하는 최대 시간 제한입니다. 기본값은 '300s'입니다. |
consumerRefreshInterval |
STRING |
소비자가 검사되고 새로 고쳐지는 간격입니다. 기본값은 '300s'입니다. |
Kinesis의 읽기 처리량 및 대기 시간을 제어하는 데 사용되는 인수는 다음과 같습니다.
매개 변수 | 형식 | 설명 |
---|---|---|
maxRecordsPerFetch |
INTEGER (>0) |
선택 사항으로, Kinesis에 대한 API 요청당 읽을 레코드는 기본값인 10,000개입니다. |
maxFetchRate |
STRING |
분할당 데이터를 프리페치하는 속도입니다. MB/s로 측정된 '1.0'과 '2.0' 사이의 값입니다. 기본값은 '1.0'입니다. |
minFetchPeriod |
STRING |
연속 프리페치 시도 사이의 최대 대기 시간입니다. 기본값은 '400ms'입니다. |
maxFetchDuration |
STRING |
프리페치된 새 데이터를 버퍼링할 최대 기간입니다. 기본값은 '10s'입니다. |
fetchBufferSize |
STRING |
다음 트리거에 대한 데이터 양입니다. 기본값은 '20gb'입니다. |
shardsPerTask |
INTEGER (>0) |
Spark 작업당 병렬로 프리페치할 Kinesis 분할된 데이터베이스의 수입니다. 기본값은 5입니다. |
shardFetchinterval |
STRING |
분할을 폴링하는 빈도입니다. 기본값은 '1s'입니다. |
coalesceThresholdBlockSize |
INTEGER (>0) |
자동 병합이 발생하는 임계값입니다. 기본값은 10,000,000입니다. |
coalesce |
BOOLEAN |
true 를 사용하여 프리페치된 요청을 병합합니다. 기본값은 true 입니다. |
coalesceBinSize |
INTEGER (>0) |
병합 후 대략적인 블록 크기입니다. 기본값은 128,000,000입니다. |
reuseKinesisClient |
BOOLEAN |
true 캐시에 저장된 Kinesis 클라이언트를 다시 사용합니다. 기본값은 true PE 클러스터를 제외한 것입니다. |
clientRetries |
INTEGER (>0) |
재시도 시나리오의 재시도 횟수입니다. 기본값은 5입니다. |
반품
Kinesis 테이블은 다음 스키마를 사용하여 레코드를 기록합니다.
속성 | 데이터 형식 | Nullable | Standard | 설명 |
---|---|---|---|---|
partitionKey |
STRING |
아니요 | 스트림의 분할된 데이터베이스 간에 데이터를 배포하는 데 사용되는 키입니다. 동일한 파티션 키를 가진 모든 데이터 레코드는 동일한 분할된 데이터베이스에서 읽습니다. | |
data |
BINARY |
아니요 | base-64로 인코딩된 kinesis 데이터 페이로드입니다. | |
stream |
STRING |
아니요 | 데이터를 읽은 스트림의 이름입니다. | |
shardId |
STRING |
아니요 | 데이터를 읽은 분할된 데이터베이스의 고유 식별자입니다. | |
sequenceNumber |
BIGINT |
아니요 | 분할된 데이터베이스 내 레코드의 고유 식별자입니다. | |
approximateArrivalTimestamp |
TIMESTAMP |
아니요 | 레코드가 스트림에 삽입된 대략적인 시간입니다. |
열은 기본 키를 구성합니다 (stream, shardId, sequenceNumber)
.
예제
-- Streaming Ingestion from Kinesis
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
awsAccessKey => secret(‘test-databricks’, ‘awsAccessKey’),
awsSecretKey => secret(‘test-databricks’, ‘awsSecretKey’),
initialPosition => 'earliest');
-- The data would now need to be queried from the testing.streaming_table
-- A streaming query when the environment variables already contain AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest');
-- A streaming query when the roleArn, roleSessionName, and roleExternalID are configured
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_kinesis (
streamName => 'test_databricks',
initialPosition => 'earliest',
roleArn => 'arn:aws:iam::123456789012:role/MyRole',
roleSessionName => 'testing@databricks.com');