共用方式為


PySpark 自訂資料來源

重要

PySpark 自定義數據源在 Databricks Runtime 15.2 和更新版本中處於 公開預覽 狀態。 Databricks Runtime 15.3 和更新版本提供串流支援。

PySpark DataSource 是由 Python (PySpark) DataSource API 所建立,可讓您從自定義數據源讀取,並使用 Python 寫入 Apache Spark 中的自定義數據接收器。 您可以使用 PySpark 自訂數據源來定義與數據系統的自定義連線,並實作其他功能,以建置可重複使用的數據源。

資料來源類別

PySpark DataSource 是基類,提供建立數據讀取器和寫入器的方法。

實作數據源子類別

根據您的使用案例,下列項目必須由任何子類別實作,讓數據源成為可讀取、可寫入或兩者:

屬性或方法 描述
name 必要。 數據源的名稱
schema 必要。 要讀取或寫入之數據源的架構
reader() 必須傳回 DataSourceReader ,讓資料源成為可讀取的 (批次)
writer() 必須傳回 DataSourceWriter ,才能讓資料接收可寫入 (批次)
streamReader()simpleStreamReader() 必須傳回 DataSourceStreamReader ,才能讓資料流成為可讀取的 (串流)
streamWriter() 必須傳回 DataSourceStreamWriter ,才能讓資料流寫入(串流)

注意

用戶定義的 DataSourceDataSourceReader、、DataSourceWriterDataSourceStreamReaderDataSourceStreamWriter、 及其方法必須能夠串行化。 換句話說,它們必須是包含基本類型的字典或巢狀字典。

註冊資料來源

實作 介面之後,您必須註冊它,然後您可以載入或使用它,如下列範例所示:

# Register the data source
spark.dataSource.register(MyDataSourceClass)

# Read from a custom data source
spark.read.format("my_datasource_name").load().show()

範例 1:建立 Batch 查詢的 PySpark DataSource

若要示範 PySpark DataSource 讀取器功能,請建立數據源,以使用 faker Python 套件產生範例數據。 如需 的詳細資訊 faker,請參閱 Faker 檔

使用下列命令安裝 faker 套件:

%pip install faker

步驟 1:定義範例 DataSource

首先,使用名稱、架構和讀取器,將新的 PySpark DataSource 定義為 的 DataSource 子類別。 reader()必須定義 方法,才能從批次查詢中的數據源讀取。

from pyspark.sql.datasource import DataSource, DataSourceReader
from pyspark.sql.types import StructType

