Поделиться через


Добавочная загрузка данных из управляемого экземпляра SQL Azure в хранилище Azure с использованием технологии "Отслеживание измененных данных" (CDC)

ОБЛАСТЬ ПРИМЕНЕНИЯ: Фабрика данных Azure Azure Synapse Analytics

Совет

Попробуйте использовать фабрику данных в Microsoft Fabric, решение для аналитики с одним интерфейсом для предприятий. Microsoft Fabric охватывает все, от перемещения данных до обработки и анализа данных в режиме реального времени, бизнес-аналитики и отчетности. Узнайте, как бесплатно запустить новую пробную версию !

Из этого руководства вы узнаете, как создать фабрику данных Azure с конвейером, который копирует разностные данные на основе сведений об отслеживании измененных данных (CDC) в базе данных SQL Azure управляемого экземпляра в хранилище BLOB-объектов Azure.

В этом руководстве вы выполните следующие шаги:

  • подготовите исходное хранилище данных;
  • Создали фабрику данных.
  • Создали связанные службы.
  • Создали наборы данных приемника и источника.
  • Создание, отладка и запуск конвейера для проверки измененных данных
  • Изменение данных в таблице-источнике
  • Создание, запуску и мониторинг конвейера добавочного копирования

Обзор

Для определения измененных данных используется технология "Отслеживание измененных данных", поддерживаемая такими хранилищами данных, как Управляемый экземпляр (MI) SQL Azure и SQL Server. В этом руководстве описано, как использовать службу "Фабрика данных Azure" с технологией "Отслеживание измененных данных" (SQL) для добавочной загрузки разностных данных из Управляемого экземпляра SQL Azure в хранилище BLOB-объектов Azure. Дополнительные сведения о технологии "Отслеживание измененных данных" (SQL) см. в статье Об отслеживании измененных данных (SQL Server).

Комплексный рабочий процесс

Ниже описан стандартный рабочий процесс по добавочной загрузке данных с помощью технологии "Отслеживание измененных данных".

Примечание.

И Управляемый экземпляр SQL Azure, и SQL Server поддерживают технологию "Отслеживание измененных данных". В этом руководстве в качестве исходного хранилища данных используется управляемый экземпляр SQL Azure. Вы также можете использовать локальную среду SQL Server.

Общее решение

В этом руководстве описано, как создать конвейер, который выполняет следующую операцию:

  1. Создайте действие уточнения, чтобы подсчитать количество измененных записей в таблице CDC базы данных SQL и передать их в действие условия ЕСЛИ.
  2. Создайте условие ЕСЛИ, чтобы проверить наличие измененных записей и, если это так, вызвать действие копирования.
  3. Создайте действие копирования, чтобы скопировать вставленные/обновленные/удаленные данные между таблицей CDC и хранилищем BLOB-объектов Azure.

Если у вас нет подписки Azure, создайте бесплатную учетную запись, прежде чем приступить к работе.

Необходимые компоненты

Создание таблицы источника данных в Базе данных SQL Azure

  1. Запустите SQL Server Management Studio и подключитесь к серверу управляемого экземпляра SQL Azure.

  2. В обозревателе сервера щелкните правой кнопкой мыши базу данных и выберите Создать запрос.

  3. Выполните указанную ниже команду SQL для базы данных управляемых экземпляров SQL Azure, чтобы создать таблицу с именем customers в качестве хранилища источника данных.

    create table customers 
    (
    customer_id int, 
    first_name varchar(50), 
    last_name varchar(50), 
    email varchar(100), 
    city varchar(50), CONSTRAINT "PK_Customers" PRIMARY KEY CLUSTERED ("customer_id") 
     );
    
  4. Включите функцию Отслеживание измененных данных для базы данных и исходной таблицы (customers), выполнив следующий SQL-запрос:

    Примечание.

    • Замените <имя вашей исходной схемы> схемой управляемого экземпляра SQL Azure с таблицей Customers.
    • Система отслеживания измененных данных не выполняет никаких действий в рамках транзакций, изменяющих отслеживаемую таблицу. Вместо этого операции вставки, обновления и удаления записываются в журнал транзакций. Данные, хранящиеся в таблицах изменений, будут неуправляемо расти, если периодически и систематически не усекать эти данные. Дополнительные сведения см. в разделе Включение отслеживания измененных данных для базы данных
    EXEC sys.sp_cdc_enable_db 
    
    EXEC sys.sp_cdc_enable_table
    @source_schema = 'dbo',
    @source_name = 'customers', 
    @role_name = NULL,
    @supports_net_changes = 1
    
  5. Вставьте данные в таблицу Customers, выполнив следующую команду:

     insert into customers 
         (customer_id, first_name, last_name, email, city) 
     values 
         (1, 'Chevy', 'Leward', 'cleward0@mapy.cz', 'Reading'),
         (2, 'Sayre', 'Ateggart', 'sateggart1@nih.gov', 'Portsmouth'),
        (3, 'Nathalia', 'Seckom', 'nseckom2@blogger.com', 'Portsmouth');
    

    Примечание.

    Перед включением системы отслеживания измененных данных никакие исторические изменения в таблице не фиксируются.

