[アーティクル] 10/14/2024
4 人の共同作成者
フィードバック
この記事の内容
重要
Azure Arc によって実現されている Azure IoT Operations プレビューは、現在プレビュー 段階です。 運用環境ではこのプレビュー ソフトウェアを使わないでください。
Azure IoT Operations の一般公開リリースが提供されたときには、新規インストールをデプロイすることが必要になります。 プレビュー インストールからのアップグレードはできません。
ベータ版、プレビュー版、または一般提供としてまだリリースされていない Azure の機能に適用される法律条項については、「Microsoft Azure プレビューの追加使用条件 」を参照してください。
データフローとは、データがソースから宛先までたどるパスであり、必要に応じて変換が行われます。 データフローを構成するには、Dataflow カスタム リソースを作成するか、Azure IoT Operations Studio ポータルを使用します。 データフローは次の 3 つの部分で構成されます: ソース 、変換 、宛先
ソースと宛先を定義するには、データフロー エンドポイントを構成する必要があります。 変換は省略可能であり、データのエンリッチメント、データのフィルター処理、データの別のフィールドへのマッピングなどの操作を含めることができます。
この記事では、ソース、変換、宛先を含め、例を使用してデータフローを作成する方法について説明します。
前提条件
データフローを作成する
データフロー エンドポイントを作成したら、それらを使用してデータフローを作成できます。 データフローは、ソース、変換、宛先の 3 つの部分で構成されていることを思い出してください。
操作エクスペリエンス ポータルでデータフローを作成するには、[データフロー] >[データフローを作成する] を選択します。
データフロー構成の全体的な構造は、次のとおりです。
apiVersion: connectivity.iotoperations.azure.com/v1beta1
kind: Dataflow
metadata:
name: my-dataflow
namespace: azure-iot-operations
spec:
profileRef: default
mode: Enabled
operations:
- operationType: Source
sourceSettings:
# See source configuration section
- operationType: BuiltInTransformation
builtInTransformationSettings:
# See transformation configuration section
- operationType: Destination
destinationSettings:
# See destination configuration section
データフローの操作の種類を構成する方法については、以下のセクションを参照してください。
データフローのソースを構成するには、エンドポイント参照とデータ ソースを指定します。 エンドポイントのデータ ソースの一覧を指定できます。
ソースとして資産を使用する
資産 をデータフローのソースとして使用できます。 これは、操作エクスペリエンス ポータルでのみ利用可能です。
[ソースの詳細] で、[資産] を選択します。
ソース エンドポイントとして使用する資産を選択します。
[続行] を選択します。
選択した資産のデータポイントの一覧が表示されます。
[適用] を選択して、資産をソース エンドポイントとして使用します。
資産のソースとしての構成は、操作エクスペリエンス ポータルでのみ可能です。
ソースとして MQTT を使用する
[ソースの詳細] で、[MQTT] を選択します。
受信メッセージをリッスンする MQTT トピック を入力します。
ドロップダウン リストからメッセージ スキーマ を選択するか、新しいスキーマをアップロードします。 ソース データに省略可能なフィールドや、さまざまな型を持つフィールドがある場合は、一貫性を保つために逆シリアル化スキーマを指定します。 たとえば、データに、すべてのメッセージに存在するわけではないフィールドがあるとします。 スキーマがないと、これらのフィールドは空の値を持つため、変換ではこれらを処理できません。 スキーマがあれば、既定値を指定することも、フィールドを無視することもできます。
適用 を選択します。
たとえば、1 つの MQTT エンドポイントと 2 つの MQTT トピック フィルターを使ってソースを構成するには、次の構成を使用します。
sourceSettings:
endpointRef: mq
dataSources:
- thermostats/+/telemetry/temperature/#
- humidifiers/+/telemetry/humidity/#
dataSources
ではエンドポイント構成を変更せずに MQTT または Kafka トピックを指定できるため、トピックが異なる場合でも複数のデータフローに対してエンドポイントを再利用できます。 詳細については、データフロー エンドポイントの再利用 に関する記事を参照してください。
データを逆シリアル化するスキーマを指定する
ソース データに省略可能なフィールドや、さまざまな型を持つフィールドがある場合は、一貫性を保つために逆シリアル化スキーマを指定します。 たとえば、データに、すべてのメッセージに存在するわけではないフィールドがあるとします。 スキーマがないと、これらのフィールドは空の値を持つため、変換ではこれらを処理できません。 スキーマがあれば、既定値を指定することも、フィールドを無視することもできます。
spec:
operations:
- operationType: Source
sourceSettings:
serializationFormat: Json
schemaRef: aio-sr://exampleNamespace/exampleAvroSchema:1.0.0
スキーマを指定するには、ファイルを作成し、スキーマ レジストリに格納します。
{
"type": "record",
"name": "Temperature",
"fields": [
{"name": "deviceId", "type": "string"},
{"name": "temperature", "type": "float"}
]
}
Note
サポートされているシリアル化形式は JSON のみです。 スキーマは省略可能です。
スキーマ レジストリの詳細については、「メッセージ スキーマについて 」を参照してください。
共有サブスクリプション
MQTT ソースで共有サブスクリプションを使用するには、$shared/<subscription-group>/<topic>
の形式で共有サブスクリプション トピックを指定します。
sourceSettings:
dataSources:
- $shared/myGroup/thermostats/+/telemetry/temperature/#
Note
データフロー プロファイル のインスタンス数が 1 を超える場合は、共有サブスクリプション トピックを使用する必要があります。
変換操作では、宛先に送信する前にソースからのデータを変換できます。 変換は省略可能です。 データを変更する必要がない場合は、データフロー構成に変換操作を含めないでください。 複数の変換は、構成で指定した順序に関係なく、段階的に連結されます。 ステージの順序は常に次のようになります。
エンリッチ : 一致するデータセットと条件を指定して、ソース データにデータを追加します。
フィルター : 条件に基づいてデータをフィルター処理します。
マップ : 省略可能な変換を使用して、あるフィールドから別のフィールドにデータを移動します。
操作エクスペリエンス ポータルで、[データフロー] >[変換の追加 (省略可能)] を選びます。
builtInTransformationSettings:
datasets:
# ...
filter:
# ...
map:
# ...
エンリッチ: 参照データを追加する
データをエンリッチするには、Azure IoT Operations の分散状態ストア (DSS) の参照データセットを使用できます。 データセットは、条件に基づいてソース データにさらにデータを追加するために使用されます。 条件は、データセット内のフィールドと一致するソース データ内のフィールドとして指定されます。
分散状態ストアのキー名は、データフロー構成のデータセットに対応します。
現在、エンリッチ操作は、操作エクスペリエンス ポータルでは使用できません。
たとえば、ソース データの deviceId
フィールドを使用して、データセットの asset
フィールドと一致させることができます。
builtInTransformationSettings:
datasets:
- key: assetDataset
inputs:
- $source.deviceId # ------------- $1
- $context(assetDataset).asset # - $2
expression: $1 == $2
データセットに asset
フィールドを持つレコードがある場合は、次のようになります。
{
"asset": "thermostat1",
"location": "room1",
"manufacturer": "Contoso"
}
thermostat1
と一致する deviceId
フィールドを持つソースのデータには、filter
と map
のステージで使用できる location
と manufacturer
のフィールドがあります。
DSS セット ツール サンプル を使用して、DSS にサンプル データを読み込むことができます。
条件構文の詳細については、「データフローを使用してデータをエンリッチする 」と「データフローを使用したデータの変換 」に関する記事を参照してください。
フィルター: 条件に基づいてデータをフィルター処理する
条件に基づいてデータをフィルター処理するには、filter
ステージを使用できます。 条件は、値と一致するソース データ内のフィールドとして指定されます。
[変換 (省略可能)] で、[フィルター] >[追加] を選びます。
データセットに含めるデータポイントを選択します。
フィルター条件と説明を追加します。
適用 を選択します。
たとえば、ソース データの temperature
フィールドを使用してデータをフィルター処理できます。
builtInTransformationSettings:
filter:
- inputs:
- temperature ? $last # - $1
expression: "$1 > 20"
temperature
フィールドが 20 より大きい場合、データは次のステージに渡されます。 temperature
フィールドが 20 以下の場合、データはフィルター処理されます。
マップ: あるフィールドから別のフィールドにデータを移動する
省略可能な変換を使用してデータを別のフィールドにマップするには、map
操作を使用できます。 変換は、ソース データのフィールドを使用する数式として指定されます。
操作エクスペリエンス ポータルでは、現在、コンピューティング 変換を使用したマッピングがサポートされています。
[変換 (省略可能)] で、[コンピューティング] >[追加] を選びます。
必須フィールドと式を入力します。
適用 を選択します。
たとえば、ソース データの temperature
フィールドを使用して温度を摂氏に変換し、それを temperatureCelsius
フィールドに格納できます。 コンテキスト化データセットの location
フィールドを使用してソース データをエンリッチすることもできます。
builtInTransformationSettings:
map:
- inputs:
- temperature # - $1
output: temperatureCelsius
expression: "($1 - 32) * 5/9"
- inputs:
- $context(assetDataset).location
output: location
詳細については、「データフローを使用してデータをマッピングする 」と「データフローを使用したデータの変換 」に関する記事を参照してください。
スキーマに従ってデータをシリアル化する
データを宛先に送信する前にシリアル化する場合は、スキーマとシリアル化形式を指定する必要があります。 それ以外の場合は、データは推論された型を使用して JSON でシリアル化されます。 Microsoft Fabric や Azure Data Lake などのストレージ エンドポイントでは、データの一貫性を確保するためにスキーマが必要です。
宛先データフロー エンドポイントを追加するときに、出力 スキーマを指定します。
builtInTransformationSettings:
serializationFormat: Parquet
schemaRef: aio-sr://<NAMESPACE>/<SCHEMA>:<VERSION>
スキーマを指定するには、スキーマ定義を使用して Schema カスタム リソースを作成します。
スキーマ レジストリの詳細については、「メッセージ スキーマについて 」を参照してください。
{
"type": "record",
"name": "Temperature",
"fields": [
{"name": "deviceId", "type": "string"},
{"name": "temperatureCelsius", "type": "float"}
{"name": "location", "type": "string"}
]
}
サポートされているシリアル化形式は、JSON、Parquet、Delta です。
データフローの宛先を構成するには、エンドポイント参照とデータ宛先を指定します。 MQTT トピックまたは Kafka トピックである、エンドポイントのデータ宛先の一覧を指定できます。
宛先として使用するデータフロー エンドポイントを選択します。
[続行] を選択して、宛先を構成します。
宛先の種類に基づいてマッピングの詳細を追加します。
たとえば、静的 MQTT トピックと先ほど作成した MQTT エンドポイントを使用して宛先を構成するには、次の構成を使用します。
destinationSettings:
endpointRef: mq
dataDestination: factory
Microsoft Fabric などのストレージ エンドポイントを作成済みの場合は、データ宛先フィールドを使用してテーブルまたはコンテナー名を指定します。
destinationSettings:
endpointRef: adls
dataDestination: telemetryTable
例
次の例は、ソースと宛先に MQTT エンドポイントを使用したデータフロー構成です。 ソースは、MQTT トピック thermostats/+/telemetry/temperature/#
および humidifiers/+/telemetry/humidity/#
からのデータをフィルターします。 変換によって温度が華氏に変換され、温度が 100000 未満のデータにフィルターされます。 宛先が MQTT トピック factory
にデータを送信します。
apiVersion: connectivity.iotoperations.azure.com/v1beta1
kind: Dataflow
metadata:
name: my-dataflow
namespace: azure-iot-operations
spec:
profileRef: default
mode: Enabled
operations:
- operationType: Source
sourceSettings:
endpointRef: mq
dataSources:
- thermostats/+/telemetry/temperature/#
- humidifiers/+/telemetry/humidity/#
- operationType: builtInTransformation
builtInTransformationSettings:
filter:
- inputs:
- 'temperature.Value'
- '"Tag 10".Value'
expression: "$1*$2<100000"
map:
- inputs:
- '*'
output: '*'
- inputs:
- temperature.Value
output: TemperatureF
expression: cToF($1)
- inputs:
- '"Tag 10".Value'
output: 'Tag 10'
- operationType: Destination
destinationSettings:
endpointRef: mq
dataDestination: factory
データフローが機能していることを確認する
「チュートリアル: Azure Event Grid への双方向 MQTT ブリッジ 」に従って、データフローが機能していることを確認します。
データフロー構成のエクスポート
データフロー構成をエクスポートするには、操作エクスペリエンス ポータルを使用するか、Dataflow カスタム リソースをエクスポートします。
エクスポートするデータフローを選択し、ツール バーから [エクスポート] を選びます。
kubectl get dataflow my-dataflow -o yaml > my-dataflow.yaml