Azure HDInsight 中的 Hive Warehouse Connector API
本文列出 Hive Warehouse Connector 支援的所有 API。 下列所有範例都是使用 Spark Shell 和 Hive Warehouse Connector 工作階段執行的。
如何建立 Hive Warehouse Connector 工作階段:
import com.hortonworks.hwc.HiveWarehouseSession
val hive = HiveWarehouseSession.session(spark).build()
必要條件
完成 Hive Warehouse Connector 設定步驟。
支援的 API
設定資料庫:
hive.setDatabase("<database-name>")
列出所有資料庫:
hive.showDatabases()
列出目前資料庫中的所有資料表
hive.showTables()
描述資料表
// Describes the table <table-name> in the current database hive.describeTable("<table-name>")
// Describes the table <table-name> in <database-name> hive.describeTable("<database-name>.<table-name>")
卸除資料庫
// ifExists and cascade are boolean variables hive.dropDatabase("<database-name>", ifExists, cascade)
卸除目前資料庫中的資料表
// ifExists and purge are boolean variables hive.dropTable("<table-name>", ifExists, purge)
建立資料庫
// ifNotExists is boolean variable hive.createDatabase("<database-name>", ifNotExists)
在目前的資料庫中建立資料表
// Returns a builder to create table val createTableBuilder = hive.createTable("<table-name>")
create-table 的產生器僅支援下列作業:
// Create only if table does not exists already createTableBuilder = createTableBuilder.ifNotExists()
// Add columns createTableBuilder = createTableBuilder.column("<column-name>", "<datatype>")
// Add partition column createTableBuilder = createTableBuilder.partition("<partition-column-name>", "<datatype>")
// Add table properties createTableBuilder = createTableBuilder.prop("<key>", "<value>")
// Creates a bucketed table, // Parameters are numOfBuckets (integer) followed by column names for bucketing createTableBuilder = createTableBuilder.clusterBy(numOfBuckets, "<column1>", .... , "<columnN>")
// Creates the table createTableBuilder.create()
注意
此 API 會在預設位置建立 ORC 格式化資料表。 若要使用其他功能/選項,或使用 Hive 查詢建立資料表,請使用
executeUpdate
API。讀取資料表
// Returns a Dataset<Row> that contains data of <table-name> in the current database hive.table("<table-name>")
在 HiveServer2 上執行 DDL 命令
// Executes the <hive-query> against HiveServer2 // Returns true or false if the query succeeded or failed respectively hive.executeUpdate("<hive-query>")
// Executes the <hive-query> against HiveServer2 // Throws exception, if propagateException is true and query threw exception in HiveServer2 // Returns true or false if the query succeeded or failed respectively hive.executeUpdate("<hive-query>", propagateException) // propagate exception is boolean value
執行 Hive 查詢並在資料集中載入結果
透過 LLAP 精靈執行查詢。 [建議]
// <hive-query> should be a hive query hive.executeQuery("<hive-query>")
使用 HiveServer2 透過 JDBC 執行查詢。
先在 Spark 設定中將
spark.datasource.hive.warehouse.smartExecution
設為false
,再啟動 Spark 工作階段以使用此 APIhive.execute("<hive-query>")
關閉 Hive Warehouse Connector 工作階段
// Closes all the open connections and // release resources/locks from HiveServer2 hive.close()
執行 Hive 合併查詢
此 API 會建立下列格式的 Hive 合併查詢
MERGE INTO <current-db>.<target-table> AS <targetAlias> USING <source expression/table> AS <sourceAlias> ON <onExpr> WHEN MATCHED [AND <updateExpr>] THEN UPDATE SET <nameValuePair1> ... <nameValuePairN> WHEN MATCHED [AND <deleteExpr>] THEN DELETE WHEN NOT MATCHED [AND <insertExpr>] THEN INSERT VALUES <value1> ... <valueN>
val mergeBuilder = hive.mergeBuilder() // Returns a builder for merge query
產生器支援下列作業:
mergeBuilder.mergeInto("<target-table>", "<targetAlias>")
mergeBuilder.using("<source-expression/table>", "<sourceAlias>")
mergeBuilder.on("<onExpr>")
mergeBuilder.whenMatchedThenUpdate("<updateExpr>", "<nameValuePair1>", ... , "<nameValuePairN>")
mergeBuilder.whenMatchedThenDelete("<deleteExpr>")
mergeBuilder.whenNotMatchedInsert("<insertExpr>", "<value1>", ... , "<valueN>");
// Executes the merge query mergeBuilder.merge()
以批次方式將資料集寫入至 Hive 資料表
df.write.format("com.hortonworks.spark.sql.hive.llap.HiveWarehouseConnector") .option("table", tableName) .mode(SaveMode.Type) .save()
TableName 的格式應為
<db>.<table>
或<table>
。 若未提供資料庫名稱,則會在目前的資料庫中搜尋/建立資料表SaveMode 類型為:
附加:將資料集附加至指定的資料表
覆寫:使用資料集覆寫指定資料表中的資料
忽略:如果資料表已存在則略過寫入,不會擲回錯誤
ErrorIfExists:如果資料表已存在則擲回錯誤
使用 HiveStreaming 將資料集寫入至 Hive 資料表
df.write.format("com.hortonworks.spark.sql.hive.llap.HiveStreamingDataSource") .option("database", databaseName) .option("table", tableName) .option("metastoreUri", "<HMS_URI>") // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .save() // To write to static partition df.write.format("com.hortonworks.spark.sql.hive.llap.HiveStreamingDataSource") .option("database", databaseName) .option("table", tableName) .option("partition", partition) .option("metastoreUri", "<HMS URI>") // .option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .save()
注意
資料流寫入一律會附加資料。
將 Spark 資料流寫入至 Hive 資料表
stream.writeStream .format("com.hortonworks.spark.sql.hive.llap.streaming.HiveStreamingDataSource") .option("metastoreUri", "<HMS_URI>") .option("database", databaseName) .option("table", tableName) //.option("partition", partition) , add if inserting data in partition //.option("metastoreKrbPrincipal", principal), add if executing in ESP cluster .start()