트랜잭션 일괄 처리 작업을 만들 때 컨테이너 인스턴스로 시작하고 CreateTransactionalBatch를 호출합니다.
PartitionKey partitionKey = new PartitionKey("road-bikes");
TransactionalBatch batch = container.CreateTransactionalBatch(partitionKey);
다음으로 일괄 처리에 여러 작업을 추가합니다.
Product bike = new (
id: "68719520766",
category: "road-bikes",
name: "Chropen Road Bike"
);
batch.CreateItem<Product>(bike);
Part part = new (
id: "68719519885",
category: "road-bikes",
name: "Tronosuros Tire",
productId: bike.id
);
batch.CreateItem<Part>(part);
마지막으로 일괄 처리에서 ExecuteAsync를 호출합니다.
using TransactionalBatchResponse response = await batch.ExecuteAsync();
응답이 수신되면 응답이 성공적으로 수행되는지 검토합니다. 응답이 성공을 나타내는 경우 결과를 추출합니다.
if (response.IsSuccessStatusCode)
{
TransactionalBatchOperationResult<Product> productResponse;
productResponse = response.GetOperationResultAtIndex<Product>(0);
Product productResult = productResponse.Resource;
TransactionalBatchOperationResult<Part> partResponse;
partResponse = response.GetOperationResultAtIndex<Part>(1);
Part partResult = partResponse.Resource;
}
Important
오류가 발생하는 경우 실패한 작업에 해당 오류에 대한 상태 코드가 포함됩니다. 다른 모든 작업에는 424 상태 코드(실패한 종속성)가 포함됩니다. 이미 존재하는 항목을 만들려고 하여 작업이 실패하면 상태 코드 409(충돌)가 반환됩니다. 이 상태 코드를 사용하여 트랜잭션 오류의 원인을 식별할 수 있습니다.
트랜잭션 일괄 처리 작업을 만들 때 CosmosBatch.createCosmosBatch를 호출합니다.
PartitionKey partitionKey = new PartitionKey("road-bikes");
CosmosBatch batch = CosmosBatch.createCosmosBatch(partitionKey);
다음으로 일괄 처리에 여러 작업을 추가합니다.
Product bike = new Product();
bike.setId("68719520766");
bike.setCategory("road-bikes");
bike.setName("Chropen Road Bike");
batch.createItemOperation(bike);
Part part = new Part();
part.setId("68719519885");
part.setCategory("road-bikes");
part.setName("Tronosuros Tire");
part.setProductId(bike.getId());
batch.createItemOperation(part);
마지막으로 컨테이너 인스턴스를 사용하여 일괄 처리로 executeCosmosBatch를 호출합니다.
CosmosBatchResponse response = container.executeCosmosBatch(batch);
응답이 수신되면 응답이 성공적으로 수행되는지 검토합니다. 응답이 성공을 나타내는 경우 결과를 추출합니다.
if (response.isSuccessStatusCode())
{
List<CosmosBatchOperationResult> results = response.getResults();
}
Important
오류가 발생하는 경우 실패한 작업에 해당 오류에 대한 상태 코드가 포함됩니다. 다른 모든 작업에는 424 상태 코드(실패한 종속성)가 포함됩니다. 이미 존재하는 항목을 만들려고 하여 작업이 실패하면 상태 코드 409(충돌)가 반환됩니다. 이 상태 코드를 사용하여 트랜잭션 오류의 원인을 식별할 수 있습니다.
컨테이너 인스턴스를 가져오거나 만듭니다.
container = database.create_container_if_not_exists(id="batch_container",
partition_key=PartitionKey(path='/category'))
Python에서 트랜잭션 Batch 작업은 단일 작업 API와 매우 유사하며, (Operation_type_string, args_tuple, Batch_Operation_kwargs_dictionary)를 포함하는 튜플입니다. 다음은 일괄 처리 작업 기능을 시연하는 데 사용되는 샘플 항목입니다.
create_demo_item = {
"id": "68719520766",
"category": "road-bikes",
"name": "Chropen Road Bike"
}
# for demo, assume that this item already exists in the container.
# the item id will be used for read operation in the batch
read_demo_item1 = {
"id": "68719519884",
"category": "road-bikes",
"name": "Tronosuros Tire",
"productId": "68719520766"
}
# for demo, assume that this item already exists in the container.
# the item id will be used for read operation in the batch
read_demo_item2 = {
"id": "68719519886",
"category": "road-bikes",
"name": "Tronosuros Tire",
"productId": "68719520766"
}
# for demo, assume that this item already exists in the container.
# the item id will be used for read operation in the batch
read_demo_item3 = {
"id": "68719519887",
"category": "road-bikes",
"name": "Tronosuros Tire",
"productId": "68719520766"
}
# for demo, we'll upsert the item with id 68719519885
upsert_demo_item = {
"id": "68719519885",
"category": "road-bikes",
"name": "Tronosuros Tire Upserted",
"productId": "68719520768"
}
# for replace demo, we'll replace the read_demo_item2 with this item
replace_demo_item = {
"id": "68719519886",
"category": "road-bikes",
"name": "Tronosuros Tire replaced",
"productId": "68719520769"
}
# for replace with etag match demo, we'll replace the read_demo_item3 with this item
# The use of etags and if-match/if-none-match options allows users to run conditional replace operations
# based on the etag value passed. When using if-match, the request will only succeed if the item's latest etag
# matches the passed in value. For more on optimistic concurrency control, see the link below:
# https://zcusa.951200.xyz/azure/cosmos-db/nosql/database-transactions-optimistic-concurrency
replace_demo_item_if_match_operation = {
"id": "68719519887",
"category": "road-bikes",
"name": "Tronosuros Tireh",
"wasReplaced": "Replaced based on etag match"
"productId": "68719520769"
}
일괄 처리에 추가할 작업을 준비합니다.
create_item_operation = ("create", (create_demo_item,), {})
read_item_operation = ("read", ("68719519884",), {})
delete_item_operation = ("delete", ("68719519885",), {})
upsert_item_operation = ("upsert", (upsert_demo_item,), {})
replace_item_operation = ("replace", ("68719519886", replace_demo_item), {})
replace_item_if_match_operation = ("replace",
("68719519887", replace_demo_item_if_match_operation),
{"if_match_etag": container.client_connection.last_response_headers.get("etag")})
일괄 처리에 작업을 추가합니다.
batch_operations = [
create_item_operation,
read_item_operation,
delete_item_operation,
upsert_item_operation,
replace_item_operation,
replace_item_if_match_operation
]
마지막으로 일괄 처리를 실행합니다.
try:
# Run that list of operations
batch_results = container.execute_item_batch(batch_operations=batch_operations, partition_key="road_bikes")
# Batch results are returned as a list of item operation results - or raise a CosmosBatchOperationError if
# one of the operations failed within your batch request.
print("\nResults for the batch operations: {}\n".format(batch_results))
except exceptions.CosmosBatchOperationError as e:
error_operation_index = e.error_index
error_operation_response = e.operation_responses[error_operation_index]
error_operation = batch_operations[error_operation_index]
print("\nError operation: {}, error operation response: {}\n".format(error_operation, error_operation_response))
# [END handle_batch_error]
일괄 처리에서 패치 작업 및 바꾸기_if_match_etag 작업 사용에 대한 참고 사항
일괄 처리 작업 kwargs 사전은 제한되어 있으며 총 3개의 서로 다른 키 값만 사용합니다. 일괄 처리 내에서 조건부 패치를 사용하려는 경우 패치 작업에는 filter_predicate 키를 사용할 수 있으며, 작업에 etag를 사용하려는 경우 if_match_etag/if_none_match_etag 키도 사용할 수 있습니다.
batch_operations = [
("replace", (item_id, item_body), {"if_match_etag": etag}),
("patch", (item_id, operations), {"filter_predicate": filter_predicate, "if_none_match_etag": etag}),
]
오류가 발생하는 경우 실패한 작업에 해당 오류에 대한 상태 코드가 포함됩니다. 다른 모든 작업에는 424 상태 코드(실패한 종속성)가 포함됩니다. 이미 존재하는 항목을 만들려고 하여 작업이 실패하면 상태 코드 409(충돌)가 반환됩니다. 이 상태 코드를 사용하여 트랜잭션 오류의 원인을 식별할 수 있습니다.
트랜잭션 일괄 처리 작업을 실행하는 방법
트랜잭션 Batch가 실행되면 트랜잭션 Batch의 모든 작업이 그룹화되고 단일 페이로드로 직렬화되며 단일 요청으로 Azure Cosmos DB 서비스에 전송됩니다.
이 서비스는 요청을 수신하고 트랜잭션 범위 내에서 모든 작업을 실행하며 동일한 serialization 프로토콜을 사용하여 응답을 반환합니다. 이 응답은 성공 또는 실패이며 작업당 개별 작업 응답을 제공합니다.
SDK는 결과를 확인하고 필요에 따라 각 내부 작업 결과를 추출하기 위한 응답을 제공합니다.
제한 사항
현재는 다음과 같은 두 가지 알려진 제한 사항이 있습니다.
- Azure Cosmos DB 요청 크기 제한은 트랜잭션 Batch 페이로드의 크기를 2MB를 초과하지 않도록 제한하고 최대 실행 시간은 5초입니다.
- 성능이 예상대로 SLA 내에 있는지 확인하기 위해 현재 트랜잭션 Batch당 작업 100개로 제한되어 있습니다.
다음 단계