Consultar bases de dados utilizando JDBC
O Azure Databricks dá suporte à conexão com bancos de dados externos usando JDBC. Este artigo fornece a sintaxe básica para configurar e usar essas conexões com exemplos em Python, SQL e Scala.
Importante
As configurações descritas neste artigo são experimentais. Os recursos experimentais são fornecidos no estado em que se encontram e não são suportados pelo Databricks por meio do suporte técnico ao cliente. Para obter suporte completo à federação de consultas, você deve usar a Lakehouse Federation, que permite que os usuários do Azure Databricks aproveitem a sintaxe do Catálogo Unity e as ferramentas de governança de dados.
O Partner Connect fornece integrações otimizadas para sincronizar dados com muitas fontes de dados externas externas. Consulte O que é o Databricks Partner Connect?.
Importante
Os exemplos neste artigo não incluem nomes de usuário e senhas em URLs JDBC. O Databricks recomenda o uso de segredos para armazenar suas credenciais de banco de dados. Por exemplo:
Python
username = dbutils.secrets.get(scope = "jdbc", key = "username")
password = dbutils.secrets.get(scope = "jdbc", key = "password")
Scala
val username = dbutils.secrets.get(scope = "jdbc", key = "username")
val password = dbutils.secrets.get(scope = "jdbc", key = "password")
Para fazer referência a segredos do Databricks com SQL, você deve configurar uma propriedade de configuração do Spark durante a iniciação do cluster.
Para obter um exemplo completo de gerenciamento de segredos, consulte Tutorial: Criar e usar um segredo do Databricks.
Ler dados com JDBC
Você deve definir várias configurações para ler dados usando JDBC. Observe que cada banco de dados usa um formato diferente para o <jdbc-url>
.
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.load()
O Spark lê automaticamente o esquema da tabela do banco de dados e mapeia seus tipos de volta para os tipos SQL do Spark.
Python
employees_table.printSchema
SQL
DESCRIBE employees_table_vw
Scala
employees_table.printSchema
Você pode executar consultas nesta tabela JDBC:
Python
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
SQL
SELECT age, avg(salary) as salary
FROM employees_table_vw
GROUP BY age
Scala
display(employees_table.select("age", "salary").groupBy("age").avg("salary"))
Gravar dados com JDBC
Salvar dados em tabelas com JDBC usa configurações semelhantes à leitura. Veja o seguinte exemplo:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
SQL
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
O comportamento padrão tenta criar uma nova tabela e lança um erro se uma tabela com esse nome já existir.
Você pode acrescentar dados a uma tabela existente usando a seguinte sintaxe:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
)
SQL
CREATE TABLE IF NOT EXISTS new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
);
INSERT INTO new_employees_table
SELECT * FROM employees_table_vw;
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("append")
.save()
Você pode substituir uma tabela existente usando a seguinte sintaxe:
Python
(employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
)
SQL
CREATE OR REPLACE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT * FROM employees_table_vw;
Scala
employees_table.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.mode("overwrite")
.save()
Controlar paralelismo para consultas JDBC
Por padrão, o driver JDBC consulta o banco de dados de origem com apenas um único thread. Para melhorar o desempenho de leituras, você precisa especificar várias opções para controlar quantas consultas simultâneas o Azure Databricks faz ao seu banco de dados. Para clusters pequenos, definir a numPartitions
opção igual ao número de núcleos executores no cluster garante que todos os nós consultem dados em paralelo.
Aviso
A configuração numPartitions
de um valor alto em um cluster grande pode resultar em desempenho negativo para o banco de dados remoto, pois muitas consultas simultâneas podem sobrecarregar o serviço. Isso é especialmente problemático para bancos de dados de aplicativos. Tenha cuidado ao definir este valor acima de 50.
Nota
Acelere as consultas selecionando uma coluna com um índice calculado no banco de dados de origem para o partitionColumn
.
O exemplo de código a seguir demonstra a configuração do paralelismo para um cluster com oito núcleos:
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
# a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
# lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
# max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
# number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>',
partitionColumn "<partition-key>",
lowerBound "<min-value>",
upperBound "<max-value>",
numPartitions 8
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
// a column that can be used that has a uniformly distributed range of values that can be used for parallelization
.option("partitionColumn", "<partition-key>")
// lowest value to pull data for with the partitionColumn
.option("lowerBound", "<min-value>")
// max value to pull data for with the partitionColumn
.option("upperBound", "<max-value>")
// number of partitions to distribute the data into. Do not set this very large (~hundreds)
.option("numPartitions", 8)
.load()
Nota
O Azure Databricks suporta todas as opções do Apache Spark para configurar o JDBC.
Ao gravar em bancos de dados usando JDBC, o Apache Spark usa o número de partições na memória para controlar o paralelismo. Você pode reparticionar dados antes de gravar para controlar o paralelismo. Evite um grande número de partições em clusters grandes para evitar sobrecarregar seu banco de dados remoto. O exemplo a seguir demonstra o reparticionamento para oito partições antes de escrever:
Python
(employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
)
SQL
CREATE TABLE new_employees_table
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'
) AS
SELECT /*+ REPARTITION(8) */ * FROM employees_table_vw
Scala
employees_table.repartition(8)
.write
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<new-table-name>")
.option("user", "<username>")
.option("password", "<password>")
.save()
Empurrar uma consulta para o mecanismo de banco de dados
Você pode enviar uma consulta inteira para o banco de dados e retornar apenas o resultado. O table
parâmetro identifica a tabela JDBC a ser lida. Você pode usar qualquer coisa que seja válida em uma cláusula de consulta FROM
SQL.
Python
pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "(select * from employees where emp_no < 10008) as emp_alias",
user '<username>',
password '<password>'
)
Scala
val pushdown_query = "(select * from employees where emp_no < 10008) as emp_alias"
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", pushdown_query)
.option("user", "<username>")
.option("password", "<password>")
.load()
Controlar o número de linhas obtidas por consulta
Os drivers JDBC têm um fetchSize
parâmetro que controla o número de linhas buscadas por vez no banco de dados remoto.
Definição | Result |
---|---|
Muito baixo | Alta latência devido a muitas viagens de ida e volta (poucas linhas retornadas por consulta) |
Muito alto | Erro de falta de memória (muitos dados retornados em uma consulta) |
O valor ideal depende da carga de trabalho. As considerações incluem:
- Quantas colunas são retornadas pela consulta?
- Que tipos de dados são retornados?
- Por quanto tempo as cadeias de caracteres em cada coluna são retornadas?
Os sistemas podem ter um padrão muito pequeno e se beneficiar do ajuste. Por exemplo: o padrão fetchSize
da Oracle é 10. Aumentar para 100 reduz o número total de consultas que precisam ser executadas por um fator de 10. Os resultados JDBC são tráfego de rede, portanto, evite números muito grandes, mas os valores ideais podem estar na casa dos milhares para muitos conjuntos de dados.
Use a fetchSize
opção, como no exemplo a seguir:
Python
employees_table = (spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()
)
SQL
CREATE TEMPORARY VIEW employees_table_vw
USING JDBC
OPTIONS (
url "<jdbc-url>",
dbtable "<table-name>",
user '<username>',
password '<password>'.
fetchSize 100
)
Scala
val employees_table = spark.read
.format("jdbc")
.option("url", "<jdbc-url>")
.option("dbtable", "<table-name>")
.option("user", "<username>")
.option("password", "<password>")
.option("fetchSize", "100")
.load()