Azure Functions 用 RedisStreamTrigger
RedisStreamTrigger
はストリームから新規エントリを読み取り、それらの要素を関数に提示します。
関数トリガーの可用性のスコープ
トリガーの種類 | Azure Managed Redis | Azure Cache for Redis |
---|---|---|
ストリーム | Yes | 可 |
重要
Azure Managed Redis または Azure Cache for Redis の Enterprise レベルを使用する場合は、ポート 6380 または 6379 ではなくポート 10000 を使用します。
重要
現在1、Redis トリガーは、従量課金プランで実行されている関数ではサポートされていません。
重要
Functions の Node.js v4 モデルは、Azure Cache for Redis 拡張機能ではまだサポートされていません。 v4 モデルの動作の詳細については、Azure Functions Node.js 開発者ガイドを参照してください。 v3 と v4 の違いの詳細については、移行ガイドを参照してください。
重要
Functions 用 Python v2 モデルは、Azure Cache for Redis 拡張機能ではまだサポートされていません。 v2 モデルの動作の詳細については、Azure Functions Node.js 開発者ガイドを参照してください。
例
重要
.NET 関数の場合は、"インプロセス" モデルより、"分離ワーカー モデル" を使うことをお勧めします。 in-process と isolated worker モデルの比較については、azure Functions の .NET の isolated worker モデルと in-process モデルの違いを参照してください。
実行モデル | 説明 |
---|---|
分離ワーカー モデル | 関数コードは、別の .NET ワーカー プロセスで実行されます。 .NET と .NET Framework のサポートされているバージョンで使います。 詳細については、.NET 分離ワーカー プロセス関数の開発に関する記事を参照してください。 |
インプロセス モデル | 関数コードは、Functions ホスト プロセスと同じプロセスで実行されます。 .NET の長期サポート (LTS) バージョンのみをサポートします。 詳細については、.NET クラス ライブラリ関数の開発に関する記事を参照してください。 |
using Microsoft.Extensions.Logging;
namespace Microsoft.Azure.Functions.Worker.Extensions.Redis.Samples.RedisStreamTrigger
{
internal class SimpleStreamTrigger
{
private readonly ILogger<SimpleStreamTrigger> logger;
public SimpleStreamTrigger(ILogger<SimpleStreamTrigger> logger)
{
this.logger = logger;
}
[Function(nameof(SimpleStreamTrigger))]
public void Run(
[RedisStreamTrigger(Common.connectionStringSetting, "streamKey")] string entry)
{
logger.LogInformation(entry);
}
}
}
package com.function.RedisStreamTrigger;
import com.microsoft.azure.functions.*;
import com.microsoft.azure.functions.annotation.*;
import com.microsoft.azure.functions.redis.annotation.*;
public class SimpleStreamTrigger {
@FunctionName("SimpleStreamTrigger")
public void run(
@RedisStreamTrigger(
name = "req",
connection = "redisConnectionString",
key = "streamTest",
pollingIntervalInMs = 1000,
maxBatchSize = 1)
String message,
final ExecutionContext context) {
context.getLogger().info(message);
}
}
このサンプルでは、同じ index.js
ファイルと、function.json
ファイル内のバインディング データを使用します。
index.js
ファイルは次のとおりです。
module.exports = async function (context, entry) {
context.log(entry);
}
function.json
のバインディング データは次のとおりです。
{
"bindings": [
{
"type": "redisStreamTrigger",
"connection": "redisConnectionString",
"key": "streamTest",
"pollingIntervalInMs": 1000,
"maxBatchSize": 16,
"name": "entry",
"direction": "in"
}
],
"scriptFile": "index.js"
}
このサンプルでは、同じ run.ps1
ファイルと、function.json
ファイル内のバインディング データを使用します。
run.ps1
ファイルは次のとおりです。
param($entry, $TriggerMetadata)
Write-Host ($entry | ConvertTo-Json)
function.json
のバインディング データは次のとおりです。
{
"bindings": [
{
"type": "redisStreamTrigger",
"connection": "redisConnectionString",
"key": "streamTest",
"pollingIntervalInMs": 1000,
"maxBatchSize": 16,
"name": "entry",
"direction": "in"
}
],
"scriptFile": "run.ps1"
}
Python v1 プログラミング モデルでは、関数フォルダー内の個別の function.json ファイルでバインドを定義する必要があります。 詳細については、「Python 開発者ガイド」を参照してください。
このサンプルでは、同じ __init__.py
ファイルと、function.json
ファイル内のバインディング データを使用します。
__init__.py
ファイルは次のとおりです。
import logging
def main(entry: str):
logging.info(entry)
function.json
のバインディング データは次のとおりです。
{
"bindings": [
{
"type": "redisStreamTrigger",
"connection": "redisConnectionString",
"key": "streamTest",
"pollingIntervalInMs": 1000,
"maxBatchSize": 16,
"name": "entry",
"direction": "in"
}
],
"scriptFile": "__init__.py"
}
属性
パラメーター | 説明 | 必要 | Default |
---|---|---|---|
Connection |
キャッシュ 接続文字列を含むアプリケーション設定の名前。次に例を示します。<cacheName>.redis.cache.windows.net:6380,password... |
はい | |
Key |
読み取り元のキー。 | はい | |
PollingIntervalInMs |
Redis サーバーをポーリングする頻度 (ミリ秒単位)。 | 省略可能 | 1000 |
MessagesPerWorker |
各関数のワーカーが処理する必要のあるメッセージの数。 関数がスケーリングする必要のあるワーカーの数を決めるために使われます。 | 省略可能 | 100 |
Count |
Redis から一度にプルする要素の数。 | 省略可能 | 10 |
DeleteAfterProcess |
関数が処理後にストリーム エントリを削除するかどうかを示します。 | 省略可能 | false |
注釈
パラメーター | Description | 必要 | Default |
---|---|---|---|
name |
entry |
はい | |
connection |
キャッシュ 接続文字列を含むアプリケーション設定の名前。次に例を示します。<cacheName>.redis.cache.windows.net:6380,password... |
はい | |
key |
読み取り元のキー。 | はい | |
pollingIntervalInMs |
Redis をポーリングする頻度 (ミリ秒単位)。 | 省略可能 | 1000 |
messagesPerWorker |
各関数のワーカーが処理する必要のあるメッセージの数。 関数をスケーリングするワーカーの数を決定するために使用されます。 | 省略可能 | 100 |
count |
Redis から一度に読み取るエントリの数。 エントリは並列で処理されます。 | 省略可能 | 10 |
deleteAfterProcess |
関数の実行後にストリーム エントリを削除するかどうか。 | 省略可能 | false |
構成
次の表は、function.json ファイルで設定したバインド構成のプロパティを説明しています。
function.json のプロパティ | 説明 | 必要 | Default |
---|---|---|---|
type |
はい | ||
deleteAfterProcess |
省略可能 | false |
|
connection |
キャッシュ 接続文字列を含むアプリケーション設定の名前。次に例を示します。<cacheName>.redis.cache.windows.net:6380,password... |
はい | |
key |
読み取り元のキー。 | はい | |
pollingIntervalInMs |
Redis をポーリングする頻度 (ミリ秒単位)。 | 省略可能 | 1000 |
messagesPerWorker |
(省略可能) 各関数のワーカーが処理する必要のあるメッセージの数。 関数がスケーリングする必要のあるワーカーの数を決めるために使用されます。 | 省略可能 | 100 |
count |
Redis から一度に読み取るエントリの数。 これらは並列で処理されます。 | 省略可能 | 10 |
name |
はい | ||
direction |
はい |
完全な例については、セクションの例を参照してください。
使用方法
RedisStreamTrigger
Azure 関数は、ストリームから新規エントリを読み取り、それらのエントリを関数に提示します。
トリガーは、構成可能な固定間隔で Redis をポーリングし、XREADGROUP
を使ってストリームから要素を読み取ります。
関数のすべてのインスタンスのコンシューマー グループは、関数の名前、つまり、StreamTrigger サンプルのSimpleStreamTrigger
です。
各関数インスタンスは、 WEBSITE_INSTANCE_ID
を使用するか、グループ内のコンシューマー名として使用するランダム GUID を生成して、関数のスケールアウトされたインスタンスがストリームから同じメッセージを読み取らないようにします。
型 | 説明 |
---|---|
byte[] |
チャネルからのメッセージ。 |
string |
チャネルからのメッセージ。 |
Custom |
トリガーは、Json.NET シリアル化を使用して、チャネルからのメッセージを string からカスタム型にマップします。 |