Initiating an Oozie workflow with PowerShell
From: Developing big data solutions on Microsoft Azure HDInsight
The Azure PowerShell module does not provide a cmdlet specifically for initiating Oozie jobs. However, you can use the Invoke-RestMethod cmdlet to submit the job as an HTTP request to the Oozie application on the HDInsight cluster. This technique requires that your PowerShell code constructs the appropriate XML job configuration body for the request, and you will need to decipher the JSON responses returned by the cluster to determine job status and results.
The following code example shows a PowerShell script that uploads the files used by an Oozie workflow, constructs an Oozie job request to start the workflow, submits the request to a cluster, and displays status information as the workflow progresses. The example is deliberately kept simple by including the credentials in the script so that you can copy and paste the code while you are experimenting with HDInsight. In a production system you must protect credentials, as described in “Securing credentials in scripts and applications” in the Security section of this guide.
$clusterName = "cluster-name"
$storageAccountName = "storage-account-name"
$containerName = "container-name"
$clusterUser = "user-name"
$clusterPwd = "password"
$passwd = ConvertTo-SecureString $clusterPwd –AsPlainText -Force
$creds = New-Object System.Management.Automation.PSCredential($clusterUser, $passwd)
$storageUri = "wasb://$containerName@$storageAccountName.blob.core.windows.net"
$ooziePath = "/mydata/oozieworkflow"
$tableName = "MyTable"
$tableFolder = "/mydata/MyTable"
# Find the local folder containing the workflow files.
$thisfolder = Split-Path -parent $MyInvocation.MyCommand.Definition
$localfolder = "$thisfolder\oozieworkflow"
# Upload workflow files.
$destfolder = "mydata/oozieworkflow"
$storageAccountKey = (Get-AzureStorageKey -StorageAccountName $storageAccountName).Primary
$blobContext = New-AzureStorageContext -StorageAccountName $storageAccountName -StorageAccountKey $storageAccountKey
$files = Get-ChildItem $localFolder
foreach($file in $files)
{
$fileName = "$localFolder\$file"
$blobName = "$destfolder/$file"
write-host "copying $fileName to $blobName"
Set-AzureStorageBlobContent -File $filename -Container $containerName -Blob $blobName -Context $blobContext -Force
}
write-host "All files in $localFolder uploaded to $containerName!"
# Oozie job properties.
$OoziePayload = @"
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<property>
<name>nameNode</name>
<value>$storageUri</value>
</property>
<property>
<name>jobTracker</name>
<value>jobtrackerhost:9010</value>
</property>
<property>
<name>queueName</name>
<value>default</value>
</property>
<property>
<name>oozie.use.system.libpath</name>
<value>true</value>
</property>
<property>
<name>TableName</name>
<value>$tableName</value>
</property>
<property>
<name>TableFolder</name>
<value>$tableFolder</value>
</property>
<property>
<name>user.name</name>
<value>$clusterUser</value>
</property>
<property>
<name>oozie.wf.application.path</name>
<value>$ooziePath</value>
</property>
</configuration>
"@
# Create Oozie job.
$clusterUriCreateJob = "https://$clusterName.azurehdinsight.net:443/oozie/v2/jobs"
$response = Invoke-RestMethod -Method Post
-Uri $clusterUriCreateJob
-Credential $creds
-Body $OoziePayload
-ContentType "application/xml" -OutVariable $OozieJobName
$jsonResponse = ConvertFrom-Json(ConvertTo-Json -InputObject $response)
$oozieJobId = $jsonResponse[0].("id")
Write-Host "Oozie job id is $oozieJobId..."
# Start Oozie job.
Write-Host "Starting the Oozie job $oozieJobId..."
$clusterUriStartJob = "https://$clusterName.azurehdinsight.net:443/oozie/v2/job/" + $oozieJobId + "?action=start"
$response = Invoke-RestMethod -Method Put -Uri $clusterUriStartJob -Credential $creds | Format-Table -HideTableHeaders
# Get job status.
Write-Host "Waiting until the job metadata is populated in the Oozie metastore..."
Start-Sleep -Seconds 10
Write-Host "Getting job status and waiting for the job to complete..."
$clusterUriGetJobStatus = "https://$clusterName.azurehdinsight.net:443/oozie/v2/job/" + $oozieJobId + "?show=info"
$response = Invoke-RestMethod -Method Get -Uri $clusterUriGetJobStatus -Credential $creds
$jsonResponse = ConvertFrom-Json (ConvertTo-Json -InputObject $response)
$JobStatus = $jsonResponse[0].("status")
while($JobStatus -notmatch "SUCCEEDED|KILLED")
{
Write-Host "$(Get-Date -format 'G'): $oozieJobId is $JobStatus ...waiting for the job to complete..."
Start-Sleep -Seconds 10
$response = Invoke-RestMethod -Method Get -Uri $clusterUriGetJobStatus -Credential $creds
$jsonResponse = ConvertFrom-Json (ConvertTo-Json -InputObject $response)
$JobStatus = $jsonResponse[0].("status")
}
Write-Host "$(Get-Date -format 'G'): $oozieJobId $JobStatus!"
Note
Due to page width limitations we have broken some of the commands in the code above across several lines for clarity. In your code each command must be on a single, unbroken line.
The request to submit the Oozie job consists of an XML configuration document that contains the properties to be used by the workflow. These properties include configuration settings for Oozie, and any parameters that are required by actions defined in the Oozie job. In this example the properties include parameters named TableName and TableFolder that are used by the following action in the workflow.xml file uploaded in the oozieworkflow folder.
<action name='CreateTable'>
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>${jobTracker}</job-tracker>
<name-node>${nameNode}</name-node>
<configuration>
<property>
<name>mapred.job.queue.name</name>
<value>default</value>
</property>
<property>
<name>oozie.hive.defaults</name>
<value>hive-default.xml</value>
</property>
</configuration>
<script>CreateTable.q</script>
<param>TABLE_NAME=${TableName}</param>
<param>LOCATION=${TableFolder}</param>
</hive>
<ok to="end"/>
<error to="fail"/>
</action>
This action passes the parameter values to the CreateTable.q file, also in the oozieworkflow folder, which is shown in the following code example.
DROP TABLE IF EXISTS ${TABLE_NAME};
CREATE EXTERNAL TABLE ${TABLE_NAME} (id INT, val STRING)
ROW FORMAT DELIMITED FIELDS TERMINATED BY ' '
STORED AS TEXTFILE LOCATION '${LOCATION}';