Partager via


StreamInsight手札(四)——使用IEnumerable接口创建StreamInsight程序

本文将介绍如何利用IEnumerable接口创建StreamInsight应用程序。

IEnumerable与IObservable最大的区别在于IEnumerable是采用Pull模式获取数据,而IObservable则是用Push模式推送数据。

用户首先通过IEnumerable.GetEnumerator来获得数据源的Enumerator,然后通过调用IEnumerator.MoveNext和IEnumerator.Current来获取数据。StreamInsight的输入输出通过一个数据队列连接。输入适配器获得数据,并调用Enqueue将数据放入队列,输出适配器循环检测队列是否为空,如果不是空队列,则调用Dequeue将数据取出。

在HelloInsight程序基础上,可以通过如下修改来实现IEnumerable接口。

1、获取输入数据源的Enumerator(HelloPointInput.cs):

public HelloPointInput(HelloInputConfig config)

{

_config = config;

var streamReader = new StreamReader(config.fileName);

strings = new List<string>();

while (!streamReader.EndOfStream)

{

strings.Add(streamReader.ReadLine());

}

stringEnumerator = strings.GetEnumerator();

streamReader.Close();

}

2、读取Enumerator中的数据,并放入队列。注意判断队满的情况(HelloPointInput.cs):

private void ProduceEvents()

{

    while (AdapterState != AdapterState.Stopping)

    {

        if (stringEnumerator.MoveNext())

        {

            try

            {

                var line = stringEnumerator.Current;

                var pendingEvent = CreateInsertEvent();

                pendingEvent.StartTime = DateTime.Now;

                pendingEvent.Payload = new HelloPayload

                {

                    str = line

                };

                EnqueueOperationResult result;

                result = Enqueue(ref pendingEvent);

                if (result == EnqueueOperationResult.Full)

                {

                    Thread.Sleep(1000);

                    //Ready();

                    return;

                }

                EnqueueCtiEvent(DateTime.Now);

                Thread.Sleep(1000);

            }

            catch

            {

                //error handling should go here

            }

        }

        else

        {

            break;

        }

    }

    Stopped();

}

注意:在IEnumerator模式下,CTI事件是必须的,否则不会产生数据输出。

3、循环读取队列中的数据,并进行输出处理(HelloPointOutput.cs):

private void ConsumeEvents()

{

    PointEvent<HelloPayload> currEvent;

    DequeueOperationResult result;

    while (AdapterState != AdapterState.Stopping)

    {

        result = Dequeue(out currEvent);

        if (result == DequeueOperationResult.Empty)

        {

            Ready();

            return;

                                   

        }

        else

        {

            if (currEvent.EventKind == EventKind.Insert)

            {

                Console.WriteLine("Output: " +

                currEvent.Payload.str

                );

            }

            ReleaseEvent(ref currEvent);

        }

    }

    Stopped();

}

4、运行程序

建立测试文件:

将输入数据源指向测试文件的存储地址。运行程序,得到如下输出:

 

软件测试工程师 金晶

HelloInsight.zip