次の方法で共有


Azure Functions 用 RedisStreamTrigger

RedisStreamTrigger はストリームから新規エントリを読み取り、それらの要素を関数に提示します。

関数トリガーの可用性のスコープ

トリガーの種類 Azure Managed Redis Azure Cache for Redis
ストリーム Yes

重要

Azure Managed Redis または Azure Cache for Redis の Enterprise レベルを使用する場合は、ポート 6380 または 6379 ではなくポート 10000 を使用します。

重要

現在1、Redis トリガーは、従量課金プランで実行されている関数ではサポートされていません。

重要

Functions の Node.js v4 モデルは、Azure Cache for Redis 拡張機能ではまだサポートされていません。 v4 モデルの動作の詳細については、Azure Functions Node.js 開発者ガイドを参照してください。 v3 と v4 の違いの詳細については、移行ガイドを参照してください。

重要

Functions 用 Python v2 モデルは、Azure Cache for Redis 拡張機能ではまだサポートされていません。 v2 モデルの動作の詳細については、Azure Functions Node.js 開発者ガイドを参照してください。

重要

.NET 関数の場合は、"インプロセス" モデルより、"分離ワーカー モデル" を使うことをお勧めします。 in-processisolated worker モデルの比較については、azure Functions の .NET の isolated worker モデルと in-process モデルの違いを参照してください。

実行モデル 説明
分離ワーカー モデル 関数コードは、別の .NET ワーカー プロセスで実行されます。 .NET と .NET Framework のサポートされているバージョンで使います。 詳細については、.NET 分離ワーカー プロセス関数の開発に関する記事を参照してください。
インプロセス モデル 関数コードは、Functions ホスト プロセスと同じプロセスで実行されます。 .NET の長期サポート (LTS) バージョンのみをサポートします。 詳細については、.NET クラス ライブラリ関数の開発に関する記事を参照してください。
using Microsoft.Extensions.Logging;

namespace Microsoft.Azure.Functions.Worker.Extensions.Redis.Samples.RedisStreamTrigger
{
    internal class SimpleStreamTrigger
    {
        private readonly ILogger<SimpleStreamTrigger> logger;

        public SimpleStreamTrigger(ILogger<SimpleStreamTrigger> logger)
        {
            this.logger = logger;
        }

        [Function(nameof(SimpleStreamTrigger))]
        public void Run(
            [RedisStreamTrigger(Common.connectionStringSetting, "streamKey")] string entry)
        {
            logger.LogInformation(entry);
        }
    }
}

package com.function.RedisStreamTrigger;

import com.microsoft.azure.functions.*;
import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.redis.annotation.*;

public class SimpleStreamTrigger {
    @FunctionName("SimpleStreamTrigger")
    public void run(
            @RedisStreamTrigger(
                name = "req",
                connection = "redisConnectionString",
                key = "streamTest",
                pollingIntervalInMs = 1000,
                maxBatchSize = 1)
                String message,
            final ExecutionContext context) {
            context.getLogger().info(message);
    }
}

このサンプルでは、同じ index.js ファイルと、function.json ファイル内のバインディング データを使用します。

index.js ファイルは次のとおりです。

module.exports = async function (context, entry) {
    context.log(entry);
}

function.json のバインディング データは次のとおりです。

{
  "bindings": [
    {
      "type": "redisStreamTrigger",
      "connection": "redisConnectionString",
      "key": "streamTest",
      "pollingIntervalInMs": 1000,
      "maxBatchSize": 16,
      "name": "entry",
      "direction": "in"
    }
  ],
  "scriptFile": "index.js"
}

このサンプルでは、同じ run.ps1 ファイルと、function.json ファイル内のバインディング データを使用します。

run.ps1 ファイルは次のとおりです。

param($entry, $TriggerMetadata)
Write-Host ($entry | ConvertTo-Json)

function.json のバインディング データは次のとおりです。

{
  "bindings": [
    {
      "type": "redisStreamTrigger",
      "connection": "redisConnectionString",
      "key": "streamTest",
      "pollingIntervalInMs": 1000,
      "maxBatchSize": 16,
      "name": "entry",
      "direction": "in"
    }
  ],
  "scriptFile": "run.ps1"
}

