다음을 통해 공유


프로토콜 버퍼 읽기 및 쓰기

Azure Databricks는 Apache Spark 구조체와 프로토콜 버퍼(protobuf) 간의 직렬화 및 역직렬화에 대한 기본 지원을 제공합니다. Protobuf 지원은 Apache Spark DataFrame 변환기로 구현되며 구조적 스트리밍 또는 일괄 처리 작업에 사용할 수 있습니다.

프로토콜 버퍼를 역직렬화 및 직렬화하는 방법

Databricks Runtime 12.2 LTS 이상에서는 데이터를 직렬화하고 from_protobuf 역직렬화하는 데 사용하고 함수를 사용할 to_protobuf 수 있습니다. Protobuf serialization은 스트리밍 워크로드에서 일반적으로 사용됩니다.

protobuf 함수의 기본 구문은 읽기 및 쓰기 함수와 비슷합니다. 사용하려면 먼저 이러한 함수를 가져와야 합니다.

from_protobuf 이진 데이터 column을 구조체로 변환하고 to_protobuf 구조체 column을 이진 데이터로 변환합니다. options 인수로 지정된 schema 레지스트리 또는 descFilePath 인수로 식별된 설명자 파일을 제공해야 합니다.

Python

from_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

to_protobuf(data: 'ColumnOrName', messageName: Optional[str] = None, descFilePath: Optional[str] = None, options: Optional[Dict[str, str]] = None)

Scala

// While using with Schema registry:
from_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
from_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

// While using with Schema registry:
to_protobuf(data: Column, options: Map[String, String])

// Or with Protobuf descriptor file:
to_protobuf(data: Column, messageName: String, descFilePath: String, options: Map[String, String])

다음 예제에서는 이진 protobuf 레코드를 from_protobuf() 처리하고 Spark SQL 구조체를 이진 protobuf로 to_protobuf()변환하는 방법을 보여 줍니다.

Confluent Schema 레지스트리에서 protobuf 사용

Azure Databricks는 Confluent Schema 레지스트리 사용하여 Protobuf를 정의하도록 지원합니다.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

schema_registry_options = {
  "schema.registry.subject" : "app-events-value",
  "schema.registry.address" : "https://schema-registry:8081/"
}

# Convert binary Protobuf to SQL struct with from_protobuf():
proto_events_df = (
  input_df
    .select(
      from_protobuf("proto_bytes", options = schema_registry_options)
        .alias("proto_event")
    )
)

