다음을 통해 공유


Apache Spark를 사용하여 Azure Cosmos DB에서 전용 SQL 풀로 데이터 복사

Azure Cosmos DB용 Azure Synapse Link를 사용하면 Azure Cosmos DB의 작동 데이터에 대해 거의 실시간으로 분석을 실행할 수 있습니다. 그러나 데이터 웨어하우스 사용자에게 서비스를 제공하기 위해 일부 데이터를 집계하고 보강해야 하는 경우도 있습니다. Azure Synapse Link 데이터를 큐레이팅하고 내보내는 작업은 Notebook에 있는 몇 개의 셀만으로 수행할 수 있습니다.

필수 조건

단계

이 자습서에서는 트랜잭션 저장소에 영향을 주지 않도록 분석 저장소에 연결합니다(요청 단위를 사용하지 않음). 다음 단계를 수행합니다.

  1. Azure Cosmos DB HTAP 컨테이너를 Spark 데이터 프레임으로 읽기
  2. 새 데이터 프레임에 결과 집계
  3. 전용 SQL 풀로 데이터 수집

Spark에서 SQL로의 1단계

데이터

이 예제에서는 RetailSales라는 HTAP 컨테이너를 사용합니다. ConnectedData라는 연결된 서비스의 일부이며, 다음과 같은 스키마가 있습니다.

  • _rid: string(nullable = true)
  • _ts: long(nullable = true)
  • logQuantity: double(nullable = true)
  • productCode: string(nullable = true)
  • quantity: long(nullable = true)
  • price: long(nullable = true)
  • id: string(nullable = true)
  • advertising: long(nullable = true)
  • storeId: long(nullable = true)
  • weekStarting: long(nullable = true)
  • _etag: string(nullable = true)

보고용으로 productCodeweekStarting별로 판매(수량, 수익(가격 x 수량))를 집계합니다. 마지막으로 해당 데이터를 dbo.productsales라는 전용 SQL 풀 테이블로 내보냅니다.

Spark Notebook 구성

Scala as Spark(Scala)를 주 언어로 사용하여 Spark Notebook을 만듭니다. 세션에 Notebook의 기본 설정을 사용합니다.

Spark에서 데이터 읽기

Spark가 있는 Azure Cosmos DB HTAP 컨테이너를 첫 번째 셀의 데이터 프레임으로 읽습니다.

val df_olap = spark.read.format("cosmos.olap").
    option("spark.synapse.linkedService", "ConnectedData").
    option("spark.cosmos.container", "RetailSales").
    load()

새 데이터 프레임에 결과 집계

두 번째 셀에서는 전용 SQL 풀 데이터베이스에 로드하기 전에 새 데이터 프레임에 필요한 변환과 집계를 실행합니다.

// Select relevant columns and create revenue
val df_olap_step1 = df_olap.select("productCode","weekStarting","quantity","price").withColumn("revenue",col("quantity")*col("price"))
//Aggregate revenue, quantity sold and avg. price by week and product ID
val df_olap_aggr = df_olap_step1.groupBy("productCode","weekStarting").agg(sum("quantity") as "Sum_quantity",sum("revenue") as "Sum_revenue").
    withColumn("AvgPrice",col("Sum_revenue")/col("Sum_quantity"))

결과를 전용 SQL 풀에 로드

세 번째 셀에서는 데이터를 전용 SQL 풀에 로드합니다. 작업이 완료되면 삭제될 임시 외부 테이블, 외부 테이블 소스 및 외부 파일 형식이 자동으로 생성됩니다.

df_olap_aggr.write.sqlanalytics("userpool.dbo.productsales", Constants.INTERNAL)

SQL로 결과 쿼리

다음 SQL 스크립트와 같은 간단한 SQL 쿼리를 사용하여 결과를 쿼리할 수 있습니다.

SELECT  [productCode]
,[weekStarting]
,[Sum_quantity]
,[Sum_revenue]
,[AvgPrice]
 FROM [dbo].[productsales]

쿼리는 차트 모드에서 다음과 같은 결과를 제공합니다. Spark에서 SQL 2단계로

다음 단계