Acesse o Azure Cosmos DB para Apache Cassandra do Spark no YARN com o HDInsight
APLICA-SE A: Cassandra
Este artigo aborda como acessar o Azure Cosmos DB para Apache Cassandra do Spark no YARN com o HDInsight-Spark do spark-shell
. O HDInsight é o Hortonworks Hadoop PaaS da Microsoft no Azure. Ele usa armazenamento de objetos para HDFS e vem em vários sabores, incluindo Spark. Embora este artigo se refira ao HDInsight-Spark, ele se aplica a todas as distribuições Hadoop.
Pré-requisitos
Antes de começar, revise as noções básicas de conexão com o Azure Cosmos DB para Apache Cassandra.
Você precisa dos seguintes pré-requisitos:
Provisione o Azure Cosmos DB para Apache Cassandra. Consulte Criar uma conta de banco de dados.
Provisione um cluster HDInsight-Spark. Consulte Criar cluster Apache Spark no Azure HDInsight usando o modelo ARM.
API para configuração Cassandra no Spark2. O conector Spark para Cassandra requer que os detalhes da conexão Cassandra sejam inicializados como parte do contexto Spark. Quando você inicia um bloco de anotações Jupyter, a sessão de faísca e o contexto já são inicializados. Não pare e reinicialize o contexto do Spark, a menos que ele esteja completo com todas as configurações definidas como parte da inicialização padrão do notebook Jupyter do HDInsight. Uma solução alternativa é adicionar os detalhes da instância Cassandra à configuração do serviço Ambari, Spark2, diretamente. Essa abordagem é uma atividade única por cluster que requer uma reinicialização do serviço Spark2.
Vá para Ambari, serviço Spark2 e selecione configurações.
Vá para spark2-defaults personalizados, adicione uma nova propriedade com o seguinte e reinicie o serviço Spark2:
spark.cassandra.connection.host=YOUR_COSMOSDB_ACCOUNT_NAME.cassandra.cosmosdb.azure.com<br> spark.cassandra.connection.port=10350<br> spark.cassandra.connection.ssl.enabled=true<br> spark.cassandra.auth.username=YOUR_COSMOSDB_ACCOUNT_NAME<br> spark.cassandra.auth.password=YOUR_COSMOSDB_KEY<br>
Você pode usar cqlsh
para validação. Para obter mais informações, consulte Conectando-se ao Azure Cosmos DB para Apache Cassandra a partir do Spark.
Acessar o Azure Cosmos DB para Apache Cassandra a partir do shell do Spark
A concha de faísca é usada para testes e exploração.
Inicie
spark-shell
com as dependências maven necessárias compatíveis com a versão Spark do cluster.spark-shell --packages "com.datastax.spark:spark-cassandra-connector_2.11:2.3.0,com.microsoft.azure.cosmosdb:azure-cosmos-cassandra-spark-helper:1.0.0"
Executar algumas operações DDL e DML
import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} import spark.implicits._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.Column import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType,LongType,FloatType,DoubleType, TimestampType} import org.apache.spark.sql.cassandra._ //Spark connector import com.datastax.spark.connector._ import com.datastax.spark.connector.cql.CassandraConnector //CosmosDB library for multiple retry import com.microsoft.azure.cosmosdb.cassandra // Specify connection factory for Cassandra spark.conf.set("spark.cassandra.connection.factory", "com.microsoft.azure.cosmosdb.cassandra.CosmosDbConnectionFactory") // Parallelism and throughput configs spark.conf.set("spark.cassandra.output.batch.size.rows", "1") spark.conf.set("spark.cassandra.connection.connections_per_executor_max", "10") spark.conf.set("spark.cassandra.output.concurrent.writes", "100") spark.conf.set("spark.cassandra.concurrent.reads", "512") spark.conf.set("spark.cassandra.output.batch.grouping.buffer.size", "1000") spark.conf.set("spark.cassandra.connection.keep_alive_ms", "60000000") //Increase this number as needed
Executar operações CRUD
//1) Create table if it does not exist val cdbConnector = CassandraConnector(sc) cdbConnector.withSessionDo(session => session.execute("CREATE TABLE IF NOT EXISTS books_ks.books(book_id TEXT PRIMARY KEY,book_author TEXT, book_name TEXT,book_pub_year INT,book_price FLOAT) WITH cosmosdb_provisioned_throughput=4000;")) //2) Delete data from potential prior runs cdbConnector.withSessionDo(session => session.execute("DELETE FROM books_ks.books WHERE book_id IN ('b00300','b00001','b00023','b00501','b09999','b01001','b00999','b03999','b02999','b000009');")) //3) Generate a few rows val booksDF = Seq( ("b00001", "Arthur Conan Doyle", "A study in scarlet", 1887,11.33), ("b00023", "Arthur Conan Doyle", "A sign of four", 1890,22.45), ("b01001", "Arthur Conan Doyle", "The adventures of Sherlock Holmes", 1892,19.83), ("b00501", "Arthur Conan Doyle", "The memoirs of Sherlock Holmes", 1893,14.22), ("b00300", "Arthur Conan Doyle", "The hounds of Baskerville", 1901,12.25) ).toDF("book_id", "book_author", "book_name", "book_pub_year","book_price") //4) Persist booksDF.write.mode("append").format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks", "output.consistency.level" -> "ALL", "ttl" -> "10000000")).save() //5) Read the data in the table spark.read.format("org.apache.spark.sql.cassandra").options(Map( "table" -> "books", "keyspace" -> "books_ks")).load.show
Acessar o Azure Cosmos DB para Apache Cassandra a partir de notebooks Jupyter
O HDInsight-Spark vem com os serviços de notebook Zeppelin e Jupyter. Ambos são ambientes de notebook baseados na Web que suportam Scala e Python. Os notebooks são ótimos para análise exploratória interativa e colaboração, mas não se destinam a processos operacionais ou de produção.
Os seguintes blocos de anotações Jupyter podem ser carregados em seu cluster HDInsight Spark e fornecer exemplos prontos para trabalhar com o Azure Cosmos DB para Apache Cassandra. Certifique-se de revisar o primeiro bloco de anotações 1.0-ReadMe.ipynb
para revisar a configuração do serviço Spark para se conectar ao Azure Cosmos DB para Apache Cassandra.
Transfira os blocos de notas em azure-cosmos-db-cassandra-api-spark-notebooks-jupyter para a sua máquina.
Como carregar
Ao iniciar o Jupyter, navegue até Scala. Crie um diretório e, em seguida, carregue os blocos de anotações para o diretório. O botão Carregar está na parte superior, do lado direito.
Como executar
Percorra os blocos de notas e cada célula do bloco de notas sequencialmente. Selecione o botão Executar na parte superior de cada bloco de notas para executar todas as células ou Shift+Enter para cada célula.
Acesso com o Azure Cosmos DB para Apache Cassandra a partir do seu programa Spark Scala
Para processos automatizados em produção, os programas Spark são enviados ao cluster usando spark-submit.