ISubject < TSource、 TResult > 介面
表示同時為可觀察序列和觀察者的物件。
Namespace:System.Reactive.Subjects
裝配: System.Reactive.dll) 中的 System.Reactive (
Syntax
'Declaration
Public Interface ISubject(Of In TSource, Out TResult) _
Inherits IObserver(Of TSource), IObservable(Of TResult)
'Usage
Dim instance As ISubject(Of In TSource, Out TResult)
public interface ISubject<in TSource, out TResult> : IObserver<TSource>,
IObservable<TResult>
generic<typename TSource, typename TResult>
public interface class ISubject : IObserver<TSource>,
IObservable<TResult>
type ISubject<'TSource, 'TResult> =
interface
interface IObserver<'TSource>
interface IObservable<'TResult>
end
JScript does not support generic types and methods.
類型參數
ISubject < TSource、TResult > 類型會公開下列成員。
方法
名稱 | 描述 | |
---|---|---|
OnCompleted | (繼承自IObserver< TSource > .) | |
OnError | (繼承自IObserver< TSource > .) | |
OnNext | (繼承自IObserver< TSource > .) | |
訂閱 | (繼承自IObservable< TResult > .) |
頂端
擴充方法
頂端
備註
這個主旨介面可讓您彈性地觀察某個類型的可觀察序列,同時發行另一種型別的可觀察序列。
範例
這個範例示範如何實作 ISubject < TSource,TResult > 會觀察某個類型的可觀察序列,同時發行另一種型別的可觀察序列。 本範例中的 AsciiConverterSubject 會藉由觀察 char 類型的序列併發布 int 的可觀察序列來示範實作。int 的已發佈可觀察序列是所觀察之每個 char 值的 ASCII 程式碼。
using System;
using System.Collections.Generic;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Threading;
namespace Example
{
class Program
{
static void Main()
{
//****************************************************************************************//
//*** Create an observable sequence of char from console input until enter is pressed. ***//
//****************************************************************************************//
IObservable<char> keySequence = Observable.Create<char>(observer =>
{
bool bContinue = true;
while (bContinue)
{
ConsoleKeyInfo keyInfo = Console.ReadKey(true);
if (keyInfo.Key != ConsoleKey.Enter)
{
observer.OnNext(keyInfo.KeyChar);
}
else
{
observer.OnCompleted();
bContinue = false;
}
}
return (() => { });
});
//****************************************************************************************//
//*** Create an AsciiConverterSubject which takes a source type of char and returns an ***//
//*** observable sequence of int which is the ASCII code for source items of char. ***//
//****************************************************************************************//
AsciiConverterSubject myConverterSubject = new AsciiConverterSubject();
//****************************************************************************************//
//*** Subscribe to the keySequence on the .NET threadpool so the main thread can ***//
//*** create subscriptions to the AsciiConverterSubject ***//
//****************************************************************************************//
IDisposable subscription = keySequence.SubscribeOn(Scheduler.ThreadPool).Subscribe(myConverterSubject);
Console.WriteLine("\nEnter a sequence of keys to have the AsciiConverterSubject\nconvert the keys to their ASCII code values.\n");
Console.WriteLine("Press ENTER to terminate the observable sequence...\n");
//****************************************************************************************//
//*** Subscribe to the AsciiConverterSubject and write the ASCII code values to the ***//
//*** console window. ***//
//*** ***//
//*** The main thread will wait on the completion of the keySequence. It completes ***//
//*** when ENTER is pressed. ***//
//****************************************************************************************//
EventWaitHandle waitHandle = new EventWaitHandle(false, EventResetMode.ManualReset);
myConverterSubject.Subscribe(c => Console.WriteLine("Ascii Char code {0} entered.",c), () => waitHandle.Set());
waitHandle.WaitOne();
//***************************************//
//*** Explicitly releasing resources. ***//
//***************************************//
subscription.Dispose();
myConverterSubject.Dispose();
}
}
//***********************************************************************************************//
//*** ***//
//*** The AsciiConverterSubject demonstrates an implementation of ISubject<TSource, TResult>. ***//
//*** It is used to subscribe to an observable sequence of char. It publishes an observable ***//
//*** sequence of int which should be the ASCII code for each char value it observes. ***//
//*** ***//
//***********************************************************************************************//
class AsciiConverterSubject : ISubject<char, int>, IDisposable
{
private List<IObserver<int>> observerList;
private bool isDisposed;
private bool isStopped;
object gate = new object();
Exception exception;
public AsciiConverterSubject()
{
observerList = new List<IObserver<int>>();
}
public void OnCompleted()
{
//****************************************************************************************//
//*** Make sure the OnCompleted operation is not preempted by another operation ***//
//*** which would break the expected behavior. For example, don't allow an error from ***//
//*** OnError preempt OnCompleted from anotther thread. Then OnCompleted would follow ***//
//*** an error. That would be an incorrect behavior. ***//
//****************************************************************************************//
lock (gate)
{
CheckDisposed();
if (!isStopped)
{
foreach (IObserver<int> observer in observerList)
{
observer.OnCompleted();
}
observerList.Clear();
isStopped = true;
}
}
}
public void OnError(Exception error)
{
if (error == null)
throw new ArgumentException("Exception error should not be null.");
//****************************************************************************************//
//*** Make sure the OnError operation is not preempted by another operation which ***//
//*** would break the expected behavior. For example, don't allow unsubscribe or an ***//
//*** OnCompleted operation to preempt OnError from another thread. This would result ***//
//*** in an error following completion. That would be an incorrect behavior. ***//
//****************************************************************************************//
lock (gate)
{
CheckDisposed();
if (!isStopped)
{
exception = error;
foreach (IObserver<int> observer in observerList)
{
observer.OnError(error);
}
observerList.Clear();
isStopped = true;
}
}
}
public void OnNext(char value)
{
//****************************************************************************************//
//*** Make sure the OnNext operation is not preempted by another operation which ***//
//*** would break the expected behavior. For example, don't allow unsubscribe, errors ***//
//*** or an OnCompleted operation to preempt OnNext from another thread. This would ***//
//*** have the result of items in a sequence following completion, errors, or ***//
//*** unsubscribe. That would be an incorrect behavior. ***//
//****************************************************************************************//
lock (gate)
{
CheckDisposed();
if (!isStopped)
{
foreach (IObserver<int> observer in observerList)
{
observer.OnNext(Convert.ToInt32(value));
}
}
}
}
public IDisposable Subscribe(IObserver<int> observer)
{
if (observer == null)
throw new ArgumentException("observer should not BehaviorSubject null.");
//****************************************************************************************//
//*** Make sure Subscribe occurs in sync with the other operations so we keep the ***//
//*** correct behavior depending on whether an error has occurred or the observable ***//
//*** sequence has completed. ***//
//****************************************************************************************//
lock (gate)
{
CheckDisposed();
if (!isStopped)
{
observerList.Add(observer);
return new Subscription(observer, this);
}
else if (exception != null)
{
observer.OnError(exception);
return Disposable.Empty;
}
else
{
observer.OnCompleted();
return Disposable.Empty;
}
}
}
private void Unsubscribe(IObserver<int> observer)
{
//****************************************************************************************//
//*** Make sure Unsubscribe occurs in sync with the other operations so we keep the ***//
//*** correct behavior. ***//
//****************************************************************************************//
lock (gate)
{
observerList.Remove(observer);
}
}
public void Dispose()
{
//****************************************************************************************//
//*** Make sure Dispose occurs in sync with the other operations so we keep the ***//
//*** correct behavior. For example, Dispose shouldn't preempt the other operations ***//
//*** changing state variables after they have been checked. ***//
//****************************************************************************************//
lock (gate)
{
observerList.Clear();
isStopped = true;
isDisposed = true;
}
}
private void CheckDisposed()
{
if (isDisposed)
throw new ObjectDisposedException("Subject has been disposed.");
}
//************************************************************************************//
//*** ***//
//*** The Subscription class wraps each observer that creates a subscription. This ***//
//*** is needed to expose an IDisposable interface through which a observer can ***//
//*** cancel the subscription. ***//
//*** ***//
//************************************************************************************//
class Subscription : IDisposable
{
private AsciiConverterSubject subject;
private IObserver<int> observer;
public Subscription(IObserver<int> obs, AsciiConverterSubject sub)
{
subject = sub;
observer = obs;
}
public void Dispose()
{
subject.Unsubscribe(observer);
}
}
}
}
下列輸出是由範例程式碼所產生。
Enter a sequence of keys to have the AsciiConverterSubject
convert the keys to their ASCII code values.
Press ENTER to terminate the observable sequence...
Ascii Char code 59 entered.
Ascii Char code 108 entered.
Ascii Char code 115 entered.
Ascii Char code 100 entered.
Ascii Char code 107 entered.
Ascii Char code 102 entered.
Ascii Char code 59 entered.
Ascii Char code 108 entered.
Ascii Char code 115 entered.
Ascii Char code 100 entered.
Ascii Char code 107 entered.
Ascii Char code 102 entered.
Ascii Char code 59 entered.
Ascii Char code 108 entered.
Ascii Char code 115 entered.
Ascii Char code 107 entered.
Ascii Char code 100 entered.
Ascii Char code 59 entered.
Ascii Char code 102 entered.
Ascii Char code 108 entered.
Ascii Char code 115 entered.
Ascii Char code 107 entered.
Ascii Char code 100 entered.