Создание фабрики данных

Выполните действия, описанные в кратком руководстве по созданию фабрики данных с помощью портал Azure для создания фабрики данных, если у вас еще нет этой фабрики данных.

Создание связанных служб

Связанная служба в фабрике данных связывает хранилища данных и службы вычислений с фабрикой данных. В этом разделе вы создадите связанные службы для учетной записи хранения Azure и управляемого экземпляра SQL Azure.

Создание связанной службы хранилища Azure

На этом шаге вы свяжете учетную запись хранения Azure с фабрикой данных.

  1. Нажмите кнопку Подключения, а затем + Создать.

    Кнопка создания подключения

  2. В окне New Linked Service (Новая связанная служба) выберите хранилище BLOB-объектов Azure и щелкните Continue (Продолжить).

    Выбор хранилища BLOB-объектов Azure

  3. В окне New Linked Service (Новая связанная служба) выполните следующие действия:

    1. Введите AzureStorageLinkedService в поле имени.
    2. Выберите учетную запись хранения в списке Storage account name (Имя учетной записи хранения).
    3. Нажмите кнопку Сохранить.

    Параметры учетной записи хранения Azure

Создание связанной службы базы данных управляемого экземпляра SQL Azure.

На этом шаге вы свяжете базу данных управляемого экземпляра SQL Azure с фабрикой данных.

Примечание.

Для тех, кто использует SQL MI, см. здесь сведения о доступе через публичную и частную конечную точку. При использовании частной конечной точки необходимо запустить этот конвейер с помощью локальной среды выполнения интеграции. Это же относится к тем, у кого запущен SQL Server локально, в сценариях виртуальной машины или виртуальной сети.

  1. Нажмите кнопку Подключения, а затем + Создать.

  2. В окне New Linked Service (Новая связанная служба) выберите Azure SQL Database Managed Instance (Управляемый экземпляр базы данных SQL Microsoft Azure) и щелкните Continue (Продолжить).

  3. В окне New Linked Service (Новая связанная служба) выполните следующие действия:

    1. Введите AzureSqlMI1 в поле Имя.
    2. Выберите нужный SQL-сервер в поле Имя сервера.
    3. Выберите базу данных SQL в поле Имя базы данных.
    4. Введите имя пользователя в поле User name (Имя пользователя).
    5. Введите пароль для этого пользователя в поле Password (Пароль).
    6. Нажмите кнопку Test connection (Проверить подключение), чтобы проверить подключение.
    7. Нажмите кнопку Save (Сохранить), чтобы сохранить связанную службу.

    Настройки связанной службы базы данных SQL Azure MI

Создайте наборы данных.

На этом шаге вы создадите наборы данных, которые представляют данные источника и приемника.

Создание набора данных для представления исходных данных

На этом шаге вы создадите набор данных для представления исходных данных.

  1. В представлении в виде дерева щелкните значок + (плюс) и выберите вариант Набор данных.

    Меню

  2. Выберите Azure SQL Database Managed Instance (Управляемый экземпляр базы данных SQL Microsoft Azure) и щелкните Continue (Продолжить).

    Тип исходного набора данных — база данных SQL Azure

  3. На вкладке Свойства набора задайте имя набора данных и сведения о соединении:

    1. Выберите AzureSqlMI1 для связанной службы.
    2. Выберите [dbo]. [dbo_customers_CT] в качестве имени таблицы. Примечание. Эта таблица была автоматически создана при включении CDC в таблице Customers. Измененные данные никогда не запрашиваются из этой таблицы напрямую, а извлекаются с помощью функций CDC.

    Подключение к источнику

Создайте набор данных, который будет представлять данные для копирования в целевое хранилище данных.

