read_pulsar
스트리밍 테이블 반환 함수
적용 대상: Databricks SQL Databricks Runtime 14.1 이상
Important
이 기능은 공개 미리 보기 상태입니다.
Pulsar에서 읽은 레코드가 있는 테이블을 반환합니다.
테이블 반환 함수는 일괄 처리 쿼리가 아닌 스트리밍만 지원합니다.
구문
read_pulsar ( { option_key => option_value } [, ...] )
인수
이 함수에는 옵션 키에 대한 명명된 매개 변수 호출 이 필요합니다.
옵션 serviceUrl
이며 topic
필수입니다.
인수에 대한 설명은 여기에서 간략하게 설명합니다. 확장 설명은 구조적 스트리밍 펄서 설명서를 참조하세요.
옵션 | Type | 기본값 | 설명 |
---|---|---|---|
ServiceUrl | STRING | 필수 | 펄서 서비스의 URI입니다. |
토픽 | STRING | 필수 | 읽을 항목입니다. |
predefinedSubscription | STRING | None | Spark 애플리케이션 진행률을 추적하기 위해 커넥터에서 사용하는 미리 정의된 구독 이름입니다. |
subscriptionPrefix | STRING | None | Spark 애플리케이션 진행률을 추적하기 위해 임의 구독을 생성하기 위해 커넥터에서 사용하는 접두사입니다. |
pollTimeoutMs | LONG | 120000 | Pulsar에서 메시지를 읽기 위한 시간 제한(밀리초)입니다. |
failOnDataLoss | BOOLEAN | true | 데이터가 손실될 때 쿼리에 실패할지 여부를 제어합니다(예: 토픽이 삭제되거나 보존 정책으로 인해 메시지가 삭제됨). |
startingOffsets | STRING | latest | 특정 오프셋을 지정하는 가장 빠른, 최신 또는 JSON 문자열 중 하나를 쿼리를 시작할 때의 시작점입니다. 최신인 경우 판독기는 실행을 시작한 후 최신 레코드를 읽습니다. 가장 이른 경우 판독기는 가장 빠른 오프셋에서 읽습니다. 사용자는 특정 오프셋을 지정하는 JSON 문자열을 지정할 수도 있습니다. |
startingTime | STRING | None | 지정하면 Pulsar 원본은 지정된 startingTime의 위치에서 시작하는 메시지를 읽습니다. |
다음 인수는 펄서 클라이언트의 인증에 사용됩니다.
옵션 | Type | 기본값 | 설명 |
---|---|---|---|
pulsarClientAuthPluginClassName | STRING | None | 인증 플러그 인의 이름입니다. |
pulsarClientAuthParams | STRING | None | 인증 플러그 인에 대한 매개 변수입니다. |
pulsarClientUseKeyStoreTls | STRING | None | tls 인증에 KeyStore를 사용할지 여부입니다. |
pulsarClientTlsTrustStoreType | STRING | None | tls 인증을 위한 TrustStore 파일 형식입니다. |
pulsarClientTlsTrustStorePath | STRING | None | tls 인증을 위한 TrustStore 파일 경로입니다. |
pulsarClientTlsTrustStorePassword | STRING | None | tls 인증에 대한 TrustStore 암호입니다. |
이러한 인수는 펄서 허용 제어의 구성 및 인증에 사용되며, 허용 제어를 사용하도록 설정한 경우에만 펄서 관리자 구성이 필요합니다(maxBytesPerTrigger가 설정된 경우).
옵션 | Type | 기본값 | 설명 |
---|---|---|---|
maxBytesPerTrigger | BIGINT | None | 마이크로배치당 처리하려는 최대 바이트 수의 소프트 제한입니다. 지정한 경우 admin.url도 지정해야 합니다. |
adminUrl | STRING | None | Pulsar serviceHttpUrl 구성입니다. maxBytesPerTrigger가 지정된 경우에만 필요합니다. |
pulsarAdminAuthPlugin | STRING | None | 인증 플러그 인의 이름입니다. |
pulsarAdminAuthParams | STRING | None | 인증 플러그 인에 대한 매개 변수입니다. |
pulsarClientUseKeyStoreTls | STRING | None | tls 인증에 KeyStore를 사용할지 여부입니다. |
pulsarAdminTlsTrustStoreType | STRING | None | tls 인증을 위한 TrustStore 파일 형식입니다. |
pulsarAdminTlsTrustStorePath | STRING | None | tls 인증을 위한 TrustStore 파일 경로입니다. |
pulsarAdminTlsTrustStorePassword | STRING | None | tls 인증에 대한 TrustStore 암호입니다. |
반품
다음 스키마가 있는 펄서 레코드 테이블입니다.
__key STRING NOT NULL
: 펄서 메시지 키입니다.value BINARY NOT NULL
: 펄서 메시지 값입니다.참고: Avro 또는 JSON 스키마가 있는 토픽의 경우 콘텐츠를 이진 값 필드에 로드하는 대신 펄서 토픽의 필드 이름과 필드 형식을 유지하도록 콘텐츠가 확장됩니다.
__topic STRING NOT NULL
: 펄서 토픽 이름입니다.__messageId BINARY NOT NULL
: 펄서 메시지 ID입니다.__publishTime TIMESTAMP NOT NULL
: 펄서 메시지 게시 시간입니다.__eventTime TIMESTAMP NOT NULL
: 펄서 메시지 이벤트 시간입니다.__messageProperties MAP<STRING, STRING>
: 펄서 메시지 속성입니다.
예제
-- Streaming from Pulsar
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic');
-- Streaming Ingestion from Pulsar with authentication
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pulsar(
serviceUrl => 'pulsar://broker.example.com:6650',
startingOffsets => 'earliest',
topic => 'my-topic',
pulsarClientAuthPluginClassName => 'org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls',
pulsarClientAuthParams => 'keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw'
);
The data can now to be queried from the testing.streaming_table for further analysis.