次の方法で共有


read_pubsub ストリーミングテーブル値関数

適用対象:check marked yes Databricks SQL Databricks Runtime 13.3 LTS 以降

トピックの Pub/Sub から読み取られたレコードを含むテーブルを返します。 ストリーミング クエリのみをサポートします。

構文

read_pubsub( { parameter => value } [, ...])

引数

read_pubsub には、名前付きパラメーター呼び出しが必要です。

必須の引数は subscriptionIdprojectId、および topicId のみです。 他の引数はすべて省略可能です。

詳細な引数の説明については、「Pub/Sub ストリーミング読み取りのオプションを構成する」を参照してください。

Databricks では、承認オプションを提供するときにシークレットを使用することをお勧めします。 「secret 関数」を参照してください。

Pub/Sub へのアクセスの構成の詳細については、「Pub/Sub へのアクセスを構成する」を参照してください。

パラメーター 説明
subscriptionId STRING 必須。Pub/Sub サブスクリプションに割り当てられた一意識別子。
projectId STRING 必須。Pub/Sub トピックに関連付けられている Google Cloud プロジェクト ID。
topicId STRING 必須。サブスクライブする Pub/Sub トピックの ID または名前。
clientEmail STRING 認証用のサービス アカウントに関連付けられているメール アドレス。
clientId STRING 認証用のサービス アカウントに関連付けられているクライアント ID。
privateKeyId STRING サービス アカウントに関連付けられている秘密キーの ID。
privateKey STRING 認証用のサービス アカウントに関連付けられている秘密キー。

これらの引数は、Pub/Sub から読み取るときにさらに微調整するために使用されます。

パラメーター 説明
numFetchPartitions STRING 省略可能。Executor の既定の数。 サブスクリプションからレコードをフェッチする並列 Spark タスクの数。
deleteSubscriptionOnStreamStop BOOLEAN 省略可能。既定値は false。 true に設定すると、ストリームに渡されたサブスクリプションが、ストリーミング ジョブの終了時に削除されます。
maxBytesPerTrigger STRING トリガーされる各マイクロバッチの間に処理されるバッチ サイズのソフト制限。 既定値は ‘none’ です。
maxRecordsPerFetch STRING レコードを処理する前にタスクごとにフェッチするレコードの数。 既定値は ‘1000’ です。
maxFetchPeriod STRING レコードを処理する前に取得する各タスクの時間。 既定値は '10s' です。

返品

次のスキーマを持つ Pub/Sub レコードのテーブル。 属性列は null の可能性がありますが、他のすべての列は null ではありません。

名前 データ型 Nullable Standard 説明
messageId STRING いいえ Pub/Sub メッセージの一意識別子。
payload BINARY いいえ Pub/Sub メッセージのコンテンツ。
attributes STRING はい Pub/Sub メッセージの属性を表すキーと値のペア。 これは json でエンコードされた文字列です。
publishTimestampInMillis BIGINT いいえ メッセージが発行されたときのタイムスタンプ (ミリ秒単位)。
sequenceNumber BIGINT いいえ シャード内のレコードの一意識別子。

-- Streaming Ingestion from Pubsub
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                clientEmail => secret(‘app-events’, ‘clientEmail’),
                clientId => secret(‘app-events’, ‘clientId’),
        privateKeyId => secret(‘app-events’, ‘privateKeyId’),
                privateKey => secret(‘app-events’, ‘privateKey’)
);

-- A streaming query when a service account is associated with the cluster
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’
);

さらに分析するには、testing.streaming_table に対してクエリを実行してデータを入手する必要があります。

誤ったクエリ:

-- Missing topicId option
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’
);

-- Limit is too high for an option, MAX_RECORDS_PER_FETCH_LIMIT
> CREATE STREAMING TABLE testing.streaming_table AS
  SELECT * FROM STREAM read_pubsub (
                subscriptionId => ‘app-events-1234’,
                projectId => ‘app-events-project’,
                topicId => ‘app-events-topic’,
                maxRecordsPerFetchLimit => ‘1000001’
);