Agregar operações no Azure Cosmos DB para tabelas Apache Cassandra do Spark
APLICA-SE A: Cassandra
Este artigo descreve operações básicas de agregação no Azure Cosmos DB para tabelas Apache Cassandra do Spark.
Nota
Atualmente, não há suporte para filtragem do lado do servidor e agregação do lado do servidor no Azure Cosmos DB para Apache Cassandra.
Configuração da API para Cassandra
Defina abaixo a configuração de faísca no cluster de blocos de anotações. É uma atividade única.
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Nota
Se você estiver usando o Spark 3.x, não precisará instalar o auxiliar e a fábrica de conexões do Azure Cosmos DB. Você também deve usar remoteConnectionsPerExecutor
em vez do connections_per_executor_max
conector Spark 3 (veja acima).
Aviso
Os exemplos do Spark 3 mostrados neste artigo foram testados com o Spark versão 3.2.1 e o correspondente Cassandra Spark Connector com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0. Versões posteriores do Spark e/ou do conector Cassandra podem não funcionar como esperado.
Gerador de dados de amostra
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.functions._
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
// Generate a simple dataset containing five values
val booksDF = Seq(
("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
booksDF.write
.mode("append")
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
.save()
Operação de contagem
RDD API
sc.cassandraTable("books_ks", "books").count
Saída:
count: Long = 5
API DataFrame
Atualmente, não há suporte para contagem em dataframes. O exemplo abaixo mostra como executar uma contagem de dataframe depois de persistir o dataframe na memória como uma solução alternativa.
Escolha uma opção de armazenamento entre as seguintes opções disponíveis, para evitar problemas de "falta de memória":
MEMORY_ONLY: É a opção de armazenamento padrão. Armazena RDD como objetos Java desserializados na JVM. Se o RDD não couber na memória, algumas partições não serão armazenadas em cache e serão recalculadas instantaneamente sempre que forem necessárias.
MEMORY_AND_DISK: Armazena RDD como objetos Java desserializados na JVM. Se o RDD não couber na memória, armazene as partições que não cabem no disco e, sempre que necessário, leia-as a partir do local onde estão armazenadas.
MEMORY_ONLY_SER (Java/Scala): Armazena RDD como objetos Java serializados - matriz de 1 byte por partição. Essa opção é eficiente em termos de espaço quando comparada a objetos desserializados, especialmente ao usar um serializador rápido, mas com maior consumo de CPU para leitura.
MEMORY_AND_DISK_SER (Java/Scala): Esta opção de armazenamento é como MEMORY_ONLY_SER, a única diferença é que ela derrama partições que não cabem na memória do disco em vez de recalculá-las quando são necessárias.
DISK_ONLY: Armazena as partições RDD apenas no disco.
MEMORY_ONLY_2, MEMORY_AND_DISK_2...: O mesmo que os níveis acima, mas replica cada partição em dois nós de cluster.
OFF_HEAP (experimental): Semelhante ao MEMORY_ONLY_SER, mas armazena os dados na memória off-heap e requer memória off-heap para ser ativada com antecedência.
//Workaround
import org.apache.spark.storage.StorageLevel
//Read from source
val readBooksDF = spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
//Explain plan
readBooksDF.explain
//Materialize the dataframe
readBooksDF.persist(StorageLevel.MEMORY_ONLY)
//Subsequent execution against this DF hits the cache
readBooksDF.count
//Persist as temporary view
readBooksDF.createOrReplaceTempView("books_vw")
SQL
%sql
select * from books_vw;
select count(*) from books_vw where book_pub_year > 1900;
select count(book_id) from books_vw;
select book_author, count(*) as count from books_vw group by book_author;
select count(*) from books_vw;
Operação média
RDD API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Double) => c).mean
Saída:
res24: Double = 16.016000175476073
API DataFrame
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(avg("book_price"))
.show
Saída:
+------------------+
| avg(book_price) |
| +------------------+ |
| 16.016000175476073 |
| +------------------+ |
SQL
select avg(book_price) from books_vw;
Saída:
16.016000175476073
Operação mínima
RDD API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).min
Saída:
res31: Float = 11.33
API DataFrame
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_id","book_price")
.agg(min("book_price"))
.show
Saída:
+---------------+
| min(book_price) |
| +---------------+ |
| 11.33 |
| +---------------+ |
SQL
%sql
select avg(book_price) from books_vw;
Saída:
11.33
Operação máxima
RDD API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).max
API DataFrame
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(max("book_price"))
.show
Saída:
+---------------+
| max(book_price) |
| +---------------+ |
| 22.45 |
| +---------------+ |
SQL
%sql
select max(book_price) from books_vw;
Saída:
22.45
Operação de soma
RDD API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).sum
Saída:
res46: Double = 80.08000087738037
API DataFrame
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(sum("book_price"))
.show
Saída:
+-----------------+
| sum(book_price) |
| +-----------------+ |
| 80.08000087738037 |
| +-----------------+ |
SQL
select sum(book_price) from books_vw;
Saída:
80.08000087738037
Operação superior ou comparável
RDD API
val readCalcTopRDD = sc.cassandraTable("books_ks", "books").select("book_name","book_price").sortBy(_.getFloat(1), false)
readCalcTopRDD.zipWithIndex.filter(_._2 < 3).collect.foreach(println)
//delivers the first top n items without collecting the rdd to the driver.
Saída:
(CassandraRow{book_name: A sign of four, book_price: 22.45},0)
(CassandraRow{book_name: The adventures of Sherlock Holmes, book_price: 19.83},1)
(CassandraRow{book_name: The memoirs of Sherlock Holmes, book_price: 14.22},2)
readCalcTopRDD: org.apache.spark.rdd.RDD[com.datastax.spark.connector.CassandraRow] = MapPartitionsRDD[430] at sortBy at command-2371828989676374:1
API DataFrame
import org.apache.spark.sql.functions._
val readBooksDF = spark.read.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
.select("book_name","book_price")
.orderBy(desc("book_price"))
.limit(3)
//Explain plan
readBooksDF.explain
//Top
readBooksDF.show
Saída:
== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[book_price#1840 DESC NULLS LAST], output=[book_name#1839,book_price#1840])
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@29cd5f58 [book_name#1839,book_price#1840] PushedFilters: [], ReadSchema: struct<book_name:string,book_price:float>
+--------------------+----------+
| book_name | book_price |
| +--------------------+----------+ |
| A sign of four | 22.45 |
| The adventures of... | 19.83 |
| The memoirs of Sh... | 14.22 |
| +--------------------+----------+ |
import org.apache.spark.sql.functions._
readBooksDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [book_name: string, book_price: float]
SQL
select book_name,book_price from books_vw order by book_price desc limit 3;