프로토콜 버퍼 읽기 및 쓰기
Azure Databricks는 Apache Spark 구조체와 프로토콜 버퍼(protobuf) 간의 직렬화 및 역직렬화에 대한 기본 지원을 제공합니다. Protobuf 지원은 Apache Spark DataFrame 변환기로 구현되며 구조적 스트리밍 또는 일괄 처리 작업에 사용할 수 있습니다.
프로토콜 버퍼를 역직렬화 및 직렬화하는 방법
Databricks Runtime 12.2 LTS 이상에서는 데이터를 직렬화하고 from_protobuf
역직렬화하는 데 사용하고 함수를 사용할 to_protobuf
수 있습니다. Protobuf serialization은 스트리밍 워크로드에서 일반적으로 사용됩니다.
protobuf 함수의 기본 구문은 읽기 및 쓰기 함수와 비슷합니다. 사용하려면 먼저 이러한 함수를 가져와야 합니다.
from_protobuf
이진 데이터 column을 구조체로 변환하고 to_protobuf
구조체 column을 이진 데이터로 변환합니다.
options
인수로 지정된 schema 레지스트리 또는 descFilePath
인수로 식별된 설명자 파일을 제공해야 합니다.
Python
from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)
Scala
// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])
// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])
다음 예제에서는 이진 protobuf 레코드를 from_protobuf()
처리하고 Spark SQL 구조체를 이진 protobuf로 to_protobuf()
변환하는 방법을 보여 줍니다.
Confluent Schema 레지스트리에서 protobuf 사용
Azure Databricks는 Confluent Schema 레지스트리 사용하여 Protobuf를 정의하도록 지원합니다.
Python
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://schema-registry:8081/"
}
# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
input_df
.select(
from_protobuf("proto_bytes", options = schema_registry_options)
.alias("proto_event")
)
)
# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
proto_events_df
.selectExpr("struct(name, id, context) as event")
.select(
to_protobuf("event", options = schema_registry_options)
.alias("proto_bytes")
)
)
Scala
import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://schema-registry:8081/"
)
// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
.select(
from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
.as("proto_event")
)
// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
.selectExpr("struct(name, id, context) as event")
.select(
to_protobuf($"event", options = schemaRegistryOptions.asJava)
.as("proto_bytes")
)
외부 Confluent Schema 레지스트리에 인증하기
외부 Confluent Schema 레지스트리에 인증하기 위해, 인증 credentials 및 API 키를 포함하도록 schema 레지스트리 옵션을 update 설정하세요.
Python
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
}
Scala
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
"confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)
Unity Catalogvolumes 트러스트 저장소 및 키 저장소 파일 사용
Databricks Runtime 14.3 LTS 이상에서는 Unity Catalogvolumes 신뢰 저장소 및 키 저장소 파일을 사용하여 Confluent Schema 레지스트리에 인증할 수 있습니다. Update 레지스트리 옵션을 다음 예제에 따라 schema으로 설정하십시오.
Python
schema_registry_options = {
"schema.registry.subject" : "app-events-value",
"schema.registry.address" : "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" : "<password>",
"confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" : "<password>",
"confluent.schema.registry.ssl.key.password" : "<password>"
}
Scala
val schemaRegistryOptions = Map(
"schema.registry.subject" -> "app-events-value",
"schema.registry.address" -> "https://remote-schema-registry-endpoint",
"confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
"confluent.schema.registry.ssl.truststore.password" -> "<password>",
"confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
"confluent.schema.registry.ssl.keystore.password" -> "<password>",
"confluent.schema.registry.ssl.key.password" -> "<password>"
)
설명자 파일과 함께 Protobuf 사용
컴퓨팅 클러스터에서 사용할 수 있는 protobuf 설명자 파일을 참조할 수도 있습니다. 해당 위치에 따라 파일을 읽을 수 있는 적절한 권한이 있는지 확인합니다.
Python
from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf
descriptor_file = "/path/to/proto_descriptor.desc"
proto_events_df = (
input_df.select(
from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
)
)
proto_binary_df = (
proto_events_df
.select(
to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
)
)
Scala
import org.apache.spark.sql.protobuf.functions._
val descriptorFile = "/path/to/proto_descriptor.desc"
val protoEventsDF = inputDF
.select(
from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
)
val protoBytesDF = protoEventsDF
.select(
to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
)
Protobuf 함수에서 지원되는 옵션
Protobuf 함수에서 지원되는 옵션은 다음과 같습니다.
-
모드: Protobuf 레코드를 역직렬화하는 동안 오류가 처리되는 방법을 결정합니다. 오류는 레코드의 실제 schema과
from_protobuf()
에서 제공된 예상 schema 간의 불일치를 포함하여 다양한 형식의 잘못된 레코드로 인해 발생할 수 있습니다.-
Values:
-
FAILFAST
(기본값): 잘못된 형식의 레코드가 발생하고 작업이 실패하면 오류가 발생합니다. -
PERMISSIVE
: 형식이 잘못된 레코드에 대해 NULL이 반환됩니다. 많은 레코드가 삭제될 수 있으므로 이 옵션을 신중하게 사용합니다. 이는 원본에 있는 레코드의 작은 부분이 잘못된 경우에 유용합니다.
-
-
Values:
-
recursive.fields.max.depth: 재귀 필드에 대한 지원을 추가합니다. Spark SQL 스키마는 재귀 필드를 지원하지 않습니다. 이 옵션을 지정하지 않으면 재귀 필드가 허용되지 않습니다. Protobufs에서 재귀 필드를 지원하려면 지정된 깊이로 확장해야 합니다.
Values:
-1(기본값): 재귀 필드는 허용되지 않습니다.
0: 재귀 필드가 삭제됩니다.
1: 단일 수준의 재귀를 허용합니다.
[2-10]: 최대 10개 수준까지 여러 재귀에 대한 임계값을 지정합니다.
값을 0보다 크게 설정하면 중첩된 필드를 구성된 깊이로 확장하여 재귀 필드를 허용합니다. 실수로 매우 큰 스키마를 만들지 않도록 10보다 큰 Values 허용되지 않습니다. Protobuf 메시지에 설정된 limit을 초과하는 깊이가 있으면, 재귀 limit후에 반환된 Spark 구조체가 잘립니다.
예: 다음 재귀 필드가 있는 Protobuf를 고려합니다.
message Person { string name = 1; Person friend = 2; }
다음은 이 설정에 대한 다양한 values을 가진 끝 schema들을 나열합니다.
- 옵션 set에서
STRUCT<name: STRING>
까지 1 - 옵션 set에서 #D2까지:
STRUCT<name STRING, friend: STRUCT<name: STRING>>
- 옵션 set에서 3까지:
STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
- 옵션 set에서
-
convert.any.fields.to.json: 이 옵션을 사용하면 Protobuf Any 필드를 JSON으로 변환할 수 있습니다. 이 기능은 신중하게 사용하도록 설정해야 합니다. JSON 변환 및 처리는 비효율적입니다. 또한 JSON 문자열 필드는 Protobuf schema 안전성을 잃어버리며, 이로 인해 다운스트림 처리가 오류 발생에 취약해집니다.
Values:
- False(기본값): 런타임에 이러한 와일드카드 필드에는 임의의 Protobuf 메시지를 이진 데이터로 포함할 수 있습니다. 기본적으로 이러한 필드는 일반 Protobuf 메시지처럼 처리됩니다.
schema
(STRUCT<type_url: STRING, value: BINARY>)
있는 두 개의 필드가 있습니다. 기본적으로 이진value
필드는 어떤 방식으로도 해석되지 않습니다. 그러나 이진 데이터는 실제로 일부 애플리케이션에서 작동하는 것이 편리하지 않을 수 있습니다. - True: 이 값을 True로 설정하면 런타임에 필드를 JSON 문자열로 변환
Any
할 수 있습니다. 이 옵션을 사용하면 이진 파일이 구문 분석되고 Protobuf 메시지가 JSON 문자열로 역직렬화됩니다.
- False(기본값): 런타임에 이러한 와일드카드 필드에는 임의의 Protobuf 메시지를 이진 데이터로 포함할 수 있습니다. 기본적으로 이러한 필드는 일반 Protobuf 메시지처럼 처리됩니다.
schema
예: 다음과 같이 정의된 두 가지 Protobuf 형식을 고려합니다.
message ProtoWithAny { string event_name = 1; google.protobuf.Any details = 2; } message Person { string name = 1; int32 id = 2; }
이 옵션을 사용하면 schema에 대한
from_protobuf("col", messageName ="ProtoWithAny")
은STRUCT<event_name: STRING, details: STRING>
가 됩니다.런타임에 필드에 Protobuf 메시지가 포함된
details
경우Person
반환된 값은 다음과('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}')
같습니다.요구 사항:
- 필드에 사용되는 가능한 모든 Protobuf 형식에
Any
대한 정의는 전달된from_protobuf()
Protobuf 설명자 파일에서 사용할 수 있어야 합니다. - Protobuf를 찾을 수 없으면
Any
해당 레코드에 대한 오류가 발생합니다. - 이 기능은 현재 schema-registry에서 지원되지 않습니다.
- 필드에 사용되는 가능한 모든 Protobuf 형식에
-
emit.default입니다.values: Protobuf를 Spark 구조체로 역직렬화할 때 values 없는 렌더링 필드를 사용하도록 설정합니다. 이 옵션은 아쉽게 사용해야 합니다. 일반적으로 의미 체계에서 이러한 미세한 차이에 의존 하는 것이 좋습니다.
Values
- False(기본값): 직렬화된 Protobuf에서 필드가 비어 있으면 Spark 구조체의 결과 필드는 기본적으로 null입니다. 이 옵션을 사용하지 않고 기본값으로 처리하는
null
것이 더 간단합니다. - True: 이 옵션을 사용하도록 설정하면 해당 필드가 해당 기본값인 values로 채워집니다.
- False(기본값): 직렬화된 Protobuf에서 필드가 비어 있으면 Spark 구조체의 결과 필드는 기본적으로 null입니다. 이 옵션을 사용하지 않고 기본값으로 처리하는
예: 다음과 같이
Person(age=0, middle_name="")
생성된 Protobuf를 사용하여 다음 Protobuf를 고려합니다.syntax = "proto3"; message Person { string name = 1; int64 age = 2; optional string middle_name = 3; optional int64 salary = 4; }
-
set 옵션을 False로 설정하면,
from_protobuf()
을 호출한 후, Spark 구조체는 모두 null인{"name": null, "age": null, "middle_name": "", "salary": null}
가 됩니다. 두 필드(age
및middle_name
)에 valuesset있음에도 불구하고, Protobuf는 그들이 기본값 values이기 때문에 wire-format에 포함하지 않습니다. - 옵션 set을(를) True로 설정하면
from_protobuf()
호출한 후 Spark 구조체는{"name": "", "age": 0, "middle_name": "", "salary": null}
가 됩니다.salary
필드는 명시적으로optional
선언되고 입력 레코드에 set 않으므로 null로 유지됩니다.
-
set 옵션을 False로 설정하면,
-
enums.as.ints: 사용하도록 설정하면 Protobuf의 열거형 필드가 Spark에서 정수 필드로 렌더링됩니다.
Values
- False(기본값)
- True: 사용하도록 설정하면 Protobuf의 열거형 필드가 Spark에서 정수 필드로 렌더링됩니다.
예제: 다음 Protobuf를 고려합니다.
syntax = "proto3"; message Person { enum Job { NONE = 0; ENGINEER = 1; DOCTOR = 2; NURSE = 3; } Job job = 1; }
다음과 같은
Person(job = ENGINEER)
Protobuf 메시지가 표시됩니다.- 이 옵션을 사용하지 않도록 설정하면 해당 Spark 구조체가 됩니다
{"job": "ENGINEER"}
. - 이 옵션을 사용하도록 설정하면 해당 Spark 구조체가 됩니다
{"job": 1}
.
이러한 필드에 대한 schema 각 사례에서 다릅니다(기본 문자열이 아닌 정수). 이러한 변경은 다운스트림 tables의 schema에 영향을 미칠 수 있습니다.
- 이 옵션을 사용하지 않도록 설정하면 해당 Spark 구조체가 됩니다
Schema 레지스트리 옵션
다음 schema 레지스트리 옵션은 Protobuf 함수와 함께 schema 레지스트리를 사용하는 동안 관련이 있습니다.
-
schema.registry.subject
- Required
- Schema 레지스트리에서 schema의 주제를 지정합니다 (예: "client-event").
- .registry.address
- Required
-
schema 레지스트리의 URL, 예를 들어
https://schema-registry.example.com:8081
처럼
-
schema.registry.protobuf.name
- 선택 사항
- 기본값:
<NONE>
. - 주체에 대한 schema-registry 항목에는 단일
proto
파일처럼 여러 Protobuf 정의가 포함될 수 있습니다. 이 옵션을 지정하지 않으면 schema에 첫 번째 Protobuf가 사용됩니다. 항목의 첫 번째 메시지가 아닌 경우 Protobuf 메시지의 이름을 지정합니다. 예를 들어 두 개의 Protobuf 정의인 "Person" 및 "Location"이 있는 항목을 순서대로 고려해 보세요. 스트림이 "Person"이 아닌 "Location"에 해당하는 경우, 이 옵션을 "위치"(또는 패키지 "com.example.protos.Location"이(가) 포함된 전체 이름)로 설정하세요. set.
- .registry를 schema.schema.evolution.mode
- 기본값: "restart".
- 지원되는 모드:
- "restart"
- "none"
- 이 옵션은
from_protobuf()
에 대해 schema진화 모드를 설정합니다. 쿼리 시작 시 Spark는 지정된 주제에 대한 최신 schema-id를 기록합니다. 이것은from_protobuf()
에 대한 schema를 결정합니다. 쿼리가 시작된 후 새 schema가 schema 레지스트리에 게시될 수 있습니다. 새로운 schema-id가 들어오는 레코드에서 발견되면 이는 schema이 변경되었음을 나타냅니다. 이 옵션은 schema 대한 이러한 변경 사항을 처리하는 방법을 결정합니다.-
다시 시작(기본값): 최신 schema-id가 발견되면
UnknownFieldException
트리거합니다. 그러면 쿼리가 종료됩니다. Databricks는 작업을 구성하여 쿼리가 실패하면 자동으로 다시 시작하도록 함으로써 schema 변경 내용을 처리할 수 있도록 할 것을 권장합니다. - 없음: Schema-id 변경 내용은 무시됩니다. 쿼리 시작 시 관찰된 동일한 schema을 사용하여 최신 schema-id가 있는 레코드는 구문 분석됩니다. 최신 Protobuf 정의는 이전 버전과 호환될 것으로 예상되며 새 필드는 무시됩니다.
-
다시 시작(기본값): 최신 schema-id가 발견되면
-
결합합니다. .registry를schema.
<schema-registy-client-option>
- 선택 사항
-
Schema-registry는 Confluent schema-registry에 Confluent Schema Registry 클라이언트를 사용하여 연결합니다. 클라이언트에서 지원하는 모든 구성 옵션은 "confluent" 접두사를 사용하여 지정할 수 있습니다.schema.registry". 예를 들어, 다음의 두 가지 설정은 "USER_INFO" 인증 credentials을 제공합니다.
- "confluent. .registry.basic.auth를schema.credentials.source": 'USER_INFO'
- "confluent.schema.registry.basic.auth.user.info": "
<KEY>
:<SECRET>
"