Operações DDL no Azure Cosmos DB para Apache Cassandra do Spark
APLICA-SE A: Cassandra
Este artigo detalha as operações DDL de espaço de chave e tabela no Azure Cosmos DB para Apache Cassandra do Spark.
Contexto da faísca
O conector da API para Cassandra requer que os detalhes da conexão Cassandra sejam inicializados como parte do contexto de faísca. Quando você inicia um bloco de anotações, o contexto de faísca já está inicializado e não é aconselhável pará-lo e reinicializá-lo. Uma solução é adicionar a API para configuração de instância Cassandra em um nível de cluster, na configuração de faísca de cluster. É uma atividade única por cluster. Adicione o seguinte código à configuração do Spark como um par de valores de chave separados por espaço:
spark.cassandra.connection.host YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_COSMOSDB_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_COSMOSDB_KEY
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
API para configuração relacionada a Cassandra
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//if using Spark 2.x, CosmosDB library for multiple retry
//import com.microsoft.azure.cosmosdb.cassandra
//spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory")
Nota
Se você estiver usando o Spark 3.x, não precisará instalar o auxiliar e a fábrica de conexões do Azure Cosmos DB. Você também deve usar remoteConnectionsPerExecutor
em vez do connections_per_executor_max
conector Spark 3 (veja acima).
Aviso
Os exemplos do Spark 3 mostrados neste artigo foram testados com o Spark versão 3.2.1 e o correspondente Cassandra Spark Connector com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.1. Versões posteriores do Spark e/ou do conector Cassandra podem não funcionar como esperado.
Operações DDL Keyspace
Criar um espaço de chave
//Cassandra connector instance
val cdbConnector = CassandraConnector(sc)
// Create keyspace
cdbConnector.withSessionDo(session => session.execute("CREATE KEYSPACE IF NOT EXISTS books_ks WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1 } "))
Validar em cqlsh
Execute o seguinte comando no cqlsh e você verá o keyspace criado anteriormente.
DESCRIBE keyspaces;
Solte um espaço de teclas
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP KEYSPACE books_ks"))
Validar em cqlsh
DESCRIBE keyspaces;
Tabela de operações DDL
Considerações:
- A taxa de transferência pode ser atribuída no nível da tabela usando a instrução create table.
- Uma chave de partição pode armazenar 20 GB de dados.
- Um registo pode armazenar um máximo de 2 MB de dados.
- Um intervalo de chaves de partição pode armazenar várias chaves de partição.
Criar uma tabela
cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT, PRIMARY KEY(book_id,book_pub_year)) WITH cosmosdb_provisioned_throughput=4000 , WITH default_time_to_live=630720000;"))
Validar em cqlsh
Execute o seguinte comando no cqlsh e você verá a tabela chamada "books:
USE books_ks;
DESCRIBE books;
A taxa de transferência provisionada e os valores TTL padrão não são mostrados na saída do comando anterior, você pode obter esses valores do portal.
Alterar tabela
Você pode alterar os seguintes valores usando o comando alter table:
- débito aprovisionado
- valor de tempo de vida
No momento, não há suporte para alterações de coluna.
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("ALTER TABLE books_ks.books WITH cosmosdb_provisioned_throughput=8000, WITH default_time_to_live=0;"))
Mesa suspensa
val cdbConnector = CassandraConnector(sc)
cdbConnector.withSessionDo(session => session.execute("DROP TABLE IF EXISTS books_ks.books;"))
Validar em cqlsh
Execute o seguinte comando no cqlsh e você verá que a tabela "books" não está mais disponível:
USE books_ks;
DESCRIBE tables;
Próximos passos
Depois de criar o keyspace e a tabela, prossiga para os seguintes artigos para operações CRUD e muito mais: