入力と出力のバインドを使用して Azure Functions と Azure Data Explorer を統合する (プレビュー)
重要
このコネクタは、Microsoft Fabric のリアルタイム インテリジェンスで使用できます。 次の例外を除き、この記事の手順を使用してください。
- 必要に応じて、「KQL データベースを作成する」の手順に従ってデータベースを作成する。
- 必要に応じて、「空のテーブルを作成する」の手順に従ってテーブルを作成する。
- 「URI をコピーする」の手順に従って、クエリまたはインジェスト URI を取得する。
- KQL クエリセットでクエリを実行する。
Azure Functions を使用すると、スケジュールに従って、またはイベントに応答して、クラウドでサーバーレス コードを実行できます。 Azure Functions に対して Azure Data Explorer の入力と出力のバインドを使うと、Azure Data Explorer をワークフローに統合して、データを取り込み、クラスターに対してクエリを実行できます。
前提条件
- Azure サブスクリプション。 無料の Azure アカウントを作成します。
- Azure Data Explorer クラスターとデータベースおよびサンプル データ。 クラスターとデータベースを作成します。
- ストレージ アカウント。
サンプル プロジェクトと統合してみます
Azure Functions に Azure Data Explorer のバインドを使用する方法
Azure Functions に Azure Data Explorer のバインドを使う方法については、次のトピックをご覧ください。
- Azure Functions における Azure Data Explorer バインドの概要
- Azure Functions 用の Azure Data Explorer 入力バインド
- Azure Functions 用の Azure Data Explorer 出力バインド
Azure Functions に Azure Data Explorer のバインドを使用するシナリオ
以下のセクションでは、Azure Functions に Azure Data Explorer のバインドを使用するいくつかの一般的なシナリオについて説明します。
入力バインディング
入力バインドは、必要に応じてパラメーターを使って Kusto 照会言語 (KQL) クエリまたは KQL 関数を実行し、関数に出力を返します。
以下のセクションでは、いくつかの一般的なシナリオで入力バインドを使う方法について説明します。
シナリオ 1: クラスターでデータのクエリを実行する HTTP エンドポイント
REST API を使って Azure Data Explorer のデータを公開する必要がある状況では、入力バインドの使用を適用できます。 このシナリオでは、Azure Functions HTTP トリガーを使って、クラスター内のデータのクエリを実行します。 このシナリオは、外部のアプリケーションまたはサービスに対して Azure Data Explorer のデータへのプログラムによるアクセスを提供する必要がある場合に特に便利です。 REST API を介してデータを公開すると、アプリケーションはクラスターに直接接続する必要がなく、データをすぐに使用できます。
次のコードでは、HTTP トリガーと Azure Data Explorer の入力バインドを使って関数を定義しています。 入力バインドでは、productsdb データベースの Products テーブルに対して実行するクエリが指定されています。 この関数では、パラメーターとして渡される述語として、productId 列を使います。
{
[FunctionName("GetProduct")]
public static async Task<IActionResult> RunAsync(
[HttpTrigger(AuthorizationLevel.User, "get", Route = "getproducts/{productId}")]
HttpRequest req,
[Kusto(Database:"productsdb" ,
KqlCommand = "declare query_parameters (productId:long);Products | where ProductID == productId" ,
KqlParameters = "@productId={productId}",
Connection = "KustoConnectionString")]
IAsyncEnumerable<Product> products)
{
IAsyncEnumerator<Product> enumerator = products.GetAsyncEnumerator();
var productList = new List<Product>();
while (await enumerator.MoveNextAsync())
{
productList.Add(enumerator.Current);
}
await enumerator.DisposeAsync();
return new OkObjectResult(productList);
}
}
この関数は、次のようにして呼び出すことができます。
curl https://myfunctionapp.azurewebsites.net/api/getproducts/1
シナリオ 2: クラスターからデータをエクスポートするスケジュールされたトリガー
次のシナリオは、時間ベースのスケジュールでデータをエクスポートする必要がある状況で適用できます。
このコードでは、productsdb データベースから Azure Blob Storage の CSV ファイルに売上データの集計をエクスポートするタイマー トリガーを含む関数が定義されています。
public static async Task Run([TimerTrigger("0 0 1 * * *")] TimerInfo myTimer,
[Kusto(ConnectionStringSetting = "KustoConnectionString",
DatabaseName = "productsdb",
Query = "ProductSales | where OrderDate >= ago(1d) | summarize Sales = sum(ProductSales) by ProductName | top 10 by Sales desc")] IEnumerable<dynamic> queryResults,
[Blob("salescontainer/productsblob.csv", FileAccess.Write, Connection = "BlobStorageConnection")] CloudBlockBlob outputBlob,
ILogger log)
{
// Write the query results to a CSV file
using (var stream = new MemoryStream())
using (var writer = new StreamWriter(stream))
using (var csv = new CsvWriter(writer, CultureInfo.InvariantCulture))
{
csv.WriteRecords(queryResults);
writer.Flush();
stream.Position = 0;
await outputBlob.UploadFromStreamAsync(stream);
}
}
出力バインディング
出力バインドは 1 つ以上の行を受け取り、Azure Data Explorer のテーブルにそれを挿入します。
以下のセクションでは、いくつかの一般的なシナリオで出力バインドを使う方法について説明します。
シナリオ 1: クラスターにデータを取り込む HTTP エンドポイント
次のシナリオは、着信した HTTP 要求を処理してクラスターに取り込む必要がある場合に適用できます。 出力バインドを使うと、要求からの着信データを Azure Data Explorer のテーブルに書き込むことができます。
次のコードでは、HTTP トリガーと Azure Data Explorer の出力バインドを使って関数を定義しています。 この関数は、HTTP 要求本文の JSON ペイロードを取得して、productsdb データベースの products テーブルにそれを書き込みます。
public static IActionResult Run(
[HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "addproductuni")]
HttpRequest req, ILogger log,
[Kusto(Database:"productsdb" ,
TableName ="products" ,
Connection = "KustoConnectionString")] out Product product)
{
log.LogInformation($"AddProduct function started");
string body = new StreamReader(req.Body).ReadToEnd();
product = JsonConvert.DeserializeObject<Product>(body);
string productString = string.Format(CultureInfo.InvariantCulture, "(Name:{0} ID:{1} Cost:{2})",
product.Name, product.ProductID, product.Cost);
log.LogInformation("Ingested product {}", productString);
return new CreatedResult($"/api/addproductuni", product);
}
この関数は、次のようにして呼び出すことができます。
curl -X POST https://myfunctionapp.azurewebsites.net/api/addproductuni -d '{"Name":"Product1","ProductID":1,"Cost":100,"ActivatedOn":"2023-01-02T00:00:00"}'
シナリオ 2: Azure でサポートされている RabbitMQ または他のメッセージング システムからデータを取り込む
次のシナリオは、メッセージング システムからのデータをクラスターに取り込む必要がある場合に適用できます。 出力バインドを使うと、メッセージング システムからの着信データを Azure Data Explorer のテーブルに取り込むことができます。
このコードで定義されている関数は、RabbitMQ トリガーを介して着信したメッセージ (JSON 形式のデータ) を productsdb データベースのテーブル products テーブルに取り込みます。
public class QueueTrigger
{
[FunctionName("QueueTriggerBinding")]
[return: Kusto(Database: "productsdb",
TableName = "products",
Connection = "KustoConnectionString")]
public static Product Run(
[RabbitMQTrigger(queueName: "bindings.products.queue", ConnectionStringSetting = "rabbitMQConnectionAppSetting")] Product product,
ILogger log)
{
log.LogInformation($"Dequeued product {product.ProductID}");
return product;
}
}
関数について詳しくは、Azure Functions のドキュメントをご覧ください。 Azure Data Explorer の拡張機能は次の場所から入手できます。