Aracılığıyla paylaş


Apache Cassandra için Azure Cosmos DB'de değişiklik akışı

ŞUNLAR IÇIN GEÇERLIDIR: Cassandra

Apache Cassandra için Azure Cosmos DB'de değişiklik akışı desteği, Cassandra Sorgu Dili'ndeki (CQL) sorgu önkoşulları aracılığıyla kullanılabilir. Bu koşul koşullarını kullanarak değişiklik akışı API'sini sorgulayabilirsiniz. Uygulamalar CQL'de gerekli olduğu gibi birincil anahtarı (bölüm anahtarı olarak da bilinir) kullanarak bir tabloda yapılan değişiklikleri alabilir. Ardından sonuçlara göre daha fazla eylem gerçekleştirebilirsiniz. Tablodaki satırlarda yapılan değişiklikler, değişiklik süreleri ve bölüm anahtarı başına sıralama düzeni sırasıyla yakalanır.

Aşağıdaki örnek, .NET kullanarak Cassandra Keyspace tablosu için BIR API'deki tüm satırlarda değişiklik akışının nasıl alıneceğini gösterir. COSMOS_CHANGEFEED_START_TIME() koşulu, belirtilen başlangıç zamanından (bu örnekte geçerli tarih saat) değişiklik akışındaki öğeleri sorgulamak için doğrudan CQL içinde kullanılır. C# için tam örneği buradan ve Java için buradan indirebilirsiniz.

Her yinelemede, disk belleği durumu kullanılarak sorgu son noktadaki değişiklikler okundu olarak sürdürülür. Keyspace'te tabloda yapılan yeni değişikliklerin sürekli akışını görebiliriz. Eklenen veya güncelleştirilen satırlarda yapılan değişiklikleri göreceğiz. Cassandra için API'de değişiklik akışını kullanarak silme işlemlerini izlemek şu anda desteklenmemektedir.

Not

Bir koleksiyonu bıraktıktan sonra bir belirtecin yeniden kullanılıp aynı adla yeniden oluşturma işlemi hatayla sonuçlanır. Yeni bir koleksiyon oluştururken ve koleksiyon adını yeniden kullanırken pageState değerini null olarak ayarlamanızı öneririz.

    Session cassandraSession = utils.getSession();

    try {
        DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");  
        LocalDateTime now = LocalDateTime.now().minusHours(6).minusMinutes(30);  
        String query="SELECT * FROM uprofile.user where COSMOS_CHANGEFEED_START_TIME()='" 
            + dtf.format(now)+ "'";
        
        byte[] token=null; 
        System.out.println(query); 
        while(true)
        {
            SimpleStatement st=new  SimpleStatement(query);
            st.setFetchSize(100);
            if(token!=null)
                st.setPagingStateUnsafe(token);
            
            ResultSet result=cassandraSession.execute(st) ;
            token=result.getExecutionInfo().getPagingState().toBytes();
            
            for(Row row:result)
            {
                System.out.println(row.getString("user_name"));
            }
        }
    } finally {
        utils.close();
        LOGGER.info("Please delete your table after verifying the presence of the data in portal or from CQL");
    }

Değişiklikleri birincil anahtara göre tek bir satıra almak için sorguya birincil anahtarı ekleyebilirsiniz. Aşağıdaki örnekte, "user_id = 1" olan satırdaki değişikliklerin nasıl izlendiği gösterilmektedir

    String query="SELECT * FROM uprofile.user where user_id=1 and COSMOS_CHANGEFEED_START_TIME()='" 
                       + dtf.format(now)+ "'";
    SimpleStatement st=new  SimpleStatement(query);

Geçerli sınırlamalar

Cassandra için API ile değişiklik akışı kullanılırken aşağıdaki sınırlamalar geçerlidir:

  • Eklemeler ve güncelleştirmeler şu anda desteklenmektedir. Silme işlemi henüz desteklenmiyor. Geçici bir çözüm olarak, silinen satırlara geçici bir işaretçi ekleyebilirsiniz. Örneğin, satıra "silinmiş" adlı bir alan ekleyin ve bunu "true" olarak ayarlayın.
  • NoSQL için çekirdek API'de olduğu gibi son güncelleştirme kalıcıdır ve varlıkta ara güncelleştirmeler kullanılamaz.

Hata işleme

Cassandra için API'de değişiklik akışı kullanılırken aşağıdaki hata kodları ve iletiler desteklenir:

  • HTTP hata kodu 429 - Değişiklik akışı hızı sınırlı olduğunda boş bir sayfa döndürür.

Sonraki adımlar