Condividi tramite


Dettagli di implementazione dei flussi Disass

In questa sezione viene fornita una panoramica generale dell'implementazione di Stream di Stream. Descrive i concetti e i dettagli che non sono visibili a livello di applicazione. Se si prevede di usare solo i flussi, non è necessario leggere questa sezione.

Terminologia:

La parola "coda" si riferisce a qualsiasi tecnologia di archiviazione durevole in grado di inserire eventi di flusso e consente di eseguire il pull degli eventi o fornisce un meccanismo basato su push per utilizzare gli eventi. In genere, per garantire la scalabilità, queste tecnologie forniscono code partizionate/partizionate. Ad esempio, Le code di Azure consentono di creare più code e Hub eventi ha più hub.

Flussi persistenti

Tutti i provider di flussi persistenti Dias condividono un'implementazione comune di PersistentStreamProvider. Questi provider di flussi generici devono essere configurati con un oggetto specifico della tecnologia IQueueAdapterFactory.

A scopo di test, ad esempio, sono stati generati adattatori di coda che generano i dati di test invece di leggerne i dati da una coda. Il codice seguente illustra come configurare un provider di flussi persistente per l'uso dell'adattatore di coda personalizzato (generatore). A tale scopo, configura il provider di flussi persistente con una funzione factory usata per creare l'adapter.

hostBuilder.AddPersistentStreams(
    StreamProviderName, GeneratorAdapterFactory.Create);

Quando un producer di stream.OnNext()flusso genera un nuovo elemento di flusso e chiama , il runtime IQueueAdapter di streaming Disaccodati richiama il metodo appropriato sull'oggetto del provider di flussi che accoda l'elemento direttamente nella coda appropriata.

Pull degli agenti

Alla base del provider di flussi persistenti ci sono gli agenti di pull. Il pull degli agenti estrae gli eventi da un set di code durevoli e li recapita al codice dell'applicazione in granularità che li utilizza. È possibile pensare agli agenti di pull come a un "microservizi" distribuito, un componente partizionato, a disponibilità elevata ed elastico distribuito. Gli agenti di pull vengono eseguiti all'interno degli stessi silo che ospitano i granulari dell'applicazione e sono completamente gestiti dal runtime di streaming diAnsanss.

StreamQueueMapper e StreamQueueBalancer

Gli agenti di pull sono parametrizzati con IStreamQueueMapper e IStreamQueueBalancer. fornisce IStreamQueueMapper un elenco di tutte le code ed è anche responsabile del mapping dei flussi alle code. In questo modo, il lato producer del provider di flussi persistenti sa in quale coda accodare il messaggio.

esprime IStreamQueueBalancer il modo in cui le code vengono bilanciate tra i silo e gli agenti di Unana. L'obiettivo è quello di assegnare le code agli agenti in modo bilanciato, per evitare colli di bottiglia e supportare l'elasticità. Quando viene aggiunto un nuovo silo al cluster Disassociati, le code vengono automaticamente ribilanciati tra i silo nuovi e vecchi. consente StreamQueueBalancer la personalizzazione di tale processo. Per Supportare diversi scenari di bilanciamento (numero elevato e ridotto di code) e ambienti diversi (Azure, locali, statici), Il servizio Disaccostie dispone di diversi ambienti predefiniti.

Usando l'esempio del generatore di test precedente, il codice seguente illustra come è possibile configurare il mapper di code e il servizio di bilanciamento della coda.

hostBuilder
    .AddPersistentStreams(StreamProviderName, GeneratorAdapterFactory.Create,
        providerConfigurator =>
        providerConfigurator.Configure<HashRingStreamQueueMapperOptions>(
            ob => ob.Configure(options => options.TotalQueueCount = 8))
      .UseDynamicClusterConfigDeploymentBalancer());

Il codice precedente configura per l'uso GeneratorAdapterFactory di un mapper di code con otto code e bilancia le code nel cluster usando DynamicClusterConfigDeploymentBalancer.

Protocollo di pull

Ogni silo esegue un set di agenti di pull, ogni agente esegue il pull da una coda. Gli agenti di pull stessi vengono implementati da un componente di runtime interno, denominato SystemTarget. Le destinazione di sistema sono essenzialmente granulari di runtime, sono soggette alla concorrenza a thread singolo, possono usare la messaggistica con granularità regolare e sono leggere quanto i granulari. A differenza dei granulari, le destinazione di sistema non sono virtuali: vengono create in modo esplicito (dal runtime) e non sono trasparenti per la posizione. Implementando gli agenti di pull come SystemTargets, il runtime di streaming di Streaming di Streaming Disarticolta può basarsi sulle funzionalità integrate e può essere ridimensionato fino a un numero molto elevato di code, poiché la creazione di un nuovo agente di pull è economica quanto la creazione di una nuova granularità.

