Condividi tramite


Leggere l'input in qualsiasi formato usando deserializzatori personalizzati .NET (anteprima)

I deserializzatori personalizzati .NET consentono ai processi di Analisi di flusso di Azure di leggere dati da formati diversi dai tre formati dati predefiniti. Questo articolo illustra il formato di serializzazione e le interfacce che definiscono i deserializzatori personalizzati .NET per i processi cloud ed Edge di Analisi di flusso di Azure. Sono disponibili anche deserializzatori di esempio per il buffer del protocollo e il formato CSV.

Deserializzatore personalizzato .NET

Gli esempi di codice seguenti sono interfacce che definiscono il deserializzatore personalizzato e implementano StreamDeserializer<T>.

UserDefinedOperator è la classe di base per tutti gli operatori di streaming personalizzati. Inizializza StreamingContext, che fornisce il contesto, che include il meccanismo per la pubblicazione della diagnostica per cui è necessario eseguire il debug di eventuali problemi con il deserializzatore.

    public abstract class UserDefinedOperator
    {
        public abstract void Initialize(StreamingContext streamingContext);
    }

Il frammento di codice seguente rappresenta la deserializzazione per lo streaming di dati.

Gli errori ignorabili devono essere generati passando IStreamingDiagnostics al metodo Initialize di UserDefinedOperator. Tutte le eccezioni verranno considerate come errori e il deserializzatore verrà ricreato. Dopo alcuni errori, il processo passerà a uno stato di errore.

StreamDeserializer<T> deserializza un flusso in un oggetto di tipo T. Le condizioni seguenti devono essere soddisfatte:

  1. T è una classe o uno struct.
  2. Tutti i campi pubblici in T hanno una delle seguenti caratteristiche:
    1. Sono di tipo [sbyte, byte, short, ushort, int, uint, long, DateTime, string, float, double] o rispettivi equivalenti che ammettono i valori Null.
    2. Corrispondono a un altro struct o a un'altra classe che segue le stesse regole.
    3. Sono una matrice di tipo T2 che segue le stesse regole.
    4. IListT2 dove T2 segue le stesse regole.
    5. Non ha tipi ricorsivi.

Il parametro stream è il flusso contenente l'oggetto serializzato. Deserialize restituisce una raccolta di istanze di T.

    public abstract class StreamDeserializer<T> : UserDefinedOperator
    {
        public abstract IEnumerable<T> Deserialize(Stream stream);
    }

StreamingContext fornisce il contesto, che include il meccanismo per la pubblicazione della diagnostica per l'operatore utente.

    public abstract class StreamingContext
    {
        public abstract StreamingDiagnostics Diagnostics { get; }
    }

StreamingDiagnostics rappresenta la diagnostica per gli operatori definiti dall'utente, che include serializzatore, deserializzatore e funzioni definite dall'utente.

WriteError scrive un messaggio di errore nei log delle risorse e invia l'errore alla diagnostica.

briefMessage è un messaggio di errore breve. Questo messaggio viene visualizzato nella diagnostica e viene usato dal team di prodotto a scopo di debug. Non includere informazioni riservate e mantenere il messaggio meno di 200 caratteri

detailedMessage è un messaggio di errore dettagliato che viene aggiunto solo ai log delle risorse nella risorsa di archiviazione in uso. Il messaggio deve contenere meno di 2000 caratteri.

    public abstract class StreamingDiagnostics
    {
        public abstract void WriteError(string briefMessage, string detailedMessage);
    }

Esempi di deserializzatori

Questa sezione illustra come scrivere deserializzatori personalizzati per Protobuf e CSV. Per altri esempi, ad esempio il formato AVRO per l'acquisizione di Hub eventi, vedere Analisi di flusso di Azure su GitHub.

Formato buffer del protocollo (Protobuf)

Questo esempio usa il formato del buffer del protocollo.

Si presupponga la definizione del buffer del protocollo seguente.

syntax = "proto3";
// protoc.exe from nuget "Google.Protobuf.Tools" is used to generate .cs file from this schema definition.
// Run below command to generate the csharp class
// protoc.exe --csharp_out=. MessageBodyProto.proto

package SimulatedTemperatureSensor;
message MessageBodyProto {
    message Ambient {
      double temperature = 1;
      int64 humidity = 2;
    }

    message Machine {
      double temperature = 1;
      double pressure = 2;
    }

    Machine machine = 1;
    Ambient ambient = 2;
    string timeCreated = 3;
}

Eseguendo protoc.exe da Google.Protobuf.Tools NuGet genera un file con estensione cs con la definizione. Il file generato non viene visualizzato qui. È necessario assicurarsi che la versione di Protobuf NuGet usata nel progetto di Analisi di flusso corrisponda alla versione protobuf usata per generare l'input.

