ISubject < T > 介面
表示同時為可觀察序列和觀察者的物件。
Namespace:System.Reactive.Subjects
裝配: System.Reactive.dll) 中的 System.Reactive (
Syntax
'Declaration
Public Interface ISubject(Of T) _
Inherits ISubject(Of T, T), IObserver(Of T), _
IObservable(Of T)
'Usage
Dim instance As ISubject(Of T)
public interface ISubject<T> : ISubject<T, T>,
IObserver<T>, IObservable<T>
generic<typename T>
public interface class ISubject : ISubject<T, T>,
IObserver<T>, IObservable<T>
type ISubject<'T> =
interface
interface ISubject<'T, 'T>
interface IObserver<'T>
interface IObservable<'T>
end
JScript does not support generic types and methods.
類型參數
- T
主旨的類型。
ISubject < T > 類型會公開下列成員。
方法
名稱 | 描述 | |
---|---|---|
OnCompleted | (繼承自IObserver< T > .) | |
OnError | (繼承自IObserver< T > .) | |
OnNext | (繼承自IObserver< T > .) | |
訂閱 | (繼承自IObservable< T > .) |
頂端
擴充方法
頂端
備註
實作 ISubject 介面可讓您使用自訂行為建立自己的主旨。
範例
下列範例程式碼會實作範例主旨,其只會回應主控台視窗執行的方法。
using System;
using System.Collections.Generic;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
namespace Example
{
class Program
{
static void Main()
{
//************************************************************************************//
//*** The ConsoleEchoSubject will just echo it's method calls out to the console ***//
//*** window. ***//
//*** ***//
//*** Create an instance of the ConsoleEchoSubject and subscribe to a sequence of ***//
//*** the first 5 integers returned by the Interval operator. ***//
//*** ***//
//************************************************************************************//
ConsoleEchoSubject<long> mySubject = new ConsoleEchoSubject<long>();
var obs = Observable.Interval(TimeSpan.FromSeconds(1), Scheduler.ThreadPool).Take(5);
Console.WriteLine("Subscribing ConsoleEchoSubject to observable sequence.\n");
obs.Subscribe(mySubject);
//************************************************************************************//
//*** Subscribe to the subject's observable interface. Each value will be written ***//
//*** to the console window. ***//
//************************************************************************************//
IDisposable subscription = mySubject.Subscribe(x => Console.WriteLine(x), () =>
{
Console.WriteLine("\nSequence Completed.\n");
Console.WriteLine("Press ENTER to exit...");
});
Console.ReadLine();
//***************************************//
//*** Explicitly releasing resources. ***//
//***************************************//
subscription.Dispose();
mySubject.Dispose();
}
}
//************************************************************************************//
//*** ***//
//*** The ConsoleEchoSubject will just echo it's method calls out to the console ***//
//*** window. ***//
//*** ***//
//************************************************************************************//
class ConsoleEchoSubject<T> : ISubject<T>, IDisposable
{
private List<IObserver<T>> observerList;
private bool isDisposed;
private bool isStopped;
object gate = new object();
Exception exception;
public ConsoleEchoSubject()
{
observerList = new List<IObserver<T>>();
}
public void OnCompleted()
{
//****************************************************************************************//
//*** Make sure the OnCompleted operation is not preempted by another operation ***//
//*** which would break the expected behavior. For example, don't allow an error from ***//
//*** OnError preempt OnCompleted from anotther thread. Then OnCompleted would follow ***//
//*** an error. That would be an incorrect behavior. ***//
//****************************************************************************************//
lock (gate)
{
CheckDisposed();
if (!isStopped)
{
Console.WriteLine("\nConsoleEchoSubject : OnCompleted()");
foreach (IObserver<T> observer in observerList)
{
observer.OnCompleted();
}
observerList.Clear();
isStopped = true;
}
}
}
public void OnError(Exception error)
{
if (error == null)
throw new ArgumentException("Exception error should not be null.");
//****************************************************************************************//
//*** Make sure the OnError operation is not preempted by another operation which ***//
//*** would break the expected behavior. For example, don't allow unsubscribe or an ***//
//*** OnCompleted operation to preempt OnError from another thread. This would result ***//
//*** in an error following completion. That would be an incorrect behavior. ***//
//****************************************************************************************//
lock (gate)
{
CheckDisposed();
if (!isStopped)
{
Console.WriteLine("\nConsoleEchoSubject : OnError({0})", error.Message);
exception = error;
foreach (IObserver<T> observer in observerList)
{
observer.OnError(error);
}
observerList.Clear();
isStopped = true;
}
}
}
public void OnNext(T value)
{
//****************************************************************************************//
//*** Make sure the OnNext operation is not preempted by another operation which ***//
//*** would break the expected behavior. For example, don't allow unsubscribe, errors ***//
//*** or an OnCompleted operation to preempt OnNext from another thread. This would ***//
//*** have the result of items in a sequence following completion, errors, or ***//
//*** unsubscribe. That would be an incorrect behavior. ***//
//****************************************************************************************//
lock (gate)
{
CheckDisposed();
if (!isStopped)
{
Console.WriteLine("\nConsoleEchoSubject : OnNext({0})", value.ToString());
foreach (IObserver<T> observer in observerList)
{
observer.OnNext(value);
}
}
}
}
public IDisposable Subscribe(IObserver<T> observer)
{
if (observer == null)
throw new ArgumentException("observer should not BehaviorSubject null.");
//****************************************************************************************//
//*** Make sure Subscribe occurs in sync with the other operations so we keep the ***//
//*** correct behavior depending on whether an error has occurred or the observable ***//
//*** sequence has completed. ***//
//****************************************************************************************//
lock (gate)
{
CheckDisposed();
if (!isStopped)
{
Console.WriteLine("\nConsoleEchoSubject : Creating new subscription.");
observerList.Add(observer);
return new Subscription(observer, this);
}
else if(exception != null)
{
observer.OnError(exception);
return Disposable.Empty;
}
else
{
observer.OnCompleted();
return Disposable.Empty;
}
}
}
private void Unsubscribe(IObserver<T> observer)
{
//****************************************************************************************//
//*** Make sure Unsubscribe occurs in sync with the other operations so we keep the ***//
//*** correct behavior. ***//
//****************************************************************************************//
lock (gate)
{
Console.WriteLine("\nConsoleEchoSubject : Unsubscribing subscription.");
observerList.Remove(observer);
}
}
public void Dispose()
{
//****************************************************************************************//
//*** Make sure Dispose occurs in sync with the other operations so we keep the ***//
//*** correct behavior. For example, Dispose shouldn't preempt the other operations ***//
//*** changing state variables after they have been checked. ***//
//****************************************************************************************//
lock (gate)
{
Console.WriteLine("\nConsoleEchoSubject : Disposing resources.");
observerList.Clear();
isStopped = true;
isDisposed = true;
}
}
private void CheckDisposed()
{
if (isDisposed)
throw new ObjectDisposedException("Subject has been disposed.");
}
//************************************************************************************//
//*** ***//
//*** The Subscription class wraps each observer that creates a subscription. This ***//
//*** is needed to expose an IDisposable interface through which a observer can ***//
//*** cancel the subscription. ***//
//*** ***//
//************************************************************************************//
class Subscription : IDisposable
{
private ConsoleEchoSubject<T> subject;
private IObserver<T> observer;
public Subscription(IObserver<T> obs, ConsoleEchoSubject<T> sub)
{
subject = sub;
observer = obs;
}
public void Dispose()
{
subject.Unsubscribe(observer);
}
}
}
}
下列輸出是由範例程式碼所產生。
Subscribing ConsoleEchoSubject to observable sequence.
ConsoleEchoSubject : Creating new subscription.
ConsoleEchoSubject : OnNext(0)
0
ConsoleEchoSubject : OnNext(1)
1
ConsoleEchoSubject : OnNext(2)
2
ConsoleEchoSubject : OnNext(3)
3
ConsoleEchoSubject : OnNext(4)
4
ConsoleEchoSubject : OnCompleted()
Sequence Completed.
Press ENTER to exit...
ConsoleEchoSubject : Unsubscribing subscription.
ConsoleEchoSubject : Disposing resources.
C:\