Asynchrone Datei-E/A
Synchrone E/A bedeutet, dass die Methode solange blockiert ist, bis der E/A-Vorgang vollständig durchgeführt worden ist. Danach kehrt die Methode zu den entsprechenden Daten zurück. Bei asynchroner E/A kann der Benutzer BeginRead aufrufen. Der Hauptthread kann mit der Bewältigung anderer Aufgaben fortfahren. Anschließend kann der Benutzer die Daten weiterverarbeiten. Verschiedene E/A-Anforderungen können auch gleichzeitig bearbeitet werden.
Damit Sie wissen, wann diese Daten verfügbar sind, können Sie EndRead oder EndWrite aufrufen. Dabei wird IAsyncResult entsprechend der von Ihnen herausgegebenen E/A-Anforderung übergeben. Sie können auch eine Rückrufmethode bereitstellen, die EndRead oder EndWrite aufruft, um die Anzahl der gelesenen oder geschriebenen Bytes zu ermitteln. Wenn mehrere E/A-Anforderungen gleichzeitig bearbeitet werden, kann mit asynchroner E/A eine bessere Leistung erzielt werden. In der Regel muss jedoch die Anwendung in einem gewissen Umfang neu strukturiert werden, um eine korrekte Ausführung zu gewährleisten.
Die Stream-Klasse unterstützt die gemeinsame Ausführung von synchronen und asynchronen Lese- und Schreibvorgängen auf demselben Stream, und zwar unabhängig davon, ob das Betriebssystem dies zulässt. Stream stellt Standardimplementierungen asynchroner Lese- und Schreibvorgänge anhand deren synchroner Implementierungen sowie Standardimplementierungen synchroner Lese- und Schreibvorgänge anhand deren asynchroner Implementierungen bereit.
Wenn eine von Stream abgeleitete Klasse implementiert wird, muss entweder eine Implementierung für die synchrone Read-Methode und die synchrone Write-Methode oder für die entsprechenden asynchronen Methoden bereitgestellt werden. Zwar ist es zulässig, Read und Write zu überschreiben, wobei die Standardimplementierungen der asynchronen Methoden (BeginRead, EndRead, BeginWrite und EndWrite) mit den Implementierungen der synchronen Methoden eingesetzt werden können, doch wird dadurch keine optimale Leistung erzielt. Entsprechend werden auch die synchrone Read-Methode und die synchroneWrite-Methode korrekt ausgeführt, wenn eine Implementierung der asynchronen Methoden bereitgestellt wurde. Die Leistung ist jedoch im Allgemeinen besser, wenn speziell die synchronen Methoden implementiert werden. Die Standardimplementierungen von ReadByte und WriteByte rufen die synchrone Read-Methode und die synchroneWrite-Methode mit einem Bytearray auf, das aus einem Element besteht. Wenn Sie Klassen von Stream ableiten und über einen internen Bytepuffer verfügen, sollten Sie diese Methoden unbedingt überschreiben, damit durch Zugriff auf den internen Puffer eine bessere Leistung erzielt werden kann.
Ein Stream, der eine Verbindung zu einem Sicherungsspeicher herstellt, überschreibt entweder die synchronen oder die asynchronen Read-Methoden und Write-Methoden und erhält dadurch standardmäßig die Funktionalität der jeweils gültigen Methoden. Wenn ein Stream keine asynchronen oder synchronen Operationen unterstützt, muss das Implementierungsprogramm die entsprechenden Methoden lediglich dazu veranlassen, Ausnahmen auszulösen.
Im folgenden Beispiel wird eine asynchrone Implementierung eines hypothetischen Prozessors zur Verarbeitung umfangreicher Grafikdaten gezeigt. Danach folgt ein entsprechendes Beispiel für eine synchrone Implementierung. Dieser Code eignet sich für rechenintensive Operationen, die für jede Datei eines Verzeichnisses durchgeführt werden. Weitere Informationen finden Sie unter dem Thema Entwurfsmuster für die asynchrone Programmierung.
Imports System
Imports System.IO
Imports System.Threading
Imports System.Runtime.InteropServices
Imports System.Runtime.Remoting.Messaging
Imports System.Security.Permissions
Module BulkImageProcAsync
Dim ImageBaseName As String = "tmpImage-"
Dim numImages As Integer = 200
Dim numPixels As Integer = 512 * 512
' ProcessImage has a simple O(N) loop, and you can vary the number
' of times you repeat that loop to make the application more CPU-
' bound or more IO-bound.
Dim processImageRepeats As Integer = 20
' Threads must decrement NumImagesToFinish, and protect
' their access to it through a mutex.
Dim NumImagesToFinish As Integer = numImages
Dim NumImagesMutex(-1) As [Object]
' WaitObject is signalled when all image processing is done.
Dim WaitObject(-1) As [Object]
Structure ImageStateObject
Public pixels() As Byte
Public imageNum As Integer
Public fs As FileStream
End Structure
<SecurityPermissionAttribute(SecurityAction.Demand, Flags:=SecurityPermissionFlag.UnmanagedCode)> _
Sub MakeImageFiles()
Dim sides As Integer = Fix(Math.Sqrt(numPixels))
Console.Write("Making {0} {1}x{1} images... ", numImages, sides)
Dim pixels(numPixels) As Byte
Dim i As Integer
For i = 0 To numPixels
pixels(i) = 255
Next i
Dim fs As FileStream
For i = 0 To numImages
fs = New FileStream(ImageBaseName + i.ToString() + ".tmp", FileMode.Create, FileAccess.Write, FileShare.None, 8192, False)
fs.Write(pixels, 0, pixels.Length)
FlushFileBuffers(fs.SafeFileHandle.DangerousGetHandle())
fs.Close()
Next i
fs = Nothing
Console.WriteLine("Done.")
End Sub
Sub ReadInImageCallback(ByVal asyncResult As IAsyncResult)
Dim state As ImageStateObject = CType(asyncResult.AsyncState, ImageStateObject)
Dim stream As Stream = state.fs
Dim bytesRead As Integer = stream.EndRead(asyncResult)
If bytesRead <> numPixels Then
Throw New Exception(String.Format("In ReadInImageCallback, got the wrong number of " + "bytes from the image: {0}.", bytesRead))
End If
ProcessImage(state.pixels, state.imageNum)
stream.Close()
' Now write out the image.
' Using asynchronous I/O here appears not to be best practice.
' It ends up swamping the threadpool, because the threadpool
' threads are blocked on I/O requests that were just queued to
' the threadpool.
Dim fs As New FileStream(ImageBaseName + state.imageNum.ToString() + ".done", FileMode.Create, FileAccess.Write, FileShare.None, 4096, False)
fs.Write(state.pixels, 0, numPixels)
fs.Close()
' This application model uses too much memory.
' Releasing memory as soon as possible is a good idea,
' especially global state.
state.pixels = Nothing
fs = Nothing
' Record that an image is finished now.
SyncLock NumImagesMutex
NumImagesToFinish -= 1
If NumImagesToFinish = 0 Then
Monitor.Enter(WaitObject)
Monitor.Pulse(WaitObject)
Monitor.Exit(WaitObject)
End If
End SyncLock
End Sub
Sub ProcessImage(ByVal pixels() As Byte, ByVal imageNum As Integer)
Console.WriteLine("ProcessImage {0}", imageNum)
Dim y As Integer
' Perform some CPU-intensive operation on the image.
Dim x As Integer
For x = 0 To processImageRepeats
For y = 0 To numPixels
pixels(y) = 1
Next y
Next x
Console.WriteLine("ProcessImage {0} done.", imageNum)
End Sub
Sub ProcessImagesInBulk()
Console.WriteLine("Processing images... ")
Dim t0 As Long = Environment.TickCount
NumImagesToFinish = numImages
Dim readImageCallback As New AsyncCallback(AddressOf ReadInImageCallback)
Dim i As Integer
For i = 0 To numImages
Dim state As New ImageStateObject()
state.pixels = New Byte(numPixels) {}
state.imageNum = i
' Very large items are read only once, so you can make the
' buffer on the FileStream very small to save memory.
Dim fs As New FileStream(ImageBaseName + i.ToString() + ".tmp", FileMode.Open, FileAccess.Read, FileShare.Read, 1, True)
state.fs = fs
fs.BeginRead(state.pixels, 0, numPixels, readImageCallback, state)
Next i
' Determine whether all images are done being processed.
' If not, block until all are finished.
Dim mustBlock As Boolean = False
SyncLock NumImagesMutex
If NumImagesToFinish > 0 Then
mustBlock = True
End If
End SyncLock
If mustBlock Then
Console.WriteLine("All worker threads are queued. " + " Blocking until they complete. numLeft: {0}", NumImagesToFinish)
Monitor.Enter(WaitObject)
Monitor.Wait(WaitObject)
Monitor.Exit(WaitObject)
End If
Dim t1 As Long = Environment.TickCount
Console.WriteLine("Total time processing images: {0}ms", t1 - t0)
End Sub
Sub Cleanup()
Dim i As Integer
For i = 0 To numImages
File.Delete(ImageBaseName + i.ToString + ".tmp")
File.Delete(ImageBaseName + i.ToString + ".done")
Next i
End Sub
Sub TryToClearDiskCache()
' Try to force all pending writes to disk, and clear the
' disk cache of any data.
Dim bytes(100 * (1 << 20)) As Byte
Dim i As Integer
For i = 0 To bytes.Length - 1
bytes(i) = 0
Next i
bytes = Nothing
GC.Collect()
Thread.Sleep(2000)
End Sub
Sub Main(ByVal args() As String)
Console.WriteLine("Bulk image processing sample application," + " using asynchronous IO")
Console.WriteLine("Simulates applying a simple " + "transformation to {0} ""images""", numImages)
Console.WriteLine("(Async FileStream & Threadpool benchmark)")
Console.WriteLine("Warning - this test requires {0} " + "bytes of temporary space", numPixels * numImages * 2)
If args.Length = 1 Then
processImageRepeats = Int32.Parse(args(0))
Console.WriteLine("ProcessImage inner loop - {0}.", processImageRepeats)
End If
MakeImageFiles()
TryToClearDiskCache()
ProcessImagesInBulk()
Cleanup()
End Sub
<DllImport("KERNEL32", SetLastError:=True)> _
Sub FlushFileBuffers(ByVal handle As IntPtr)
End Sub
End Module
using System;
using System.IO;
using System.Threading;
using System.Runtime.InteropServices;
using System.Runtime.Remoting.Messaging;
using System.Security.Permissions;
public class BulkImageProcAsync
{
public const String ImageBaseName = "tmpImage-";
public const int numImages = 200;
public const int numPixels = 512 * 512;
// ProcessImage has a simple O(N) loop, and you can vary the number
// of times you repeat that loop to make the application more CPU-
// bound or more IO-bound.
public static int processImageRepeats = 20;
// Threads must decrement NumImagesToFinish, and protect
// their access to it through a mutex.
public static int NumImagesToFinish = numImages;
public static Object[] NumImagesMutex = new Object[0];
// WaitObject is signalled when all image processing is done.
public static Object[] WaitObject = new Object[0];
public class ImageStateObject
{
public byte[] pixels;
public int imageNum;
public FileStream fs;
}
[SecurityPermissionAttribute(SecurityAction.Demand, Flags=SecurityPermissionFlag.UnmanagedCode)]
public static void MakeImageFiles()
{
int sides = (int)Math.Sqrt(numPixels);
Console.Write("Making {0} {1}x{1} images... ", numImages,
sides);
byte[] pixels = new byte[numPixels];
int i;
for (i = 0; i < numPixels; i++)
pixels[i] = (byte)i;
FileStream fs;
for (i = 0; i < numImages; i++)
{
fs = new FileStream(ImageBaseName + i + ".tmp",
FileMode.Create, FileAccess.Write, FileShare.None,
8192, false);
fs.Write(pixels, 0, pixels.Length);
FlushFileBuffers(fs.SafeFileHandle.DangerousGetHandle());
fs.Close();
}
fs = null;
Console.WriteLine("Done.");
}
public static void ReadInImageCallback(IAsyncResult asyncResult)
{
ImageStateObject state = (ImageStateObject)asyncResult.AsyncState;
Stream stream = state.fs;
int bytesRead = stream.EndRead(asyncResult);
if (bytesRead != numPixels)
throw new Exception(String.Format
("In ReadInImageCallback, got the wrong number of " +
"bytes from the image: {0}.", bytesRead));
ProcessImage(state.pixels, state.imageNum);
stream.Close();
// Now write out the image.
// Using asynchronous I/O here appears not to be best practice.
// It ends up swamping the threadpool, because the threadpool
// threads are blocked on I/O requests that were just queued to
// the threadpool.
FileStream fs = new FileStream(ImageBaseName + state.imageNum +
".done", FileMode.Create, FileAccess.Write, FileShare.None,
4096, false);
fs.Write(state.pixels, 0, numPixels);
fs.Close();
// This application model uses too much memory.
// Releasing memory as soon as possible is a good idea,
// especially global state.
state.pixels = null;
fs = null;
// Record that an image is finished now.
lock (NumImagesMutex)
{
NumImagesToFinish--;
if (NumImagesToFinish == 0)
{
Monitor.Enter(WaitObject);
Monitor.Pulse(WaitObject);
Monitor.Exit(WaitObject);
}
}
}
public static void ProcessImage(byte[] pixels, int imageNum)
{
Console.WriteLine("ProcessImage {0}", imageNum);
int y;
// Perform some CPU-intensive operation on the image.
for (int x = 0; x < processImageRepeats; x += 1)
for (y = 0; y < numPixels; y += 1)
pixels[y] += 1;
Console.WriteLine("ProcessImage {0} done.", imageNum);
}
public static void ProcessImagesInBulk()
{
Console.WriteLine("Processing images... ");
long t0 = Environment.TickCount;
NumImagesToFinish = numImages;
AsyncCallback readImageCallback = new
AsyncCallback(ReadInImageCallback);
for (int i = 0; i < numImages; i++)
{
ImageStateObject state = new ImageStateObject();
state.pixels = new byte[numPixels];
state.imageNum = i;
// Very large items are read only once, so you can make the
// buffer on the FileStream very small to save memory.
FileStream fs = new FileStream(ImageBaseName + i + ".tmp",
FileMode.Open, FileAccess.Read, FileShare.Read, 1, true);
state.fs = fs;
fs.BeginRead(state.pixels, 0, numPixels, readImageCallback,
state);
}
// Determine whether all images are done being processed.
// If not, block until all are finished.
bool mustBlock = false;
lock (NumImagesMutex)
{
if (NumImagesToFinish > 0)
mustBlock = true;
}
if (mustBlock)
{
Console.WriteLine("All worker threads are queued. " +
" Blocking until they complete. numLeft: {0}",
NumImagesToFinish);
Monitor.Enter(WaitObject);
Monitor.Wait(WaitObject);
Monitor.Exit(WaitObject);
}
long t1 = Environment.TickCount;
Console.WriteLine("Total time processing images: {0}ms",
(t1 - t0));
}
public static void Cleanup()
{
for (int i = 0; i < numImages; i++)
{
File.Delete(ImageBaseName + i + ".tmp");
File.Delete(ImageBaseName + i + ".done");
}
}
public static void TryToClearDiskCache()
{
// Try to force all pending writes to disk, and clear the
// disk cache of any data.
byte[] bytes = new byte[100 * (1 << 20)];
for (int i = 0; i < bytes.Length; i++)
bytes[i] = 0;
bytes = null;
GC.Collect();
Thread.Sleep(2000);
}
public static void Main(String[] args)
{
Console.WriteLine("Bulk image processing sample application," +
" using asynchronous IO");
Console.WriteLine("Simulates applying a simple " +
"transformation to {0} \"images\"", numImages);
Console.WriteLine("(Async FileStream & Threadpool benchmark)");
Console.WriteLine("Warning - this test requires {0} " +
"bytes of temporary space", (numPixels * numImages * 2));
if (args.Length == 1)
{
processImageRepeats = Int32.Parse(args[0]);
Console.WriteLine("ProcessImage inner loop - {0}.",
processImageRepeats);
}
MakeImageFiles();
TryToClearDiskCache();
ProcessImagesInBulk();
Cleanup();
}
[DllImport("KERNEL32", SetLastError = true)]
private static extern void FlushFileBuffers(IntPtr handle);
}
Im Folgenden wird ein ähnliches Beispiel für die synchrone Implementierung gegeben.
Imports System
Imports System.IO
Imports System.Threading
Imports System.Runtime.InteropServices
Imports System.Runtime.Remoting.Messaging
Imports System.Security.Permissions
Module BulkImageProcSync
Dim ImageBaseName As String = "tmpImage-"
Dim numImages As Integer = 200
Dim numPixels As Integer = 512 * 512
' ProcessImage has a simple O(N) loop, and you can vary the number
' of times you repeat that loop to make the application more CPU-
' bound or more IO-bound.
Dim processImageRepeats As Integer = 20
<SecurityPermissionAttribute(SecurityAction.Demand, Flags:=SecurityPermissionFlag.UnmanagedCode)> _
Sub MakeImageFiles()
Dim sides As Integer = Fix(Math.Sqrt(numPixels))
Console.Write("Making {0} {1}x{1} images... ", numImages, sides)
Dim pixels(numPixels) As Byte
Dim i As Integer
For i = 0 To numPixels
pixels(i) = 255
Next i
Dim fs As FileStream
For i = 0 To numImages
fs = New FileStream(ImageBaseName + i.ToString + ".tmp", FileMode.Create, FileAccess.Write, FileShare.None, 8192, False)
fs.Write(pixels, 0, pixels.Length)
FlushFileBuffers(fs.SafeFileHandle.DangerousGetHandle())
fs.Close()
Next i
fs = Nothing
Console.WriteLine("Done.")
End Sub
Sub ProcessImage(ByVal pixels() As Byte, ByVal imageNum As Integer)
Console.WriteLine("ProcessImage {0}", imageNum)
Dim y As Integer
' Perform some CPU-intensive operation on the image.
Dim x As Integer
For x = 0 To processImageRepeats
For y = 0 To numPixels
pixels(y) = 1
Next y
Next x
Console.WriteLine("ProcessImage {0} done.", imageNum)
End Sub
Sub ProcessImagesInBulk()
Console.WriteLine("Processing images... ")
Dim t0 As Long = Environment.TickCount
Dim pixels(numPixels) As Byte
Dim input As FileStream
Dim output As FileStream
Dim i As Integer
For i = 0 To numImages
input = New FileStream(ImageBaseName + i.ToString + ".tmp", FileMode.Open, FileAccess.Read, FileShare.Read, 4196, False)
input.Read(pixels, 0, numPixels)
input.Close()
ProcessImage(pixels, i)
output = New FileStream(ImageBaseName + i.ToString + ".done", FileMode.Create, FileAccess.Write, FileShare.None, 4196, False)
output.Write(pixels, 0, numPixels)
output.Close()
Next i
input = Nothing
output = Nothing
Dim t1 As Long = Environment.TickCount
Console.WriteLine("Total time processing images: {0}ms", t1 - t0)
End Sub
Sub Cleanup()
Dim i As Integer
For i = 0 To numImages
File.Delete(ImageBaseName + i.ToString + ".tmp")
File.Delete(ImageBaseName + i.ToString + ".done")
Next i
End Sub
Sub TryToClearDiskCache()
Dim bytes(100 * (1 << 20)) As Byte
Dim i As Integer
For i = 0 To bytes.Length - 1
bytes(i) = 0
Next i
bytes = Nothing
GC.Collect()
Thread.Sleep(2000)
End Sub
Sub Main(ByVal args() As String)
Console.WriteLine("Bulk image processing sample application," + " using synchronous I/O.")
Console.WriteLine("Simulates applying a simple " + "transformation to {0} ""images.""", numImages)
Console.WriteLine("(ie, Sync FileStream benchmark).")
Console.WriteLine("Warning - this test requires {0} " + "bytes of temporary space", numPixels * numImages * 2)
If args.Length = 1 Then
processImageRepeats = Int32.Parse(args(0))
Console.WriteLine("ProcessImage inner loop {0}", processImageRepeats)
End If
MakeImageFiles()
TryToClearDiskCache()
ProcessImagesInBulk()
Cleanup()
End Sub
<DllImport("KERNEL32", SetLastError:=True)> _
Sub FlushFileBuffers(ByVal handle As IntPtr)
End Sub
End Module
using System;
using System.IO;
using System.Threading;
using System.Runtime.InteropServices;
using System.Runtime.Remoting.Messaging;
using System.Security.Permissions;
public class BulkImageProcSync
{
public const String ImageBaseName = "tmpImage-";
public const int numImages = 200;
public const int numPixels = 512 * 512;
// ProcessImage has a simple O(N) loop, and you can vary the number
// of times you repeat that loop to make the application more CPU-
// bound or more IO-bound.
public static int processImageRepeats = 20;
[SecurityPermissionAttribute(SecurityAction.Demand, Flags=SecurityPermissionFlag.UnmanagedCode)]
public static void MakeImageFiles()
{
int sides = (int)Math.Sqrt(numPixels);
Console.Write("Making {0} {1}x{1} images... ", numImages,
sides);
byte[] pixels = new byte[numPixels];
int i;
for (i = 0; i < numPixels; i++)
pixels[i] = (byte)i;
FileStream fs;
for (i = 0; i < numImages; i++)
{
fs = new FileStream(ImageBaseName + i + ".tmp",
FileMode.Create, FileAccess.Write, FileShare.None,
8192, false);
fs.Write(pixels, 0, pixels.Length);
FlushFileBuffers(fs.SafeFileHandle.DangerousGetHandle());
fs.Close();
}
fs = null;
Console.WriteLine("Done.");
}
public static void ProcessImage(byte[] pixels, int imageNum)
{
Console.WriteLine("ProcessImage {0}", imageNum);
int y;
// Perform some CPU-intensive operation on the image.
for (int x = 0; x < processImageRepeats; x += 1)
for (y = 0; y < numPixels; y += 1)
pixels[y] += 1;
Console.WriteLine("ProcessImage {0} done.", imageNum);
}
public static void ProcessImagesInBulk()
{
Console.WriteLine("Processing images... ");
long t0 = Environment.TickCount;
byte[] pixels = new byte[numPixels];
FileStream input;
FileStream output;
for (int i = 0; i < numImages; i++)
{
input = new FileStream(ImageBaseName + i + ".tmp",
FileMode.Open, FileAccess.Read, FileShare.Read,
4196, false);
input.Read(pixels, 0, numPixels);
input.Close();
ProcessImage(pixels, i);
output = new FileStream(ImageBaseName + i + ".done",
FileMode.Create, FileAccess.Write, FileShare.None,
4196, false);
output.Write(pixels, 0, numPixels);
output.Close();
}
input = null;
output = null;
long t1 = Environment.TickCount;
Console.WriteLine("Total time processing images: {0}ms",
(t1 - t0));
}
public static void Cleanup()
{
for (int i = 0; i < numImages; i++)
{
File.Delete(ImageBaseName + i + ".tmp");
File.Delete(ImageBaseName + i + ".done");
}
}
public static void TryToClearDiskCache()
{
byte[] bytes = new byte[100 * (1 << 20)];
for (int i = 0; i < bytes.Length; i++)
bytes[i] = 0;
bytes = null;
GC.Collect();
Thread.Sleep(2000);
}
public static void Main(String[] args)
{
Console.WriteLine("Bulk image processing sample application," +
" using synchronous I/O.");
Console.WriteLine("Simulates applying a simple " +
"transformation to {0} \"images.\"", numImages);
Console.WriteLine("(ie, Sync FileStream benchmark).");
Console.WriteLine("Warning - this test requires {0} " +
"bytes of temporary space", (numPixels * numImages * 2));
if (args.Length == 1)
{
processImageRepeats = Int32.Parse(args[0]);
Console.WriteLine("ProcessImage inner loop {0}",
processImageRepeats);
}
MakeImageFiles();
TryToClearDiskCache();
ProcessImagesInBulk();
Cleanup();
}
[DllImport("KERNEL32", SetLastError = true)]
private static extern void FlushFileBuffers(IntPtr handle);
}
Siehe auch
Referenz
Stream
Stream.Read
Stream.Write
Stream.BeginRead
Stream.BeginWrite
Stream.EndRead
Stream.EndWrite
IAsyncResult
Mutex