PySpark 사용자 지정 데이터 원본
Important
PySpark 사용자 지정 데이터 원본은 Databricks Runtime 15.2 이상에서 공개 미리 보기 로 제공됩니다. 스트리밍 지원은 Databricks Runtime 15.3 이상에서 사용할 수 있습니다.
PySpark DataSource는 Python(PySpark) DataSource API를 통해 만들어지며, 이를 통해 사용자 지정 데이터 원본에서 읽고 Python을 사용하여 Apache Spark의 사용자 지정 데이터 싱크에 쓸 수 있습니다. PySpark 사용자 지정 데이터 원본을 사용하여 데이터 시스템에 대한 사용자 지정 connections 정의하고 추가 기능을 구현하여 재사용 가능한 데이터 원본을 빌드할 수 있습니다.
DataSource 클래스
PySpark DataSource 는 데이터 판독기 및 작성기를 만드는 메서드를 제공하는 기본 클래스입니다.
데이터 원본 하위 클래스 구현
사용 사례에 따라 데이터 원본을 읽을 수 있거나 쓰기 가능하거나 둘 다 만들려면 하위 클래스에서 다음을 구현해야 합니다.
속성 또는 메서드 | 설명 |
---|---|
name |
필수입니다. 데이터 원본의 이름 |
schema |
필수입니다. 읽거나 쓸 데이터 원본의 schema |
reader() |
데이터 원본을 DataSourceReader 읽을 수 있도록 하려면 반환해야 합니다(일괄 처리). |
writer() |
데이터 싱크를 DataSourceWriter 쓰기 가능(일괄 처리)하려면 반환해야 합니다. |
streamReader() 또는 simpleStreamReader() |
데이터 스트림을 DataSourceStreamReader 읽을 수 있도록 하려면 반환해야 합니다(스트리밍). |
streamWriter() |
데이터 스트림을 DataSourceStreamWriter 쓰기 가능(스트리밍)하려면 반환해야 합니다. |
참고 항목
사용자 정의 DataSource
, , DataSourceReader
, DataSourceWriter
DataSourceStreamReader
DataSourceStreamWriter
및 해당 메서드를 serialize할 수 있어야 합니다. 즉, 기본 형식을 포함하는 사전 또는 중첩된 사전이어야 합니다.
데이터 원본 등록
인터페이스를 구현한 후 등록해야 하며, 다음 예제와 같이 로드하거나 사용할 수 있습니다.
# Register the data source
spark.dataSource.register(MyDataSourceClass)
# Read from a custom data source
spark.read.format("my_datasource_name").load().show()
예제 1: 일괄 처리 쿼리를 위한 PySpark DataSource 만들기
PySpark DataSource 판독기 기능을 보여 주려면 Python 패키지를 사용하여 예제 데이터를 생성하는 데이터 원본을 faker
만듭니다. 자세한 faker
내용은 Faker 설명서를 참조 하세요.
다음 명령을 사용하여 faker
패키지를 설치합니다.
%pip install faker
1단계: 예제 DataSource 정의
먼저 새 PySpark DataSource를 이름, schema및 판독기를 사용하여 DataSource
하위 클래스로 정의합니다.
reader()
일괄 처리 쿼리의 데이터 원본에서 읽도록 메서드를 정의해야 합니다.
from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType
class FakeDataSource(DataSource):
"""
An example data source for batch query using the `faker` library.
"""
@classmethod
def name(cls):
return "fake"
def schema(self):
return "name string, date string, zipcode string, state string"
def reader(self, schema: StructType):
return FakeDataSourceReader(schema, self.options)
2단계: 일괄 처리 쿼리에 대한 판독기 구현
다음으로, generate 예제 데이터에 대한 판독 논리를 구현합니다. 설치된 faker
라이브러리를 사용하여 schema각 필드를 채웁다.
class FakeDataSourceReader(DataSourceReader):
def __init__(self, schema, options):
self.schema: StructType = schema
self.options = options
def read(self, partition):
# Library imports must be within the method.
from faker import Faker
fake = Faker()
# Every value in this `self.options` dictionary is a string.
num_rows = int(self.options.get("numRows", 3))
for _ in range(num_rows):
row = []
for field in self.schema.fields:
value = getattr(fake, field.name)()
row.append(value)
yield tuple(row)
3단계: 예제 데이터 원본 등록 및 사용
데이터 원본을 사용하려면 데이터 원본을 등록합니다. 기본적으로 FakeDataSource
세 개의 행이 있으며 schemaname
, date
, zipcode
, state
등의 string
필드를 포함합니다. 다음 예제에서는 기본값을 사용하여 예제 데이터 원본을 등록, 로드 및 출력합니다.
spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
| name| date|zipcode| state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24| 79766| Colorado|
| Shelby Cox|2011-08-05| 24596| Florida|
| Amanda Robinson|2019-01-06| 57395|Washington|
+-----------------+----------+-------+----------+
string
필드만 지원되지만, 테스트 및 개발을 위한 임의 데이터를 generate하기 위해 faker
패키지 providers의 필드에 해당하는 어느 필드라도 schema로 지정할 수 있습니다. 다음 예제에서는 데이터 원본과 name
필드를 로드 company
합니다.
spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name |company |
+---------------------+--------------+
|Tanner Brennan |Adams Group |
|Leslie Maxwell |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc |
+---------------------+--------------+
사용자 지정 행 수를 사용하여 데이터 원본을 로드하려면 옵션을 지정 numRows
합니다. 다음 예제에서는 5개의 행을 지정합니다.
spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
| name| date|zipcode| state|
+--------------+----------+-------+------------+
| Pam Mitchell|1988-10-20| 23788| Tennessee|
|Melissa Turner|1996-06-14| 30851| Nevada|
| Brian Ramsey|2021-08-21| 55277| Washington|
| Caitlin Reed|1983-06-22| 89813|Pennsylvania|
| Douglas James|2007-01-18| 46226| Alabama|
+--------------+----------+-------+------------+
예제 2: 읽기 및 쓰기 스트리밍을 위한 PySpark DataSource 만들기
PySpark DataSource 스트림 판독기 및 기록기 기능을 보여 주려면 Python 패키지를 사용하여 모든 마이크로배치에서 두 개의 행을 생성하는 예제 데이터 원본을 faker
만듭니다. 자세한 faker
내용은 Faker 설명서를 참조 하세요.
다음 명령을 사용하여 faker
패키지를 설치합니다.
%pip install faker
1단계: 예제 DataSource 정의
먼저 이름, schema및 메서드 streamReader()
및 streamWriter()
사용하여 새 PySpark DataSource를 DataSource
서브클래스로 정의합니다.
from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType
class FakeStreamDataSource(DataSource):
"""
An example data source for streaming read and write using the `faker` library.
"""
@classmethod
def name(cls):
return "fakestream"
def schema(self):
return "name string, state string"
def streamReader(self, schema: StructType):
return FakeStreamReader(schema, self.options)
# If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
# def simpleStreamReader(self, schema: StructType):
# return SimpleStreamReader()
def streamWriter(self, schema: StructType, overwrite: bool):
return FakeStreamWriter(self.options)
2단계: 스트림 판독기 구현
다음으로, 모든 마이크로배치에서 두 개의 행을 생성하는 스트리밍 데이터 판독기 예제를 구현합니다. 구현 DataSourceStreamReader
하거나 데이터 원본의 처리량이 낮고 분할이 필요하지 않은 경우 대신 구현 SimpleDataSourceStreamReader
할 수 있습니다.
simpleStreamReader()
구현 streamReader()
되거나 구현되어야 하며 simpleStreamReader()
구현되지 않은 경우에만 호출 streamReader()
됩니다.
DataSourceStreamReader 구현
streamReader
인스턴스에는 DataSourceStreamReader
인터페이스로 구현된 정수 offset이 있으며, 이 정수는 각 마이크로배치마다 2씩 증가합니다.
class RangePartition(InputPartition):
def __init__(self, start, end):
self.start = start
self.end = end
class FakeStreamReader(DataSourceStreamReader):
def __init__(self, schema, options):
self.current = 0
def initialOffset(self) -> dict:
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def latestOffset(self) -> dict:
"""
Returns the current latest offset that the next microbatch will read to.
"""
self.current += 2
return {"offset": self.current}
def partitions(self, start: dict, end: dict):
"""
Plans the partitioning of the current microbatch defined by start and end offset. It
needs to return a sequence of :class:`InputPartition` objects.
"""
return [RangePartition(start["offset"], end["offset"])]
def commit(self, end: dict):
"""
This is invoked when the query has finished processing data before end offset. This
can be used to clean up the resource.
"""
pass
def read(self, partition) -> Iterator[Tuple]:
"""
Takes a partition as an input and reads an iterator of tuples from the data source.
"""
start, end = partition.start, partition.end
for i in range(start, end):
yield (i, str(i))
SimpleDataSourceStreamReader 구현
인스턴스는 SimpleStreamReader
모든 일괄 처리에서 두 개의 행을 생성하지만 분할 없이 인터페이스로 구현되는 인스턴스와 FakeStreamReader
동일합니다SimpleDataSourceStreamReader
.
class SimpleStreamReader(SimpleDataSourceStreamReader):
def initialOffset(self):
"""
Returns the initial start offset of the reader.
"""
return {"offset": 0}
def read(self, start: dict) -> (Iterator[Tuple], dict):
"""
Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
"""
start_idx = start["offset"]
it = iter([(i,) for i in range(start_idx, start_idx + 2)])
return (it, {"offset": start_idx + 2})
def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
"""
Takes start and end offset as inputs, then reads an iterator of data deterministically.
This is called when the query replays batches during restart or after a failure.
"""
start_idx = start["offset"]
end_idx = end["offset"]
return iter([(i,) for i in range(start_idx, end_idx)])
def commit(self, end):
"""
This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
"""
pass
3단계: 스트림 작성기 구현
이제 스트리밍 기록기를 구현합니다. 이 스트리밍 데이터 작성기는 각 마이크로배치의 메타데이터 정보를 로컬 경로에 씁니다.
class SimpleCommitMessage(WriterCommitMessage):
partition_id: int
count: int
class FakeStreamWriter(DataSourceStreamWriter):
def __init__(self, options):
self.options = options
self.path = self.options.get("path")
assert self.path is not None
def write(self, iterator):
"""
Writes the data, then returns the commit message of that partition. Library imports must be within the method.
"""
from pyspark import TaskContext
context = TaskContext.get()
partition_id = context.partitionId()
cnt = 0
for row in iterator:
cnt += 1
return SimpleCommitMessage(partition_id=partition_id, count=cnt)
def commit(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
"""
status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
file.write(json.dumps(status) + "\n")
def abort(self, messages, batchId) -> None:
"""
Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
In this FakeStreamWriter, a failure message is written into a text file inside abort().
"""
with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
file.write(f"failed in batch {batchId}")
4단계: 예제 데이터 원본 등록 및 사용
데이터 원본을 사용하려면 데이터 원본을 등록합니다. Regsitered 후에는 짧은 이름 또는 전체 이름을 format()
전달하여 스트리밍 쿼리에서 원본 또는 싱크로 사용할 수 있습니다. 다음 예제에서는 데이터 원본을 등록한 다음 예제 데이터 원본에서 읽고 콘솔로 출력하는 쿼리를 시작합니다.
spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()
또는 다음 예제에서는 예제 스트림을 싱크로 사용하고 출력 경로를 지정합니다.
query = spark.readStream.format("fakestream").load().writeStream.format("fake").start("/output_path")
문제 해결
출력이 다음 오류인 경우 컴퓨팅은 PySpark 사용자 지정 데이터 원본을 지원하지 않습니다. Databricks Runtime 15.2 이상을 사용해야 합니다.
Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000