演習 - イベントを処理して Azure Cosmos DB にデータを格納する
2 番目の関数では、Azure イベント ハブで特定の名前空間のイベントをリッスンし、処理して、Azure Cosmos DB で作成したデータベースに格納することができます。
Azure Cosmos DB を使用してデータベースを作成する
データベースを作成するには、az cosmosdb create
コマンドを使用します。 このコマンドでは、Azure Cosmos DB アカウント、データベース、SQL コンテナーを使います。
az cosmosdb create \
--resource-group $RESOURCE_GROUP \
--name $COSMOS_DB_ACCOUNT
az cosmosdb sql database create \
--resource-group $RESOURCE_GROUP \
--account-name $COSMOS_DB_ACCOUNT \
--name TelemetryDb
az cosmosdb sql container create \
--resource-group $RESOURCE_GROUP \
--account-name $COSMOS_DB_ACCOUNT \
--database-name TelemetryDb \
--name TelemetryInfo \
--partition-key-path '/temperatureStatus'
このシナリオでは、温度に関心があります。 パーティション キーとして temperatureStatus
を定義します。
別の Azure 関数を構築、構成、デプロイする
イベント ハブでは、メガバイト単位のデータ ストリームで開始し、ギガバイトまたはテラバイトまで拡張できます。 自動インフレ機能は、使用状況のニーズに合わせてスループット ユニットの数をスケーリングするために利用できる多くのオプションの 1 つです。
各関数のコンシューマー アプリケーションには、イベント ストリームの個別のビューがあります。 それらは、独自のペースとオフセットで個別にストリームを読み取ります。
このシナリオでは、例として 1 つのコンシューマー Azure 関数を作成します。 関数を作成する場合は、疎結合とスケーラビリティのための、独立したストレージ アカウントとバインドを使用して、独立させることをお勧めします。
az storage account create \
--resource-group $RESOURCE_GROUP \
--name $STORAGE_ACCOUNT"c" \
--sku Standard_LRS
az functionapp create \
--resource-group $RESOURCE_GROUP \
--name $FUNCTION_APP"-c"\
--storage-account $STORAGE_ACCOUNT"c" \
--consumption-plan-location $LOCATION \
--runtime java \
--functions-version 4
接続文字列を取得する
コンシューマー関数は、そのストレージ アカウントとイベント ハブを認識している必要があります。 また、処理されたイベントの書き込み先のデータベースも把握する必要があります。
AZURE_WEB_JOBS_STORAGE=$( \
az storage account show-connection-string \
--resource-group $RESOURCE_GROUP \
--name $STORAGE_ACCOUNT"c" \
--query connectionString \
--output tsv)
echo $AZURE_WEB_JOBS_STORAGE
COSMOS_DB_CONNECTION_STRING=$( \
az cosmosdb keys list \
--resource-group $RESOURCE_GROUP \
--name $COSMOS_DB_ACCOUNT \
--type connection-strings \
--query 'connectionStrings[0].connectionString' \
--output tsv)
echo $COSMOS_DB_CONNECTION_STRING
コマンド echo $EVENT_HUB_CONNECTION_STRING
を使うと、変数がまだ正しく設定されているかどうかを確認できます。 そうでない場合は、次のコマンドを再実行します。
EVENT_HUB_CONNECTION_STRING=$( \
az eventhubs eventhub authorization-rule keys list \
--resource-group $RESOURCE_GROUP \
--name $EVENT_HUB_AUTHORIZATION_RULE \
--eventhub-name $EVENT_HUB_NAME \
--namespace-name $EVENT_HUB_NAMESPACE \
--query primaryConnectionString \
--output tsv)
echo $EVENT_HUB_CONNECTION_STRING
これらの接続文字列は、Azure Functions アカウントのアプリケーション設定に保存する必要があります。
az functionapp config appsettings set \
--resource-group $RESOURCE_GROUP \
--name $FUNCTION_APP"-c" \
--settings \
AzureWebJobsStorage=$AZURE_WEB_JOBS_STORAGE \
EventHubConnectionString=$EVENT_HUB_CONNECTION_STRING \
CosmosDBConnectionString=$COSMOS_DB_CONNECTION_STRING
Note
運用環境では、Azure Key Vault のインスタンスを使用して接続文字列を格納および管理できます。
関数のアプリケーションを作成する
次の関数を作成する前に、正しいフォルダーに移動してください。
cd ..
mvn archetype:generate --batch-mode \
-DarchetypeGroupId=com.microsoft.azure \
-DarchetypeArtifactId=azure-functions-archetype \
-DappName=$FUNCTION_APP"-c" \
-DresourceGroup=$RESOURCE_GROUP \
-DappRegion=$LOCATION \
-DappServicePlanName=$LOCATION"plan" \
-DgroupId=com.learn \
-DartifactId=telemetry-functions-consumer
このコマンドを使うと、前回の演習と同様にアプリケーションが作成されます。 テスト ファイルを削除し、fetch-app-settings
コマンドを使って local.settings.file
を更新し、既存の Function.java
ファイルを置き換えます。
cd telemetry-functions-consumer
rm -r src/test
ローカルでの実行とデバッグ用にローカル設定を更新します。
func azure functionapp fetch-app-settings $FUNCTION_APP"-c"
次に、Function.java
ファイルを開き、その内容を次のコードに置き換えます。
package com.learn;
import com.learn.TelemetryItem.status;
import com.microsoft.azure.functions.annotation.FunctionName;
import com.microsoft.azure.functions.ExecutionContext;
import com.microsoft.azure.functions.OutputBinding;
import com.microsoft.azure.functions.annotation.Cardinality;
import com.microsoft.azure.functions.annotation.CosmosDBOutput;
import com.microsoft.azure.functions.annotation.EventHubTrigger;
public class Function {
@FunctionName("processSensorData")
public void processSensorData(
@EventHubTrigger(
name = "msg",
eventHubName = "", // blank because the value is included in the connection string
cardinality = Cardinality.ONE,
connection = "EventHubConnectionString")
TelemetryItem item,
@CosmosDBOutput(
name = "databaseOutput",
databaseName = "TelemetryDb",
collectionName = "TelemetryInfo",
connectionStringSetting = "CosmosDBConnectionString")
OutputBinding<TelemetryItem> document,
final ExecutionContext context) {
context.getLogger().info("Event hub message received: " + item.toString());
if (item.getPressure() > 30) {
item.setNormalPressure(false);
} else {
item.setNormalPressure(true);
}
if (item.getTemperature() < 40) {
item.setTemperatureStatus(status.COOL);
} else if (item.getTemperature() > 90) {
item.setTemperatureStatus(status.HOT);
} else {
item.setTemperatureStatus(status.WARM);
}
document.setValue(item);
}
}
TelemetryItem.java という名前のもう 1 つの新しいファイルを Function.java と同じ場所に作成し、次のコードを追加します。
package com.learn;
public class TelemetryItem {
private String id;
private double temperature;
private double pressure;
private boolean isNormalPressure;
private status temperatureStatus;
static enum status {
COOL,
WARM,
HOT
}
public TelemetryItem(double temperature, double pressure) {
this.temperature = temperature;
this.pressure = pressure;
}
public String getId() {
return id;
}
public double getTemperature() {
return temperature;
}
public double getPressure() {
return pressure;
}
@Override
public String toString() {
return "TelemetryItem={id=" + id + ",temperature="
+ temperature + ",pressure=" + pressure + "}";
}
public boolean isNormalPressure() {
return isNormalPressure;
}
public void setNormalPressure(boolean isNormal) {
this.isNormalPressure = isNormal;
}
public status getTemperatureStatus() {
return temperatureStatus;
}
public void setTemperatureStatus(status temperatureStatus) {
this.temperatureStatus = temperatureStatus;
}
}
イベント ハブでは、メッセージを受け取ると、イベントが生成されます。 イベントを受け取ると、processSensorData
関数が実行されます。 次に、イベント データを処理し、Azure Cosmos DB の出力バインディングを使って結果をデータベースに送信します。 ここでも TelemetryItem.java
クラスを使います。 TelemetryItem
オブジェクトは、このイベント ドリブン システムの参加者間の、コンシューマー ドリブン契約と見なすことができます。
ローカルで実行する
Azure Functions を使用すると、世界中からイベントを受け取ることができます。 もちろん、開発用コンピューターでイベントをローカルに受け取ることもできます。
mvn clean package
mvn azure-functions:run
ビルドと起動メッセージの後、関数が実行されると受信イベントが表示されます。
[2021-01-19T16:45:24.709Z] Executing 'Functions.processSensorData' (Reason='(null)', Id=87354afa-abf4-4963-bd44-0c1421048240)
[2021-01-19T16:45:24.712Z] Event hub message received: TelemetryItem={id=null,temperature=21.653044570769897,pressure=36.061288095436126}
[2021-01-19T16:45:24.712Z] Function "processSensorData" (Id: 87354afa-abf4-4963-bd44-0c1421048240) invoked by Java Worker
Azure portal で Azure Cosmos DB アカウントに移動します。 データを受け取ったら、[データ エクスプローラー] を選択し、[TelemetryInfo] を選択してから、[項目] を選択して表示します。
Azure でのデプロイ
次に、ワークロード全体をクラウドにシフトしてみましょう。 関数を Azure Functions にデプロイするには、Maven コマンド mvn azure-functions:deploy
を使います。 引き続き正しいリポジトリ telemetry-functions にいることをご確認ください。
mvn azure-functions:deploy
おめでとうございます。 データがイベント ハブに送信され、異なる独立関数でデータが使用されたことで、利用統計情報のシナリオ全体がデプロイされました。 関数は、データを処理した後、Azure Cosmos DB で作成されたデータベースに結果を格納します。 アプリケーションが定義済みの要件を満たしているかどうかを確認するには、どうすればよいでしょうか。 監視を使います。