# Convert SQL struct to binary Protobuf with to_protobuf():
protobuf_binary_df = (
  proto_events_df
    .selectExpr("struct(name, id, context) as event")
    .select(
      to_protobuf("event", options = schema_registry_options)
        .alias("proto_bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._
import scala.collection.JavaConverters._

val schemaRegistryOptions = Map(
    "schema.registry.subject" -> "app-events-value",
    "schema.registry.address" -> "https://schema-registry:8081/"
)

// Convert binary Protobuf to SQL struct with from_protobuf():
val protoEventsDF = inputDF
    .select(
        from_protobuf($"proto_bytes", options = schemaRegistryOptions.asJava)
            .as("proto_event")
    )

// Convert SQL struct to binary Protobuf with to_protobuf():
val protobufBinaryDF = protoEventsDF
    .selectExpr("struct(name, id, context) as event")
    .select(
        to_protobuf($"event", options = schemaRegistryOptions.asJava)
            .as("proto_bytes")
    )

외부 Confluent Schema 레지스트리에 인증하기

외부 Confluent Schema 레지스트리에 인증하기 위해, 인증 credentials 및 API 키를 포함하도록 schema 레지스트리 옵션을 update 설정하세요.

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.basic.auth.credentials.source" : "USER_INFO",
    "confluent.schema.registry.basic.auth.user.info" : "confluentApiKey:confluentApiSecret"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.basic.auth.credentials.source" -> "USER_INFO",
      "confluent.schema.registry.basic.auth.user.info" -> "confluentApiKey:confluentApiSecret"
)

Unity Catalogvolumes 트러스트 저장소 및 키 저장소 파일 사용

Databricks Runtime 14.3 LTS 이상에서는 Unity Catalogvolumes 신뢰 저장소 및 키 저장소 파일을 사용하여 Confluent Schema 레지스트리에 인증할 수 있습니다. Update 레지스트리 옵션을 다음 예제에 따라 schema으로 설정하십시오.

Python

schema_registry_options = {
    "schema.registry.subject" : "app-events-value",
    "schema.registry.address" : "https://remote-schema-registry-endpoint",
    "confluent.schema.registry.ssl.truststore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
    "confluent.schema.registry.ssl.truststore.password" : "<password>",
    "confluent.schema.registry.ssl.keystore.location" : "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
    "confluent.schema.registry.ssl.keystore.password" : "<password>",
    "confluent.schema.registry.ssl.key.password" : "<password>"
  }

Scala

val schemaRegistryOptions = Map(
      "schema.registry.subject" -> "app-events-value",
      "schema.registry.address" -> "https://remote-schema-registry-endpoint",
      "confluent.schema.registry.ssl.truststore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.truststore.jks",
      "confluent.schema.registry.ssl.truststore.password" -> "<password>",
      "confluent.schema.registry.ssl.keystore.location" -> "/Volumes/<catalog_name>/<schema_name>/<volume_name>/kafka.client.keystore.jks",
      "confluent.schema.registry.ssl.keystore.password" -> "<password>",
      "confluent.schema.registry.ssl.key.password" -> "<password>"
)

설명자 파일과 함께 Protobuf 사용

컴퓨팅 클러스터에서 사용할 수 있는 protobuf 설명자 파일을 참조할 수도 있습니다. 해당 위치에 따라 파일을 읽을 수 있는 적절한 권한이 있는지 확인합니다.

Python

from pyspark.sql.protobuf.functions import to_protobuf, from_protobuf

descriptor_file = "/path/to/proto_descriptor.desc"

proto_events_df = (
    input_df.select(
      from_protobuf(input_df.value, "BasicMessage", descFilePath=descriptor_file).alias("proto")
    )
)

proto_binary_df = (
  proto_events_df
    .select(
      to_protobuf(proto_events_df.proto, "BasicMessage", descriptor_file).alias("bytes")
    )
)

Scala

import org.apache.spark.sql.protobuf.functions._

val descriptorFile = "/path/to/proto_descriptor.desc"

val protoEventsDF = inputDF
  .select(
    from_protobuf($"value", "BasicMessage", descFilePath=descriptorFile).as("proto")
  )

val protoBytesDF = protoEventsDF
  .select(
    to_protobuf($"proto", "BasicMessage", descriptorFile).as("bytes")
  )

Protobuf 함수에서 지원되는 옵션

Protobuf 함수에서 지원되는 옵션은 다음과 같습니다.

  • 모드: Protobuf 레코드를 역직렬화하는 동안 오류가 처리되는 방법을 결정합니다. 오류는 레코드의 실제 schema과 from_protobuf()에서 제공된 예상 schema 간의 불일치를 포함하여 다양한 형식의 잘못된 레코드로 인해 발생할 수 있습니다.
    • Values:
      • FAILFAST(기본값): 잘못된 형식의 레코드가 발생하고 작업이 실패하면 오류가 발생합니다.
      • PERMISSIVE: 형식이 잘못된 레코드에 대해 NULL이 반환됩니다. 많은 레코드가 삭제될 수 있으므로 이 옵션을 신중하게 사용합니다. 이는 원본에 있는 레코드의 작은 부분이 잘못된 경우에 유용합니다.
  • recursive.fields.max.depth: 재귀 필드에 대한 지원을 추가합니다. Spark SQL 스키마는 재귀 필드를 지원하지 않습니다. 이 옵션을 지정하지 않으면 재귀 필드가 허용되지 않습니다. Protobufs에서 재귀 필드를 지원하려면 지정된 깊이로 확장해야 합니다.
    • Values:

      • -1(기본값): 재귀 필드는 허용되지 않습니다.

      • 0: 재귀 필드가 삭제됩니다.

      • 1: 단일 수준의 재귀를 허용합니다.

      • [2-10]: 최대 10개 수준까지 여러 재귀에 대한 임계값을 지정합니다.

        값을 0보다 크게 설정하면 중첩된 필드를 구성된 깊이로 확장하여 재귀 필드를 허용합니다. 실수로 매우 큰 스키마를 만들지 않도록 10보다 큰 Values 허용되지 않습니다. Protobuf 메시지에 설정된 limit을 초과하는 깊이가 있으면, 재귀 limit후에 반환된 Spark 구조체가 잘립니다.

    • : 다음 재귀 필드가 있는 Protobuf를 고려합니다.

      message Person { string name = 1; Person friend = 2; }
      

      다음은 이 설정에 대한 다양한 values을 가진 끝 schema들을 나열합니다.

      • 옵션 set에서 STRUCT<name: STRING>까지 1
      • 옵션 set에서 #D2까지: STRUCT<name STRING, friend: STRUCT<name: STRING>>
      • 옵션 set에서 3까지: STRUCT<name STRING, friend: STRUCT<name STRING, friend: STRUCT<name: STRING>>>
  • convert.any.fields.to.json: 이 옵션을 사용하면 Protobuf Any 필드를 JSON으로 변환할 수 있습니다. 이 기능은 신중하게 사용하도록 설정해야 합니다. JSON 변환 및 처리는 비효율적입니다. 또한 JSON 문자열 필드는 Protobuf schema 안전성을 잃어버리며, 이로 인해 다운스트림 처리가 오류 발생에 취약해집니다.
    • Values:

      • False(기본값): 런타임에 이러한 와일드카드 필드에는 임의의 Protobuf 메시지를 이진 데이터로 포함할 수 있습니다. 기본적으로 이러한 필드는 일반 Protobuf 메시지처럼 처리됩니다. schema (STRUCT<type_url: STRING, value: BINARY>)있는 두 개의 필드가 있습니다. 기본적으로 이진 value 필드는 어떤 방식으로도 해석되지 않습니다. 그러나 이진 데이터는 실제로 일부 애플리케이션에서 작동하는 것이 편리하지 않을 수 있습니다.
      • True: 이 값을 True로 설정하면 런타임에 필드를 JSON 문자열로 변환 Any 할 수 있습니다. 이 옵션을 사용하면 이진 파일이 구문 분석되고 Protobuf 메시지가 JSON 문자열로 역직렬화됩니다.
    • : 다음과 같이 정의된 두 가지 Protobuf 형식을 고려합니다.

      message ProtoWithAny {
         string event_name = 1;
         google.protobuf.Any details = 2;
      }
      
      message Person {
         string name = 1;
         int32 id = 2;
      }
      

      이 옵션을 사용하면 schema에 대한 from_protobuf("col", messageName ="ProtoWithAny")STRUCT<event_name: STRING, details: STRING>가 됩니다.

      런타임에 필드에 Protobuf 메시지가 포함된 details 경우 Person 반환된 값은 다음과 ('click', '{"@type":"type.googleapis.com/...ProtoWithAny","name":"Mario","id":100}')같습니다.

    • 요구 사항:

      • 필드에 사용되는 가능한 모든 Protobuf 형식에 Any 대한 정의는 전달된 from_protobuf()Protobuf 설명자 파일에서 사용할 수 있어야 합니다.
      • Protobuf를 찾을 수 없으면 Any 해당 레코드에 대한 오류가 발생합니다.
      • 이 기능은 현재 schema-registry에서 지원되지 않습니다.
  • emit.default입니다.values: Protobuf를 Spark 구조체로 역직렬화할 때 values 없는 렌더링 필드를 사용하도록 설정합니다. 이 옵션은 아쉽게 사용해야 합니다. 일반적으로 의미 체계에서 이러한 미세한 차이에 의존 하는 것이 좋습니다.
    • Values

      • False(기본값): 직렬화된 Protobuf에서 필드가 비어 있으면 Spark 구조체의 결과 필드는 기본적으로 null입니다. 이 옵션을 사용하지 않고 기본값으로 처리하는 null 것이 더 간단합니다.
      • True: 이 옵션을 사용하도록 설정하면 해당 필드가 해당 기본값인 values로 채워집니다.
    • : 다음과 같이 Person(age=0, middle_name="")생성된 Protobuf를 사용하여 다음 Protobuf를 고려합니다.

      syntax = "proto3";
      
      message Person {
         string name = 1;
         int64 age = 2;
         optional string middle_name = 3;
         optional int64 salary = 4;
      }
      
      • set 옵션을 False로 설정하면, from_protobuf()을 호출한 후, Spark 구조체는 모두 null인 {"name": null, "age": null, "middle_name": "", "salary": null}가 됩니다. 두 필드(agemiddle_name)에 valuesset있음에도 불구하고, Protobuf는 그들이 기본값 values이기 때문에 wire-format에 포함하지 않습니다.
      • 옵션 set을(를) True로 설정하면 from_protobuf() 호출한 후 Spark 구조체는 {"name": "", "age": 0, "middle_name": "", "salary": null}가 됩니다. salary 필드는 명시적으로 optional 선언되고 입력 레코드에 set 않으므로 null로 유지됩니다.
  • enums.as.ints: 사용하도록 설정하면 Protobuf의 열거형 필드가 Spark에서 정수 필드로 렌더링됩니다.
    • Values

      • False(기본값)
      • True: 사용하도록 설정하면 Protobuf의 열거형 필드가 Spark에서 정수 필드로 렌더링됩니다.
    • 예제: 다음 Protobuf를 고려합니다.

      syntax = "proto3";
      
      message Person {
         enum Job {
           NONE = 0;
           ENGINEER = 1;
           DOCTOR = 2;
           NURSE = 3;
         }
         Job job = 1;
      }
      

      다음과 같은 Person(job = ENGINEER)Protobuf 메시지가 표시됩니다.

      • 이 옵션을 사용하지 않도록 설정하면 해당 Spark 구조체가 됩니다 {"job": "ENGINEER"}.
      • 이 옵션을 사용하도록 설정하면 해당 Spark 구조체가 됩니다 {"job": 1}.

      이러한 필드에 대한 schema 각 사례에서 다릅니다(기본 문자열이 아닌 정수). 이러한 변경은 다운스트림 tables의 schema에 영향을 미칠 수 있습니다.

Schema 레지스트리 옵션

다음 schema 레지스트리 옵션은 Protobuf 함수와 함께 schema 레지스트리를 사용하는 동안 관련이 있습니다.

  • schema.registry.subject
    • Required
    • Schema 레지스트리에서 schema의 주제를 지정합니다 (예: "client-event").
  • .registry.address
    • Required
    • schema 레지스트리의 URL, 예를 들어 https://schema-registry.example.com:8081처럼
  • schema.registry.protobuf.name
    • 선택 사항
    • 기본값: <NONE>.
    • 주체에 대한 schema-registry 항목에는 단일 proto 파일처럼 여러 Protobuf 정의가 포함될 수 있습니다. 이 옵션을 지정하지 않으면 schema에 첫 번째 Protobuf가 사용됩니다. 항목의 첫 번째 메시지가 아닌 경우 Protobuf 메시지의 이름을 지정합니다. 예를 들어 두 개의 Protobuf 정의인 "Person" 및 "Location"이 있는 항목을 순서대로 고려해 보세요. 스트림이 "Person"이 아닌 "Location"에 해당하는 경우, 이 옵션을 "위치"(또는 패키지 "com.example.protos.Location"이(가) 포함된 전체 이름)로 설정하세요. set.
  • .registry를 schema.schema.evolution.mode
    • 기본값: "restart".
    • 지원되는 모드:
      • "restart"
      • "none"
    • 이 옵션은 from_protobuf()에 대해 schema진화 모드를 설정합니다. 쿼리 시작 시 Spark는 지정된 주제에 대한 최신 schema-id를 기록합니다. 이것은 from_protobuf()에 대한 schema를 결정합니다. 쿼리가 시작된 후 새 schema가 schema 레지스트리에 게시될 수 있습니다. 새로운 schema-id가 들어오는 레코드에서 발견되면 이는 schema이 변경되었음을 나타냅니다. 이 옵션은 schema 대한 이러한 변경 사항을 처리하는 방법을 결정합니다.
      • 다시 시작(기본값): 최신 schema-id가 발견되면 UnknownFieldException 트리거합니다. 그러면 쿼리가 종료됩니다. Databricks는 작업을 구성하여 쿼리가 실패하면 자동으로 다시 시작하도록 함으로써 schema 변경 내용을 처리할 수 있도록 할 것을 권장합니다.
      • 없음: Schema-id 변경 내용은 무시됩니다. 쿼리 시작 시 관찰된 동일한 schema을 사용하여 최신 schema-id가 있는 레코드는 구문 분석됩니다. 최신 Protobuf 정의는 이전 버전과 호환될 것으로 예상되며 새 필드는 무시됩니다.
  • 결합합니다. .registry를schema.<schema-registy-client-option>
    • 선택 사항
    • Schema-registry는 Confluent schema-registry에 Confluent Schema Registry 클라이언트를 사용하여 연결합니다. 클라이언트에서 지원하는 모든 구성 옵션은 "confluent" 접두사를 사용하여 지정할 수 있습니다.schema.registry". 예를 들어, 다음의 두 가지 설정은 "USER_INFO" 인증 credentials을 제공합니다.
      • "confluent. .registry.basic.auth를schema.credentials.source": 'USER_INFO'
      • "confluent.schema.registry.basic.auth.user.info": "<KEY> : <SECRET>"