Microsoft Fabric REST API を使用して Spark ジョブ定義を作成および更新する方法
Microsoft Fabric REST API は、Fabric 項目の CRUD 操作用のサービス エンドポイントを提供します。 このチュートリアルでは、Spark ジョブ定義成果物を作成および更新する方法のエンド ツー エンドのシナリオの手順について説明します。 3 つの大まかなステップが含まれます。
- 何らかの初期状態で Spark ジョブ定義項目を作成します
- メインの定義ファイルとその他の lib ファイルをアップロードします
- メインの定義ファイルとその他の lib ファイルの OneLake URL を使って Spark ジョブ定義項目を更新します
前提条件
- Fabric REST API にアクセスするには、Microsoft Entra トークンが必要です。 トークンを取得するには、MSAL ライブラリをお勧めします。 詳細については「MSAL での認証フローのサポート」を参照してください。
- OneLake API にアクセスするには、ストレージ トークンが必要です。 詳細については、Python 用 MSAL に関するページを参照してください。
初期状態で Spark ジョブ定義項目を作成する
Microsoft Fabric REST API で、Fabric 項目の CRUD 操作用の統合エンドポイントを定義します。 エンドポイントが https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items
です。
項目の詳細は、要求本文内で指定します。 Spark ジョブ定義項目を作成するための要求本文の例を次に示します。
{
"displayName": "SJDHelloWorld",
"type": "SparkJobDefinition",
"definition": {
"format": "SparkJobDefinitionV1",
"parts": [
{
"path": "SparkJobDefinitionV1.json",
"payload":"eyJleGVjdXRhYmxlRmlsZSI6bnVsbCwiZGVmYXVsdExha2Vob3VzZUFydGlmYWN0SWQiOiIiLCJtYWluQ2xhc3MiOiIiLCJhZGRpdGlvbmFsTGFrZWhvdXNlSWRzIjpbXSwicmV0cnlQb2xpY3kiOm51bGwsImNvbW1hbmRMaW5lQXJndW1lbnRzIjoiIiwiYWRkaXRpb25hbExpYnJhcnlVcmlzIjpbXSwibGFuZ3VhZ2UiOiIiLCJlbnZpcm9ubWVudEFydGlmYWN0SWQiOm51bGx9",
"payloadType": "InlineBase64"
}
]
}
}
この例の Spark ジョブ定義項目の名前は SJDHelloWorld
です。 payload
フィールドは、詳細な設定の base64 でエンコードされた内容です。デコード後の内容は次のとおりです。
{
"executableFile":null,
"defaultLakehouseArtifactId":"",
"mainClass":"",
"additionalLakehouseIds":[],
"retryPolicy":null,
"commandLineArguments":"",
"additionalLibraryUris":[],
"language":"",
"environmentArtifactId":null
}
詳細設定のエンコードとデコードのために、次の 2 つのヘルパー関数があります。
import base64
def json_to_base64(json_data):
# Serialize the JSON data to a string
json_string = json.dumps(json_data)
# Encode the JSON string as bytes
json_bytes = json_string.encode('utf-8')
# Encode the bytes as Base64
base64_encoded = base64.b64encode(json_bytes).decode('utf-8')
return base64_encoded
def base64_to_json(base64_data):
# Decode the Base64-encoded string to bytes
base64_bytes = base64_data.encode('utf-8')
# Decode the bytes to a JSON string
json_string = base64.b64decode(base64_bytes).decode('utf-8')
# Deserialize the JSON string to a Python dictionary
json_data = json.loads(json_string)
return json_data
Spark ジョブ定義項目を作成するコード スニペットを次に示します。
import requests
bearerToken = "breadcrumb"; # replace this token with the real AAD token
headers = {
"Authorization": f"Bearer {bearerToken}",
"Content-Type": "application/json" # Set the content type based on your request
}
payload = "eyJleGVjdXRhYmxlRmlsZSI6bnVsbCwiZGVmYXVsdExha2Vob3VzZUFydGlmYWN0SWQiOiIiLCJtYWluQ2xhc3MiOiIiLCJhZGRpdGlvbmFsTGFrZWhvdXNlSWRzIjpbXSwicmV0cnlQb2xpY3kiOm51bGwsImNvbW1hbmRMaW5lQXJndW1lbnRzIjoiIiwiYWRkaXRpb25hbExpYnJhcnlVcmlzIjpbXSwibGFuZ3VhZ2UiOiIiLCJlbnZpcm9ubWVudEFydGlmYWN0SWQiOm51bGx9"
# Define the payload data for the POST request
payload_data = {
"displayName": "SJDHelloWorld",
"Type": "SparkJobDefinition",
"definition": {
"format": "SparkJobDefinitionV1",
"parts": [
{
"path": "SparkJobDefinitionV1.json",
"payload": payload,
"payloadType": "InlineBase64"
}
]
}
}
# Make the POST request with Bearer authentication
sjdCreateUrl = f"https://api.fabric.microsoft.com//v1/workspaces/{workspaceId}/items"
response = requests.post(sjdCreateUrl, json=payload_data, headers=headers)
メイン定義ファイルと他の lib ファイルをアップロードする
OneLake にファイルをアップロードするには、ストレージ トークンが必要です。 ストレージ トークンを取得するヘルパー関数を次に示します。
import msal
def getOnelakeStorageToken():
app = msal.PublicClientApplication(
"{client id}", # this filed should be the client id
authority="https://login.microsoftonline.com/microsoft.com")
result = app.acquire_token_interactive(scopes=["https://storage.azure.com/.default"])
print(f"Successfully acquired AAD token with storage audience:{result['access_token']}")
return result['access_token']
これで Spark ジョブ定義項目が作成されました。これを実行できるようにするには、メイン定義ファイルと必要なプロパティを設定する必要があります。 この SJD 項目のファイルをアップロードするためのエンドポイントは https://onelake.dfs.fabric.microsoft.com/{workspaceId}/{sjdartifactid}
です。 前のステップと同じ "workspaceId" を使う必要があります。"sjdartifactid" の値は、前のステップの応答本文の中に見つけることができます。 メイン定義ファイルを設定するコード スニペットを次に示します。
import requests
# three steps are required: create file, append file, flush file
onelakeEndPoint = "https://onelake.dfs.fabric.microsoft.com/workspaceId/sjdartifactid"; # replace the id of workspace and artifact with the right one
mainExecutableFile = "main.py"; # the name of the main executable file
mainSubFolder = "Main"; # the sub folder name of the main executable file. Don't change this value
onelakeRequestMainFileCreateUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?resource=file" # the url for creating the main executable file via the 'file' resource type
onelakePutRequestHeaders = {
"Authorization": f"Bearer {onelakeStorageToken}", # the storage token can be achieved from the helper function above
}
onelakeCreateMainFileResponse = requests.put(onelakeRequestMainFileCreateUrl, headers=onelakePutRequestHeaders)
if onelakeCreateMainFileResponse.status_code == 201:
# Request was successful
print(f"Main File '{mainExecutableFile}' was successfully created in onelake.")
# with previous step, the main executable file is created in OneLake, now we need to append the content of the main executable file
appendPosition = 0;
appendAction = "append";
### Main File Append.
mainExecutableFileSizeInBytes = 83; # the size of the main executable file in bytes
onelakeRequestMainFileAppendUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?position={appendPosition}&action={appendAction}";
mainFileContents = "filename = 'Files/' + Constant.filename; tablename = 'Tables/' + Constant.tablename"; # the content of the main executable file, please replace this with the real content of the main executable file
mainExecutableFileSizeInBytes = 83; # the size of the main executable file in bytes, this value should match the size of the mainFileContents
onelakePatchRequestHeaders = {
"Authorization": f"Bearer {onelakeStorageToken}",
"Content-Type" : "text/plain"
}
onelakeAppendMainFileResponse = requests.patch(onelakeRequestMainFileAppendUrl, data = mainFileContents, headers=onelakePatchRequestHeaders)
if onelakeAppendMainFileResponse.status_code == 202:
# Request was successful
print(f"Successfully Accepted Main File '{mainExecutableFile}' append data.")
# with previous step, the content of the main executable file is appended to the file in OneLake, now we need to flush the file
flushAction = "flush";
### Main File flush
onelakeRequestMainFileFlushUrl = f"{onelakeEndPoint}/{mainSubFolder}/{mainExecutableFile}?position={mainExecutableFileSizeInBytes}&action={flushAction}"
print(onelakeRequestMainFileFlushUrl)
onelakeFlushMainFileResponse = requests.patch(onelakeRequestMainFileFlushUrl, headers=onelakePatchRequestHeaders)
if onelakeFlushMainFileResponse.status_code == 200:
print(f"Successfully Flushed Main File '{mainExecutableFile}' contents.")
else:
print(onelakeFlushMainFileResponse.json())
必要な場合は、同じプロセスで他の lib ファイルをアップロードします。
メイン定義ファイルと他の lib ファイルの OneLake URL を使用して Spark ジョブ定義項目を更新する
ここまでで、何らかの初期状態で Spark ジョブ定義項目を作成し、メイン定義ファイルと他の lib ファイルをアップロードしました。最後のステップでは、Spark ジョブ定義項目を更新して、メイン定義ファイルと他の lib ファイルの URL プロパティを設定します。 Spark ジョブ定義項目を更新するためのエンドポイントは https://api.fabric.microsoft.com/v1/workspaces/{workspaceId}/items/{sjdartifactid}
です。 前のステップと同じ "workspaceId" と "sjdartifactid" を使う必要があります。 Spark ジョブ定義項目を更新するコード スニペットを次に示します。
mainAbfssPath = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{sjdartifactid}/Main/{mainExecutableFile}" # the workspaceId and sjdartifactid are the same as previous steps, the mainExecutableFile is the name of the main executable file
libsAbfssPath = f"abfss://{workspaceId}@onelake.dfs.fabric.microsoft.com/{sjdartifactid}/Libs/{libsFile}" # the workspaceId and sjdartifactid are the same as previous steps, the libsFile is the name of the libs file
defaultLakehouseId = 'defaultLakehouseid'; # replace this with the real default lakehouse id
updateRequestBodyJson = {
"executableFile":mainAbfssPath,
"defaultLakehouseArtifactId":defaultLakehouseId,
"mainClass":"",
"additionalLakehouseIds":[],
"retryPolicy":None,
"commandLineArguments":"",
"additionalLibraryUris":[libsAbfssPath],
"language":"Python",
"environmentArtifactId":None}
# Encode the bytes as a Base64-encoded string
base64EncodedUpdateSJDPayload = json_to_base64(updateRequestBodyJson)
# Print the Base64-encoded string
print("Base64-encoded JSON payload for SJD Update:")
print(base64EncodedUpdateSJDPayload)
# Define the API URL
updateSjdUrl = f"https://api.fabric.microsoft.com//v1/workspaces/{workspaceId}/items/{sjdartifactid}/updateDefinition"
updatePayload = base64EncodedUpdateSJDPayload
payloadType = "InlineBase64"
path = "SparkJobDefinitionV1.json"
format = "SparkJobDefinitionV1"
Type = "SparkJobDefinition"
# Define the headers with Bearer authentication
bearerToken = "breadcrumb"; # replace this token with the real AAD token
headers = {
"Authorization": f"Bearer {bearerToken}",
"Content-Type": "application/json" # Set the content type based on your request
}
# Define the payload data for the POST request
payload_data = {
"displayName": "sjdCreateTest11",
"Type": Type,
"definition": {
"format": format,
"parts": [
{
"path": path,
"payload": updatePayload,
"payloadType": payloadType
}
]
}
}
# Make the POST request with Bearer authentication
response = requests.post(updateSjdUrl, json=payload_data, headers=headers)
if response.status_code == 200:
print("Successfully updated SJD.")
else:
print(response.json())
print(response.status_code)
プロセス全体をまとめると、Spark ジョブ定義項目を作成および更新するには、Fabric REST API と OneLake API の両方が必要です。 Fabric REST API は、Spark ジョブ定義項目の作成と更新に使います。OneLake API は、メイン定義ファイルと他の lib ファイルのアップロードに使います。 メイン定義ファイルと他の lib ファイルを最初に OneLake にアップロードします。 次に、メイン定義ファイルと他の lib ファイルの URL プロパティを Spark ジョブ定義項目に設定します。