Conectar-se ao Azure Cosmos DB para Apache Cassandra a partir do Spark
APLICA-SE A: Cassandra
Este artigo é um entre uma série de artigos sobre a integração do Azure Cosmos DB para Apache Cassandra do Spark. Os artigos abordam conectividade, operações DDL (Data Definition Language), operações DML (Data Manipulation Language) básicas e integração avançada do Azure Cosmos DB para Apache Cassandra do Spark.
Pré-requisitos
Provisione uma conta do Azure Cosmos DB para Apache Cassandra.
Provisione sua escolha de ambiente do Spark [Azure Databricks | Azure HDInsight-Spark | Outros].
Dependências para conectividade
Conector Spark para Cassandra: O conector Spark é usado para se conectar ao Azure Cosmos DB para Apache Cassandra. Identifique e use a versão do conector localizada no Maven central que é compatível com as versões Spark e Scala do seu ambiente Spark. Recomendamos um ambiente que suporte o Spark 3.2.1 ou superior e o conector de faísca disponível nas coordenadas
com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.2.0
maven. Se estiver usando o Spark 2.x, recomendamos um ambiente com o Spark versão 2.4.5, usando o conector de faísca nas coordenadascom.datastax.spark:spark-cassandra-connector_2.11:2.4.3
maven.Biblioteca auxiliar do Azure Cosmos DB para API para Cassandra: Se você estiver usando uma versão do Spark 2.x, além do conector do Spark, precisará de outra biblioteca chamada azure-cosmos-cassandra-spark-helper com coordenadas
com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.2.0
maven do Azure Cosmos DB para lidar com a limitação de taxa. Esta biblioteca contém classes de política de fábrica de conexões personalizadas e de novas tentativas.A política de repetição no Azure Cosmos DB está configurada para lidar com exceções do código de status HTTP 429 ("Taxa de solicitação grande"). O Azure Cosmos DB para Apache Cassandra traduz essas exceções em erros sobrecarregados no protocolo nativo Cassandra e você pode tentar novamente com back-offs. Como o Azure Cosmos DB usa o modelo de taxa de transferência provisionada, as exceções de limitação de taxa de solicitação ocorrem quando as taxas de entrada/saída aumentam. A política de repetição protege seus trabalhos de faísca contra picos de dados que excedem momentaneamente a taxa de transferência alocada para seu contêiner. Se estiver usando o conector Spark 3.x, a implementação dessa biblioteca não será necessária.
Nota
A política de repetição pode proteger seus trabalhos de faísca apenas contra picos momentâneos. Se você não configurou RUs suficientes necessárias para executar sua carga de trabalho, a política de repetição não é aplicável e a classe de política de repetição relança a exceção.
Detalhes de conexão da conta do Azure Cosmos DB: seu nome de conta, ponto de extremidade de conta e chave da API do Azure para Cassandra.
Otimizando a configuração da taxa de transferência do conector Spark
Listados na próxima seção estão todos os parâmetros relevantes para controlar a taxa de transferência usando o Spark Connector for Cassandra. Para otimizar os parâmetros para maximizar a taxa de transferência para trabalhos de faísca, o spark.cassandra.output.concurrent.writes
, spark.cassandra.concurrent.reads
e spark.cassandra.input.reads_per_sec
as configurações precisam ser configuradas corretamente, a fim de evitar muita limitação e back-off (o que, por sua vez, pode levar a uma taxa de transferência mais baixa).
O valor ideal dessas configurações depende de quatro fatores:
- A quantidade de taxa de transferência (Unidades de Solicitação) configurada para a tabela na qual os dados estão sendo ingeridos.
- O número de trabalhadores no cluster do Spark.
- O número de executores configurados para o seu trabalho do Spark (que pode ser controlado usando
spark.cassandra.connection.connections_per_executor_max
ouspark.cassandra.connection.remoteConnectionsPerExecutor
dependendo da versão do Spark) - A latência média de cada solicitação ao Azure Cosmos DB, se você estiver colocado no mesmo Data Center. Suponha que esse valor seja 10 ms para gravações e 3 ms para leituras.
Por exemplo, se tivermos cinco trabalhadores e um valor de spark.cassandra.output.concurrent.writes
= 1 e um valor de spark.cassandra.connection.remoteConnectionsPerExecutor
= 1, então teremos cinco trabalhadores que estão escrevendo simultaneamente na tabela, cada um com um thread. Se demorar 10 ms para executar uma única gravação, podemos enviar 100 solicitações (1000 milissegundos divididos por 10) por segundo, por thread. Com cinco trabalhadores, seriam 500 gravações por segundo. A um custo médio de cinco unidades de solicitação (RUs) por gravação, a tabela de destino precisaria de um mínimo de 2500 unidades de solicitação provisionadas (5 RUs x 500 gravações por segundo).
Aumentar o número de executores pode aumentar o número de threads em um determinado trabalho, o que, por sua vez, pode aumentar a taxa de transferência. No entanto, o impacto exato disso pode ser variável dependendo do trabalho, enquanto o controle da taxa de transferência com o número de trabalhadores é mais determinista. Você também pode determinar o custo exato de uma determinada solicitação criando o perfil para obter a cobrança da Unidade de Solicitação (RU). Isso ajudará você a ser mais preciso ao provisionar a taxa de transferência para sua tabela ou espaço de chave. Dê uma olhada em nosso artigo aqui para entender como obter taxas unitárias de solicitação em um nível por solicitação.
Dimensionamento da taxa de transferência no banco de dados
O conector Cassandra Spark saturará a taxa de transferência no Azure Cosmos DB de forma eficiente. Como resultado, mesmo com tentativas eficazes, você precisará garantir que tenha uma taxa de transferência (RUs) suficiente provisionada no nível da tabela ou do espaço de chave para evitar erros relacionados à limitação de taxa. A configuração mínima de 400 RUs em uma determinada tabela ou espaço de teclas não será suficiente. Mesmo com definições de configuração de taxa de transferência mínima, o conector Spark pode gravar a uma taxa correspondente a cerca de 6000 unidades de solicitação ou mais.
Se a configuração de RU necessária para a movimentação de dados usando o Spark for maior do que a necessária para sua carga de trabalho em estado estacionário, você poderá facilmente dimensionar a taxa de transferência para cima e para baixo sistematicamente no Azure Cosmos DB para atender às necessidades de sua carga de trabalho por um determinado período de tempo. Leia nosso artigo sobre escala elástica na API para Cassandra para entender as diferentes opções de dimensionamento de forma programática e dinâmica.
Nota
As orientações acima pressupõem uma distribuição razoavelmente uniforme dos dados. Se você tiver uma distorção significativa nos dados (ou seja, um número excessivamente grande de leituras/gravações no mesmo valor de chave de partição), ainda poderá enfrentar gargalos, mesmo que tenha um grande número de unidades de solicitação provisionadas em sua tabela. As unidades de solicitação são divididas igualmente entre partições físicas, e a distorção de dados pesada pode causar um afunilamento de solicitações para uma única partição.
Parâmetros de configuração da taxa de transferência do conector Spark
A tabela a seguir lista os parâmetros de configuração de taxa de transferência específicos do Azure Cosmos DB para Apache Cassandra fornecidos pelo conector. Para obter uma lista detalhada de todos os parâmetros de configuração, consulte a página de referência de configuração do repositório GitHub do Spark Cassandra Connector.
Nome da propriedade | Valor predefinido | Descrição |
---|---|---|
spark.cassandra.output.batch.size.rows | 1 | Número de linhas por lote único. Defina este parâmetro como 1. Esse parâmetro é usado para obter uma taxa de transferência mais alta para cargas de trabalho pesadas. |
spark.cassandra.connection.connections_per_executor_max (Spark 2.x) spark.cassandra.connection.remoteConnectionsPerExecutor (Spark 3.x) | Nenhuma | Número máximo de conexões por nó por executor. 10*n é equivalente a 10 conexões por nó em um cluster Cassandra de n-nós. Portanto, se você precisar de cinco conexões por nó por executor para um cluster Cassandra de cinco nós, deverá definir essa configuração como 25. Modifique esse valor com base no grau de paralelismo ou no número de executores para os quais seus trabalhos de faísca estão configurados. |
spark.cassandra.output.concurrent.writes | 100 | Define o número de gravações paralelas que podem ocorrer por executor. Como você define "batch.size.rows" como 1, certifique-se de aumentar esse valor de acordo. Modifique esse valor com base no grau de paralelismo ou na taxa de transferência que você deseja alcançar para sua carga de trabalho. |
faísca.cassandra.concurrent.reads | 512 | Define o número de leituras paralelas que podem ocorrer por executor. Modifique esse valor com base no grau de paralelismo ou na taxa de transferência que você deseja alcançar para sua carga de trabalho |
spark.cassandra.output.throughput_mb_per_sec | Nenhuma | Define a taxa de transferência de gravação total por executor. Esse parâmetro pode ser usado como um limite superior para a taxa de transferência do trabalho de faísca e baseá-lo na taxa de transferência provisionada do contêiner do Azure Cosmos DB. |
spark.cassandra.input.reads_per_sec | Nenhuma | Define a taxa de transferência de leitura total por executor. Esse parâmetro pode ser usado como um limite superior para a taxa de transferência do trabalho de faísca e baseá-lo na taxa de transferência provisionada do contêiner do Azure Cosmos DB. |
spark.cassandra.output.batch.grouping.buffer.size | 1000 | Define o número de lotes por tarefa de faísca única que podem ser armazenados na memória antes de enviar para a API para Cassandra |
spark.cassandra.connection.keep_alive_ms | 60000 | Define o período de tempo até o qual as conexões não utilizadas estão disponíveis. |
Ajuste a taxa de transferência e o grau de paralelismo desses parâmetros com base na carga de trabalho que você espera para seus trabalhos de faísca e na taxa de transferência provisionada para sua conta do Azure Cosmos DB.
Conectando-se ao Azure Cosmos DB para Apache Cassandra a partir do Spark
CQLSH
Os comandos a seguir detalham como se conectar ao Azure Cosmos DB para Apache Cassandra a partir do cqlsh. Isso é útil para validação à medida que você executa os exemplos no Spark.
Do Linux/Unix/Mac:
export SSL_VERSION=TLSv1_2
export SSL_VALIDATE=false
cqlsh.py YOUR-COSMOSDB-ACCOUNT-NAME.cassandra.cosmosdb.azure.com 10350 -u YOUR-COSMOSDB-ACCOUNT-NAME -p YOUR-COSMOSDB-ACCOUNT-KEY --ssl
1. Azure Databricks
O artigo abaixo aborda o provisionamento de cluster do Azure Databricks, a configuração de cluster para conexão com o Azure Cosmos DB para Apache Cassandra e vários blocos de anotações de exemplo que abrangem operações DDL, operações DML e muito mais.
Trabalhar com o Azure Cosmos DB para Apache Cassandra a partir do Azure Databricks
2. Azure HDInsight-Spark
O artigo abaixo aborda o serviço HDinsight-Spark, provisionamento, configuração de cluster para conexão com o Azure Cosmos DB para Apache Cassandra e vários blocos de anotações de exemplo que abrangem operações DDL, operações DML e muito mais.
Trabalhar com o Azure Cosmos DB para Apache Cassandra a partir do Azure HDInsight-Spark
3. Ambiente de faísca em geral
Embora as seções acima fossem específicas para serviços PaaS baseados no Azure Spark, esta seção abrange qualquer ambiente geral do Spark. As dependências do conector, as importações e a configuração da sessão do Spark são detalhadas abaixo. A seção "Próximas etapas" abrange exemplos de código para operações DDL, operações DML e muito mais.
Dependências do conector:
- Adicione as coordenadas maven para obter o conector Cassandra para o Spark
- Adicionar as coordenadas maven para a biblioteca auxiliar do Azure Cosmos DB para API for Cassandra
Importações:
import org.apache.spark.sql.cassandra._
//Spark connector
import com.datastax.spark.connector._
import com.datastax.spark.connector.cql.CassandraConnector
//CosmosDB library for multiple retry
import com.microsoft.azure.cosmosdb.cassandra
Configuração da sessão Spark:
spark.cassandra.connection.host YOUR_ACCOUNT_NAME.cassandra.cosmosdb.azure.com
spark.cassandra.connection.port 10350
spark.cassandra.connection.ssl.enabled true
spark.cassandra.auth.username YOUR_ACCOUNT_NAME
spark.cassandra.auth.password YOUR_ACCOUNT_KEY
// if using Spark 2.x
// spark.cassandra.connection.factory com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory
//Throughput-related...adjust as needed
spark.cassandra.output.batch.size.rows 1
// spark.cassandra.connection.connections_per_executor_max 10 // Spark 2.x
spark.cassandra.connection.remoteConnectionsPerExecutor 10 // Spark 3.x
spark.cassandra.output.concurrent.writes 1000
spark.cassandra.concurrent.reads 512
spark.cassandra.output.batch.grouping.buffer.size 1000
spark.cassandra.connection.keep_alive_ms 600000000
Próximos passos
Os artigos a seguir demonstram a integração do Spark com o Azure Cosmos DB para Apache Cassandra.