На этом шаге вы создадите набор данных для представления данных, которые копируются из исходного хранилища данных. Для работы с этим руководством нужен контейнер data lake, созданный в хранилище BLOB-объектов Azure. Создайте контейнер (если его еще нет) или присвойте ему имя имеющегося контейнера. В этом руководстве имя файла выходных данных создается динамически с помощью времени триггера, который будет настроен позже.

  1. В представлении в виде дерева щелкните значок + (плюс) и выберите вариант Набор данных.

    Меню

  2. Выберите Хранилище BLOB-объектов Azure и щелкните Продолжить.

    Тип целевого набора данных — хранилище BLOB-объектов Azure

  3. Выберите DelimitedText в качестве формата и щелкните Продолжить.

    Формат набора данных приемника — DelimitedText

  4. На вкладке Свойства набора задайте имя набора данных и сведения о соединении:

    1. Выберите AzureStorageLinkedService в списке Связанная служба.
    2. Введите raw для части контейнера filePath.
    3. Включите Использовать первую строку в качестве заголовка
    4. Нажмите кнопку ОК.

    Целевой набор данных — подключение

Создание конвейера для копирования измененных данных

На этом шаге вы создадите конвейер, который сначала проверяет число измененных записей, имеющихся в таблице изменений, с помощью действия уточнения. Условие ЕСЛИ проверяет, что число измененных записей больше нуля, и запускает действие копирования, чтобы скопировать вставленные/обновленные/удаленные данные из базы данных SQL Azure в хранилище BLOB-объектов Azure. Наконец, триггер смены окна настроен на время начала и окончания, которые будут переданы действиям в качестве параметров начального и конечного окон.

  1. В пользовательском интерфейсе фабрики данных перейдите на вкладку "Изменить ". Щелкните +(плюс) в левой области и нажмите кнопку "Конвейер".

    Меню создания конвейера

  2. Вы увидите новую вкладку для настройки конвейера. Также этот конвейер появится в отображении дерева. В окне Свойства укажите имя IncrementalCopyPipeline для нового конвейера.

    Имя конвейера

  3. Разверните элемент Общие на панели Действия и перетащите действие Поиск в область конструктора конвейера. Для имени действия задайте значение GetChangeCount. Это действие получает количество записей в таблице изменений для заданного временного окна.

    Действие поиска — имя

  4. Перейдите на вкладку Настройки в окне Свойства:

    1. Укажите имя набора данных SQL MI для поля Исходный набор данных .

    2. Выберите параметр запроса и введите в поле запроса следующее:

    DECLARE  @from_lsn binary(10), @to_lsn binary(10);  
    SET @from_lsn =sys.fn_cdc_get_min_lsn('dbo_customers');  
    SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal',  GETDATE());
    SELECT count(1) changecount FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, 'all')
    
    1. Включите Только первая строка

    Действие поиска — настройки

  5. Нажмите кнопку Предварительный просмотр данных, чтобы обеспечить получение достоверных выходных данных действием поиска

    Действие поиска — предварительный просмотр

  6. На панели Действия разверните элемент Итерация и условия, а затем перетащите действие If Condition в область конструктора конвейера. Для имени действия задайте значение HasChangedRows.

    Действие условия If — имя

  7. Перейдите на вкладку Действия в окне Свойства:

    1. Введите следующее Выражение
    @greater(int(activity('GetChangeCount').output.firstRow.changecount),0)
    
    1. Щелкните значок с изображением карандаша, чтобы изменить истинное условие.

    Действие условия If — настройки

    1. Разверните Общие в панели элементов Действия и перетащите действие Ждать в область конструктора конвейера. Это временное действие для отладки условия If; оно будет изменено далее в этом руководстве.

    Условие ЕСЛИ верно — ждать

    1. Щелкните элемент в пути IncrementalCopyPipeline, чтобы вернуться к основному конвейеру.
  8. Запустите конвейер в режиме Отладка, чтобы убедиться, что конвейер успешно выполнен.

    Конвейер — отладка

  9. Затем вернитесь к верному шагу условия и удалите действие Ждать. На панели элементов Действия разверните узел Переместить и преобразовать и перетащите действие Копирование в область конструктора конвейера. Присвойте этому действию имя IncrementalCopyActivity.

    Действие копирования — имя

  10. Откройте вкладку Источник в окне Свойства и выполните здесь следующие действия.

  11. Укажите имя набора данных SQL MI для поля Исходный набор данных .

  12. Выберите Запрос в списке Use Query (Пользовательский запрос).

  13. Введите следующий Запрос.

    DECLARE @from_lsn binary(10), @to_lsn binary(10); 
    SET @from_lsn =sys.fn_cdc_get_min_lsn('dbo_customers'); 
    SET @to_lsn = sys.fn_cdc_map_time_to_lsn('largest less than or equal', GETDATE());
    SELECT * FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, 'all')
    

