Manipular consultas grandes em fluxos de trabalho interativos
Um dos desafios com fluxos de trabalho de dados interativos é lidar com consultas grandes. Isso inclui consultas que geram muitas linhas de saída, buscam muitas partições externas ou computação em conjuntos de dados muito grandes. Essas consultas podem ser extremamente lentas, saturar recursos de computação e dificultar o compartilhamento da mesma computação por outras pessoas.
O Watchdog de Consulta é um processo que impede que consultas monopolizem recursos de computação examinando as causas mais comuns de consultas grandes e encerrando consultas que passam por um limite. Este artigo descreve como habilitar e configurar o Watchdog de Consulta.
Importante
O Watchdog de Consulta está habilitado para todos os cálculos de todos os fins criados usando a interface do usuário.
Exemplo de uma consulta disruptiva
Um analista está executando algumas consultas ad hoc em um data warehouse just-in-time. O analista usa uma computação de dimensionamento automático compartilhado que facilita o uso de uma única computação por vários usuários ao mesmo tempo. Suponha que haja duas tabelas com um milhão de linhas cada.
import org.apache.spark.sql.functions._
spark.conf.set("spark.sql.shuffle.partitions", 10)
spark.range(1000000)
.withColumn("join_key", lit(" "))
.createOrReplaceTempView("table_x")
spark.range(1000000)
.withColumn("join_key", lit(" "))
.createOrReplaceTempView("table_y")
Esses tamanhos de tabela são gerenciáveis no Apache Spark. No entanto, cada uma delas inclui uma coluna join_key
com uma cadeia de caracteres vazia em cada linha. Isso pode acontecer se os dados não forem perfeitamente limpos ou se houver distorção de dados significativa onde algumas chaves sejam mais predominantes do que outras. Essas chaves de junção vazias são muito mais predominantes do que qualquer outro valor.
No código a seguir, o analista está unindo essas duas tabelas às suas chaves, o que produz a saída de um trilhão de resultados e todos eles são produzidos em um único executor (o executor que obtém a chave " "
):
SELECT
id, count(id)
FROM
(SELECT
x.id
FROM
table_x x
JOIN
table_y y
on x.join_key = y.join_key)
GROUP BY id
Essa consulta parece estar em execução. Mas sem saber mais sobre os dados, o analista vê que há "apenas" uma única tarefa ao longo da execução do trabalho. A consulta nunca termina, deixando o analista frustrado e confuso sobre o motivo pelo qual ela não funcionou.
Nesse caso, há apenas uma chave de junção problemática. Outras vezes, pode haver muito mais.
Habilitar e configurar o Watchdog de Consulta
Para habilitar e configurar o Watchdog de Consulta, as etapas a seguir são necessárias.
- Habilite o Watchdog com
spark.databricks.queryWatchdog.enabled
. - Configure o runtime da tarefa com
spark.databricks.queryWatchdog.minTimeSecs
. - Exiba a saída
spark.databricks.queryWatchdog.minOutputRows
. - Configure a taxa de saída com
spark.databricks.queryWatchdog.outputRatioThreshold
.
Para impedir que uma consulta crie muitas linhas de saída para o número de linhas de entrada, você pode habilitar o Watchdog de Consulta e configurar o número máximo de linhas de saída como um múltiplo do número de linhas de entrada. Neste exemplo, usamos uma taxa de 1.000 (o padrão).
spark.conf.set("spark.databricks.queryWatchdog.enabled", true)
spark.conf.set("spark.databricks.queryWatchdog.outputRatioThreshold", 1000L)
A última configuração declara que qualquer tarefa específica nunca deve produzir mais de 1.000 vezes o número de linhas de entrada.
Dica
A taxa de saída é totalmente personalizável. É recomendável começar mais baixo e ver qual limite funciona bem para você e sua equipe. Um intervalo de 1.000 a 10.000 é um bom ponto de partida.
Além de impedir que os usuários monopolizem recursos de computação para trabalhos que nunca serão concluídos, ele também economiza tempo ao falhar rapidamente em uma consulta que nunca teria sido concluída. Por exemplo, a consulta a seguir falhará após vários minutos porque excede a taxa.
SELECT
z.id
join_key,
sum(z.id),
count(z.id)
FROM
(SELECT
x.id,
y.join_key
FROM
table_x x
JOIN
table_y y
on x.join_key = y.join_key) z
GROUP BY join_key, z.id
Isso é o que você verá:
Normalmente, é suficiente habilitar o Watchdog de Consulta e definir a taxa de limite de saída/entrada, mas você também tem a opção de definir duas propriedades adicionais: spark.databricks.queryWatchdog.minTimeSecs
e spark.databricks.queryWatchdog.minOutputRows
. Essas propriedades especificam o tempo mínimo que uma determinada tarefa em uma consulta deve executar antes de cancelá-la e o número mínimo de linhas de saída para uma tarefa nessa consulta.
Por exemplo, você pode definir minTimeSecs
como um valor mais alto se quiser dar a ele a oportunidade de produzir um grande número de linhas por tarefa. Da mesma forma, você poderá definir spark.databricks.queryWatchdog.minOutputRows
como dez milhões se quiser interromper uma consulta somente após uma tarefa nessa consulta produzir dez milhões de linhas. Qualquer coisa menor do que isso fará com que a consulta seja bem-sucedida, mesmo que a taxa de saída/entrada tenha sido excedida.
spark.conf.set("spark.databricks.queryWatchdog.minTimeSecs", 10L)
spark.conf.set("spark.databricks.queryWatchdog.minOutputRows", 100000L)
Dica
Se você configurar o Watchdog de Consulta em um notebook, a configuração não persistirá entre as reinicializações de computação. Se você quiser configurar o Cão de Guarda de Consultas para todos os usuários de uma computação, recomendamos que você use uma configuração de computação.
Detectar consulta em um conjuntos de dados extremamente grande
Outra consulta grande típica pode examinar uma grande quantidade de dados de grandes tabelas/conjuntos de dados. A operação de verificação pode durar muito tempo e saturar recursos de computação (até mesmo ler metadados de uma tabela do Hive grande pode levar muito tempo). Você pode definir maxHivePartitions
para impedir a busca de muitas partições de uma tabela grande do Hive. Da mesma forma, você também pode definir maxQueryTasks
para limitar consultas em um conjunto de dados extremamente grande.
spark.conf.set("spark.databricks.queryWatchdog.maxHivePartitions", 20000)
spark.conf.set("spark.databricks.queryWatchdog.maxQueryTasks", 20000)
Quando você deve habilitar o Watchdog de Consulta?
O Watchdog de Consulta deve ser habilitado para computação de análise ad hoc, em que analistas de SQL e cientistas de dados estão compartilhando uma determinada computação e um administrador precisa garantir que as consultas sejam bem” executadas “entre si.
Quando você deve desabilitar o Watchdog de Consulta?
Em geral, não recomendamos cancelar com impaciência as consultas usadas em um cenário de ETL porque normalmente não há um humano no loop para corrigir o erro. Recomendamos que você desabilite o Cão de Guarda de Consulta para todos, exceto a computação de análise ad hoc.