Python v1 プログラミング モデルでは、関数フォルダー内の個別の function.json ファイルでバインドを定義する必要があります。 詳細については、「Python 開発者ガイド」を参照してください。

このサンプルでは、同じ __init__.py ファイルと、function.json ファイル内のバインディング データを使用します。

__init__.py ファイルは次のとおりです。

import logging

def main(entry: str):
    logging.info(entry)

function.json のバインディング データは次のとおりです。

{
  "bindings": [
    {
      "type": "redisStreamTrigger",
      "connection": "redisConnectionString",
      "key": "streamTest",
      "pollingIntervalInMs": 1000,
      "maxBatchSize": 16,
      "name": "entry",
      "direction": "in"
    }
  ],
  "scriptFile": "__init__.py"
}

属性

パラメーター 説明 必要 Default
Connection キャッシュ 接続文字列を含むアプリケーション設定の名前。次に例を示します。<cacheName>.redis.cache.windows.net:6380,password... はい
Key 読み取り元のキー。 はい
PollingIntervalInMs Redis サーバーをポーリングする頻度 (ミリ秒単位)。 省略可能 1000
MessagesPerWorker 各関数のワーカーが処理する必要のあるメッセージの数。 関数がスケーリングする必要のあるワーカーの数を決めるために使われます。 省略可能 100
Count Redis から一度にプルする要素の数。 省略可能 10
DeleteAfterProcess 関数が処理後にストリーム エントリを削除するかどうかを示します。 省略可能 false

注釈

パラメーター Description 必要 Default
name entry はい
connection キャッシュ 接続文字列を含むアプリケーション設定の名前。次に例を示します。<cacheName>.redis.cache.windows.net:6380,password... はい
key 読み取り元のキー。 はい
pollingIntervalInMs Redis をポーリングする頻度 (ミリ秒単位)。 省略可能 1000
messagesPerWorker 各関数のワーカーが処理する必要のあるメッセージの数。 関数をスケーリングするワーカーの数を決定するために使用されます。 省略可能 100
count Redis から一度に読み取るエントリの数。 エントリは並列で処理されます。 省略可能 10
deleteAfterProcess 関数の実行後にストリーム エントリを削除するかどうか。 省略可能 false

構成

次の表は、function.json ファイルで設定したバインド構成のプロパティを説明しています。

function.json のプロパティ 説明 必要 Default
type はい
deleteAfterProcess 省略可能 false
connection キャッシュ 接続文字列を含むアプリケーション設定の名前。次に例を示します。<cacheName>.redis.cache.windows.net:6380,password... はい
key 読み取り元のキー。 はい
pollingIntervalInMs Redis をポーリングする頻度 (ミリ秒単位)。 省略可能 1000
messagesPerWorker (省略可能) 各関数のワーカーが処理する必要のあるメッセージの数。 関数がスケーリングする必要のあるワーカーの数を決めるために使用されます。 省略可能 100
count Redis から一度に読み取るエントリの数。 これらは並列で処理されます。 省略可能 10
name はい
direction はい

完全な例については、セクションの例を参照してください。

使用方法

RedisStreamTrigger Azure 関数は、ストリームから新規エントリを読み取り、それらのエントリを関数に提示します。

トリガーは、構成可能な固定間隔で Redis をポーリングし、XREADGROUP を使ってストリームから要素を読み取ります。

関数のすべてのインスタンスのコンシューマー グループは、関数の名前、つまり、StreamTrigger サンプルSimpleStreamTriggerです。

各関数インスタンスは、 WEBSITE_INSTANCE_ID を使用するか、グループ内のコンシューマー名として使用するランダム GUID を生成して、関数のスケールアウトされたインスタンスがストリームから同じメッセージを読み取らないようにします。

説明
byte[] チャネルからのメッセージ。
string チャネルからのメッセージ。
Custom トリガーは、Json.NET シリアル化を使用して、チャネルからのメッセージを string からカスタム型にマップします。