Действие копирования — настройки источника

  1. Нажмите кнопку предварительного просмотра, чтобы убедиться, что запрос правильно возвращает измененные строки.

    Снимок экрана: предварительный просмотр для проверки запроса.

  2. Перейдите на вкладку Приемник и укажите набор данных Хранилища Azure в поле Sink Dataset (Целевой набор данных).

    Снимок экрана: вкладка

  3. Нажмите кнопку назад на главном холсте конвейера и подключите действие Поиск к действию If Condition по одному. Перетащите зеленую кнопку, присоединенную к действию Поиск, на блок действия If Condition.

    Соединение действий поиска и копирования

  4. Нажмите кнопку Проверить на панели инструментов. Убедитесь, что проверка завершается без ошибок. Закройте окно отчета о проверке конвейера, щелкнув >>.

    Кнопка Проверка

  5. Щелкните кнопку "Отладка", чтобы проверить конвейер и убедиться, что файл создается в месте хранения.

    Добавочная отладка конвейера — 2

  6. Опубликуйте сущности (связанные службы, наборы данных и конвейеры) в службе "Фабрика данных", нажав кнопку Опубликовать все. Дождитесь сообщения Публикация успешно выполнена.

    Кнопка Publish (Опубликовать)

Настройка триггера переворачивающегося окна и параметра окна CDC

