Агрегированные операции в Azure Cosmos DB для таблиц Apache Cassandra из Spark
Область применения: Кассандра
В этой статье описаны основные операции агрегирования в таблицах Apache Cassandra для Azure Cosmos DB из Spark.
Примечание.
Фильтрация на стороне сервера и агрегирование на стороне сервера в настоящее время не поддерживается в Azure Cosmos DB для Apache Cassandra.
API для конфигурации Cassandra
Задайте следующую конфигурацию Spark в кластере записных книжек. Это разовое действие.
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Примечание.
Если вы используете Spark 3.x, вам не нужно устанавливать вспомогательный сервер Azure Cosmos DB и фабрику подключений. Также необходимо использовать remoteConnectionsPerExecutor
вместо connections_per_executor_max
для соединителя Spark 3 (см. выше).
Предупреждение
Примеры для Spark 3 в этой статье протестированы с использованием Spark версии 3.2.1 и соответствующего соединителя Cassandra Spark com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Более поздние версии Spark и(или) соединителя Cassandra могут не функционировать должным образом.
Пример генератора данных
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.functions._
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
// Generate a simple dataset containing five values
val booksDF = Seq(
("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
booksDF.write
.mode("append")
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
.save()
Операция подсчета
API RRD
sc.cassandraTable("books_ks", "books").count
Выходные данные:
count: Long = 5
API Dataframe
Подсчет кадров данных сейчас не поддерживается. В следующем примере показано, как выполнить подсчет кадров данных после сохранения кадра данных в память в качестве обходного решения.
Выберите вариант хранения из доступных вариантов, чтобы не столкнуться с проблемами нехватки памяти:
MEMORY_ONLY: это вариант хранилища по умолчанию. Хранит RDD как десериализованные объекты Java на виртуальной машине Java. Если RDD не помещается в памяти, некоторые секции не будут кэшироваться, и они будут вычисляться повторно в режиме реального времени каждый раз, когда требуется.
MEMORY_AND_DISK: хранит RDD как десериализованные объекты Java на виртуальной машине Java. Если RDD не помещается в памяти, секции, которые не помещаются в памяти, будут сохранены на диске, и, когда они потребуются, они будут считаны из расположения, в котором хранятся.
MEMORY_ONLY_SER (Java/Scala): хранит RDD как сериализованные объекты Java — в однобайтовом массиве для каждой секции. Этот вариант позволяет сэкономить пространство по сравнению с десериализованными объектами, особенно при использовании быстрого сериализатора, однако для чтения таких объектов более интенсивно используется ЦП.
MEMORY_AND_DISK_SER (Java/Scala): этот вариант похож на MEMORY_ONLY_SER, единственное различие состоит в том, что секции, которые не помещаются на диске, сбрасываются, вместо того чтобы вычислять их повторно, когда они необходимы.
DISK_ONLY: хранит секции RDD только на диске.
MEMORY_ONLY_2, MEMORY_AND_DISK_2...: то же, что и уровни выше, но реплицирует каждую секцию на двух узлах кластера.
OFF_HEAP (экспериментальная функция): аналогично MEMORY_ONLY_SER, однако данные сохраняются в памяти вне кучи, и память вне кучи требуется предварительно включить.
//Workaround
import org.apache.spark.storage.StorageLevel
//Read from source
val readBooksDF = spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
//Explain plan
readBooksDF.explain
//Materialize the dataframe
readBooksDF.persist(StorageLevel.MEMORY_ONLY)
//Subsequent execution against this DF hits the cache
readBooksDF.count
//Persist as temporary view
readBooksDF.createOrReplaceTempView("books_vw")
SQL
%sql
select * from books_vw;
select count(*) from books_vw where book_pub_year > 1900;
select count(book_id) from books_vw;
select book_author, count(*) as count from books_vw group by book_author;
select count(*) from books_vw;
Операция усреднения
API RRD
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Double) => c).mean
Выходные данные:
res24: Double = 16.016000175476073
API Dataframe
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(avg("book_price"))
.show
Выходные данные:
+------------------+
| avg(book_price) |
| +------------------+ |
| 16.016000175476073 |
| +------------------+ |
SQL
select avg(book_price) from books_vw;
Выходные данные:
16.016000175476073
Операция определения минимума
API RRD
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).min
Выходные данные:
res31: Float = 11.33
API Dataframe
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_id","book_price")
.agg(min("book_price"))
.show
Выходные данные:
+---------------+
| min(book_price) |
| +---------------+ |
| 11.33 |
| +---------------+ |
SQL
%sql
select avg(book_price) from books_vw;
Выходные данные:
11.33
Операция определения максимума
API RRD
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).max
API Dataframe
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(max("book_price"))
.show
Выходные данные:
+---------------+
| max(book_price) |
| +---------------+ |
| 22.45 |
| +---------------+ |
SQL
%sql
select max(book_price) from books_vw;
Выходные данные:
22.45
Операция суммирования
API RRD
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).sum
Выходные данные:
res46: Double = 80.08000087738037
API Dataframe
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(sum("book_price"))
.show
Выходные данные:
+-----------------+
| sum(book_price) |
| +-----------------+ |
| 80.08000087738037 |
| +-----------------+ |
SQL
select sum(book_price) from books_vw;
Выходные данные:
80.08000087738037
Операция определения максимального значения или сравнения
API RRD
val readCalcTopRDD = sc.cassandraTable("books_ks", "books").select("book_name","book_price").sortBy(_.getFloat(1), false)
readCalcTopRDD.zipWithIndex.filter(_._2 < 3).collect.foreach(println)
//delivers the first top n items without collecting the rdd to the driver.
Выходные данные:
(CassandraRow{book_name: A sign of four, book_price: 22.45},0)
(CassandraRow{book_name: The adventures of Sherlock Holmes, book_price: 19.83},1)
(CassandraRow{book_name: The memoirs of Sherlock Holmes, book_price: 14.22},2)
readCalcTopRDD: org.apache.spark.rdd.RDD[com.datastax.spark.connector.CassandraRow] = MapPartitionsRDD[430] at sortBy at command-2371828989676374:1
API Dataframe
import org.apache.spark.sql.functions._
val readBooksDF = spark.read.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
.select("book_name","book_price")
.orderBy(desc("book_price"))
.limit(3)
//Explain plan
readBooksDF.explain
//Top
readBooksDF.show
Выходные данные:
== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[book_price#1840 DESC NULLS LAST], output=[book_name#1839,book_price#1840])
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@29cd5f58 [book_name#1839,book_price#1840] PushedFilters: [], ReadSchema: struct<book_name:string,book_price:float>
+--------------------+----------+
| book_name | book_price |
| +--------------------+----------+ |
| A sign of four | 22.45 |
| The adventures of... | 19.83 |
| The memoirs of Sh... | 14.22 |
| +--------------------+----------+ |
import org.apache.spark.sql.functions._
readBooksDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [book_name: string, book_price: float]
SQL
select book_name,book_price from books_vw order by book_price desc limit 3;