class FakeDataSource(DataSource):
    """
    An example data source for batch query using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fake"

    def schema(self):
        return "name string, date string, zipcode string, state string"

    def reader(self, schema: StructType):
        return FakeDataSourceReader(schema, self.options)

步驟 2:實作批次查詢的讀取器

接下來,實作讀取器邏輯以產生範例數據。 使用已安裝 faker 的連結庫填入架構中的每個欄位。

class FakeDataSourceReader(DataSourceReader):

    def __init__(self, schema, options):
        self.schema: StructType = schema
        self.options = options

    def read(self, partition):
        # Library imports must be within the method.
        from faker import Faker
        fake = Faker()

        # Every value in this `self.options` dictionary is a string.
        num_rows = int(self.options.get("numRows", 3))
        for _ in range(num_rows):
            row = []
            for field in self.schema.fields:
                value = getattr(fake, field.name)()
                row.append(value)
            yield tuple(row)

步驟 3:註冊並使用範例數據源

若要使用數據源,請加以註冊。 根據預設,有FakeDataSource三個數據列,而且架構包含下列string欄位:name、、datestatezipcode。 下列範例會使用預設值來註冊、載入及輸出範例數據來源:

spark.dataSource.register(FakeDataSource)
spark.read.format("fake").load().show()
+-----------------+----------+-------+----------+
|             name|      date|zipcode|     state|
+-----------------+----------+-------+----------+
|Christine Sampson|1979-04-24|  79766|  Colorado|
|       Shelby Cox|2011-08-05|  24596|   Florida|
|  Amanda Robinson|2019-01-06|  57395|Washington|
+-----------------+----------+-------+----------+

string 支援欄位,但您可以使用任何對應至 faker 封裝提供者欄位的欄位來指定架構,以產生隨機數據以進行測試和開發。 下列範例會使用 和 company 欄位載入資料來源name

spark.read.format("fake").schema("name string, company string").load().show()
+---------------------+--------------+
|name                 |company       |
+---------------------+--------------+
|Tanner Brennan       |Adams Group   |
|Leslie Maxwell       |Santiago Group|
|Mrs. Jacqueline Brown|Maynard Inc   |
+---------------------+--------------+

若要使用自定義的數據列數目載入資料源,請指定 numRows 選項。 下列範例會指定 5 個資料列:

spark.read.format("fake").option("numRows", 5).load().show()
+--------------+----------+-------+------------+
|          name|      date|zipcode|       state|
+--------------+----------+-------+------------+
|  Pam Mitchell|1988-10-20|  23788|   Tennessee|
|Melissa Turner|1996-06-14|  30851|      Nevada|
|  Brian Ramsey|2021-08-21|  55277|  Washington|
|  Caitlin Reed|1983-06-22|  89813|Pennsylvania|
| Douglas James|2007-01-18|  46226|     Alabama|
+--------------+----------+-------+------------+

範例 2:建立 PySpark DataSource 以串流讀取和寫入

若要示範 PySpark DataSource 數據流讀取器和寫入器功能,請建立範例數據源,以使用 faker Python 套件在每個微批次中產生兩個數據列。 如需 的詳細資訊 faker,請參閱 Faker 檔

使用下列命令安裝 faker 套件:

%pip install faker

步驟 1:定義範例 DataSource

首先,將新的 PySpark DataSource 定義為具有名稱、架構和方法和 streamReader() 的子類別DataSourcestreamWriter()

from pyspark.sql.datasource import DataSource, DataSourceStreamReader, SimpleDataSourceStreamReader, DataSourceStreamWriter
from pyspark.sql.types import StructType

class FakeStreamDataSource(DataSource):
    """
    An example data source for streaming read and write using the `faker` library.
    """

    @classmethod
    def name(cls):
        return "fakestream"

    def schema(self):
        return "name string, state string"

    def streamReader(self, schema: StructType):
        return FakeStreamReader(schema, self.options)

    # If you don't need partitioning, you can implement the simpleStreamReader method instead of streamReader.
    # def simpleStreamReader(self, schema: StructType):
    #    return SimpleStreamReader()

    def streamWriter(self, schema: StructType, overwrite: bool):
        return FakeStreamWriter(self.options)

步驟 2:實作數據流讀取器

接下來,實作範例串流數據讀取器,以在每個 microbatch 中產生兩個數據列。 您可以實 DataSourceStreamReader作 ,或如果數據源的輸送量較低,而且不需要數據分割,您可以改為實 SimpleDataSourceStreamReader 作。 或 simpleStreamReader() streamReader() 必須實作,而且 simpleStreamReader() 只有在未實作時 streamReader() 才會叫用。

DataSourceStreamReader 實作

實例 streamReader 有一個整數位移,會在每個 microbatch 中增加 2,以 介面實作 DataSourceStreamReader

class RangePartition(InputPartition):
    def __init__(self, start, end):
        self.start = start
        self.end = end

class FakeStreamReader(DataSourceStreamReader):
    def __init__(self, schema, options):
        self.current = 0

    def initialOffset(self) -> dict:
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def latestOffset(self) -> dict:
        """
        Returns the current latest offset that the next microbatch will read to.
        """
        self.current += 2
        return {"offset": self.current}

    def partitions(self, start: dict, end: dict):
        """
        Plans the partitioning of the current microbatch defined by start and end offset. It
        needs to return a sequence of :class:`InputPartition` objects.
        """
        return [RangePartition(start["offset"], end["offset"])]

    def commit(self, end: dict):
        """
        This is invoked when the query has finished processing data before end offset. This
        can be used to clean up the resource.
        """
        pass

    def read(self, partition) -> Iterator[Tuple]:
        """
        Takes a partition as an input and reads an iterator of tuples from the data source.
        """
        start, end = partition.start, partition.end
        for i in range(start, end):
            yield (i, str(i))

SimpleDataSourceStreamReader 實作

實例 SimpleStreamReaderFakeStreamReader 每個批次中產生兩個數據列的實例相同,但使用 SimpleDataSourceStreamReader 介面實作而不進行分割。

class SimpleStreamReader(SimpleDataSourceStreamReader):
    def initialOffset(self):
        """
        Returns the initial start offset of the reader.
        """
        return {"offset": 0}

    def read(self, start: dict) -> (Iterator[Tuple], dict):
        """
        Takes start offset as an input, then returns an iterator of tuples and the start offset of the next read.
        """
        start_idx = start["offset"]
        it = iter([(i,) for i in range(start_idx, start_idx + 2)])
        return (it, {"offset": start_idx + 2})

    def readBetweenOffsets(self, start: dict, end: dict) -> Iterator[Tuple]:
        """
        Takes start and end offset as inputs, then reads an iterator of data deterministically.
        This is called when the query replays batches during restart or after a failure.
        """
        start_idx = start["offset"]
        end_idx = end["offset"]
        return iter([(i,) for i in range(start_idx, end_idx)])

    def commit(self, end):
        """
        This is invoked when the query has finished processing data before end offset. This can be used to clean up resources.
        """
        pass

步驟 3:實作數據流寫入器

現在實作串流寫入器。 這個串流數據寫入器會將每個 microbatch 的元數據資訊寫入本機路徑。

class SimpleCommitMessage(WriterCommitMessage):
   partition_id: int
   count: int

class FakeStreamWriter(DataSourceStreamWriter):
   def __init__(self, options):
       self.options = options
       self.path = self.options.get("path")
       assert self.path is not None

   def write(self, iterator):
       """
       Writes the data, then returns the commit message of that partition. Library imports must be within the method.
       """
       from pyspark import TaskContext
       context = TaskContext.get()
       partition_id = context.partitionId()
       cnt = 0
       for row in iterator:
           cnt += 1
       return SimpleCommitMessage(partition_id=partition_id, count=cnt)

   def commit(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` when all write tasks have succeeded, then decides what to do with it.
       In this FakeStreamWriter, the metadata of the microbatch(number of rows and partitions) is written into a JSON file inside commit().
       """
       status = dict(num_partitions=len(messages), rows=sum(m.count for m in messages))
       with open(os.path.join(self.path, f"{batchId}.json"), "a") as file:
           file.write(json.dumps(status) + "\n")

   def abort(self, messages, batchId) -> None:
       """
       Receives a sequence of :class:`WriterCommitMessage` from successful tasks when some other tasks have failed, then decides what to do with it.
       In this FakeStreamWriter, a failure message is written into a text file inside abort().
       """
       with open(os.path.join(self.path, f"{batchId}.txt"), "w") as file:
           file.write(f"failed in batch {batchId}")

步驟 4:註冊並使用範例數據源

若要使用數據源,請加以註冊。 在註冊之後,您可以將簡短名稱或完整名稱傳遞至 format(),將其用於串流查詢作為來源或接收。 下列範例會註冊數據源,然後啟動從範例數據源讀取並輸出至主控台的查詢:

spark.dataSource.register(FakeStreamDataSource)
query = spark.readStream.format("fakestream").load().writeStream.format("console").start()

或者,下列範例會使用範例數據流作為接收,並指定輸出路徑:

query = spark.readStream.format("fakestream").load().writeStream.format("fake").start("/output_path")

疑難排解

如果輸出是下列錯誤,您的計算不支援 PySpark 自定義數據源。 您必須使用 Databricks Runtime 15.2 或更新版本。

Error: [UNSUPPORTED_FEATURE.PYTHON_DATA_SOURCE] The feature is not supported: Python data sources. SQLSTATE: 0A000