На этом шаге вы создадите триггер переворачивающегося окна для запуска задания по частому расписанию. Вы будете использовать системные переменные WindowStart и WindowEnd для триггера переворачивающегося окна и передавать их в качестве параметров конвейера для использования в запросе CDC.

  1. Перейдите на вкладку Параметры в конвейере IncrementalCopyPipeline и с помощью кнопки + Новый добавьте два параметра (triggerStartTime и triggerEndTime) в конвейер, который будет представлять время начала и окончания переворачивающегося окна. В целях отладки добавьте значения по умолчанию в формате ГГГГ-ММ-ДД ЧЧ24:ММ:СС.ДОЛИСЕКУНД, но убедитесь, что triggerStartTime не включен в таблицу CDC, в противном случае это приведет к ошибке.

    Меню Trigger Now (Активировать сейчас)

  2. Перейдите на вкладку "Параметры действия" Поиск и настройте запрос на использование параметров начала и окончания. Скопируйте следующий код в редактор запросов:

    @concat('DECLARE @begin_time datetime, @end_time datetime, @from_lsn binary(10), @to_lsn binary(10); 
    SET @begin_time = ''',pipeline().parameters.triggerStartTime,''';
    SET @end_time = ''',pipeline().parameters.triggerEndTime,''';
    SET @from_lsn = sys.fn_cdc_map_time_to_lsn(''smallest greater than or equal'', @begin_time);
    SET @to_lsn = sys.fn_cdc_map_time_to_lsn(''largest less than'', @end_time);
    SELECT count(1) changecount FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, ''all'')')
    
  3. Перейдите к действию копирования в случае действия "True" действия "Если условие " и перейдите на вкладку "Источник ". Скопируйте следующее в запрос:

    @concat('DECLARE @begin_time datetime, @end_time datetime, @from_lsn binary(10), @to_lsn binary(10); 
    SET @begin_time = ''',pipeline().parameters.triggerStartTime,''';
    SET @end_time = ''',pipeline().parameters.triggerEndTime,''';
    SET @from_lsn = sys.fn_cdc_map_time_to_lsn(''smallest greater than or equal'', @begin_time);
    SET @to_lsn = sys.fn_cdc_map_time_to_lsn(''largest less than'', @end_time);
    SELECT * FROM cdc.fn_cdc_get_net_changes_dbo_customers(@from_lsn, @to_lsn, ''all'')')
    
  4. Перейдите на вкладку Приемник действия Копирование и щелкните кнопку Открыть, чтобы изменить свойства набора данных. Перейдите на вкладку Параметры и добавьте новый параметр с именем triggerStart

    Снимок экрана: добавление нового параметра на вкладке

  5. Затем настройте свойства набора данных для хранения данных в подкаталоге клиенты/добавочный с разделением по данным.

    1. Перейдите на вкладку Подключение в свойствах набора данных и добавьте динамическое содержимое для разделов Каталог и Файл.

    2. Введите следующее выражение в разделе Каталог, щелкнув на динамическое содержимое в текстовом поле:

      @concat('customers/incremental/',formatDateTime(dataset().triggerStart,'yyyy/MM/dd'))
      
    3. Введите следующее выражение в разделе Файл. Это создаст имена файлов на основе даты и времени начала триггера с расширением CSV:

      @concat(formatDateTime(dataset().triggerStart,'yyyyMMddHHmmssfff'),'.csv')
      

      Конфигурация набора данных приемника — 3

    4. Вернитесь к параметрам Приемника в действии Копирование, щелкнув вкладку IncrementalCopyPipeline.

    5. Разверните свойства набора данных и введите динамическое содержимое в значение параметра triggerStart с помощью следующего выражения:

      @pipeline().parameters.triggerStartTime
      

    Конфигурация набора данных приемника — 4

  6. Щелкните кнопку "Отладка", чтобы проверить конвейер и убедиться, что структура папок и выходной файл созданы должным образом. Скачайте и откройте файл, чтобы проверить его содержимое.

    Отладка инкрементной копии — 3

  7. Убедитесь, что параметры добавлены в запрос, просмотрев входные параметры выполнения конвейера.

    Отладка инкрементной копии — 4

  8. Опубликуйте сущности (связанные службы, наборы данных и конвейеры) в службе "Фабрика данных", нажав кнопку Опубликовать все. Дождитесь сообщения Публикация успешно выполнена.

  9. Наконец, настройте триггер переворачивающегося окна для запуска конвейера через обычный интервал и задайте параметры времени начала и окончания.

    1. Нажмите кнопку Добавить триггер и выберите Создать/изменить

    Добавить новый триггер

    1. Введите имя триггера и укажите время начала, равное времени окончания окна отладки выше.

    Триггер

    1. На следующем экране укажите следующие значения параметров начала и окончания соответственно.

      @formatDateTime(trigger().outputs.windowStartTime,'yyyy-MM-dd HH:mm:ss.fff')
      @formatDateTime(trigger().outputs.windowEndTime,'yyyy-MM-dd HH:mm:ss.fff')
      

      Триггер

Примечание.

Триггер будет выполняться только после его публикации. Кроме того, ожидаемым поведением переворачивающегося окна является выполнение всех интервалов с даты начала до настоящего момента. Дополнительные сведения о триггерах переворачивающегося окна можно найти здесь.

  1. С помощью SQL Server Management Studio внесите некоторые дополнительные изменения в таблицу Customer, выполнив следующий SQL:

    insert into customers (customer_id, first_name, last_name, email, city) values (4, 'Farlie', 'Hadigate', 'fhadigate3@zdnet.com', 'Reading');
    insert into customers (customer_id, first_name, last_name, email, city) values (5, 'Anet', 'MacColm', 'amaccolm4@yellowbook.com', 'Portsmouth');
    insert into customers (customer_id, first_name, last_name, email, city) values (6, 'Elonore', 'Bearham', 'ebearham5@ebay.co.uk', 'Portsmouth');
    update customers set first_name='Elon' where customer_id=6;
    delete from customers where customer_id=5;
    
  2. Нажмите кнопку Опубликовать все. Дождитесь сообщения Публикация успешно выполнена.

  3. Через несколько минут сработает триггер конвейера, а новый файл будет загружен в службу хранилища Azure

Мониторинг конвейера добавочного копирования

  1. Щелкните вкладку Мониторинг слева. В открывшемся списке вы увидите запуск конвейера и его текущее состояние. Щелкните Refresh (Обновить), чтобы обновить этот список. Наведите курсор мыши на имя конвейера, чтобы получить доступ к действию и отчету о потреблении.

    Запуски конвейера

  2. Чтобы просмотреть запуски действий, связанные с этим запуском конвейера, щелкните имя Конвейера. Если обнаружены измененные данные, в списке будут три действия, включая действие копирования, в противном случае в списке останутся только две записи. Чтобы вернуться к просмотру запусков конвейера, щелкните ссылку Все конвейеры в верхней части окна.

    Выполнение действия

Проверьте результаты.

Вы увидите второй файл в папке customers/incremental/YYYY/MM/DD контейнера raw.

Выходной файл, полученный в результате добавочного копирования

В этом руководстве рассказывается о копировании новых и измененных файлов на основе параметра LastModifiedDate:

Copy new files by lastmodifieddate (Копирование новых файлов с использованием параметра LastModifiedDate)