Använda Python User Defined Functions (UDF) med Apache Hive och Apache Pig i HDInsight
Lär dig hur du använder användardefinierade Python-funktioner (UDF) med Apache Hive och Apache Pig i Apache Hadoop i Azure HDInsight.
Python på HDInsight
Python2.7
installeras som standard på HDInsight 3.0 och senare. Apache Hive kan användas med den här versionen av Python för dataströmbearbetning. Dataströmbearbetning använder STDOUT och STDIN för att skicka data mellan Hive och UDF.
HDInsight innehåller även Jython, som är en Python-implementering skriven i Java. Jython körs direkt på den virtuella Java-datorn och använder inte strömning. Jython är den rekommenderade Python-tolken när du använder Python med Pig.
Förutsättningar
- Ett Hadoop-kluster i HDInsight. Se Kom igång med HDInsight i Linux.
- En SSH-klient. Mer information finns i Ansluta till HDInsight (Apache Hadoop) med hjälp av SSH.
- URI-schemat för dina klusters primära lagring. Detta skulle vara
wasb://
för Azure Storage,abfs://
för Azure Data Lake Storage Gen2 eller adl:// för Azure Data Lake Storage Gen1. Om säker överföring är aktiverad för Azure Storage skulle URI:n vara wasbs://. Se även säker överföring. - Möjlig ändring av lagringskonfigurationen. Se Lagringskonfiguration om du använder lagringskontotyp
BlobStorage
. - Valfritt. Om du planerar att använda PowerShell behöver du AZ-modulen installerad.
Kommentar
Lagringskontot som användes i den här artikeln var Azure Storage med säker överföring aktiverat och används därför wasbs
i hela artikeln.
Storage-konfiguration
Ingen åtgärd krävs om lagringskontot som används är av typen Storage (general purpose v1)
eller StorageV2 (general purpose v2)
. Processen i den här artikeln ger utdata till minst /tezstaging
. En standard hadoop-konfiguration innehåller i fs.azure.page.blob.dir
konfigurationsvariabeln /tezstaging
i core-site.xml
för tjänsten HDFS
. Den här konfigurationen gör att utdata till katalogen blir sidblobar, som inte stöds för lagringskontotyp BlobStorage
. Om du vill använda BlobStorage
för den här artikeln tar du bort /tezstaging
från konfigurationsvariabeln fs.azure.page.blob.dir
. Konfigurationen kan nås från Ambari-användargränssnittet. Annars får du felmeddelandet: Page blob is not supported for this account type.
Varning
Stegen i det här dokumentet gör följande antaganden:
- Du skapar Python-skripten i din lokala utvecklingsmiljö.
- Du laddar upp skripten till HDInsight med antingen
scp
kommandot eller det angivna PowerShell-skriptet.
Om du vill använda Azure Cloud Shell (bash) för att arbeta med HDInsight måste du:
- Skapa skripten i Cloud Shell-miljön.
- Använd
scp
för att ladda upp filerna från Cloud Shell till HDInsight. - Använd
ssh
från Cloud Shell för att ansluta till HDInsight och köra exemplen.
Apache Hive UDF
Python kan användas som en UDF från Hive via HiveQL-instruktionen TRANSFORM
. Följande HiveQL anropar hiveudf.py
till exempel filen som lagras i azure storage-standardkontot för klustret.
add file wasbs:///hiveudf.py;
SELECT TRANSFORM (clientid, devicemake, devicemodel)
USING 'python hiveudf.py' AS
(clientid string, phoneLabel string, phoneHash string)
FROM hivesampletable
ORDER BY clientid LIMIT 50;
Här är vad det här exemplet gör:
- -
add file
instruktionen i början av filen läggerhiveudf.py
till filen i den distribuerade cachen, så att den är tillgänglig för alla noder i klustret. - Instruktionen
SELECT TRANSFORM ... USING
väljer data frånhivesampletable
. Den skickar även värdena clientid, devicemake och devicemodel till skriptethiveudf.py
. - Satsen
AS
beskriver fälten som returneras frånhiveudf.py
.
Skapa fil
Skapa en textfil med namnet hiveudf.py
i utvecklingsmiljön. Använd följande kod som innehållet i filen:
#!/usr/bin/env python
import sys
import string
import hashlib
while True:
line = sys.stdin.readline()
if not line:
break
line = string.strip(line, "\n ")
clientid, devicemake, devicemodel = string.split(line, "\t")
phone_label = devicemake + ' ' + devicemodel
print "\t".join([clientid, phone_label, hashlib.md5(phone_label).hexdigest()])
Det här skriptet utför följande åtgärder:
- Läser en datarad från STDIN.
- Det avslutande newline-tecknet tas bort med .
string.strip(line, "\n ")
- När dataströmbearbetning utförs innehåller en enda rad alla värden med ett fliktecken mellan varje värde. Så
string.split(line, "\t")
kan användas för att dela indata på varje flik och returnera bara fälten. - När bearbetningen är klar måste utdata skrivas till STDOUT som en enda rad, med en flik mellan varje fält. Exempel:
print "\t".join([clientid, phone_label, hashlib.md5(phone_label).hexdigest()])
- Loopen
while
upprepas tills ingetline
läsfel har lästs.
Skriptutdata är en sammanlänkning av indatavärdena för devicemake
och devicemodel
, och en hash av det sammanfogade värdet.
Ladda upp fil (gränssnitt)
Följande kommando ersätter sshuser
med det faktiska användarnamnet om det skiljer sig. Ersätt mycluster
med det faktiska klusternamnet. Kontrollera att arbetskatalogen är där filen finns.
Använd
scp
för att kopiera filerna till ditt HDInsight-kluster. Redigera och ange kommandot:scp hiveudf.py sshuser@mycluster-ssh.azurehdinsight.net:
Använd SSH för att ansluta till klustret. Redigera och ange kommandot:
ssh sshuser@mycluster-ssh.azurehdinsight.net
Från SSH-sessionen lägger du till Python-filerna som laddades upp tidigare till lagringen för klustret.
hdfs dfs -put hiveudf.py /hiveudf.py
Använda Hive UDF (shell)
Om du vill ansluta till Hive använder du följande kommando från den öppna SSH-sessionen:
beeline -u 'jdbc:hive2://headnodehost:10001/;transportMode=http'
Det här kommandot startar Beeline-klienten.
Ange följande fråga i kommandotolken
0: jdbc:hive2://headnodehost:10001/>
:add file wasbs:///hiveudf.py; SELECT TRANSFORM (clientid, devicemake, devicemodel) USING 'python hiveudf.py' AS (clientid string, phoneLabel string, phoneHash string) FROM hivesampletable ORDER BY clientid LIMIT 50;
När den sista raden har angetts ska jobbet starta. När jobbet är klart returneras utdata som liknar följande exempel:
100041 RIM 9650 d476f3687700442549a83fac4560c51c 100041 RIM 9650 d476f3687700442549a83fac4560c51c 100042 Apple iPhone 4.2.x 375ad9a0ddc4351536804f1d5d0ea9b9 100042 Apple iPhone 4.2.x 375ad9a0ddc4351536804f1d5d0ea9b9 100042 Apple iPhone 4.2.x 375ad9a0ddc4351536804f1d5d0ea9b9
Om du vill avsluta Beeline anger du följande kommando:
!q
Ladda upp fil (PowerShell)
PowerShell kan också användas för att fjärrköra Hive-frågor. Kontrollera att arbetskatalogen finns där hiveudf.py
den finns. Använd följande PowerShell-skript för att köra en Hive-fråga som använder skriptet hiveudf.py
:
# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
Connect-AzAccount
}
# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"
# Revise file path as needed
$pathToStreamingFile = ".\hiveudf.py"
# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$clusterInfo = Get-AzHDInsightCluster -ClusterName $clusterName
$resourceGroup = $clusterInfo.ResourceGroup
$storageAccountName=$clusterInfo.DefaultStorageAccount.split('.')[0]
$container=$clusterInfo.DefaultStorageContainer
$storageAccountKey=(Get-AzStorageAccountKey `
-ResourceGroupName $resourceGroup `
-Name $storageAccountName)[0].Value
# Create an Azure Storage context
$context = New-AzStorageContext `
-StorageAccountName $storageAccountName `
-StorageAccountKey $storageAccountKey
# Upload local files to an Azure Storage blob
Set-AzStorageBlobContent `
-File $pathToStreamingFile `
-Blob "hiveudf.py" `
-Container $container `
-Context $context
Kommentar
Mer information om hur du laddar upp filer finns i dokumentet Ladda upp data för Apache Hadoop-jobb i HDInsight .
Använda Hive UDF
# Script should stop on failures
$ErrorActionPreference = "Stop"
# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
Connect-AzAccount
}
# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"
# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -UserName "admin" -Message "Enter the login for the cluster"
$HiveQuery = "add file wasbs:///hiveudf.py;" +
"SELECT TRANSFORM (clientid, devicemake, devicemodel) " +
"USING 'python hiveudf.py' AS " +
"(clientid string, phoneLabel string, phoneHash string) " +
"FROM hivesampletable " +
"ORDER BY clientid LIMIT 50;"
# Create Hive job object
$jobDefinition = New-AzHDInsightHiveJobDefinition `
-Query $HiveQuery
# For status bar updates
$activity="Hive query"
# Progress bar (optional)
Write-Progress -Activity $activity -Status "Starting query..."
# Start defined Azure HDInsight job on specified cluster.
$job = Start-AzHDInsightJob `
-ClusterName $clusterName `
-JobDefinition $jobDefinition `
-HttpCredential $creds
# Progress bar (optional)
Write-Progress -Activity $activity -Status "Waiting on query to complete..."
# Wait for completion or failure of specified job
Wait-AzHDInsightJob `
-JobId $job.JobId `
-ClusterName $clusterName `
-HttpCredential $creds
# Uncomment the following to see stderr output
<#
Get-AzHDInsightJobOutput `
-Clustername $clusterName `
-JobId $job.JobId `
-HttpCredential $creds `
-DisplayOutputType StandardError
#>
# Progress bar (optional)
Write-Progress -Activity $activity -Status "Retrieving output..."
# Gets the log output
Get-AzHDInsightJobOutput `
-Clustername $clusterName `
-JobId $job.JobId `
-HttpCredential $creds
Utdata för Hive-jobbet bör se ut ungefär som i följande exempel:
100041 RIM 9650 d476f3687700442549a83fac4560c51c
100041 RIM 9650 d476f3687700442549a83fac4560c51c
100042 Apple iPhone 4.2.x 375ad9a0ddc4351536804f1d5d0ea9b9
100042 Apple iPhone 4.2.x 375ad9a0ddc4351536804f1d5d0ea9b9
100042 Apple iPhone 4.2.x 375ad9a0ddc4351536804f1d5d0ea9b9
Apache Pig UDF
Ett Python-skript kan användas som en UDF från Pig via -instruktionen GENERATE
. Du kan köra skriptet med Jython eller C Python.
- Jython körs på JVM och kan anropas internt från Pig.
- C Python är en extern process, så data från Pig på JVM skickas ut till skriptet som körs i en Python-process. Utdata från Python-skriptet skickas tillbaka till Pig.
Om du vill ange Python-tolken använder du register
när du refererar till Python-skriptet. I följande exempel registreras skript med Pig som myfuncs
:
- Så här använder du Jython:
register '/path/to/pigudf.py' using jython as myfuncs;
- Så här använder du C Python:
register '/path/to/pigudf.py' using streaming_python as myfuncs;
Viktigt!
När du använder Jython kan sökvägen till pig_jython-filen antingen vara en lokal sökväg eller en WASBS:// sökväg. Men när du använder C Python måste du referera till en fil i det lokala filsystemet för noden som du använder för att skicka Pig-jobbet.
När registreringen har passerat är Pig Latin för det här exemplet detsamma för båda:
LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);
LOG = FILTER LOGS by LINE is not null;
DETAILS = FOREACH LOG GENERATE myfuncs.create_structure(LINE);
DUMP DETAILS;
Här är vad det här exemplet gör:
- Den första raden läser in exempeldatafilen
sample.log
tillLOGS
. Den definierar även varje post som enchararray
. - Nästa rad filtrerar bort alla null-värden och lagrar resultatet av åtgärden i
LOG
. - Därefter itererar den över posterna i
LOG
och använderGENERATE
för att anropacreate_structure
metoden som finns i Python/Jython-skriptet som läses in sommyfuncs
.LINE
används för att skicka den aktuella posten till funktionen. - Slutligen dumpas utdata till STDOUT med kommandot
DUMP
. Det här kommandot visar resultatet när åtgärden har slutförts.
Skapa fil
Skapa en textfil med namnet pigudf.py
i utvecklingsmiljön. Använd följande kod som innehållet i filen:
# Uncomment the following if using C Python
#from pig_util import outputSchema
@outputSchema("log: {(date:chararray, time:chararray, classname:chararray, level:chararray, detail:chararray)}")
def create_structure(input):
if (input.startswith('java.lang.Exception')):
input = input[21:len(input)] + ' - java.lang.Exception'
date, time, classname, level, detail = input.split(' ', 4)
return date, time, classname, level, detail
I exemplet LINE
Pig Latin definieras indata som en chararray eftersom det inte finns något konsekvent schema för indata. Python-skriptet omvandlar data till ett konsekvent schema för utdata.
-
@outputSchema
instruktionen definierar formatet för de data som returneras till Pig. I det här fallet är det en datapåse, som är en Pig-datatyp. Påsen innehåller följande fält, som alla är chararray (strängar):- date – det datum då loggposten skapades
- time – den tid då loggposten skapades
- classname – klassnamnet som posten skapades för
- level – loggnivån
- detail – utförlig information för loggposten
def create_structure(input)
Därefter definierar funktionen som Pig skickar radobjekt till.Exempeldata,
sample.log
, överensstämmer mestadels med datum, tid, klassnamn, nivå och detaljschema. Den innehåller dock några rader som börjar med*java.lang.Exception*
. Dessa rader måste ändras för att matcha schemat. Instruktionenif
söker efter dessa och masserar sedan indata för att flytta strängen*java.lang.Exception*
till slutet, vilket gör att data är i linje med det förväntade utdataschemat.split
Därefter används kommandot för att dela upp data med de fyra första blankstegsteckenen. Utdata tilldelas tilldate
,time
,classname
,level
ochdetail
.Slutligen returneras värdena till Pig.
När data returneras till Pig har de ett konsekvent schema enligt definitionen i -instruktionen @outputSchema
.
Ladda upp fil (gränssnitt)
I kommandona nedan ersätter du sshuser
med det faktiska användarnamnet om det skiljer sig. Ersätt mycluster
med det faktiska klusternamnet. Kontrollera att arbetskatalogen är där filen finns.
Använd
scp
för att kopiera filerna till ditt HDInsight-kluster. Redigera och ange kommandot:scp pigudf.py sshuser@mycluster-ssh.azurehdinsight.net:
Använd SSH för att ansluta till klustret. Redigera och ange kommandot:
ssh sshuser@mycluster-ssh.azurehdinsight.net
Från SSH-sessionen lägger du till Python-filerna som laddades upp tidigare till lagringen för klustret.
hdfs dfs -put pigudf.py /pigudf.py
Använda Pig UDF (shell)
Om du vill ansluta till pig använder du följande kommando från den öppna SSH-sessionen:
pig
Ange följande instruktioner i prompten
grunt>
:Register wasbs:///pigudf.py using jython as myfuncs; LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray); LOG = FILTER LOGS by LINE is not null; DETAILS = foreach LOG generate myfuncs.create_structure(LINE); DUMP DETAILS;
När du har angett följande rad ska jobbet starta. När jobbet är klart returneras utdata som liknar följande data:
((2012-02-03,20:11:56,SampleClass5,[TRACE],verbose detail for id 990982084)) ((2012-02-03,20:11:56,SampleClass7,[TRACE],verbose detail for id 1560323914)) ((2012-02-03,20:11:56,SampleClass8,[DEBUG],detail for id 2083681507)) ((2012-02-03,20:11:56,SampleClass3,[TRACE],verbose detail for id 1718828806)) ((2012-02-03,20:11:56,SampleClass3,[INFO],everything normal for id 530537821))
Använd
quit
för att avsluta Grunt-gränssnittet och använd sedan följande för att redigera filen pigudf.py i det lokala filsystemet:nano pigudf.py
När du är i redigeringsprogrammet avkommentarer du följande rad genom att ta bort
#
tecknet från början av raden:#from pig_util import outputSchema
Den här raden ändrar Python-skriptet så att det fungerar med C Python i stället för Jython. När ändringen har gjorts använder du Ctrl+X för att avsluta redigeraren. Välj Y och sedan Retur för att spara ändringarna.
pig
Använd kommandot för att starta gränssnittet igen. När du är i promptengrunt>
använder du följande för att köra Python-skriptet med C Python-tolken.Register 'pigudf.py' using streaming_python as myfuncs; LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray); LOG = FILTER LOGS by LINE is not null; DETAILS = foreach LOG generate myfuncs.create_structure(LINE); DUMP DETAILS;
När det här jobbet har slutförts bör du se samma utdata som när du tidigare körde skriptet med Jython.
Ladda upp fil (PowerShell)
PowerShell kan också användas för att fjärrköra Hive-frågor. Kontrollera att arbetskatalogen finns där pigudf.py
den finns. Använd följande PowerShell-skript för att köra en Hive-fråga som använder skriptet pigudf.py
:
# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
Connect-AzAccount
}
# If you have multiple subscriptions, set the one to use
# Select-AzSubscription -SubscriptionId "<SUBSCRIPTIONID>"
# Revise file path as needed
$pathToJythonFile = ".\pigudf.py"
# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$clusterInfo = Get-AzHDInsightCluster -ClusterName $clusterName
$resourceGroup = $clusterInfo.ResourceGroup
$storageAccountName=$clusterInfo.DefaultStorageAccount.split('.')[0]
$container=$clusterInfo.DefaultStorageContainer
$storageAccountKey=(Get-AzStorageAccountKey `
-ResourceGroupName $resourceGroup `
-Name $storageAccountName)[0].Value
# Create an Azure Storage context
$context = New-AzStorageContext `
-StorageAccountName $storageAccountName `
-StorageAccountKey $storageAccountKey
# Upload local files to an Azure Storage blob
Set-AzStorageBlobContent `
-File $pathToJythonFile `
-Blob "pigudf.py" `
-Container $container `
-Context $context
Använda Pig UDF (PowerShell)
Kommentar
När du skickar ett jobb via en fjärranslutning med PowerShell går det inte att använda C Python som tolk.
PowerShell kan också användas för att köra pig latinska jobb. Om du vill köra ett pig-latinskt jobb som använder skriptet pigudf.py
använder du följande PowerShell-skript:
# Script should stop on failures
$ErrorActionPreference = "Stop"
# Login to your Azure subscription
# Is there an active Azure subscription?
$sub = Get-AzSubscription -ErrorAction SilentlyContinue
if(-not($sub))
{
Connect-AzAccount
}
# Get cluster info
$clusterName = Read-Host -Prompt "Enter the HDInsight cluster name"
$creds=Get-Credential -UserName "admin" -Message "Enter the login for the cluster"
$PigQuery = "Register wasbs:///pigudf.py using jython as myfuncs;" +
"LOGS = LOAD 'wasbs:///example/data/sample.log' as (LINE:chararray);" +
"LOG = FILTER LOGS by LINE is not null;" +
"DETAILS = foreach LOG generate myfuncs.create_structure(LINE);" +
"DUMP DETAILS;"
# Create Pig job object
$jobDefinition = New-AzHDInsightPigJobDefinition -Query $PigQuery
# For status bar updates
$activity="Pig job"
# Progress bar (optional)
Write-Progress -Activity $activity -Status "Starting job..."
# Start defined Azure HDInsight job on specified cluster.
$job = Start-AzHDInsightJob `
-ClusterName $clusterName `
-JobDefinition $jobDefinition `
-HttpCredential $creds
# Progress bar (optional)
Write-Progress -Activity $activity -Status "Waiting for the Pig job to complete..."
# Wait for completion or failure of specified job
Wait-AzHDInsightJob `
-Job $job.JobId `
-ClusterName $clusterName `
-HttpCredential $creds
# Uncomment the following to see stderr output
<#
Get-AzHDInsightJobOutput `
-Clustername $clusterName `
-JobId $job.JobId `
-HttpCredential $creds `
-DisplayOutputType StandardError
#>
# Progress bar (optional)
Write-Progress -Activity $activity "Retrieving output..."
# Gets the log output
Get-AzHDInsightJobOutput `
-Clustername $clusterName `
-JobId $job.JobId `
-HttpCredential $creds
Utdata för Pig-jobbet bör se ut ungefär som följande data:
((2012-02-03,20:11:56,SampleClass5,[TRACE],verbose detail for id 990982084))
((2012-02-03,20:11:56,SampleClass7,[TRACE],verbose detail for id 1560323914))
((2012-02-03,20:11:56,SampleClass8,[DEBUG],detail for id 2083681507))
((2012-02-03,20:11:56,SampleClass3,[TRACE],verbose detail for id 1718828806))
((2012-02-03,20:11:56,SampleClass3,[INFO],everything normal for id 530537821))
Felsökning
Fel vid körning av jobb
När du kör hive-jobbet kan det uppstå ett fel som liknar följande text:
Caused by: org.apache.hadoop.hive.ql.metadata.HiveException: [Error 20001]: An error occurred while reading or writing to your custom script. It may have crashed with an error.
Det här problemet kan orsakas av radsluten i Python-filen. Många Windows-redigerare använder som standard CRLF som radslut, men Linux-program förväntar sig vanligtvis LF.
Du kan använda följande PowerShell-instruktioner för att ta bort CR-tecknen innan du laddar upp filen till HDInsight:
Write-Progress -Activity $activity -Status "Waiting for the Pig job to complete..."
# Wait for completion or failure of specified job
PowerShell-skript
Båda PowerShell-exempelskripten som används för att köra exemplen innehåller en kommenterad rad som visar felutdata för jobbet. Om du inte ser förväntade utdata för jobbet avkommentarer du följande rad och ser om felinformationen indikerar ett problem.
$activity="Pig job"
# Progress bar (optional)
Write-Progress -Activity $activity -Status "Starting job..."
Felinformationen (STDERR) och resultatet av jobbet (STDOUT) loggas också till HDInsight-lagringen.
För det här jobbet... | Titta på de här filerna i blobcontainern |
---|---|
Hive | /HivePython/stderr /HivePython/stdout |
Pig | /PigPython/stderr /PigPython/stdout |
Nästa steg
Om du behöver läsa in Python-moduler som inte tillhandahålls som standard läser du Så här distribuerar du en modul till Azure HDInsight.
Andra sätt att använda Pig, Hive och lära dig mer om att använda MapReduce finns i följande dokument: