Condividi tramite


Caricare i dati utilizzando Mosaic Streaming

Questo articolo descrive come utilizzare Mosaic Streaming per convertire i dati da Apache Spark in un formato compatibile con PyTorch.

Mosaic Streaming è una libreria di caricamento dati open source. Consente il training e la valutazione a nodo singolo o distribuito di modelli di Deep Learning da set di dati già caricati come dataframe Apache Spark. Mosaic Streaming supporta principalmente Mosaic Composer, ma si integra anche con PyTorch nativo, PyTorch Lightning e TorchDistributor. Mosaic Streaming offre una serie di vantaggi rispetto ai dataloader PyTorch tradizionali, come ad esempio:

  • Compatibilità con qualsiasi tipo di dati, comprese immagini, testi, video e dati multimodali.
  • Supporto per le principali providers di archiviazione cloud (AWS, OCI, GCS, Azure, Databricks UC Volume e qualsiasi archivio oggetti compatibile con S3, ad esempio Cloudflare R2, Coreweave, Backblaze b2 e così via)
  • Ottimizzazione delle garanzie di correttezza, prestazioni, flessibilità e facilità d'uso. Per altre informazioni, visualizzare la pagina delle funzionalità principali.

Per informazioni generali su Mosaic Streaming, si veda la documentazione dell'API Streaming.

Nota

Mosaic Streaming è stato preinstallato in tutte le versioni di Databricks Runtime 15.2 ML e nelle versioni successive.

Caricare dati da dataframe Spark utilizzando Mosaic Streaming

Mosaic Streaming offre un flusso di lavoro semplice per la conversione da Apache Spark al formato Mosaic Data Shard (MDS), che può essere caricato per l'uso in un ambiente distribuito.

Il flusso di lavoro consigliato è:

  1. Usare Apache Spark per caricare e facoltativamente pre-elaborare i dati.
  2. Usare streaming.base.converters.dataframe_to_mds per salvare il dataframe su disco per l'archiviazione temporanea e/o in un volume Catalog Unity per l'archiviazione permanente. Questi dati verranno archiviati nel formato MDS e possono essere ulteriormente ottimizzati con il supporto per la compressione e l'hashing. I casi d'uso avanzati possono comprendere anche la pre-elaborazione dei dati tramite funzioni definite dall'utente. Per altre informazioni, vedere l'esercitazione sul dataframe Spark in MDS.
  3. Usare streaming.StreamingDataset per caricare in memoria i dati necessari. StreamingDataset è una versione di IterableDataset di PyTorch che offre una sequenza casuale deterministica in modo elastico, che consente una ripresa rapida a metà ciclo. Per maggiori informazioni, consultare la documentazione StreamingDataset.
  4. Usare streaming.StreamingDataLoader per caricare i dati necessari per training/valutazione/test. StreamingDataLoader è una versione di DataLoader di PyTorch che fornisce un'interfaccia di checkpoint/ripresa aggiuntiva, per cui tiene traccia del numero di campioni visti dal modello in questa classificazione.

Per un esempio end-to-end, si veda il seguente notebook:

Semplificare il caricamento dei dati da Spark a PyTorch utilizzando il notebook di Mosaic Streaming

Get portatile

Risoluzione dei problemi: errore di autenticazione

Se viene visualizzato l'errore seguente durante il caricamento dei dati da un volume di Unity Catalog usando StreamingDataset, set le variabili di ambiente come illustrato di seguito.

ValueError: default auth: cannot configure default credentials, please check https://docs.databricks.com/en/dev-tools/auth.html#databricks-client-unified-authentication to configure credentials for your preferred authentication method.

Nota

Se questo errore viene visualizzato quando si esegue il training distribuito usando TorchDistributor, è necessario anche set le variabili di ambiente sui nodi di lavoro.

db_host = "https://your-databricks-host.databricks.com"
db_token = "YOUR API TOKEN" # Create a token with either method from https://docs.databricks.com/en/dev-tools/auth/index.html#databricks-authentication-methods

def your_training_function():
  import os
  os.environ['DATABRICKS_HOST'] = db_host
  os.environ['DATABRICKS_TOKEN'] = db_token

# The above function can be distributed with TorchDistributor:
# from pyspark.ml.torch.distributor import TorchDistributor
# distributor = TorchDistributor(...)
# distributor.run(your_training_function)