Veri analizi işlem hattını kullanıma hazır hale getirme
Veri işlem hatlarının altında birçok veri analizi çözümü bulunur. Adından da anlaşılacağı gibi, bir veri işlem hattı ham verileri alır, gerektiğinde temizler ve yeniden şekillendirır ve ardından işlenen verileri depolamadan önce genellikle hesaplamalar veya toplamalar yapar. İşlenen veriler istemciler, raporlar veya API'ler tarafından kullanılır. Veri işlem hattı, zamanlamaya göre veya yeni veriler tarafından tetiklendiğinde yinelenebilir sonuçlar sağlamalıdır.
Bu makalede, HDInsight Hadoop kümelerinde çalışan Oozie'yi kullanarak veri işlem hatlarınızı tekrarlanabilirlik için nasıl kullanıma hazır hale getirmeniz açıklanır. Örnek senaryo, havayolu uçuş zaman serisi verilerini hazırlayan ve işleyen bir veri işlem hattında size yol gösterir.
Aşağıdaki senaryoda, giriş verileri bir ay boyunca toplu uçuş verilerini içeren düz bir dosyadır. Bu uçuş verileri, çıkış ve varış havaalanı, uçurulan mil sayısı, kalkış ve varış saatleri gibi bilgileri içerir. Bu işlem hattının amacı, her havayolunun her gün için dakika cinsinden ortalama kalkış ve varış gecikmeleri ile bir satıra sahip olduğu ve o gün akan toplam mil sayısı olan günlük havayolu performansını özetlemektir.
YEAR | MONTH | DAY_OF_MONTH | TAŞIYICI | AVG_DEP_DELAY | AVG_ARR_DELAY | TOTAL_DISTANCE |
---|---|---|---|---|---|---|
2017 | 1 | 3 | AA | 10.142229 | 7.862926 | 2644539 |
2017 | 1 | 3 | AS | 9.435449 | 5.482143 | 572289 |
2017 | 1 | 3 | DL | 6.935409 | -2.1893024 | 1909696 |
Örnek işlem hattı, yeni bir zaman diliminin uçuş verileri gelene kadar bekler ve ardından bu ayrıntılı uçuş bilgilerini uzun vadeli analizler için Apache Hive veri ambarınıza depolar. İşlem hattı ayrıca yalnızca günlük uçuş verilerini özetleyen çok daha küçük bir veri kümesi oluşturur. Bu günlük uçuş özeti verileri, web sitesi gibi raporlar sağlamak üzere bir SQL Veritabanı gönderilir.
Aşağıdaki diyagramda örnek işlem hattı gösterilmektedir.
Apache Oozie çözümüne genel bakış
Bu işlem hattı, HDInsight Hadoop kümesinde çalışan Apache Oozie kullanır.
Oozie işlem hatlarını eylemler, iş akışları ve koordinatörler açısından açıklar. Eylemler, Hive sorgusu çalıştırma gibi gerçekleştirilecek fiili çalışmayı belirler. İş akışları eylem dizisini tanımlar. Koordinatörler, iş akışının ne zaman çalıştırılacağını belirler. Koordinatörler, iş akışının bir örneğini başlatmadan önce yeni verilerin kullanılabilirliğini de bekleyebilir.
Aşağıdaki diyagramda bu örnek Oozie işlem hattının üst düzey tasarımı gösterilmektedir.
Azure kaynaklarını sağlama
Bu işlem hattı, aynı konumda bir Azure SQL Veritabanı ve hdInsight Hadoop kümesi gerektirir. Azure SQL Veritabanı hem işlem hattı tarafından üretilen özet verileri hem de Oozie Meta Veri Deposu'nı depolar.
Sağlama Azure SQL Veritabanı
bir Azure SQL Veritabanı oluşturun. Bkz. Azure portalında Azure SQL Veritabanı oluşturma.
HDInsight kümenizin bağlı Azure SQL Veritabanı erişebildiğinden emin olmak için Azure SQL Veritabanı güvenlik duvarı kurallarını Azure hizmetlerinin ve kaynaklarının sunucuya erişmesine izin verecek şekilde yapılandırın. Bu seçeneği Azure portalında Sunucu güvenlik duvarını ayarla'yı seçip Azure hizmetlerinin ve kaynaklarının Azure SQL Veritabanı için bu sunucuya erişmesine izin ver'in altında ON seçeneğini belirleyerek etkinleştirebilirsiniz. Daha fazla bilgi için bkz. IP güvenlik duvarı kurallarını oluşturma ve yönetme.
İşlem hattının her çalıştırmasından özetlenmiş verileri depolayacak tabloyu oluşturmak
dailyflights
üzere aşağıdaki SQL deyimlerini yürütmek için Sorgu düzenleyicisini kullanın.CREATE TABLE dailyflights ( YEAR INT, MONTH INT, DAY_OF_MONTH INT, CARRIER CHAR(2), AVG_DEP_DELAY FLOAT, AVG_ARR_DELAY FLOAT, TOTAL_DISTANCE FLOAT ) GO CREATE CLUSTERED INDEX dailyflights_clustered_index on dailyflights(YEAR,MONTH,DAY_OF_MONTH,CARRIER) GO
Azure SQL Veritabanı artık hazır.
Apache Hadoop Kümesi Sağlama
Özel bir meta veri deposuyla Apache Hadoop kümesi oluşturun. Portaldan küme oluşturma sırasında, Depolama sekmesinde Meta veri deposu ayarları altında SQL Veritabanı seçtiğinizden emin olun. Meta veri deposu seçme hakkında daha fazla bilgi için bkz . Küme oluşturma sırasında özel meta veri deposu seçme. Küme oluşturma hakkında daha fazla bilgi için bkz . Linux üzerinde HDInsight kullanmaya başlama.
SSH tünel kurulumunu doğrulama
Koordinatörünüzün ve iş akışı örneklerinizin durumunu görüntülemek için Oozie Web Konsolu'nu kullanmak için HDInsight kümenize bir SSH tüneli ayarlayın. Daha fazla bilgi için bkz . SSH Tüneli.
Not
Kümenizin web kaynaklarına SSH tüneli boyunca göz atmak için Chrome'ı Foxy Proxy uzantısıyla da kullanabilirsiniz. Bunu, tünelin 9876 numaralı bağlantı noktasındaki konak localhost
aracılığıyla tüm isteklere ara sunucu olarak yapılandırın. Bu yaklaşım, Windows 10'da Bash olarak da bilinen Linux için Windows Alt Sistemi ile uyumludur.
Kümenizin adı olan
CLUSTERNAME
kümenize bir SSH tüneli açmak için aşağıdaki komutu çalıştırın:ssh -C2qTnNf -D 9876 sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Şu konuma göz atarak baş düğümünüzün Ambari'sine giderek tünelin çalıştığını doğrulayın:
http://headnodehost:8080
Ambari'nin içinden Oozie Web Konsolu'na erişmek için Oozie>Hızlı Bağlantılar> [Etkin sunucu] >Oozie Web Kullanıcı Arabirimi'ne gidin.
Hive'ı yapılandırma
Verileri karşıya yükleme
Bir ay boyunca uçuş verilerini içeren örnek bir CSV dosyasını indirin. ZIP dosyasını
2017-01-FlightData.zip
HDInsight GitHub deposundan indirin ve CSV dosyasına2017-01-FlightData.csv
açın.Bu CSV dosyasını HDInsight kümenize bağlı Azure Depolama hesabına kopyalayın ve klasörüne
/example/data/flights
yerleştirin.Dosyaları yerel makinenizden HDInsight küme baş düğümünüzün yerel depolama alanına kopyalamak için SCP kullanın.
scp ./2017-01-FlightData.csv sshuser@CLUSTERNAME-ssh.azurehdinsight.net:2017-01-FlightData.csv
Kümenize bağlanmak için ssh komutunu kullanın. öğesini kümenizin adıyla değiştirerek
CLUSTERNAME
aşağıdaki komutu düzenleyin ve komutunu girin:ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Ssh oturumunuzda HDFS komutunu kullanarak dosyayı baş düğümünüzün yerel depolama alanından Azure Depolama'ya kopyalayın.
hadoop fs -mkdir /example/data/flights hdfs dfs -put ./2017-01-FlightData.csv /example/data/flights/2017-01-FlightData.csv
Tablo oluştur
Örnek veriler artık kullanılabilir. Ancak işlem hattı, işlem için biri gelen veriler () ve biri özetlenmiş verilerflights
(rawFlights
) için iki Hive tablosu gerektirir. Ambari'de bu tabloları aşağıdaki gibi oluşturun.
adresine giderek Ambari'de
http://headnodehost:8080
oturum açın.Hizmet listesinden Hive'ı seçin.
Hive Görünümü 2.0 etiketinin yanındaki Görünüme Git'i seçin.
Sorgu metni alanına aşağıdaki deyimleri yapıştırarak
rawFlights
tabloyu oluşturun. Tablo,rawFlights
Azure Depolama'daki klasör içindeki/example/data/flights
CSV dosyaları için okunan bir şema sağlar.CREATE EXTERNAL TABLE IF NOT EXISTS rawflights ( YEAR INT, MONTH INT, DAY_OF_MONTH INT, FL_DATE STRING, CARRIER STRING, FL_NUM STRING, ORIGIN STRING, DEST STRING, DEP_DELAY FLOAT, ARR_DELAY FLOAT, ACTUAL_ELAPSED_TIME FLOAT, DISTANCE FLOAT) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' WITH SERDEPROPERTIES ( "separatorChar" = ",", "quoteChar" = "\"" ) LOCATION '/example/data/flights'
Tabloyu oluşturmak için Yürüt'e tıklayın.
Tabloyu oluşturmak
flights
için, sorgu metin alanındaki metni aşağıdaki deyimlerle değiştirin. Tabloflights
, içine yüklenen verileri yıla, aya ve ayın gününe göre bölümleyen Hive tarafından yönetilen bir tablodur. Bu tablo, her uçuş için bir satırın kaynak verilerinde en düşük ayrıntı düzeyine sahip olan tüm geçmiş uçuş verilerini içerir.SET hive.exec.dynamic.partition.mode=nonstrict; CREATE TABLE flights ( FL_DATE STRING, CARRIER STRING, FL_NUM STRING, ORIGIN STRING, DEST STRING, DEP_DELAY FLOAT, ARR_DELAY FLOAT, ACTUAL_ELAPSED_TIME FLOAT, DISTANCE FLOAT ) PARTITIONED BY (YEAR INT, MONTH INT, DAY_OF_MONTH INT) ROW FORMAT SERDE 'org.apache.hadoop.hive.serde2.OpenCSVSerde' WITH SERDEPROPERTIES ( "separatorChar" = ",", "quoteChar" = "\"" );
Tabloyu oluşturmak için Yürüt'e tıklayın.
Oozie iş akışını oluşturma
İşlem hatları genellikle verileri belirli bir zaman aralığına göre toplu olarak işler. Bu durumda işlem hattı, uçuş verilerini günlük olarak işler. Bu yaklaşım giriş CSV dosyalarının günlük, haftalık, aylık veya yıllık olarak ulaşmasını sağlar.
Örnek iş akışı, üç ana adımda günlük olarak uçuş verilerini işler:
- Tablo tarafından
rawFlights
temsil edilen kaynak CSV dosyasından o günün tarih aralığına ait verileri ayıklamak ve verileri tabloyaflights
eklemek için bir Hive sorgusu çalıştırın. - Hive'da gün ve taşıyıcıya göre özetlenen uçuş verilerinin bir kopyasını içeren bir hazırlama tablosu oluşturmak için bir Hive sorgusu çalıştırın.
- Hive'daki günlük hazırlama tablosundaki tüm verileri Azure SQL Veritabanı hedef
dailyflights
tabloya kopyalamak için Apache Sqoop kullanın. Sqoop, Azure Depolama'da bulunan Hive tablosunun arkasındaki verilerden kaynak satırları okur ve bunları bir JDBC bağlantısı kullanarak SQL Veritabanı yükler.
Bu üç adım bir Oozie iş akışı tarafından koordine edilir.
Yerel iş istasyonunuzda adlı
job.properties
bir dosya oluşturun. Dosyanın başlangıç içeriği olarak aşağıdaki metni kullanın. Ardından, belirli ortamınız için değerleri güncelleştirin. Metnin altındaki tablo özelliklerin her birini özetler ve kendi ortamınız için değerleri nerede bulabileceğinizi gösterir.nameNode=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net jobTracker=[ACTIVERESOURCEMANAGER]:8050 queueName=default oozie.use.system.libpath=true appBase=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/oozie oozie.wf.application.path=${appBase}/load_flights_by_day hiveScriptLoadPartition=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/oozie/load_flights_by_day/hive-load-flights-partition.hql hiveScriptCreateDailyTable=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/oozie/load_flights_by_day/hive-create-daily-summary-table.hql hiveDailyTableName=dailyflights${year}${month}${day} hiveDataFolder=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/example/data/flights/day/${year}/${month}/${day} sqlDatabaseConnectionString="jdbc:sqlserver://[SERVERNAME].database.windows.net;user=[USERNAME];password=[PASSWORD];database=[DATABASENAME]" sqlDatabaseTableName=dailyflights year=2017 month=01 day=03
Özellik Değer kaynağı nameNode HDInsight kümenize bağlı Azure Depolama Kapsayıcısı'nın tam yolu. jobTracker Etkin kümenizin YARN baş düğümü için iç konak adı. Ambari giriş sayfasında, hizmetler listesinden YARN'ı ve ardından Etkin Resource Manager'ı seçin. Ana bilgisayar adı URI'si sayfanın üst kısmında görüntülenir. 8050 numaralı bağlantı noktasını ekleme. queueName Hive eylemlerini zamanlarken kullanılan YARN kuyruğunun adı. Varsayılan değerde bırakın. oozie.use.system.libpath Doğru olarak bırak. appBase Azure Depolama'da Oozie iş akışını ve destekleyici dosyaları dağıttığınız alt klasörün yolu. oozie.wf.application.path Çalıştırılacak Oozie iş akışının workflow.xml
konumu.hiveScriptLoadPartition Azure Depolama'da Hive sorgu dosyasına hive-load-flights-partition.hql
giden yol.hiveScriptCreateDailyTable Azure Depolama'da Hive sorgu dosyasına hive-create-daily-summary-table.hql
giden yol.hiveDailyTableName Hazırlama tablosu için kullanılacak dinamik olarak oluşturulan ad. hiveDataFolder Azure Depolama'daki hazırlama tablosunun içerdiği verilere giden yol. sqlDatabaseConnectionString JDBC söz dizimi Azure SQL Veritabanı bağlantı dizesi. sqlDatabaseTableName özet satırların eklendiği Azure SQL Veritabanı içindeki tablonun adı. olarak dailyflights
bırakın.yıl Uçuş özetlerinin hesaplandığı günün yıl bileşeni. Olduğu gibi bırakın. aya Uçuş özetlerinin hesaplandığı günün ay bileşeni. Olduğu gibi bırakın. gün Uçuş özetlerinin hesaplandığı günün ayın günü bileşeni. Olduğu gibi bırakın. Yerel iş istasyonunuzda adlı
hive-load-flights-partition.hql
bir dosya oluşturun. Dosyanın içeriği olarak aşağıdaki kodu kullanın.SET hive.exec.dynamic.partition.mode=nonstrict; INSERT OVERWRITE TABLE flights PARTITION (YEAR, MONTH, DAY_OF_MONTH) SELECT FL_DATE, CARRIER, FL_NUM, ORIGIN, DEST, DEP_DELAY, ARR_DELAY, ACTUAL_ELAPSED_TIME, DISTANCE, YEAR, MONTH, DAY_OF_MONTH FROM rawflights WHERE year = ${year} AND month = ${month} AND day_of_month = ${day};
Oozie değişkenleri söz dizimini
${variableName}
kullanır. Bu değişkenler dosyasında ayarlanırjob.properties
. Oozie çalışma zamanında gerçek değerlerin yerini alır.Yerel iş istasyonunuzda adlı
hive-create-daily-summary-table.hql
bir dosya oluşturun. Dosyanın içeriği olarak aşağıdaki kodu kullanın.DROP TABLE ${hiveTableName}; CREATE EXTERNAL TABLE ${hiveTableName} ( YEAR INT, MONTH INT, DAY_OF_MONTH INT, CARRIER STRING, AVG_DEP_DELAY FLOAT, AVG_ARR_DELAY FLOAT, TOTAL_DISTANCE FLOAT ) ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' STORED AS TEXTFILE LOCATION '${hiveDataFolder}'; INSERT OVERWRITE TABLE ${hiveTableName} SELECT year, month, day_of_month, carrier, avg(dep_delay) avg_dep_delay, avg(arr_delay) avg_arr_delay, sum(distance) total_distance FROM flights GROUP BY year, month, day_of_month, carrier HAVING year = ${year} AND month = ${month} AND day_of_month = ${day};
Bu sorgu, yalnızca bir gün için özetlenmiş verileri depolayacak bir hazırlama tablosu oluşturur, operatör tarafından günlük ortalama gecikmeleri ve toplam mesafeyi hesaplayan SELECT deyimini not alın. Bu tabloya eklenen veriler, bir sonraki adımda Sqoop kaynağı olarak kullanılabilmesi için bilinen bir konumda (hiveDataFolder değişkeni tarafından belirtilen yol) depolanır.
Yerel iş istasyonunuzda adlı
workflow.xml
bir dosya oluşturun. Dosyanın içeriği olarak aşağıdaki kodu kullanın. Yukarıdaki adımlar Oozie iş akışı dosyasında ayrı eylemler olarak ifade edilir.<workflow-app name="loadflightstable" xmlns="uri:oozie:workflow:0.5"> <start to = "RunHiveLoadFlightsScript"/> <action name="RunHiveLoadFlightsScript"> <hive xmlns="uri:oozie:hive-action:0.2"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <configuration> <property> <name>mapred.job.queue.name</name> <value>${queueName}</value> </property> </configuration> <script>${hiveScriptLoadPartition}</script> <param>year=${year}</param> <param>month=${month}</param> <param>day=${day}</param> </hive> <ok to="RunHiveCreateDailyFlightTableScript"/> <error to="fail"/> </action> <action name="RunHiveCreateDailyFlightTableScript"> <hive xmlns="uri:oozie:hive-action:0.2"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <configuration> <property> <name>mapred.job.queue.name</name> <value>${queueName}</value> </property> </configuration> <script>${hiveScriptCreateDailyTable}</script> <param>hiveTableName=${hiveDailyTableName}</param> <param>year=${year}</param> <param>month=${month}</param> <param>day=${day}</param> <param>hiveDataFolder=${hiveDataFolder}/${year}/${month}/${day}</param> </hive> <ok to="RunSqoopExport"/> <error to="fail"/> </action> <action name="RunSqoopExport"> <sqoop xmlns="uri:oozie:sqoop-action:0.2"> <job-tracker>${jobTracker}</job-tracker> <name-node>${nameNode}</name-node> <configuration> <property> <name>mapred.compress.map.output</name> <value>true</value> </property> </configuration> <arg>export</arg> <arg>--connect</arg> <arg>${sqlDatabaseConnectionString}</arg> <arg>--table</arg> <arg>${sqlDatabaseTableName}</arg> <arg>--export-dir</arg> <arg>${hiveDataFolder}/${year}/${month}/${day}</arg> <arg>-m</arg> <arg>1</arg> <arg>--input-fields-terminated-by</arg> <arg>"\t"</arg> <archive>mssql-jdbc-7.0.0.jre8.jar</archive> </sqoop> <ok to="end"/> <error to="fail"/> </action> <kill name="fail"> <message>Job failed, error message[${wf:errorMessage(wf:lastErrorNode())}] </message> </kill> <end name="end"/> </workflow-app>
İki Hive sorgusuna Azure Depolama'daki yolları tarafından erişilir ve kalan değişken değerleri dosya tarafından job.properties
sağlanır. Bu dosya, iş akışını 3 Ocak 2017 tarihinde çalışacak şekilde yapılandırır.
Oozie iş akışını dağıtma ve çalıştırma
Oozie iş akışınızı (workflow.xml
), Hive sorgularını (hive-load-flights-partition.hql
ve ) ve hive-create-daily-summary-table.hql
iş yapılandırmasını (job.properties
) dağıtmak için bash oturumunuzdan SCP kullanın. Oozie'de baş düğümün job.properties
yerel depolamasında yalnızca dosya bulunabilir. Diğer tüm dosyalar HDFS'de(bu örnekte Azure Depolama) depolanmalıdır. İş akışı tarafından kullanılan Sqoop eylemi, SQL Veritabanı ile iletişim kurmak için bir JDBC sürücüsüne bağlıdır ve bu sürücü baş düğümden HDFS'ye kopyalanmalıdır.
Baş düğümün
load_flights_by_day
yerel depolama alanında kullanıcının yolunun altında alt klasörü oluşturun. Açık ssh oturumunuzda aşağıdaki komutu yürütebilirsiniz:mkdir load_flights_by_day
Geçerli dizindeki tüm dosyaları (
workflow.xml
vejob.properties
dosyaları) alt klasöreload_flights_by_day
kopyalayın. Yerel iş istasyonunuzda aşağıdaki komutu yürütür:scp ./* sshuser@CLUSTERNAME-ssh.azurehdinsight.net:load_flights_by_day
İş akışı dosyalarını HDFS'ye kopyalayın. Açık ssh oturumunuzda aşağıdaki komutları yürütebilirsiniz:
cd load_flights_by_day hadoop fs -mkdir -p /oozie/load_flights_by_day hdfs dfs -put ./* /oozie/load_flights_by_day
Yerel baş düğümden HDFS'deki iş akışı klasörüne kopyalayın
mssql-jdbc-7.0.0.jre8.jar
. Kümeniz farklı bir jar dosyası içeriyorsa komutu gerektiği gibi düzeltin.workflow.xml
Farklı bir jar dosyasını yansıtmak için gerektiği şekilde düzeltin. Açık ssh oturumunuzda aşağıdaki komutu yürütebilirsiniz:hdfs dfs -put /usr/share/java/sqljdbc_7.0/enu/mssql-jdbc*.jar /oozie/load_flights_by_day
İş akışını çalıştırma Açık ssh oturumunuzda aşağıdaki komutu yürütebilirsiniz:
oozie job -config job.properties -run
Oozie Web Konsolu'nu kullanarak durumu gözlemleyin. Ambari'nin içinden Oozie, Hızlı Bağlantılar ve ardından Oozie Web Konsolu'nu seçin. İş Akışı İşleri sekmesinin altında Tüm İşler'i seçin.
Durum BAŞARILI olduğunda, eklenen satırları görüntülemek için SQL Veritabanı tablosunu sorgular. Azure portalını kullanarak SQL Veritabanı bölmeye gidin, Araçlar'ı seçin ve Sorgu Düzenleyicisi açın.
SELECT * FROM dailyflights
Artık iş akışı tek bir test günü için çalıştığına göre, iş akışını günlük çalışacak şekilde zamanlayan bir koordinatörle sarmalayabilirsiniz.
İş akışını bir koordinatörle çalıştırma
Bu iş akışını günlük (veya tarih aralığındaki tüm günler) çalışacak şekilde zamanlamak için bir koordinatör kullanabilirsiniz. Koordinatör bir XML dosyası tarafından tanımlanır, örneğin coordinator.xml
:
<coordinator-app name="daily_export" start="2017-01-01T00:00Z" end="2017-01-05T00:00Z" frequency="${coord:days(1)}" timezone="UTC" xmlns="uri:oozie:coordinator:0.4">
<datasets>
<dataset name="ds_input1" frequency="${coord:days(1)}" initial-instance="2016-12-31T00:00Z" timezone="UTC">
<uri-template>${sourceDataFolder}${YEAR}-${MONTH}-FlightData.csv</uri-template>
<done-flag></done-flag>
</dataset>
</datasets>
<input-events>
<data-in name="event_input1" dataset="ds_input1">
<instance>${coord:current(0)}</instance>
</data-in>
</input-events>
<action>
<workflow>
<app-path>${appBase}/load_flights_by_day</app-path>
<configuration>
<property>
<name>year</name>
<value>${coord:formatTime(coord:nominalTime(), 'yyyy')}</value>
</property>
<property>
<name>month</name>
<value>${coord:formatTime(coord:nominalTime(), 'MM')}</value>
</property>
<property>
<name>day</name>
<value>${coord:formatTime(coord:nominalTime(), 'dd')}</value>
</property>
<property>
<name>hiveScriptLoadPartition</name>
<value>${hiveScriptLoadPartition}</value>
</property>
<property>
<name>hiveScriptCreateDailyTable</name>
<value>${hiveScriptCreateDailyTable}</value>
</property>
<property>
<name>hiveDailyTableNamePrefix</name>
<value>${hiveDailyTableNamePrefix}</value>
</property>
<property>
<name>hiveDailyTableName</name>
<value>${hiveDailyTableNamePrefix}${coord:formatTime(coord:nominalTime(), 'yyyy')}${coord:formatTime(coord:nominalTime(), 'MM')}${coord:formatTime(coord:nominalTime(), 'dd')}</value>
</property>
<property>
<name>hiveDataFolderPrefix</name>
<value>${hiveDataFolderPrefix}</value>
</property>
<property>
<name>hiveDataFolder</name>
<value>${hiveDataFolderPrefix}${coord:formatTime(coord:nominalTime(), 'yyyy')}/${coord:formatTime(coord:nominalTime(), 'MM')}/${coord:formatTime(coord:nominalTime(), 'dd')}</value>
</property>
<property>
<name>sqlDatabaseConnectionString</name>
<value>${sqlDatabaseConnectionString}</value>
</property>
<property>
<name>sqlDatabaseTableName</name>
<value>${sqlDatabaseTableName}</value>
</property>
</configuration>
</workflow>
</action>
</coordinator-app>
Gördüğünüz gibi, koordinatörün çoğunluğu yalnızca yapılandırma bilgilerini iş akışı örneğine geçiriyor. Ancak, çağrılacak birkaç önemli öğe vardır.
1. Nokta:
start
Öğenin üzerindekicoordinator-app
veend
öznitelikleri, koordinatörün çalıştığı zaman aralığını denetler.<coordinator-app ... start="2017-01-01T00:00Z" end="2017-01-05T00:00Z" frequency="${coord:days(1)}" ...>
Koordinatör, özniteliği tarafından belirtilen aralığa göre ve
end
tarih aralığındaki eylemleristart
zamanlamaklafrequency
sorumludur. Zamanlanan her eylem de iş akışını yapılandırıldığı gibi çalıştırır. Yukarıdaki koordinatör tanımında, koordinatör 1 Ocak 2017 ile 5 Ocak 2017 dönemindeki eylemleri çalıştıracak şekilde yapılandırılmıştır. Sıklık, Oozie İfade Dili frekans ifadesi${coord:days(1)}
tarafından bir gün olarak ayarlanır. Bu, koordinatörün günde bir eylem (ve dolayısıyla iş akışı) zamanlamasını sağlar. Bu örnekte olduğu gibi geçmişteki tarih aralıkları için eylem gecikme olmadan çalışacak şekilde zamanlanır. Bir eylemin çalıştırılacak zamanlandığı tarihin başlangıcına nominal saat adı verilir. Örneğin, 1 Ocak 2017 verilerini işlemek için koordinatör, 2017-01-01T00:00:00 GMT nominal zamanına sahip bir eylem zamanlar.2. Nokta: İş akışının tarih aralığında,
dataset
öğesi belirli bir tarih aralığına ait verilerin HDFS'de nereye bakileceğini belirtir ve Oozie'nin verilerin henüz işlenmek üzere kullanılabilir olup olmadığını nasıl belirleyeceğini yapılandırır.<dataset name="ds_input1" frequency="${coord:days(1)}" initial-instance="2016-12-31T00:00Z" timezone="UTC"> <uri-template>${sourceDataFolder}${YEAR}-${MONTH}-FlightData.csv</uri-template> <done-flag></done-flag> </dataset>
HDFS'deki verilerin yolu, öğesinde
uri-template
sağlanan ifadeye göre dinamik olarak oluşturulur. Bu koordinatörde, veri kümesiyle bir gün sıklığı da kullanılır. Koordinatör öğesindeki başlangıç ve bitiş tarihleri, eylemlerin zamanlandığı zamanı denetlerken (ve bunların nominal saatlerini tanımlar),initial-instance
veri kümesindeki vefrequency
değeri, oluşturmada kullanılan tarihin hesaplanması işleminiuri-template
denetler. Bu durumda, ilk günün (1 Ocak 2017) değerindeki verileri aldığından emin olmak için ilk örneği koordinatörün başlangıcından bir gün öncesine ayarlayın. Veri kümesinin tarih hesaplaması, koordinatör tarafından ayarlanan nominal süreyi (ilk eylem için 2017-01-01T00:00:00 GMT) geçirmeyen en son tarihi bulana kadar veri kümesi sıklığı (bir gün) artışlarıyla ilerleyen (12/31/2016) değerindeninitial-instance
ileri doğru ilerler.Boş
done-flag
öğesi, Oozie belirtilen zamanda giriş verilerinin varlığını denetlediğinde, Oozie'nin bir dizin veya dosya bulunup bulunmadığını belirler. Bu durumda, bir csv dosyasının varlığıdır. Bir csv dosyası varsa, Oozie verilerin hazır olduğunu varsayar ve dosyayı işlemek için bir iş akışı örneği başlatır. Csv dosyası yoksa, Oozie verilerin henüz hazır olmadığını ve iş akışı çalıştırmasının bekleme durumuna geçtiğini varsayar.3. Nokta: öğesi,
data-in
ilişkili veri kümesi için içindeki değerleriuri-template
değiştirirken nominal süre olarak kullanılacak belirli zaman damgasını belirtir.<data-in name="event_input1" dataset="ds_input1"> <instance>${coord:current(0)}</instance> </data-in>
Bu durumda, örneğini ifadesi
${coord:current(0)}
olarak ayarlayın. Bu ifade, başlangıçta koordinatör tarafından zamanlandığı gibi eylemin nominal zamanını kullanmaya çevrilir. Başka bir deyişle, koordinatör eylemi 01/01/2017 nominal zamanıyla çalışacak şekilde zamanladığında, URI şablonundaki YIL (2017) ve AY (01) değişkenlerini değiştirmek için 01/01/2017 kullanılır. Bu örnek için URI şablonu hesaplandıktan sonra, Oozie beklenen dizinin veya dosyanın kullanılabilir olup olmadığını denetler ve iş akışının bir sonraki çalıştırmasını buna göre zamanlar.
Yukarıdaki üç nokta, koordinatörün kaynak verilerin günlük olarak işlenmesini zamanladığı bir durum elde etmek için birleştirilir.
1. Nokta: Koordinatör 2017-01-01 nominal tarihiyle başlar.
2. Nokta: Oozie, içinde
sourceDataFolder/2017-01-FlightData.csv
bulunan verileri arar.3. Nokta: Oozie bu dosyayı bulduğunda, 1 Ocak 2017'ye ait verileri işleyecek iş akışının bir örneğini zamanlar. Oozie daha sonra 2017-01-02 için işlemeye devam eder. Bu değerlendirme 2017-01-05'e kadar yinelenmez ancak yinelenmez.
İş akışlarında olduğu gibi, bir koordinatörün yapılandırması, iş akışı tarafından kullanılan ayarların üst kümesine sahip bir dosyada job.properties
tanımlanır.
nameNode=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net
jobTracker=[ACTIVERESOURCEMANAGER]:8050
queueName=default
oozie.use.system.libpath=true
appBase=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/oozie
oozie.coord.application.path=${appBase}
sourceDataFolder=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/example/data/flights/
hiveScriptLoadPartition=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/oozie/load_flights_by_day/hive-load-flights-partition.hql
hiveScriptCreateDailyTable=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/oozie/load_flights_by_day/hive-create-daily-summary-table.hql
hiveDailyTableNamePrefix=dailyflights
hiveDataFolderPrefix=wasbs://[CONTAINERNAME]@[ACCOUNTNAME].blob.core.windows.net/example/data/flights/day/
sqlDatabaseConnectionString="jdbc:sqlserver://[SERVERNAME].database.windows.net;user=[USERNAME];password=[PASSWORD];database=[DATABASENAME]"
sqlDatabaseTableName=dailyflights
Bu job.properties
dosyada sunulan tek yeni özellikler şunlardır:
Özellik | Değer kaynağı |
---|---|
oozie.coord.application.path | Çalıştırılacak Oozie koordinatörünün bulunduğu dosyanın konumunu coordinator.xml gösterir. |
hiveDailyTableNamePrefix | Hazırlama tablosunun tablo adını dinamik olarak oluştururken kullanılan ön ek. |
hiveDataFolderPrefix | Tüm hazırlama tablolarının depolanacağı yolun ön eki. |
Oozie Düzenleyicisi'ni dağıtma ve çalıştırma
İşlem hattını bir koordinatörle çalıştırmak için, iş akışınızın bulunduğu klasörün bir düzey üzerindeki bir klasörden çalışmanız dışında iş akışıyla benzer şekilde ilerleyin. Bu klasör kuralı, bir koordinatörü farklı alt iş akışlarıyla ilişkilendirebilmeniz için koordinatörleri disk üzerindeki iş akışlarından ayırır.
Koordinatör dosyalarını kümenizin baş düğümünün yerel depolama alanına kopyalamak için yerel makinenizden SCP kullanın.
scp ./* sshuser@CLUSTERNAME-ssh.azurehdinsight.net:~
Baş düğümünüzün içine SSH.
ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
Düzenleyici dosyalarını HDFS'ye kopyalayın.
hdfs dfs -put ./* /oozie/
Koordinatörü çalıştırın.
oozie job -config job.properties -run
Oozie Web Konsolu'nu kullanarak durumu doğrulayın, bu kez Koordinatör İşleri sekmesini ve ardından Tüm işler'i seçin.
Zamanlanmış eylemlerin listesini görüntülemek için bir koordinatör örneği seçin. Bu durumda, 1 Ocak 2017 ile 4 Ocak 2017 arasında nominal süreleri olan dört eylem görmeniz gerekir.
Bu listedeki her eylem, iş akışının bir günlük verileri işleyen bir örneğine karşılık gelir ve burada o günün başlangıcı nominal saatle gösterilir.