Ogni agente di pull esegue un timer periodico che esegue il pull dalla coda richiamando il IQueueAdapterReceiver.GetQueueMessagesAsync metodo . I messaggi restituiti vengono inseriti nella struttura dei dati interna per agente denominata IQueueCache. Ogni messaggio viene controllato per individuare il flusso di destinazione. L'agente usa il Pub-Sub per trovare l'elenco dei consumer di flusso che hanno sottoscritto questo flusso. Dopo aver recuperato l'elenco di consumer, l'agente lo archivia in locale (nella cache pub-sub) in modo che non sia necessario consultare Pub-Sub su ogni messaggio. L'agente sottoscrive anche la sottoscrizione pub-sub per ricevere la notifica di eventuali nuovi consumer che sottoscriveranno tale flusso. Questo handshake tra l'agente e pub-sub garantisce una semantica di sottoscrizione di streaming solida: dopo che il consumer ha sottoscritto il flusso, verranno visualizzati tutti gli eventi generati dopo la sottoscrizione. Inoltre, l'uso StreamSequenceToken di consente di sottoscrivere in passato.

Cache delle code

IQueueCache è una struttura di dati interna per agente che consente di separare i nuovi eventi dalla coda e di recapitarli ai consumer. Consente anche di separare il recapito a flussi diversi e a consumer diversi.

Imagine situazione in cui un flusso ha 3 consumer di flusso e uno di essi è lento. Se non si fa attenzione, questo consumer lento può influire sullo stato di avanzamento dell'agente, rallentando l'utilizzo di altri consumer di tale flusso e anche rallentando la rimozione della coda e la distribuzione di eventi per altri flussi. Per evitare che e consentano il massimo parallelismo nell'agente, si usa IQueueCache.

IQueueCache il buffer consente di trasmettere eventi al flusso e consente all'agente di recapitare gli eventi a ogni consumer al proprio ritmo. La distribuzione per consumer viene implementata dal componente interno denominato IQueueCacheCursor, che tiene traccia dello stato di avanzamento per consumer. In questo modo, ogni consumer riceve gli eventi al proprio ritmo: i consumer veloci ricevono gli eventi con la stessa rapidità con cui vengono accodati dalla coda, mentre i consumer lenti li ricevono in un secondo momento. Dopo che il messaggio è stato recapitato a tutti i consumer, può essere eliminato dalla cache.

Contropressione

Il backpressure nel runtime di streaming di Stream si applica in due posizioni: il trasferimento degli eventi di flusso dalla coda all'agente e il recapito degli eventi dall'agente ai consumer di flussi.

Quest'ultimo viene fornito dal meccanismo di recapito dei messaggi predefinito Disas. Ogni evento di flusso viene recapitato dall'agente ai consumer tramite la messaggistica con granularità Standard, uno alla volta. Ovvero, gli agenti inviano un evento (o un batch di eventi di dimensioni limitate) a ogni consumer di flusso e attendono questa chiamata. L'evento successivo non verrà recapitato fino a quando l'attività per l'evento precedente non è stata risolta o interrotta. In questo modo si limita naturalmente la frequenza di recapito per consumer a un messaggio alla volta.

Quando si portano gli eventi di flusso dalla coda all'agente, Streaming Disas offre un nuovo meccanismo di backpressure speciale. Poiché l'agente disaccoglie la coda degli eventi dalla coda e li recapita ai consumer, un singolo consumer IQueueCache lento può essere in ritardo a tal punto da riempirsi. Per evitare IQueueCache una crescita illimitata, è necessario limitarne le dimensioni (il limite di dimensioni è configurabile). Tuttavia, l'agente non genera mai eventi non recapitati.

Al contrario, quando la cache inizia a riempirsi, gli agenti rallentano la frequenza di rimozione degli eventi dalla coda. In questo modo, è possibile "ride out" i periodi di recapito lenti regolando la frequenza con cui si utilizza la coda ("backpressure") e tornare ai tassi di consumo rapidi in un secondo momento. Per rilevare il "recapito lento" IQueueCache , usa una struttura di dati interna dei bucket della cache che tiene traccia dello stato di avanzamento del recapito degli eventi ai singoli consumer di flussi. Il risultato è un sistema molto reattivo e a regolazione autonoma.