Il frammento di codice seguente è l'implementazione del deserializzatore. Si presume che il file generato sia incluso nel progetto. Questa implementazione è solo un thin wrapper del file generato.

    public class MessageBodyDeserializer : StreamDeserializer<SimulatedTemperatureSensor.MessageBodyProto>
    {
        public override IEnumerable<SimulatedTemperatureSensor.MessageBodyProto> Deserialize(Stream stream)
        {
            while (stream.Position < stream.Length)
            {
                yield return SimulatedTemperatureSensor.MessageBodyProto.Parser.ParseDelimitedFrom(stream);
            }
        }

        public override void Initialize(StreamingContext streamingContext)
        {
        }
    }

CSV

Il frammento di codice seguente è un deserializzatore CSV semplice che illustra anche la propagazione degli errori.

using System.Collections.Generic;
using System.IO;

using Microsoft.Azure.StreamAnalytics;
using Microsoft.Azure.StreamAnalytics.Serialization;

namespace ExampleCustomCode.Serialization
{
    public class CustomCsvDeserializer : StreamDeserializer<CustomEvent>
    {
        private StreamingDiagnostics streamingDiagnostics;

        public override IEnumerable<CustomEvent> Deserialize(Stream stream)
        {
            using (var sr = new StreamReader(stream))
            {
                string line = sr.ReadLine();
                while (line != null)
                {
                    if (line.Length > 0 && !string.IsNullOrWhiteSpace(line))
                    {
                        string[] parts = line.Split(',');
                        if (parts.Length != 3)
                        {
                            streamingDiagnostics.WriteError("Did not get expected number of columns", $"Invalid line: {line}");
                        }
                        else
                        {
                            yield return new CustomEvent()
                            {
                                Column1 = parts[0],
                                Column2 = parts[1],
                                Column3 = parts[2]
                            };
                        }
                    }

                    line = sr.ReadLine();
                }
            }
        }

        public override void Initialize(StreamingContext streamingContext)
        {
            this.streamingDiagnostics = streamingContext.Diagnostics;
        }
    }

    public class CustomEvent
    {
        public string Column1 { get; set; }

        public string Column2 { get; set; }

        public string Column3 { get; set; }
    }
}

Formato di serializzazione per le API REST

Ogni input di Analisi di flusso ha un formato di serializzazione. Per altre informazioni sulle opzioni di input, vedere la documentazione relativa all'API REST di input.

Il codice JavaScript seguente è un esempio del formato di serializzazione deserializzatore .NET quando si usa l'API REST:

{    
   "properties":{    
      "type":"stream",  
      "serialization":{    
         "type":"CustomCLR",  
         "properties":{    
            "serializationDllPath":"<path to the dll inside UserCustomCode\CLR\ folder>", 
            "serializationClassName":"<Full name of the deserializer class name>" 
         }  
      }
   }  
}  

serializationClassName deve essere una classe che implementa StreamDeserializer<T>. Questo concetto viene descritto nella sezione seguente.

Supporto di area

Questa funzionalità è disponibile nelle aree seguenti quando si usa lo SKU Standard:

  • Stati Uniti centro-occidentali
  • Europa settentrionale
  • Stati Uniti orientali
  • Stati Uniti occidentali
  • Stati Uniti orientali 2
  • Europa occidentale

È possibile richiedere supporto per altre aree. Tuttavia, non esiste alcuna restrizione di tale area quando si usano cluster di Analisi di flusso.

Domande frequenti

Quando sarà disponibile questa funzionalità in tutte le aree di Azure?

Questa funzionalità è disponibile in 6 aree. Se si è interessati a usare questa funzionalità in un'altra area, è possibile inviare una richiesta. Il supporto per tutte le aree di Azure è indicato nella roadmap.

È possibile accedere a MetadataPropertyValue da input simili alla funzione GetMetadataPropertyValue?

Questa funzionalità non è supportata. Se questa capacità è necessaria, è possibile votare per questa richiesta in UserVoice.

È possibile condividere l'implementazione del deserializzatore con la community in modo che altri utenti possano trarne vantaggio?

Dopo aver implementato il deserializzatore, è possibile aiutarlo condividendolo con la community. Inviare il codice al repository GitHub di Analisi di flusso di Azure.

Quali sono le altre limitazioni dell'uso di deserializzatori personalizzati in Analisi di flusso?

Se l'input è di formato Protobuf con uno schema contenente MapField un tipo, non sarà possibile implementare un deserializzatore personalizzato. Inoltre, i deserializzatori personalizzati non supportano dati di esempio o dati di anteprima.

Passaggi successivi