Azure Blob Storage change feed client library for Java - version 12.0.0-beta.27
The purpose of the change feed is to provide transaction logs of all the changes that occur to the blobs and the blob metadata in your storage account. The change feed provides ordered, guaranteed, durable, immutable, read-only log of these changes. Client applications can read these logs at any time. The change feed enables you to build efficient and scalable solutions that process change events that occur in your Blob Storage account at a low cost.
Getting started
Prerequisites
- Java Development Kit (JDK) with version 8 or above
- Here are details about Java 8 client compatibility with Azure Certificate Authority.
- Azure Subscription
- Create Storage Account
Include the package
<dependency>
<groupId>com.azure</groupId>
<artifactId>azure-storage-blob-changefeed</artifactId>
<version>12.0.0-beta.27</version>
</dependency>
Create a Storage Account
To create a Storage Account you can use the Azure Portal or Azure CLI.
az storage account create \
--resource-group <resource-group-name> \
--name <storage-account-name> \
--location <location>
Your storage account URL, subsequently identified as <your-storage-account-url>
, would be formatted as follows
http(s)://<storage-account-name>.blob.core.windows.net
Authenticate the client
In order to interact with the Storage Service (Blob, Queue, Message, MessageId, File) you'll need to create an instance of the Service Client class. To make this possible you'll need the Account SAS (shared access signature) string of the Storage Account. Learn more at SAS Token
Get credentials
SAS Token
a. Use the Azure CLI snippet below to get the SAS token from the Storage Account.
az storage blob generate-sas \
--account-name {Storage Account name} \
--container-name {container name} \
--name {blob name} \
--permissions {permissions to grant} \
--expiry {datetime to expire the SAS token} \
--services {storage services the SAS allows} \
--resource-types {resource types the SAS allows}
Example:
CONNECTION_STRING=<connection-string>
az storage blob generate-sas \
--account-name MyStorageAccount \
--container-name MyContainer \
--name MyBlob \
--permissions racdw \
--expiry 2020-06-15
b. Alternatively, get the Account SAS Token from the Azure Portal.
- Go to your Storage Account
- Select
Shared access signature
from the menu on the left - Click on
Generate SAS and connection string
(after setup)
Shared Key Credential
a. Use Account name and Account key. Account name is your Storage Account name.
- Go to your Storage Account
- Select
Access keys
from the menu on the left - Under
key1
/key2
copy the contents of theKey
field
or
b. Use the connection string.
- Go to your Storage Account
- Select
Access keys
from the menu on the left - Under
key1
/key2
copy the contents of theConnection string
field
Key concepts
The change feed is stored as blobs in a special container in your storage account at standard blob pricing cost. You can control the retention period of these files based on your requirements (See the conditions of the current release). Change events are appended to the change feed as records in the Apache Avro format specification: a compact, fast, binary format that provides rich data structures with inline schema. This format is widely used in the Hadoop ecosystem, Stream Analytics, and Azure Data Factory.
You can process these logs incrementally or in-full. Any number of client applications can independently read the change feed, in parallel, and at their own pace. Analytics applications such as Apache Drill or Apache Spark can consume logs directly as Avro files, which let you process them at a low-cost, with high-bandwidth, and without having to write a custom application.
Examples
The following sections provide several code snippets covering some of the most common Azure Storage Blob Changefeed tasks, including:
- Create a
BlobChangefeedClient
- Get events
- Get events between a start and end time
- Resume with a cursor
- Poll for events with a cursor
Create a BlobChangefeedClient
client = new BlobChangefeedClientBuilder(blobServiceClient).buildClient();
Get events
client.getEvents().forEach(event ->
System.out.printf("Topic: %s, Subject: %s%n", event.getTopic(), event.getSubject()));
Get events between a start and end time
OffsetDateTime startTime = OffsetDateTime.MIN;
OffsetDateTime endTime = OffsetDateTime.now();
client.getEvents(startTime, endTime).forEach(event ->
System.out.printf("Topic: %s, Subject: %s%n", event.getTopic(), event.getSubject()));
Resume with a cursor
BlobChangefeedPagedIterable iterable = client.getEvents();
Iterable<BlobChangefeedPagedResponse> pages = iterable.iterableByPage();
String cursor = null;
for (BlobChangefeedPagedResponse page : pages) {
page.getValue().forEach(event ->
System.out.printf("Topic: %s, Subject: %s%n", event.getTopic(), event.getSubject()));
/*
* Get the change feed cursor. The cursor is not required to get each page of events,
* it is intended to be saved and used to resume iterating at a later date.
*/
cursor = page.getContinuationToken();
}
/* Resume iterating from the pervious position with the cursor. */
client.getEvents(cursor).forEach(event ->
System.out.printf("Topic: %s, Subject: %s%n", event.getTopic(), event.getSubject()));
Poll for events with a cursor
List<BlobChangefeedEvent> changefeedEvents = new ArrayList<>();
/* Get the start time. The change feed client will round start time down to the nearest hour if you provide
an OffsetDateTime with minutes and seconds. */
OffsetDateTime startTime = OffsetDateTime.now();
/* Get your polling interval. */
long pollingInterval = 1000 * 60 * 5; /* 5 minutes. */
/* Get initial set of events. */
Iterable<BlobChangefeedPagedResponse> pages = client.getEvents(startTime, null).iterableByPage();
String continuationToken = null;
while (true) {
for (BlobChangefeedPagedResponse page : pages) {
changefeedEvents.addAll(page.getValue());
/*
* Get the change feed cursor. The cursor is not required to get each page of events,
* it is intended to be saved and used to resume iterating at a later date.
*/
continuationToken = page.getContinuationToken();
}
/* Wait before processing next batch of events. */
try {
Thread.sleep(pollingInterval);
} catch (InterruptedException e) {
e.printStackTrace();
}
/* Resume from last continuation token and fetch latest set of events. */
pages = client.getEvents(continuationToken).iterableByPage();
}
Troubleshooting
When interacting with blobs using this Java client library, errors returned by the service correspond to the same HTTP
status codes returned for REST API requests. For example, if you try to retrieve a container or blob that
doesn't exist in your Storage Account, a 404
error is returned, indicating Not Found
.
Default HTTP Client
All client libraries by default use the Netty HTTP client. Adding the above dependency will automatically configure the client library to use the Netty HTTP client. Configuring or changing the HTTP client is detailed in the HTTP clients wiki.
Default SSL library
All client libraries, by default, use the Tomcat-native Boring SSL library to enable native-level performance for SSL operations. The Boring SSL library is an uber jar containing native libraries for Linux / macOS / Windows, and provides better performance compared to the default SSL implementation within the JDK. For more information, including how to reduce the dependency size, refer to the performance tuning section of the wiki.
Next steps
Several Storage blob changefeed Java SDK samples are available to you in the SDK's GitHub repository.
Next steps Samples
Samples are explained in detail here.
Contributing
This project welcomes contributions and suggestions. Most contributions require you to agree to a Contributor License Agreement (CLA) declaring that you have the right to, and actually do, grant us the rights to use your contribution. For details, visit https://cla.microsoft.com.
When you submit a pull request, a CLA-bot will automatically determine whether you need to provide a CLA and decorate the PR appropriately (e.g., label, comment). Simply follow the instructions provided by the bot. You will only need to do this once across all repos using our CLA.
This project has adopted the Microsoft Open Source Code of Conduct. For more information see the Code of Conduct FAQ or contact opencode@microsoft.com with any additional questions or comments.