Instruções: Utilizar o BatchBlock e o BatchedJoinBlock para melhorar a eficiência
A Biblioteca de Fluxos de Dados TPL fornece as System.Threading.Tasks.Dataflow.BatchBlock<T> classes e System.Threading.Tasks.Dataflow.BatchedJoinBlock<T1,T2> para que possa receber e colocar dados na memória intermédia de uma ou mais origens e, em seguida, propagar esses dados em memória intermédia como uma coleção. Este mecanismo de criação de batches é útil quando recolhe dados de uma ou mais origens e, em seguida, processa vários elementos de dados como um lote. Por exemplo, considere uma aplicação que utiliza o fluxo de dados para inserir registos numa base de dados. Esta operação pode ser mais eficiente se forem inseridos vários itens ao mesmo tempo em vez de um de cada vez sequencialmente. Este documento descreve como utilizar a BatchBlock<T> classe para melhorar a eficiência dessas operações de inserção de base de dados. Também descreve como utilizar a BatchedJoinBlock<T1,T2> classe para capturar os resultados e quaisquer exceções que ocorram quando o programa lê a partir de uma base de dados.
Nota
A Biblioteca de Fluxos de Dados TPL (o System.Threading.Tasks.Dataflow espaço de nomes) não é distribuída com .NET. Para instalar o System.Threading.Tasks.Dataflow espaço de nomes no Visual Studio, abra o seu projeto, selecione Gerir Pacotes NuGet no menu Projeto e procure o System.Threading.Tasks.Dataflow
pacote online. Em alternativa, para instalá-lo com a CLI do .NET Core, execute dotnet add package System.Threading.Tasks.Dataflow
.
Pré-requisitos
Leia a secção Blocos de Associação no documento Fluxo de Dados antes de iniciar estas instruções.
Certifique-se de que tem uma cópia da base de dados Northwind, Northwind.sdf, disponível no seu computador. Normalmente, este ficheiro está localizado na pasta %Program Files%\Microsoft SQL Server Compact Edition\v3.5\Samples\.
Importante
Em algumas versões do Windows, não pode ligar-se a Northwind.sdf se o Visual Studio estiver em execução num modo não administrador. Para ligar a Northwind.sdf, inicie o Visual Studio ou uma Linha de Comandos do Programador para Visual Studio no modo Executar como administrador .
Estas instruções contêm as seguintes secções:
Adicionar Dados de Funcionários à Base de Dados sem Utilizar Memória Intermédia
Utilizar a Memória Intermédia para Adicionar Dados dos Funcionários à Base de Dados
Utilizar a Associação à Memória Intermédia para Ler Dados dos Funcionários a partir da Base de Dados
Criar a Aplicação de Consola
No Visual Studio, crie um projeto visual C# ou Visual Basic Console Application . Neste documento, o projeto tem o nome
DataflowBatchDatabase
.No seu projeto, adicione uma referência ao System.Data.SqlServerCe.dll e uma referência ao System.Threading.Tasks.Dataflow.dll.
Certifique-se de que Form1.cs (Form1.vb para Visual Basic) contém as seguintes
using
instruções (Imports
no Visual Basic).using System; using System.Collections.Generic; using System.Data.SqlServerCe; using System.Diagnostics; using System.IO; using System.Threading.Tasks.Dataflow;
Imports System.Collections.Generic Imports System.Data.SqlServerCe Imports System.Diagnostics Imports System.IO Imports System.Threading.Tasks.Dataflow
Adicione os seguintes membros de dados à
Program
classe .// The number of employees to add to the database. // TODO: Change this value to experiment with different numbers of // employees to insert into the database. static readonly int insertCount = 256; // The size of a single batch of employees to add to the database. // TODO: Change this value to experiment with different batch sizes. static readonly int insertBatchSize = 96; // The source database file. // TODO: Change this value if Northwind.sdf is at a different location // on your computer. static readonly string sourceDatabase = @"C:\Program Files\Microsoft SQL Server Compact Edition\v3.5\Samples\Northwind.sdf"; // TODO: Change this value if you require a different temporary location. static readonly string scratchDatabase = @"C:\Temp\Northwind.sdf";
' The number of employees to add to the database. ' TODO: Change this value to experiment with different numbers of ' employees to insert into the database. Private Shared ReadOnly insertCount As Integer = 256 ' The size of a single batch of employees to add to the database. ' TODO: Change this value to experiment with different batch sizes. Private Shared ReadOnly insertBatchSize As Integer = 96 ' The source database file. ' TODO: Change this value if Northwind.sdf is at a different location ' on your computer. Private Shared ReadOnly sourceDatabase As String = "C:\Program Files\Microsoft SQL Server Compact Edition\v3.5\Samples\Northwind.sdf" ' TODO: Change this value if you require a different temporary location. Private Shared ReadOnly scratchDatabase As String = "C:\Temp\Northwind.sdf"
Definir a Turma de Funcionários
Adicione à Program
classe a Employee
classe .
// Describes an employee. Each property maps to a
// column in the Employees table in the Northwind database.
// For brevity, the Employee class does not contain
// all columns from the Employees table.
class Employee
{
public int EmployeeID { get; set; }
public string LastName { get; set; }
public string FirstName { get; set; }
// A random number generator that helps tp generate
// Employee property values.
static Random rand = new Random(42);
// Possible random first names.
static readonly string[] firstNames = { "Tom", "Mike", "Ruth", "Bob", "John" };
// Possible random last names.
static readonly string[] lastNames = { "Jones", "Smith", "Johnson", "Walker" };
// Creates an Employee object that contains random
// property values.
public static Employee Random()
{
return new Employee
{
EmployeeID = -1,
LastName = lastNames[rand.Next() % lastNames.Length],
FirstName = firstNames[rand.Next() % firstNames.Length]
};
}
}
' Describes an employee. Each property maps to a
' column in the Employees table in the Northwind database.
' For brevity, the Employee class does not contain
' all columns from the Employees table.
Private Class Employee
Public Property EmployeeID() As Integer
Public Property LastName() As String
Public Property FirstName() As String
' A random number generator that helps tp generate
' Employee property values.
Private Shared rand As New Random(42)
' Possible random first names.
Private Shared ReadOnly firstNames() As String = {"Tom", "Mike", "Ruth", "Bob", "John"}
' Possible random last names.
Private Shared ReadOnly lastNames() As String = {"Jones", "Smith", "Johnson", "Walker"}
' Creates an Employee object that contains random
' property values.
Public Shared Function Random() As Employee
Return New Employee With {.EmployeeID = -1, .LastName = lastNames(rand.Next() Mod lastNames.Length), .FirstName = firstNames(rand.Next() Mod firstNames.Length)}
End Function
End Class
A Employee
classe contém três propriedades, EmployeeID
, LastName
e FirstName
. Estas propriedades correspondem às Employee ID
colunas , Last Name
e First Name
na Employees
tabela na base de dados Northwind. Para esta demonstração, a Employee
classe também define o Random
método , que cria um Employee
objeto que tem valores aleatórios para as respetivas propriedades.
Definir Operações de Base de Dados de Funcionários
Adicione à Program
classe os InsertEmployees
métodos , e .GetEmployeeID
GetEmployeeCount
// Adds new employee records to the database.
static void InsertEmployees(Employee[] employees, string connectionString)
{
using (SqlCeConnection connection =
new SqlCeConnection(connectionString))
{
try
{
// Create the SQL command.
SqlCeCommand command = new SqlCeCommand(
"INSERT INTO Employees ([Last Name], [First Name])" +
"VALUES (@lastName, @firstName)",
connection);
connection.Open();
for (int i = 0; i < employees.Length; i++)
{
// Set parameters.
command.Parameters.Clear();
command.Parameters.Add("@lastName", employees[i].LastName);
command.Parameters.Add("@firstName", employees[i].FirstName);
// Execute the command.
command.ExecuteNonQuery();
}
}
finally
{
connection.Close();
}
}
}
// Retrieves the number of entries in the Employees table in
// the Northwind database.
static int GetEmployeeCount(string connectionString)
{
int result = 0;
using (SqlCeConnection sqlConnection =
new SqlCeConnection(connectionString))
{
SqlCeCommand sqlCommand = new SqlCeCommand(
"SELECT COUNT(*) FROM Employees", sqlConnection);
sqlConnection.Open();
try
{
result = (int)sqlCommand.ExecuteScalar();
}
finally
{
sqlConnection.Close();
}
}
return result;
}
// Retrieves the ID of the first employee that has the provided name.
static int GetEmployeeID(string lastName, string firstName,
string connectionString)
{
using (SqlCeConnection connection =
new SqlCeConnection(connectionString))
{
SqlCeCommand command = new SqlCeCommand(
string.Format(
"SELECT [Employee ID] FROM Employees " +
"WHERE [Last Name] = '{0}' AND [First Name] = '{1}'",
lastName, firstName),
connection);
connection.Open();
try
{
return (int)command.ExecuteScalar();
}
finally
{
connection.Close();
}
}
}
' Adds new employee records to the database.
Private Shared Sub InsertEmployees(ByVal employees() As Employee, ByVal connectionString As String)
Using connection As New SqlCeConnection(connectionString)
Try
' Create the SQL command.
Dim command As New SqlCeCommand("INSERT INTO Employees ([Last Name], [First Name])" & "VALUES (@lastName, @firstName)", connection)
connection.Open()
For i As Integer = 0 To employees.Length - 1
' Set parameters.
command.Parameters.Clear()
command.Parameters.Add("@lastName", employees(i).LastName)
command.Parameters.Add("@firstName", employees(i).FirstName)
' Execute the command.
command.ExecuteNonQuery()
Next i
Finally
connection.Close()
End Try
End Using
End Sub
' Retrieves the number of entries in the Employees table in
' the Northwind database.
Private Shared Function GetEmployeeCount(ByVal connectionString As String) As Integer
Dim result As Integer = 0
Using sqlConnection As New SqlCeConnection(connectionString)
Dim sqlCommand As New SqlCeCommand("SELECT COUNT(*) FROM Employees", sqlConnection)
sqlConnection.Open()
Try
result = CInt(Fix(sqlCommand.ExecuteScalar()))
Finally
sqlConnection.Close()
End Try
End Using
Return result
End Function
' Retrieves the ID of the first employee that has the provided name.
Private Shared Function GetEmployeeID(ByVal lastName As String, ByVal firstName As String, ByVal connectionString As String) As Integer
Using connection As New SqlCeConnection(connectionString)
Dim command As New SqlCeCommand(String.Format("SELECT [Employee ID] FROM Employees " & "WHERE [Last Name] = '{0}' AND [First Name] = '{1}'", lastName, firstName), connection)
connection.Open()
Try
Return CInt(Fix(command.ExecuteScalar()))
Finally
connection.Close()
End Try
End Using
End Function
O InsertEmployees
método adiciona novos registos de colaboradores à base de dados. O GetEmployeeCount
método obtém o número de entradas na Employees
tabela. O GetEmployeeID
método obtém o identificador do primeiro funcionário que tem o nome fornecido. Cada um destes métodos utiliza uma cadeia de ligação para a base de dados Northwind e utiliza a funcionalidade no System.Data.SqlServerCe
espaço de nomes para comunicar com a base de dados.
Adicionar Dados de Funcionários à Base de Dados Sem Utilizar Memória Intermédia
Adicione à classe os Program
AddEmployees
métodos e PostRandomEmployees
.
// Posts random Employee data to the provided target block.
static void PostRandomEmployees(ITargetBlock<Employee> target, int count)
{
Console.WriteLine("Adding {0} entries to Employee table...", count);
for (int i = 0; i < count; i++)
{
target.Post(Employee.Random());
}
}
// Adds random employee data to the database by using dataflow.
static void AddEmployees(string connectionString, int count)
{
// Create an ActionBlock<Employee> object that adds a single
// employee entry to the database.
var insertEmployee = new ActionBlock<Employee>(e =>
InsertEmployees(new Employee[] { e }, connectionString));
// Post several random Employee objects to the dataflow block.
PostRandomEmployees(insertEmployee, count);
// Set the dataflow block to the completed state and wait for
// all insert operations to complete.
insertEmployee.Complete();
insertEmployee.Completion.Wait();
}
' Posts random Employee data to the provided target block.
Private Shared Sub PostRandomEmployees(ByVal target As ITargetBlock(Of Employee), ByVal count As Integer)
Console.WriteLine("Adding {0} entries to Employee table...", count)
For i As Integer = 0 To count - 1
target.Post(Employee.Random())
Next i
End Sub
' Adds random employee data to the database by using dataflow.
Private Shared Sub AddEmployees(ByVal connectionString As String, ByVal count As Integer)
' Create an ActionBlock<Employee> object that adds a single
' employee entry to the database.
Dim insertEmployee = New ActionBlock(Of Employee)(Sub(e) InsertEmployees(New Employee() {e}, connectionString))
' Post several random Employee objects to the dataflow block.
PostRandomEmployees(insertEmployee, count)
' Set the dataflow block to the completed state and wait for
' all insert operations to complete.
insertEmployee.Complete()
insertEmployee.Completion.Wait()
End Sub
O AddEmployees
método adiciona dados de colaboradores aleatórios à base de dados através do fluxo de dados. Cria um ActionBlock<TInput> objeto que chama o InsertEmployees
método para adicionar uma entrada de funcionário à base de dados. Em AddEmployees
seguida, o método chama o PostRandomEmployees
método para publicar vários Employee
objetos no ActionBlock<TInput> objeto. Em AddEmployees
seguida, o método aguarda a conclusão de todas as operações de inserção.
Utilizar a Memória Intermédia para Adicionar Dados dos Funcionários à Base de Dados
Adicione à Program
classe o AddEmployeesBatched
método .
// Adds random employee data to the database by using dataflow.
// This method is similar to AddEmployees except that it uses batching
// to add multiple employees to the database at a time.
static void AddEmployeesBatched(string connectionString, int batchSize,
int count)
{
// Create a BatchBlock<Employee> that holds several Employee objects and
// then propagates them out as an array.
var batchEmployees = new BatchBlock<Employee>(batchSize);
// Create an ActionBlock<Employee[]> object that adds multiple
// employee entries to the database.
var insertEmployees = new ActionBlock<Employee[]>(a =>
InsertEmployees(a, connectionString));
// Link the batch block to the action block.
batchEmployees.LinkTo(insertEmployees);
// When the batch block completes, set the action block also to complete.
batchEmployees.Completion.ContinueWith(delegate { insertEmployees.Complete(); });
// Post several random Employee objects to the batch block.
PostRandomEmployees(batchEmployees, count);
// Set the batch block to the completed state and wait for
// all insert operations to complete.
batchEmployees.Complete();
insertEmployees.Completion.Wait();
}
' Adds random employee data to the database by using dataflow.
' This method is similar to AddEmployees except that it uses batching
' to add multiple employees to the database at a time.
Private Shared Sub AddEmployeesBatched(ByVal connectionString As String, ByVal batchSize As Integer, ByVal count As Integer)
' Create a BatchBlock<Employee> that holds several Employee objects and
' then propagates them out as an array.
Dim batchEmployees = New BatchBlock(Of Employee)(batchSize)
' Create an ActionBlock<Employee[]> object that adds multiple
' employee entries to the database.
Dim insertEmployees = New ActionBlock(Of Employee())(Sub(a) Program.InsertEmployees(a, connectionString))
' Link the batch block to the action block.
batchEmployees.LinkTo(insertEmployees)
' When the batch block completes, set the action block also to complete.
batchEmployees.Completion.ContinueWith(Sub() insertEmployees.Complete())
' Post several random Employee objects to the batch block.
PostRandomEmployees(batchEmployees, count)
' Set the batch block to the completed state and wait for
' all insert operations to complete.
batchEmployees.Complete()
insertEmployees.Completion.Wait()
End Sub
Este método assemelha-se AddEmployees
a , exceto que também utiliza a BatchBlock<T> classe para colocar vários Employee
objetos na memória intermédia antes de enviar esses objetos para o ActionBlock<TInput> objeto. Uma vez que a BatchBlock<T> classe propaga múltiplos elementos como uma coleção, o ActionBlock<TInput> objeto é modificado para atuar numa matriz de Employee
objetos. Tal como no AddEmployees
método , AddEmployeesBatched
chama o PostRandomEmployees
método para publicar múltiplos Employee
objetos; no entanto, AddEmployeesBatched
publica estes objetos no BatchBlock<T> objeto. O AddEmployeesBatched
método também aguarda que todas as operações de inserção sejam concluídas.
Utilizar a Associação à Memória Intermédia para Ler Dados dos Funcionários a partir da Base de Dados
Adicione à Program
classe o GetRandomEmployees
método .
// Displays information about several random employees to the console.
static void GetRandomEmployees(string connectionString, int batchSize,
int count)
{
// Create a BatchedJoinBlock<Employee, Exception> object that holds
// both employee and exception data.
var selectEmployees = new BatchedJoinBlock<Employee, Exception>(batchSize);
// Holds the total number of exceptions that occurred.
int totalErrors = 0;
// Create an action block that prints employee and error information
// to the console.
var printEmployees =
new ActionBlock<Tuple<IList<Employee>, IList<Exception>>>(data =>
{
// Print information about the employees in this batch.
Console.WriteLine("Received a batch...");
foreach (Employee e in data.Item1)
{
Console.WriteLine("Last={0} First={1} ID={2}",
e.LastName, e.FirstName, e.EmployeeID);
}
// Print the error count for this batch.
Console.WriteLine("There were {0} errors in this batch...",
data.Item2.Count);
// Update total error count.
totalErrors += data.Item2.Count;
});
// Link the batched join block to the action block.
selectEmployees.LinkTo(printEmployees);
// When the batched join block completes, set the action block also to complete.
selectEmployees.Completion.ContinueWith(delegate { printEmployees.Complete(); });
// Try to retrieve the ID for several random employees.
Console.WriteLine("Selecting random entries from Employees table...");
for (int i = 0; i < count; i++)
{
try
{
// Create a random employee.
Employee e = Employee.Random();
// Try to retrieve the ID for the employee from the database.
e.EmployeeID = GetEmployeeID(e.LastName, e.FirstName, connectionString);
// Post the Employee object to the Employee target of
// the batched join block.
selectEmployees.Target1.Post(e);
}
catch (NullReferenceException e)
{
// GetEmployeeID throws NullReferenceException when there is
// no such employee with the given name. When this happens,
// post the Exception object to the Exception target of
// the batched join block.
selectEmployees.Target2.Post(e);
}
}
// Set the batched join block to the completed state and wait for
// all retrieval operations to complete.
selectEmployees.Complete();
printEmployees.Completion.Wait();
// Print the total error count.
Console.WriteLine("Finished. There were {0} total errors.", totalErrors);
}
' Displays information about several random employees to the console.
Private Shared Sub GetRandomEmployees(ByVal connectionString As String, ByVal batchSize As Integer, ByVal count As Integer)
' Create a BatchedJoinBlock<Employee, Exception> object that holds
' both employee and exception data.
Dim selectEmployees = New BatchedJoinBlock(Of Employee, Exception)(batchSize)
' Holds the total number of exceptions that occurred.
Dim totalErrors As Integer = 0
' Create an action block that prints employee and error information
' to the console.
Dim printEmployees = New ActionBlock(Of Tuple(Of IList(Of Employee), IList(Of Exception)))(Sub(data)
' Print information about the employees in this batch.
' Print the error count for this batch.
' Update total error count.
Console.WriteLine("Received a batch...")
For Each e As Employee In data.Item1
Console.WriteLine("Last={0} First={1} ID={2}", e.LastName, e.FirstName, e.EmployeeID)
Next e
Console.WriteLine("There were {0} errors in this batch...", data.Item2.Count)
totalErrors += data.Item2.Count
End Sub)
' Link the batched join block to the action block.
selectEmployees.LinkTo(printEmployees)
' When the batched join block completes, set the action block also to complete.
selectEmployees.Completion.ContinueWith(Sub() printEmployees.Complete())
' Try to retrieve the ID for several random employees.
Console.WriteLine("Selecting random entries from Employees table...")
For i As Integer = 0 To count - 1
Try
' Create a random employee.
Dim e As Employee = Employee.Random()
' Try to retrieve the ID for the employee from the database.
e.EmployeeID = GetEmployeeID(e.LastName, e.FirstName, connectionString)
' Post the Employee object to the Employee target of
' the batched join block.
selectEmployees.Target1.Post(e)
Catch e As NullReferenceException
' GetEmployeeID throws NullReferenceException when there is
' no such employee with the given name. When this happens,
' post the Exception object to the Exception target of
' the batched join block.
selectEmployees.Target2.Post(e)
End Try
Next i
' Set the batched join block to the completed state and wait for
' all retrieval operations to complete.
selectEmployees.Complete()
printEmployees.Completion.Wait()
' Print the total error count.
Console.WriteLine("Finished. There were {0} total errors.", totalErrors)
End Sub
Este método imprime informações sobre funcionários aleatórios na consola do . Cria vários objetos aleatórios Employee
e chama o GetEmployeeID
método para obter o identificador exclusivo para cada objeto. Uma vez que o GetEmployeeID
método gera uma exceção se não existir um funcionário correspondente com os nomes próprios e apelidos especificados, o GetRandomEmployees
método utiliza a BatchedJoinBlock<T1,T2> classe para armazenar Employee
objetos para chamadas GetEmployeeID
System.Exception e objetos bem-sucedidos para chamadas que falham. O ActionBlock<TInput> objeto neste exemplo atua num Tuple<T1,T2> objeto que contém uma lista de Employee
objetos e uma lista de Exception objetos. O BatchedJoinBlock<T1,T2> objeto propaga estes dados quando a soma dos dados recebidos Employee
e Exception do objeto é igual ao tamanho do lote.
O Exemplo Completo
O exemplo seguinte mostra o código completo. O Main
método compara o tempo necessário para efetuar inserções de bases de dados em lote em comparação com o tempo para executar inserções de bases de dados não lotadas. Também demonstra a utilização da associação em memória intermédia para ler os dados dos funcionários da base de dados e também comunicar erros.
using System;
using System.Collections.Generic;
using System.Data.SqlServerCe;
using System.Diagnostics;
using System.IO;
using System.Threading.Tasks.Dataflow;
// Demonstrates how to use batched dataflow blocks to improve
// the performance of database operations.
namespace DataflowBatchDatabase
{
class Program
{
// The number of employees to add to the database.
// TODO: Change this value to experiment with different numbers of
// employees to insert into the database.
static readonly int insertCount = 256;
// The size of a single batch of employees to add to the database.
// TODO: Change this value to experiment with different batch sizes.
static readonly int insertBatchSize = 96;
// The source database file.
// TODO: Change this value if Northwind.sdf is at a different location
// on your computer.
static readonly string sourceDatabase =
@"C:\Program Files\Microsoft SQL Server Compact Edition\v3.5\Samples\Northwind.sdf";
// TODO: Change this value if you require a different temporary location.
static readonly string scratchDatabase =
@"C:\Temp\Northwind.sdf";
// Describes an employee. Each property maps to a
// column in the Employees table in the Northwind database.
// For brevity, the Employee class does not contain
// all columns from the Employees table.
class Employee
{
public int EmployeeID { get; set; }
public string LastName { get; set; }
public string FirstName { get; set; }
// A random number generator that helps tp generate
// Employee property values.
static Random rand = new Random(42);
// Possible random first names.
static readonly string[] firstNames = { "Tom", "Mike", "Ruth", "Bob", "John" };
// Possible random last names.
static readonly string[] lastNames = { "Jones", "Smith", "Johnson", "Walker" };
// Creates an Employee object that contains random
// property values.
public static Employee Random()
{
return new Employee
{
EmployeeID = -1,
LastName = lastNames[rand.Next() % lastNames.Length],
FirstName = firstNames[rand.Next() % firstNames.Length]
};
}
}
// Adds new employee records to the database.
static void InsertEmployees(Employee[] employees, string connectionString)
{
using (SqlCeConnection connection =
new SqlCeConnection(connectionString))
{
try
{
// Create the SQL command.
SqlCeCommand command = new SqlCeCommand(
"INSERT INTO Employees ([Last Name], [First Name])" +
"VALUES (@lastName, @firstName)",
connection);
connection.Open();
for (int i = 0; i < employees.Length; i++)
{
// Set parameters.
command.Parameters.Clear();
command.Parameters.Add("@lastName", employees[i].LastName);
command.Parameters.Add("@firstName", employees[i].FirstName);
// Execute the command.
command.ExecuteNonQuery();
}
}
finally
{
connection.Close();
}
}
}
// Retrieves the number of entries in the Employees table in
// the Northwind database.
static int GetEmployeeCount(string connectionString)
{
int result = 0;
using (SqlCeConnection sqlConnection =
new SqlCeConnection(connectionString))
{
SqlCeCommand sqlCommand = new SqlCeCommand(
"SELECT COUNT(*) FROM Employees", sqlConnection);
sqlConnection.Open();
try
{
result = (int)sqlCommand.ExecuteScalar();
}
finally
{
sqlConnection.Close();
}
}
return result;
}
// Retrieves the ID of the first employee that has the provided name.
static int GetEmployeeID(string lastName, string firstName,
string connectionString)
{
using (SqlCeConnection connection =
new SqlCeConnection(connectionString))
{
SqlCeCommand command = new SqlCeCommand(
string.Format(
"SELECT [Employee ID] FROM Employees " +
"WHERE [Last Name] = '{0}' AND [First Name] = '{1}'",
lastName, firstName),
connection);
connection.Open();
try
{
return (int)command.ExecuteScalar();
}
finally
{
connection.Close();
}
}
}
// Posts random Employee data to the provided target block.
static void PostRandomEmployees(ITargetBlock<Employee> target, int count)
{
Console.WriteLine("Adding {0} entries to Employee table...", count);
for (int i = 0; i < count; i++)
{
target.Post(Employee.Random());
}
}
// Adds random employee data to the database by using dataflow.
static void AddEmployees(string connectionString, int count)
{
// Create an ActionBlock<Employee> object that adds a single
// employee entry to the database.
var insertEmployee = new ActionBlock<Employee>(e =>
InsertEmployees(new Employee[] { e }, connectionString));
// Post several random Employee objects to the dataflow block.
PostRandomEmployees(insertEmployee, count);
// Set the dataflow block to the completed state and wait for
// all insert operations to complete.
insertEmployee.Complete();
insertEmployee.Completion.Wait();
}
// Adds random employee data to the database by using dataflow.
// This method is similar to AddEmployees except that it uses batching
// to add multiple employees to the database at a time.
static void AddEmployeesBatched(string connectionString, int batchSize,
int count)
{
// Create a BatchBlock<Employee> that holds several Employee objects and
// then propagates them out as an array.
var batchEmployees = new BatchBlock<Employee>(batchSize);
// Create an ActionBlock<Employee[]> object that adds multiple
// employee entries to the database.
var insertEmployees = new ActionBlock<Employee[]>(a =>
InsertEmployees(a, connectionString));
// Link the batch block to the action block.
batchEmployees.LinkTo(insertEmployees);
// When the batch block completes, set the action block also to complete.
batchEmployees.Completion.ContinueWith(delegate { insertEmployees.Complete(); });
// Post several random Employee objects to the batch block.
PostRandomEmployees(batchEmployees, count);
// Set the batch block to the completed state and wait for
// all insert operations to complete.
batchEmployees.Complete();
insertEmployees.Completion.Wait();
}
// Displays information about several random employees to the console.
static void GetRandomEmployees(string connectionString, int batchSize,
int count)
{
// Create a BatchedJoinBlock<Employee, Exception> object that holds
// both employee and exception data.
var selectEmployees = new BatchedJoinBlock<Employee, Exception>(batchSize);
// Holds the total number of exceptions that occurred.
int totalErrors = 0;
// Create an action block that prints employee and error information
// to the console.
var printEmployees =
new ActionBlock<Tuple<IList<Employee>, IList<Exception>>>(data =>
{
// Print information about the employees in this batch.
Console.WriteLine("Received a batch...");
foreach (Employee e in data.Item1)
{
Console.WriteLine("Last={0} First={1} ID={2}",
e.LastName, e.FirstName, e.EmployeeID);
}
// Print the error count for this batch.
Console.WriteLine("There were {0} errors in this batch...",
data.Item2.Count);
// Update total error count.
totalErrors += data.Item2.Count;
});
// Link the batched join block to the action block.
selectEmployees.LinkTo(printEmployees);
// When the batched join block completes, set the action block also to complete.
selectEmployees.Completion.ContinueWith(delegate { printEmployees.Complete(); });
// Try to retrieve the ID for several random employees.
Console.WriteLine("Selecting random entries from Employees table...");
for (int i = 0; i < count; i++)
{
try
{
// Create a random employee.
Employee e = Employee.Random();
// Try to retrieve the ID for the employee from the database.
e.EmployeeID = GetEmployeeID(e.LastName, e.FirstName, connectionString);
// Post the Employee object to the Employee target of
// the batched join block.
selectEmployees.Target1.Post(e);
}
catch (NullReferenceException e)
{
// GetEmployeeID throws NullReferenceException when there is
// no such employee with the given name. When this happens,
// post the Exception object to the Exception target of
// the batched join block.
selectEmployees.Target2.Post(e);
}
}
// Set the batched join block to the completed state and wait for
// all retrieval operations to complete.
selectEmployees.Complete();
printEmployees.Completion.Wait();
// Print the total error count.
Console.WriteLine("Finished. There were {0} total errors.", totalErrors);
}
static void Main(string[] args)
{
// Create a connection string for accessing the database.
// The connection string refers to the temporary database location.
string connectionString = string.Format(@"Data Source={0}",
scratchDatabase);
// Create a Stopwatch object to time database insert operations.
Stopwatch stopwatch = new Stopwatch();
// Start with a clean database file by copying the source database to
// the temporary location.
File.Copy(sourceDatabase, scratchDatabase, true);
// Demonstrate multiple insert operations without batching.
Console.WriteLine("Demonstrating non-batched database insert operations...");
Console.WriteLine("Original size of Employee table: {0}.",
GetEmployeeCount(connectionString));
stopwatch.Start();
AddEmployees(connectionString, insertCount);
stopwatch.Stop();
Console.WriteLine("New size of Employee table: {0}; elapsed insert time: {1} ms.",
GetEmployeeCount(connectionString), stopwatch.ElapsedMilliseconds);
Console.WriteLine();
// Start again with a clean database file.
File.Copy(sourceDatabase, scratchDatabase, true);
// Demonstrate multiple insert operations, this time with batching.
Console.WriteLine("Demonstrating batched database insert operations...");
Console.WriteLine("Original size of Employee table: {0}.",
GetEmployeeCount(connectionString));
stopwatch.Restart();
AddEmployeesBatched(connectionString, insertBatchSize, insertCount);
stopwatch.Stop();
Console.WriteLine("New size of Employee table: {0}; elapsed insert time: {1} ms.",
GetEmployeeCount(connectionString), stopwatch.ElapsedMilliseconds);
Console.WriteLine();
// Start again with a clean database file.
File.Copy(sourceDatabase, scratchDatabase, true);
// Demonstrate multiple retrieval operations with error reporting.
Console.WriteLine("Demonstrating batched join database select operations...");
// Add a small number of employees to the database.
AddEmployeesBatched(connectionString, insertBatchSize, 16);
// Query for random employees.
GetRandomEmployees(connectionString, insertBatchSize, 10);
}
}
}
/* Sample output:
Demonstrating non-batched database insert operations...
Original size of Employee table: 15.
Adding 256 entries to Employee table...
New size of Employee table: 271; elapsed insert time: 11035 ms.
Demonstrating batched database insert operations...
Original size of Employee table: 15.
Adding 256 entries to Employee table...
New size of Employee table: 271; elapsed insert time: 197 ms.
Demonstrating batched join database insert operations...
Adding 16 entries to Employee table...
Selecting items from Employee table...
Received a batch...
Last=Jones First=Tom ID=21
Last=Jones First=John ID=24
Last=Smith First=Tom ID=26
Last=Jones First=Tom ID=21
There were 4 errors in this batch...
Received a batch...
Last=Smith First=Tom ID=26
Last=Jones First=Mike ID=28
There were 0 errors in this batch...
Finished. There were 4 total errors.
*/
Imports System.Collections.Generic
Imports System.Data.SqlServerCe
Imports System.Diagnostics
Imports System.IO
Imports System.Threading.Tasks.Dataflow
' Demonstrates how to use batched dataflow blocks to improve
' the performance of database operations.
Namespace DataflowBatchDatabase
Friend Class Program
' The number of employees to add to the database.
' TODO: Change this value to experiment with different numbers of
' employees to insert into the database.
Private Shared ReadOnly insertCount As Integer = 256
' The size of a single batch of employees to add to the database.
' TODO: Change this value to experiment with different batch sizes.
Private Shared ReadOnly insertBatchSize As Integer = 96
' The source database file.
' TODO: Change this value if Northwind.sdf is at a different location
' on your computer.
Private Shared ReadOnly sourceDatabase As String = "C:\Program Files\Microsoft SQL Server Compact Edition\v3.5\Samples\Northwind.sdf"
' TODO: Change this value if you require a different temporary location.
Private Shared ReadOnly scratchDatabase As String = "C:\Temp\Northwind.sdf"
' Describes an employee. Each property maps to a
' column in the Employees table in the Northwind database.
' For brevity, the Employee class does not contain
' all columns from the Employees table.
Private Class Employee
Public Property EmployeeID() As Integer
Public Property LastName() As String
Public Property FirstName() As String
' A random number generator that helps tp generate
' Employee property values.
Private Shared rand As New Random(42)
' Possible random first names.
Private Shared ReadOnly firstNames() As String = {"Tom", "Mike", "Ruth", "Bob", "John"}
' Possible random last names.
Private Shared ReadOnly lastNames() As String = {"Jones", "Smith", "Johnson", "Walker"}
' Creates an Employee object that contains random
' property values.
Public Shared Function Random() As Employee
Return New Employee With {.EmployeeID = -1, .LastName = lastNames(rand.Next() Mod lastNames.Length), .FirstName = firstNames(rand.Next() Mod firstNames.Length)}
End Function
End Class
' Adds new employee records to the database.
Private Shared Sub InsertEmployees(ByVal employees() As Employee, ByVal connectionString As String)
Using connection As New SqlCeConnection(connectionString)
Try
' Create the SQL command.
Dim command As New SqlCeCommand("INSERT INTO Employees ([Last Name], [First Name])" & "VALUES (@lastName, @firstName)", connection)
connection.Open()
For i As Integer = 0 To employees.Length - 1
' Set parameters.
command.Parameters.Clear()
command.Parameters.Add("@lastName", employees(i).LastName)
command.Parameters.Add("@firstName", employees(i).FirstName)
' Execute the command.
command.ExecuteNonQuery()
Next i
Finally
connection.Close()
End Try
End Using
End Sub
' Retrieves the number of entries in the Employees table in
' the Northwind database.
Private Shared Function GetEmployeeCount(ByVal connectionString As String) As Integer
Dim result As Integer = 0
Using sqlConnection As New SqlCeConnection(connectionString)
Dim sqlCommand As New SqlCeCommand("SELECT COUNT(*) FROM Employees", sqlConnection)
sqlConnection.Open()
Try
result = CInt(Fix(sqlCommand.ExecuteScalar()))
Finally
sqlConnection.Close()
End Try
End Using
Return result
End Function
' Retrieves the ID of the first employee that has the provided name.
Private Shared Function GetEmployeeID(ByVal lastName As String, ByVal firstName As String, ByVal connectionString As String) As Integer
Using connection As New SqlCeConnection(connectionString)
Dim command As New SqlCeCommand(String.Format("SELECT [Employee ID] FROM Employees " & "WHERE [Last Name] = '{0}' AND [First Name] = '{1}'", lastName, firstName), connection)
connection.Open()
Try
Return CInt(Fix(command.ExecuteScalar()))
Finally
connection.Close()
End Try
End Using
End Function
' Posts random Employee data to the provided target block.
Private Shared Sub PostRandomEmployees(ByVal target As ITargetBlock(Of Employee), ByVal count As Integer)
Console.WriteLine("Adding {0} entries to Employee table...", count)
For i As Integer = 0 To count - 1
target.Post(Employee.Random())
Next i
End Sub
' Adds random employee data to the database by using dataflow.
Private Shared Sub AddEmployees(ByVal connectionString As String, ByVal count As Integer)
' Create an ActionBlock<Employee> object that adds a single
' employee entry to the database.
Dim insertEmployee = New ActionBlock(Of Employee)(Sub(e) InsertEmployees(New Employee() {e}, connectionString))
' Post several random Employee objects to the dataflow block.
PostRandomEmployees(insertEmployee, count)
' Set the dataflow block to the completed state and wait for
' all insert operations to complete.
insertEmployee.Complete()
insertEmployee.Completion.Wait()
End Sub
' Adds random employee data to the database by using dataflow.
' This method is similar to AddEmployees except that it uses batching
' to add multiple employees to the database at a time.
Private Shared Sub AddEmployeesBatched(ByVal connectionString As String, ByVal batchSize As Integer, ByVal count As Integer)
' Create a BatchBlock<Employee> that holds several Employee objects and
' then propagates them out as an array.
Dim batchEmployees = New BatchBlock(Of Employee)(batchSize)
' Create an ActionBlock<Employee[]> object that adds multiple
' employee entries to the database.
Dim insertEmployees = New ActionBlock(Of Employee())(Sub(a) Program.InsertEmployees(a, connectionString))
' Link the batch block to the action block.
batchEmployees.LinkTo(insertEmployees)
' When the batch block completes, set the action block also to complete.
batchEmployees.Completion.ContinueWith(Sub() insertEmployees.Complete())
' Post several random Employee objects to the batch block.
PostRandomEmployees(batchEmployees, count)
' Set the batch block to the completed state and wait for
' all insert operations to complete.
batchEmployees.Complete()
insertEmployees.Completion.Wait()
End Sub
' Displays information about several random employees to the console.
Private Shared Sub GetRandomEmployees(ByVal connectionString As String, ByVal batchSize As Integer, ByVal count As Integer)
' Create a BatchedJoinBlock<Employee, Exception> object that holds
' both employee and exception data.
Dim selectEmployees = New BatchedJoinBlock(Of Employee, Exception)(batchSize)
' Holds the total number of exceptions that occurred.
Dim totalErrors As Integer = 0
' Create an action block that prints employee and error information
' to the console.
Dim printEmployees = New ActionBlock(Of Tuple(Of IList(Of Employee), IList(Of Exception)))(Sub(data)
' Print information about the employees in this batch.
' Print the error count for this batch.
' Update total error count.
Console.WriteLine("Received a batch...")
For Each e As Employee In data.Item1
Console.WriteLine("Last={0} First={1} ID={2}", e.LastName, e.FirstName, e.EmployeeID)
Next e
Console.WriteLine("There were {0} errors in this batch...", data.Item2.Count)
totalErrors += data.Item2.Count
End Sub)
' Link the batched join block to the action block.
selectEmployees.LinkTo(printEmployees)
' When the batched join block completes, set the action block also to complete.
selectEmployees.Completion.ContinueWith(Sub() printEmployees.Complete())
' Try to retrieve the ID for several random employees.
Console.WriteLine("Selecting random entries from Employees table...")
For i As Integer = 0 To count - 1
Try
' Create a random employee.
Dim e As Employee = Employee.Random()
' Try to retrieve the ID for the employee from the database.
e.EmployeeID = GetEmployeeID(e.LastName, e.FirstName, connectionString)
' Post the Employee object to the Employee target of
' the batched join block.
selectEmployees.Target1.Post(e)
Catch e As NullReferenceException
' GetEmployeeID throws NullReferenceException when there is
' no such employee with the given name. When this happens,
' post the Exception object to the Exception target of
' the batched join block.
selectEmployees.Target2.Post(e)
End Try
Next i
' Set the batched join block to the completed state and wait for
' all retrieval operations to complete.
selectEmployees.Complete()
printEmployees.Completion.Wait()
' Print the total error count.
Console.WriteLine("Finished. There were {0} total errors.", totalErrors)
End Sub
Shared Sub Main(ByVal args() As String)
' Create a connection string for accessing the database.
' The connection string refers to the temporary database location.
Dim connectionString As String = String.Format("Data Source={0}", scratchDatabase)
' Create a Stopwatch object to time database insert operations.
Dim stopwatch As New Stopwatch()
' Start with a clean database file by copying the source database to
' the temporary location.
File.Copy(sourceDatabase, scratchDatabase, True)
' Demonstrate multiple insert operations without batching.
Console.WriteLine("Demonstrating non-batched database insert operations...")
Console.WriteLine("Original size of Employee table: {0}.", GetEmployeeCount(connectionString))
stopwatch.Start()
AddEmployees(connectionString, insertCount)
stopwatch.Stop()
Console.WriteLine("New size of Employee table: {0}; elapsed insert time: {1} ms.", GetEmployeeCount(connectionString), stopwatch.ElapsedMilliseconds)
Console.WriteLine()
' Start again with a clean database file.
File.Copy(sourceDatabase, scratchDatabase, True)
' Demonstrate multiple insert operations, this time with batching.
Console.WriteLine("Demonstrating batched database insert operations...")
Console.WriteLine("Original size of Employee table: {0}.", GetEmployeeCount(connectionString))
stopwatch.Restart()
AddEmployeesBatched(connectionString, insertBatchSize, insertCount)
stopwatch.Stop()
Console.WriteLine("New size of Employee table: {0}; elapsed insert time: {1} ms.", GetEmployeeCount(connectionString), stopwatch.ElapsedMilliseconds)
Console.WriteLine()
' Start again with a clean database file.
File.Copy(sourceDatabase, scratchDatabase, True)
' Demonstrate multiple retrieval operations with error reporting.
Console.WriteLine("Demonstrating batched join database select operations...")
' Add a small number of employees to the database.
AddEmployeesBatched(connectionString, insertBatchSize, 16)
' Query for random employees.
GetRandomEmployees(connectionString, insertBatchSize, 10)
End Sub
End Class
End Namespace
' Sample output:
'Demonstrating non-batched database insert operations...
'Original size of Employee table: 15.
'Adding 256 entries to Employee table...
'New size of Employee table: 271; elapsed insert time: 11035 ms.
'
'Demonstrating batched database insert operations...
'Original size of Employee table: 15.
'Adding 256 entries to Employee table...
'New size of Employee table: 271; elapsed insert time: 197 ms.
'
'Demonstrating batched join database insert operations...
'Adding 16 entries to Employee table...
'Selecting items from Employee table...
'Received a batch...
'Last=Jones First=Tom ID=21
'Last=Jones First=John ID=24
'Last=Smith First=Tom ID=26
'Last=Jones First=Tom ID=21
'There were 4 errors in this batch...
'Received a batch...
'Last=Smith First=Tom ID=26
'Last=Jones First=Mike ID=28
'There were 0 errors in this batch...
'Finished. There were 4 total errors.
'