Aggregierungsvorgänge an Azure Cosmos DB for Apache Cassandra-Tabellen in Spark
GILT FÜR: Cassandra
In diesem Artikel werden grundlegende Aggregierungsvorgänge an Azure Cosmos DB for Apache Cassandra-Tabellen in Spark beschrieben.
Hinweis
Serverseitige Filterung und serverseitige Aggregierung werden derzeit von Azure Cosmos DB for Apache Cassandra nicht unterstützt.
API für Cassandra-Konfiguration
Legen Sie in Ihrem Notebookcluster die folgende Spark-Konfiguration fest. Dieser Schritt muss nur einmal ausgeführt werden.
//Connection-related
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Hinweis
Wenn Sie Spark 3 verwenden, müssen Sie die Hilfs- und Verbindungsfactory von Azure Cosmos DB nicht installieren. Sie sollten auch remoteConnectionsPerExecutor
anstelle von connections_per_executor_max
für den Spark 3-Connector verwenden (siehe oben).
Warnung
Die in diesem Artikel gezeigten Spark 3-Beispiele wurden mit Spark Version 3.2.1 und dem entsprechenden Cassandra Spark-Connector com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0 getestet. Höhere Versionen von Spark und/oder dem Cassandra-Connector funktionieren möglicherweise nicht wie erwartet.
Beispieldatengenerator
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
import org.apache.spark.sql.functions._
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
// Generate a simple dataset containing five values
val booksDF = Seq(
("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33),
("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45),
("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83),
("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22),
("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25)
).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price")
booksDF.write
.mode("append")
.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000"))
.save()
Zählvorgang
RDD-API
sc.cassandraTable("books_ks", "books").count
Ausgabe:
count: Long = 5
Datenrahmen-API
Die Zählung von Datenrahmen wird derzeit nicht unterstützt. Das folgende Beispiel zeigt die Ausführung einer Zählung von Datenrahmen nach Beibehalten des Datenrahmens im Arbeitsspeicher als Problemumgehung.
Wählen Sie eine Speicheroption aus den folgenden verfügbaren Optionen, um das Problem „Nicht genügend Arbeitsspeicher“ zu vermeiden:
MEMORY_ONLY: Das ist die Standardspeicheroption. Speichert das RDD als deserialisierte Java-Objekte in der JVM. Wenn das RDD nicht in den Arbeitsspeicher passt, werden einige Partitionen nicht zwischengespeichert und werden im laufenden Betrieb jedes Mal bei Bedarf neu berechnet.
MEMORY_AND_DISK: Speichert das RDD als deserialisierte Java-Objekte in der JVM. Wenn das RDD nicht in den Arbeitsspeicher passt, speichern Sie die Partitionen, die nicht auf den Datenträger passen, und lesen Sie sie bei Bedarf an ihrem Speicherort.
MEMORY_ONLY_SER (Java/Scala): Speichert das RDD als serialisierte Java-Objekte – 1-Byte-Array pro Partition. Diese Option ist platzsparend im Vergleich zu deserialisierten Objekten, insbesondere bei Verwendung eines schnellen Serialisierungsprogramms, aber das Lesen erfordert mehr CPU-Leistung.
MEMORY_AND_DISK_SER (Java/Scala): Diese Speicheroption entspricht MEMORY_ONLY_SER – der einzige Unterschied besteht darin, dass Partitionen überlaufen, die nicht in den Datenträgerspeicher passen, anstatt dass sie bei Bedarf neu berechnet werden.
DISK_ONLY: Speichert die RDD-Partitionen nur auf dem Datenträger.
MEMORY_ONLY_2, MEMORY_AND_DISK_2...: Identisch mit den oben genannten Ebenen, aber jede Partition wird auf beiden Clusterknoten repliziert.
OFF_HEAP (experimentell): Ähnelt MEMORY_ONLY_SER, speichert aber die Daten im Off-Heap-Arbeitsspeicher und benötigt Off-Heap-Arbeitsspeicher, um im voraus aktiviert zu werden.
//Workaround
import org.apache.spark.storage.StorageLevel
//Read from source
val readBooksDF = spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
//Explain plan
readBooksDF.explain
//Materialize the dataframe
readBooksDF.persist(StorageLevel.MEMORY_ONLY)
//Subsequent execution against this DF hits the cache
readBooksDF.count
//Persist as temporary view
readBooksDF.createOrReplaceTempView("books_vw")
SQL
%sql
select * from books_vw;
select count(*) from books_vw where book_pub_year > 1900;
select count(book_id) from books_vw;
select book_author, count(*) as count from books_vw group by book_author;
select count(*) from books_vw;
Durchschnittsvorgang
RDD-API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Double) => c).mean
Ausgabe:
res24: Double = 16.016000175476073
Datenrahmen-API
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(avg("book_price"))
.show
Ausgabe:
+------------------+
| avg(book_price) |
| +------------------+ |
| 16.016000175476073 |
| +------------------+ |
SQL
select avg(book_price) from books_vw;
Ausgabe:
16.016000175476073
Minimalvorgang
RDD-API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).min
Ausgabe:
res31: Float = 11.33
Datenrahmen-API
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_id","book_price")
.agg(min("book_price"))
.show
Ausgabe:
+---------------+
| min(book_price) |
| +---------------+ |
| 11.33 |
| +---------------+ |
SQL
%sql
select avg(book_price) from books_vw;
Ausgabe:
11.33
Maximalvorgang
RDD-API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).max
Datenrahmen-API
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(max("book_price"))
.show
Ausgabe:
+---------------+
| max(book_price) |
| +---------------+ |
| 22.45 |
| +---------------+ |
SQL
%sql
select max(book_price) from books_vw;
Ausgabe:
22.45
Summierungsvorgang
RDD-API
sc.cassandraTable("books_ks", "books").select("book_price").as((c: Float) => c).sum
Ausgabe:
res46: Double = 80.08000087738037
Datenrahmen-API
spark
.read
.cassandraFormat("books", "books_ks", "")
.load()
.select("book_price")
.agg(sum("book_price"))
.show
Ausgabe:
+-----------------+
| sum(book_price) |
| +-----------------+ |
| 80.08000087738037 |
| +-----------------+ |
SQL
select sum(book_price) from books_vw;
Ausgabe:
80.08000087738037
Spitzen- oder vergleichbarer Vorgang
RDD-API
val readCalcTopRDD = sc.cassandraTable("books_ks", "books").select("book_name","book_price").sortBy(_.getFloat(1), false)
readCalcTopRDD.zipWithIndex.filter(_._2 < 3).collect.foreach(println)
//delivers the first top n items without collecting the rdd to the driver.
Ausgabe:
(CassandraRow{book_name: A sign of four, book_price: 22.45},0)
(CassandraRow{book_name: The adventures of Sherlock Holmes, book_price: 19.83},1)
(CassandraRow{book_name: The memoirs of Sherlock Holmes, book_price: 14.22},2)
readCalcTopRDD: org.apache.spark.rdd.RDD[com.datastax.spark.connector.CassandraRow] = MapPartitionsRDD[430] at sortBy at command-2371828989676374:1
Datenrahmen-API
import org.apache.spark.sql.functions._
val readBooksDF = spark.read.format("org.apache.spark.sql.cassandra")
.options(Map( "table" -> "books", "keyspace" -> "books_ks"))
.load
.select("book_name","book_price")
.orderBy(desc("book_price"))
.limit(3)
//Explain plan
readBooksDF.explain
//Top
readBooksDF.show
Ausgabe:
== Physical Plan ==
TakeOrderedAndProject(limit=3, orderBy=[book_price#1840 DESC NULLS LAST], output=[book_name#1839,book_price#1840])
+- *(1) Scan org.apache.spark.sql.cassandra.CassandraSourceRelation@29cd5f58 [book_name#1839,book_price#1840] PushedFilters: [], ReadSchema: struct<book_name:string,book_price:float>
+--------------------+----------+
| book_name | book_price |
| +--------------------+----------+ |
| A sign of four | 22.45 |
| The adventures of... | 19.83 |
| The memoirs of Sh... | 14.22 |
| +--------------------+----------+ |
import org.apache.spark.sql.functions._
readBooksDF: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [book_name: string, book_price: float]
SQL
select book_name,book_price from books_vw order by book_price desc limit 3;