Observable.Create<, méthode TSource> (Func<IObserver<TSource>, IDisposable>)
Crée une séquence observable à partir d’une implémentation de méthode d’abonnement.
Espace de noms :System.Reactive.Linq
Assemblée: System.Reactive (en System.Reactive.dll)
Syntaxe
'Declaration
Public Shared Function Create(Of TSource) ( _
subscribe As Func(Of IObserver(Of TSource), IDisposable) _
) As IObservable(Of TSource)
'Usage
Dim subscribe As Func(Of IObserver(Of TSource), IDisposable)
Dim returnValue As IObservable(Of TSource)
returnValue = Observable.Create(subscribe)
public static IObservable<TSource> Create<TSource>(
Func<IObserver<TSource>, IDisposable> subscribe
)
public:
generic<typename TSource>
static IObservable<TSource>^ Create(
Func<IObserver<TSource>^, IDisposable^>^ subscribe
)
static member Create :
subscribe:Func<IObserver<'TSource>, IDisposable> -> IObservable<'TSource>
JScript does not support generic types and methods.
Paramètres de type
- TSource
Type de la source.
Paramètres
- subscribe
Type : System.Func<IObserver<TSource>, IDisposable>
Implémentation de la méthode d’abonnement de la séquence observable résultante.
Valeur de retour
Type : System.IObservable<TSource>
Séquence observable avec l’implémentation spécifiée pour la méthode d’abonnement.
Notes
L’opérateur Create vous permet de créer vos propres séquences personnalisées sans implémenter entièrement l’interface T> IObservable<pour votre séquence. À l’aide de cet opérateur, vous implémentez simplement une fonction d’abonnement qui prend un IObserver<T> où T est votre type et retourne l’interface IDisposable utilisée pour annuler l’abonnement en appelant IDisposable::D ispose(). L’utilisation de cet opérateur est préférable à l’implémentation manuelle de l’interface T> IObservable<.
Exemples
Cet exemple simule un système de tickets hypothétique où une séquence observable de tickets est fournie sans implémenter entièrement IObservable<Ticket>. La classe TicketFactory implémente sa propre méthode d’abonnement appelée TicketSubscribe. Cette méthode est passée en tant que paramètre d’abonnement à l’opérateur Create. TicketSubscribe crée une séquence continue de tickets sur un autre thread jusqu’à ce que l’abonnement soit annulé en appelant la méthode Dispose sur l’interface IDisposable retournée par TicketSubscribe. Un ticket> IObserver<est passé à TicketSubscribe. Chaque ticket de la séquence est remis en appelant IObserver<Ticket>. OnNext(). L’action d’observateur exécutée pour l’abonnement affiche chaque ticket dans la fenêtre de console.
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Reactive.Linq;
namespace Example
{
class Program
{
static void Main()
{
IObservable<Ticket> ticketObservable = Observable.Create((Func<IObserver<Ticket>, IDisposable>)TicketFactory.TicketSubscribe);
//*********************************************************************************//
//*** This is a sequence of tickets. Display each ticket in the console window. ***//
//*********************************************************************************//
using(IDisposable handle = ticketObservable.Subscribe(ticket => Console.WriteLine(ticket.ToString())))
{
Console.WriteLine("\nPress ENTER to unsubscribe...\n");
Console.ReadLine();
}
}
}
//***************************************************************************************************//
//*** ***//
//*** The Ticket class just represents a hypothetical ticket composed of an ID and a timestamp. ***//
//*** ***//
//***************************************************************************************************//
class Ticket
{
private readonly string ticketID;
private readonly DateTime timeStamp;
public Ticket(string tid)
{
ticketID = tid;
timeStamp = DateTime.Now;
}
public override string ToString()
{
return String.Format("Ticket ID : {0}\nTimestamp : {1}\n", ticketID, timeStamp.ToString());
}
}
//***************************************************************************************************//
//*** ***//
//*** The TicketFactory class generates a new sequence of tickets on a separate thread. The ***//
//*** generation of the sequence is initiated by the TicketSubscribe method which will be passed ***//
//*** to Observable.Create(). ***//
//*** ***//
//*** TicketSubscribe() provides the IDisposable interface used to dispose of the subscription ***//
//*** stopping ticket generation resources. ***//
//*** ***//
//***************************************************************************************************//
public class TicketFactory : IDisposable
{
private bool bGenerate = true;
internal TicketFactory(object ticketObserver)
{
//************************************************************************//
//*** The sequence generator for tickets will be run on another thread ***//
//************************************************************************//
Task.Factory.StartNew(new Action<object>(TicketGenerator), ticketObserver);
}
//**************************************************************************************************//
//*** Dispose frees the ticket generating resources by allowing the TicketGenerator to complete. ***//
//**************************************************************************************************//
public void Dispose()
{
bGenerate = false;
}
//*****************************************************************************************************************//
//*** TicketGenerator generates a new ticket every 3 sec and calls the observer's OnNext handler to deliver it. ***//
//*****************************************************************************************************************//
private void TicketGenerator(object observer)
{
IObserver<Ticket> ticketObserver = (IObserver<Ticket>)observer;
//***********************************************************************************************//
//*** Generate a new ticket every 3 sec and call the observer's OnNext handler to deliver it. ***//
//***********************************************************************************************//
Ticket t;
while (bGenerate)
{
t = new Ticket(Guid.NewGuid().ToString());
ticketObserver.OnNext(t);
Thread.Sleep(3000);
}
}
//********************************************************************************************************************************//
//*** TicketSubscribe starts the flow of tickets for the ticket sequence when a subscription is created. It is passed to ***//
//*** Observable.Create() as the subscribe parameter. Observable.Create() returns the IObservable<Ticket> that is used to ***//
//*** create subscriptions by calling the IObservable<Ticket>.Subscribe() method. ***//
//*** ***//
//*** The IDisposable interface returned by TicketSubscribe is returned from the IObservable<Ticket>.Subscribe() call. Calling ***//
//*** Dispose cancels the subscription freeing ticket generating resources. ***//
//********************************************************************************************************************************//
public static IDisposable TicketSubscribe(object ticketObserver)
{
TicketFactory tf = new TicketFactory(ticketObserver);
return tf;
}
}
}
Voici un exemple de sortie à partir de l’exemple de code. Appuyez sur Entrée pour annuler l’inscription.
Press ENTER to unsubscribe...
Ticket ID : a5715731-b9ba-4992-af00-d5030956cfc4
Timestamp : 5/18/2011 6:48:50 AM
Ticket ID : d9797b2b-a356-4928-bfce-797a1637b11d
Timestamp : 5/18/2011 6:48:53 AM
Ticket ID : bb01dc7d-1ed5-4ba0-9ce0-6029187792be
Timestamp : 5/18/2011 6:48:56 AM
Ticket ID : 0d3c95de-392f-4ed3-bbda-fed2c6bc7287
Timestamp : 5/18/2011 6:48:59 AM
Ticket ID : 4d19f79e-6d4f-4fec-83a8-9644a1d4e759
Timestamp : 5/18/2011 6:49:05 AM