Compartilhar via


Integrar o Azure Functions ao Azure Data Explorer usando associações de entrada e saída (versão prévia)

Importante

Este conector pode ser usado em Real-Time Intelligence no Microsoft Fabric. Use as instruções incluídas neste artigo com as seguintes exceções:

O Azure Functions permite que você execute código sem servidor na nuvem segundo uma agenda ou em resposta a um evento. Com as associações de entrada e saída do Azure Data Explorer para o Azure Functions, você pode integrar o Azure Data Explorer aos fluxos de trabalho para ingerir dados e executar consultas em seu cluster.

Pré-requisitos

Experimente a integração com nosso projeto de exemplo

Como usar associações do Azure Data Explorer para o Azure Functions

Para obter informações sobre como usar associações do Azure Data Explorer para o Azure Functions, consulte os seguintes tópicos:

Cenários para usar associações do Azure Data Explorer para o Azure Functions

As seções a seguir descrevem alguns cenários comuns para usar associações do Azure Data Explorer para o Azure Functions.

Associações de entrada

As associações de entrada executam uma consulta KQL (Linguagem de Consulta Kusto) ou uma função KQL, opcionalmente com parâmetros, e retornam a saída para a função.

As seções a seguir descrevem como usar associações de entrada em alguns cenários comuns.

Cenário 1: um ponto de extremidade HTTP para consultar dados de um cluster

O uso de associações de entrada é aplicável em situações em que você precisa expor dados do Azure Data Explorer por meio de uma API REST. Nesse cenário, você usa um gatilho HTTP do Azure Functions para consultar dados em seu cluster. O cenário é particularmente útil em situações em que você precisa fornecer acesso programático aos dados do Azure Data Explorer para aplicativos ou serviços externos. Ao expor seus dados por meio de uma API REST, os aplicativos podem consumir prontamente os dados sem exigir que eles se conectem diretamente ao cluster.

O código define uma função com um gatilho HTTP e uma associação de entrada do Azure Data Explorer. A associação de entrada especifica a consulta a ser executada na tabela Produtos no banco de dados productsdb. A função usa a coluna productId como o predicado passado como um parâmetro.

{
    [FunctionName("GetProduct")]
    public static async Task<IActionResult> RunAsync(
        [HttpTrigger(AuthorizationLevel.User, "get", Route = "getproducts/{productId}")]
        HttpRequest req,
        [Kusto(Database:"productsdb" ,
        KqlCommand = "declare query_parameters (productId:long);Products | where ProductID == productId" ,
        KqlParameters = "@productId={productId}",
        Connection = "KustoConnectionString")]
        IAsyncEnumerable<Product> products)
    {
        IAsyncEnumerator<Product> enumerator = products.GetAsyncEnumerator();
        var productList = new List<Product>();
        while (await enumerator.MoveNextAsync())
        {
            productList.Add(enumerator.Current);
        }
        await enumerator.DisposeAsync();
        return new OkObjectResult(productList);
    }
}

Em seguida, a função pode ser invocada, da seguinte maneira:

curl https://myfunctionapp.azurewebsites.net/api/getproducts/1

Cenário 2: um gatilho agendado para exportar dados de um cluster

O cenário a seguir é aplicável em situações em que os dados precisam ser exportados em um agendamento baseado em tempo.

O código define uma função com um gatilho de temporizador que exporta uma agregação de dados de vendas do banco de dados productsdb para um arquivo CSV no Armazenamento de Blobs do Azure.

public static async Task Run([TimerTrigger("0 0 1 * * *")] TimerInfo myTimer,
    [Kusto(ConnectionStringSetting = "KustoConnectionString",
            DatabaseName = "productsdb",
            Query = "ProductSales | where OrderDate >= ago(1d) | summarize Sales = sum(ProductSales) by ProductName | top 10 by Sales desc")] IEnumerable<dynamic> queryResults,
[Blob("salescontainer/productsblob.csv", FileAccess.Write, Connection = "BlobStorageConnection")] CloudBlockBlob outputBlob,
ILogger log)
{
    // Write the query results to a CSV file
    using (var stream = new MemoryStream())
    using (var writer = new StreamWriter(stream))
    using (var csv = new CsvWriter(writer, CultureInfo.InvariantCulture))
    {
        csv.WriteRecords(queryResults);
        writer.Flush();
        stream.Position = 0;
        await outputBlob.UploadFromStreamAsync(stream);
    }
}

Associações de saída

As associações de saída pegam uma ou mais linhas e as inserem em uma tabela do Azure Data Explorer.

As seções a seguir descrevem como usar associações de saída em alguns cenários comuns.

Cenário 1: ponto de extremidade HTTP para ingerir dados em um cluster

O cenário a seguir é aplicável em situações em que as solicitações HTTP de entrada precisam ser processadas e ingeridas em seu cluster. Usando uma associação de saída, os dados de entrada da solicitação podem ser gravados em tabelas do Azure Data Explorer.

O código define uma função com um gatilho HTTP e uma associação de saída do Azure Data Explorer. Essa função usa uma carga JSON no corpo da solicitação HTTP e a grava na tabela de produtos no banco de dados productsdb.

public static IActionResult Run(
    [HttpTrigger(AuthorizationLevel.Anonymous, "post", Route = "addproductuni")]
    HttpRequest req, ILogger log,
    [Kusto(Database:"productsdb" ,
    TableName ="products" ,
    Connection = "KustoConnectionString")] out Product product)
{
    log.LogInformation($"AddProduct function started");
    string body = new StreamReader(req.Body).ReadToEnd();
    product = JsonConvert.DeserializeObject<Product>(body);
    string productString = string.Format(CultureInfo.InvariantCulture, "(Name:{0} ID:{1} Cost:{2})",
                product.Name, product.ProductID, product.Cost);
    log.LogInformation("Ingested product {}", productString);
    return new CreatedResult($"/api/addproductuni", product);
}

Em seguida, a função pode ser invocada, da seguinte maneira:

curl -X POST https://myfunctionapp.azurewebsites.net/api/addproductuni -d '{"Name":"Product1","ProductID":1,"Cost":100,"ActivatedOn":"2023-01-02T00:00:00"}'

Cenário 2: Ingerir dados do RabbitMQ ou de outros sistemas de mensagens com suporte no Azure

O cenário a seguir é aplicável em situações em que os dados de um sistema de mensagens precisam ser ingeridos em seu cluster. Usando uma associação de saída, os dados de entrada do sistema de mensagens podem ser ingeridos nas tabelas do Azure Data Explorer.

O código define uma função com mensagens, dados no formato JSON, entrada por meio de um gatilho RabbitMQ que são ingeridos na tabela produtos no banco de dados productsdb.

public class QueueTrigger
{
    [FunctionName("QueueTriggerBinding")]
    [return: Kusto(Database: "productsdb",
                TableName = "products",
                Connection = "KustoConnectionString")]
    public static Product Run(
        [RabbitMQTrigger(queueName: "bindings.products.queue", ConnectionStringSetting = "rabbitMQConnectionAppSetting")] Product product,
        ILogger log)
    {
        log.LogInformation($"Dequeued product {product.ProductID}");
        return product;
    }
}

Para obter mais informações sobre as funções, consulte a Documentação do Azure Functions. A extensão do Azure Data Explorer está disponível em: