read_pubsub
스트리밍 table값 함수
적용 대상: Databricks SQL Databricks Runtime 13.3 LTS 이상
토픽에서 Pub/Sub에 의해 읽은 레코드를 포함한 table를 반환합니다. 스트리밍 쿼리만 지원합니다.
구문
read_pubsub( { parameter => value } [, ...])
인수
read_pubsub
에는 명명된 매개 변수 호출이 필요합니다.
유일하게 필요한 인수는 subscriptionId
, projectId
및 topicId
. 다른 모든 인수는 선택 사항입니다.
전체 인수 설명은 Pub/Sub 스트리밍 읽기에 대한 옵션 구성을 참조 하세요.
Databricks는 권한 부여 옵션을 제공할 때 비밀을 사용하는 것이 좋습니다. 비밀 함수를 참조하세요.
Pub/Sub에 대한 액세스 구성에 대한 자세한 내용은 Pub/Sub에 대한 액세스 구성을 참조 하세요.
매개 변수 | 형식 | 설명 |
---|---|---|
subscriptionId |
STRING |
Pub/Sub 구독에 할당된 고유한 identifier 필수입니다. |
projectId |
STRING |
필수 요소로, Pub/Sub 토픽과 연결된 Google Cloud 프로젝트 ID입니다. |
topicId |
STRING |
구독할 Pub/Sub 토픽의 ID 또는 이름이 필요합니다. |
clientEmail |
STRING |
인증을 위해 서비스 계정과 연결된 이메일 주소입니다. |
clientId |
STRING |
인증을 위해 서비스 계정과 연결된 클라이언트 ID입니다. |
privateKeyId |
STRING |
서비스 계정과 연결된 프라이빗 키의 ID입니다. |
privateKey |
STRING |
인증을 위해 서비스 계정과 연결된 프라이빗 키입니다. |
이러한 인수는 Pub/Sub에서 읽을 때 추가로 미세 조정하는 데 사용됩니다.
매개 변수 | 형식 | 설명 |
---|---|---|
numFetchPartitions |
STRING |
기본 실행기 수를 사용하는 선택 사항입니다. 구독에서 레코드를 가져오는 병렬 Spark 작업의 수입니다. |
deleteSubscriptionOnStreamStop |
BOOLEAN |
기본값인 선택 사항 false 입니다.
set이 true인 경우에, 스트리밍 작업이 종료되면 스트림에 전달된 구독이 삭제됩니다. |
maxBytesPerTrigger |
STRING |
트리거된 각 마이크로 배치에서 처리할 배치 크기에 대한 완화된 limit. 기본값은 'none'입니다. |
maxRecordsPerFetch |
STRING |
레코드를 처리하기 전에 태스크당 가져올 레코드 수입니다. 기본값은 '1000'입니다. |
maxFetchPeriod |
STRING |
각 태스크가 레코드를 처리하기 전에 가져올 기간입니다. 기본값은 '10s'입니다. |
반품
다음과 같은 schema를 포함하는 table의 Pub/Sub 레코드. column 특성은 null일 수 있지만 다른 모든 columns는 null이 아닙니다.
속성 | 데이터 형식 | Nullable | Standard | 설명 |
---|---|---|---|---|
messageId |
STRING |
아니요 | Pub/Sub 메시지의 고유 식별자 identifier. | |
payload |
BINARY |
아니요 | Pub/Sub 메시지의 콘텐츠입니다. | |
attributes |
STRING |
예 | Pub/Sub 메시지의 특성을 나타내는 키-값 쌍입니다. json으로 인코딩된 문자열입니다. | |
publishTimestampInMillis |
BIGINT |
아니요 | 메시지가 게시된 타임스탬프(밀리초)입니다. | |
sequenceNumber |
BIGINT |
아니요 | 분할된 데이터베이스 내 레코드의 고유한 identifier. |
예제
-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
clientEmail => secret(‘app-events’, ‘clientEmail’),
clientId => secret(‘app-events’, ‘clientId’),
privateKeyId => secret(‘app-events’, ‘privateKeyId’),
privateKey => secret(‘app-events’, ‘privateKey’)
);
-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’
);
이제 추가 분석을 위해 데이터를 쿼리 testing.streaming_table
해야 합니다.
잘못된 쿼리:
-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’
);
-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
SELECT * FROM STREAM read_pubsub (
subscriptionId => ‘app-events-1234’,
projectId => ‘app-events-project’,
topicId => ‘app-events-topic’,
maxRecordsPerFetchLimit => ‘1000001’
);