Apache Pulsar からのストリーム
重要
この機能はパブリック プレビュー段階にあります。
Databricks Runtime 14.1 以降では、構造化ストリーミングを使用して、Azure Databricks 上の Apache Pulsar からデータをストリーミングできます。
構造化ストリーミングは、Pulsar ソースから読み取られたデータに対して 1 回だけ処理のセマンティクスを提供します。
構文の例
次に、構造化ストリーミングを使用して Pulsar から読み取る基本的な例を示します。
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.load()
トピックを指定するには、常に service.url
と次のいずれかのオプションを指定する必要があります。
topic
topics
topicsPattern
オプションの完全な一覧については、「Pulsar ストリーミング読み取りのオプションを構成する」を参照してください。
Pulsar に対する認証
Azure Databricks では、Pulsar に対する認証として、トラストストアとキーストアがサポートされています。 Databricks では、構成の詳細を格納する際にシークレットを使用することを推奨しています。
ストリームの構成中に、次のオプションを設定できます。
pulsar.client.authPluginClassName
pulsar.client.authParams
pulsar.client.useKeyStoreTls
pulsar.client.tlsTrustStoreType
pulsar.client.tlsTrustStorePath
pulsar.client.tlsTrustStorePassword
ストリームで PulsarAdmin
を使用する場合は、次の設定も行います。
pulsar.admin.authPluginClassName
pulsar.admin.authParams
次の例では、認証オプションの構成を示します。
val clientAuthParams = dbutils.secrets.get(scope = "pulsar", key = "clientAuthParams")
val clientPw = dbutils.secrets.get(scope = "pulsar", key = "clientPw")
// clientAuthParams is a comma-separated list of key-value pairs, such as:
//"keyStoreType:JKS,keyStorePath:/var/private/tls/client.keystore.jks,keyStorePassword:clientpw"
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topics", "topic1,topic2")
.option("startingOffsets", startingOffsets)
.option("pulsar.client.authPluginClassName", "org.apache.pulsar.client.impl.auth.AuthenticationKeyStoreTls")
.option("pulsar.client.authParams", clientAuthParams)
.option("pulsar.client.useKeyStoreTls", "true")
.option("pulsar.client.tlsTrustStoreType", "JKS")
.option("pulsar.client.tlsTrustStorePath", trustStorePath)
.option("pulsar.client.tlsTrustStorePassword", clientPw)
.load()
Pulsar スキーマ
Pulsar から読み取られるレコードのスキーマは、トピックのスキーマがどのようにエンコードされているかによって異なります。
- Avro スキーマまたは JSON スキーマを含むトピックの場合、フィールド名とフィールド型は結果の Spark DataFrame に保持されます。
- スキーマのないトピック、または Pulsar の単純なデータ型を使用するトピックの場合、ペイロードは
value
列に読み込まれます。 - スキーマが異なる複数のトピックを読み取るリーダーが構成されている場合は、生のコンテンツを
value
列に読み込むallowDifferentTopicSchemas
を設定します。
Pulsar レコードには、次のメタデータ フィールドがあります。
列 | Type |
---|---|
__key |
binary |
__topic |
string |
__messageId |
binary |
__publishTime |
timestamp |
__eventTime |
timestamp |
__messageProperties |
map<String, String> |
Pulsar ストリーミング読み取りのオプションを構成する
すべてのオプションは、.option("<optionName>", "<optionValue>")
構文を使用して構造化ストリーミング読み取りの一部として構成されます。 オプションを使用して認証を構成することもできます。 「Pulsar に対する認証」を参照してください。
次の表で、Pulsar に必要な構成について説明します。 topic
、topics
、topicsPattern
のいずれかのオプションのみを指定する必要があります。
オプション | 既定値 | 説明 |
---|---|---|
service.url |
なし | Pulsar サービスの Pulsar serviceUrl 構成。 |
topic |
なし | 使用するトピックのトピック名文字列。 |
topics |
なし | 使用するトピックのコンマ区切りリスト。 |
topicsPattern |
なし | 使用するトピックに一致する Java 正規表現文字列。 |
次の表で、Pulsar でサポートされているその他のオプションについて説明します。
オプション | 既定値 | 説明 |
---|---|---|
predefinedSubscription |
なし | Spark アプリケーションの進行状況を追跡するためにコネクタによって使用される、定義済みのサブスクリプション名。 |
subscriptionPrefix |
なし | Spark アプリケーションの進行状況を追跡するランダムなサブスクリプションを生成するために、コネクタによって使用されるプレフィックス。 |
pollTimeoutMs |
120000 | Pulsar からメッセージを読み取る際のタイムアウト (ミリ秒単位)。 |
waitingForNonExistedTopic |
false |
コネクタは目的のトピックが作成されるまで待機する必要があるかどうか。 |
failOnDataLoss |
true |
データが失われたとき (たとえば、トピックが削除された場合や、アイテム保持ポリシーのためにメッセージが削除された場合) にクエリを失敗させるかどうかを制御します。 |
allowDifferentTopicSchemas |
false |
異なるスキーマを持つ複数のトピックを読み取る場合は、このパラメーターを使用して、スキーマベースのトピック値の自動逆シリアル化をオフにします。 これが true の場合は、生の値のみが返されます。 |
startingOffsets |
latest |
latest の場合、リーダーは実行を開始した後に最新のレコードを読み取ります。 earliest の場合、リーダーは最も早いオフセットから読み取ります。 ユーザーは、特定のオフセットを指定する JSON 文字列を指定することもできます。 |
maxBytesPerTrigger |
なし | マイクロバッチごとに処理する最大バイト数のソフト制限。 これを指定する場合は、admin.url も指定する必要があります。 |
admin.url |
なし | Pulsar serviceHttpUrl 構成。 maxBytesPerTrigger が指定されている場合にのみ必要です。 |
次のパターンを使用して、Pulsar クライアント、管理者、リーダーの構成を指定することもできます。
パターン | 構成オプションへのリンク |
---|---|
pulsar.client.* |
Pulsar クライアント構成 |
pulsar.admin.* |
Pulsar 管理者構成 |
pulsar.reader.* |
Pulsar リーダー構成 |
開始オフセット JSON を作成する
メッセージ ID を手動で作成して特定のオフセットを指定し、これを JSON として startingOffsets
オプションに渡すことができます。 次のコード例は、この構文を示しています。
import org.apache.spark.sql.pulsar.JsonUtils
import org.apache.pulsar.client.api.MessageId
import org.apache.pulsar.client.impl.MessageIdImpl
val topic = "my-topic"
val msgId: MessageId = new MessageIdImpl(ledgerId, entryId, partitionIndex)
val startOffsets = JsonUtils.topicOffsets(Map(topic -> msgId))
query = spark.readStream
.format("pulsar")
.option("service.url", "pulsar://broker.example.com:6650")
.option("topic", topic)
.option("startingOffsets", startOffsets)
.load()