다음을 통해 공유


read_pubsub 스트리밍 table값 함수

적용 대상:예로 표시된 확인 Databricks SQL 예로 표시된 확인 Databricks Runtime 13.3 LTS 이상

토픽에서 Pub/Sub에 의해 읽은 레코드를 포함한 table를 반환합니다. 스트리밍 쿼리만 지원합니다.

구문

read_pubsub( { parameter => value } [, ...])

인수

read_pubsub 에는 명명된 매개 변수 호출이 필요합니다.

유일하게 필요한 인수는 subscriptionId, projectIdtopicId. 다른 모든 인수는 선택 사항입니다.

전체 인수 설명은 Pub/Sub 스트리밍 읽기에 대한 옵션 구성을 참조 하세요.

Databricks는 권한 부여 옵션을 제공할 때 비밀을 사용하는 것이 좋습니다. 비밀 함수를 참조하세요.

Pub/Sub에 대한 액세스 구성에 대한 자세한 내용은 Pub/Sub에 대한 액세스 구성을 참조 하세요.

매개 변수 형식 설명
subscriptionId STRING Pub/Sub 구독에 할당된 고유한 identifier 필수입니다.
projectId STRING 필수 요소로, Pub/Sub 토픽과 연결된 Google Cloud 프로젝트 ID입니다.
topicId STRING 구독할 Pub/Sub 토픽의 ID 또는 이름이 필요합니다.
clientEmail STRING 인증을 위해 서비스 계정과 연결된 이메일 주소입니다.
clientId STRING 인증을 위해 서비스 계정과 연결된 클라이언트 ID입니다.
privateKeyId STRING 서비스 계정과 연결된 프라이빗 키의 ID입니다.
privateKey STRING 인증을 위해 서비스 계정과 연결된 프라이빗 키입니다.

이러한 인수는 Pub/Sub에서 읽을 때 추가로 미세 조정하는 데 사용됩니다.

매개 변수 형식 설명
numFetchPartitions STRING 기본 실행기 수를 사용하는 선택 사항입니다. 구독에서 레코드를 가져오는 병렬 Spark 작업의 수입니다.
deleteSubscriptionOnStreamStop BOOLEAN 기본값인 선택 사항 false입니다. set이 true인 경우에, 스트리밍 작업이 종료되면 스트림에 전달된 구독이 삭제됩니다.
maxBytesPerTrigger STRING 트리거된 각 마이크로 배치에서 처리할 배치 크기에 대한 완화된 limit. 기본값은 'none'입니다.
maxRecordsPerFetch STRING 레코드를 처리하기 전에 태스크당 가져올 레코드 수입니다. 기본값은 '1000'입니다.
maxFetchPeriod STRING 각 태스크가 레코드를 처리하기 전에 가져올 기간입니다. 기본값은 '10s'입니다.

반품

다음과 같은 schema를 포함하는 table의 Pub/Sub 레코드. column 특성은 null일 수 있지만 다른 모든 columns는 null이 아닙니다.

속성 데이터 형식 Nullable Standard 설명
messageId STRING 아니요 Pub/Sub 메시지의 고유 식별자 identifier.
payload BINARY 아니요 Pub/Sub 메시지의 콘텐츠입니다.
attributes STRING Pub/Sub 메시지의 특성을 나타내는 키-값 쌍입니다. json으로 인코딩된 문자열입니다.
publishTimestampInMillis BIGINT 아니요 메시지가 게시된 타임스탬프(밀리초)입니다.
sequenceNumber BIGINT 아니요 분할된 데이터베이스 내 레코드의 고유한 identifier.

예제

-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                clientEmail => secret(‘app-events’, ‘clientEmail’),
                clientId => secret(‘app-events’, ‘clientId’),
        privateKeyId => secret(‘app-events’, ‘privateKeyId’),
                privateKey => secret(‘app-events’, ‘privateKey’)
);

-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’
);

이제 추가 분석을 위해 데이터를 쿼리 testing.streaming_table 해야 합니다.

잘못된 쿼리:

-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’
);

-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                maxRecordsPerFetchLimit => ‘1000001’
);