非同期ファイル I/O
同期 I/O では、メソッドは I/O 操作が完了するまでブロックされ、I/O 操作の完了後にデータを返します。 非同期 I/O については、BeginRead を呼び出すことができます。 メイン スレッドで他の処理を継続でき、後からデータを処理できます。 また、同時に複数の I/O 要求を保留できます。
このデータが利用可能になった時点を把握するには、発行した I/O 要求に対応する IAsyncResult を渡して EndRead または EndWrite を呼び出します。 また、読み取ったバイト数や書き込んだバイト数を算出するには、EndRead または EndWrite を呼び出すコールバック メソッドを用意します。 非同期 I/O は、同時に多数の I/O 要求を保留できるためバフォーマンスを向上させますが、一般に、アプリケーションを大幅に再構築しないと正しく機能しません。
Stream クラスでは、同じストリーム内での同期および非同期の読み書き操作の混合をサポートします。これは、オペレーティング システムがこれを許可していない場合でも同様です。 Stream は、同期の読み書き操作の実装に対しては既定の非同期操作の実装を提供し、非同期の読み書き操作の実装に対しては既定の同期操作の実装を提供します。
Stream の派生クラスを実装するときは、同期または非同期のどちらかの Read メソッドおよび Write メソッドを実装する必要があります。 Read と Write のオーバーライドは許可されており、非同期メソッド (BeginRead、EndRead、BeginWrite、および EndWrite) の既定の実装を同期メソッドの実装と併用できますが、この方法ではあまり効率的なパフォーマンスは得られません。 同様に、同期の Read メソッドと Write メソッドは非同期のメソッドを実装した場合でも正しく機能しますが、同期メソッドに特定して実装した場合の方が一般にパフォーマンスは優れています。 ReadByte および WriteByte の既定の実装は、単一の要素で構成されるバイト配列に対する同期の Read メソッドと Write メソッドを呼び出します。 Stream からクラスを派生させるとき、内部バイト バッファーがある場合は、パフォーマンス向上のため、これらのメソッドを内部バッファーにアクセスするようにオーバーライドすることをお勧めします。
バッキング ストアに関連付けられているストリームは、同期または非同期のいずれかの Read メソッドおよび Write メソッドをオーバーライドし、既定によりもう一方の機能を実装します。 ストリームが非同期操作または同期操作をサポートしていない場合は、該当するメソッドが例外をスローするようにする必要があります。
仮想のバルク イメージ プロセッサで非同期操作を実装する例を次に示します。その後で、同期操作の実装例を示します。 このコードは、ディレクトリのすべてのファイルに対して CPU 集中型の操作を実行するデザインになっています。 詳細については、「非同期プログラミングのデザイン パターン」を参照してください。
Imports System
Imports System.IO
Imports System.Threading
Imports System.Runtime.InteropServices
Imports System.Runtime.Remoting.Messaging
Imports System.Security.Permissions
Imports Microsoft.Win32.SafeHandles
Module BulkImageProcAsync
Dim ImageBaseName As String = "tmpImage-"
Dim numImages As Integer = 200
Dim numPixels As Integer = 512 * 512
' ProcessImage has a simple O(N) loop, and you can vary the number
' of times you repeat that loop to make the application more CPU-
' bound or more IO-bound.
Dim processImageRepeats As Integer = 20
' Threads must decrement NumImagesToFinish, and protect
' their access to it through a mutex.
Dim NumImagesToFinish As Integer = numImages
Dim NumImagesMutex(-1) As [Object]
' WaitObject is signalled when all image processing is done.
Dim WaitObject(-1) As [Object]
Structure ImageStateObject
Public pixels() As Byte
Public imageNum As Integer
Public fs As FileStream
End Structure
<SecurityPermissionAttribute(SecurityAction.Demand, Flags:=SecurityPermissionFlag.UnmanagedCode)> _
Sub MakeImageFiles()
Dim sides As Integer = Fix(Math.Sqrt(numPixels))
Console.Write("Making {0} {1}x{1} images... ", numImages, sides)
Dim pixels(numPixels) As Byte
Dim i As Integer
For i = 0 To numPixels
pixels(i) = 255
Next i
Dim fs As FileStream
For i = 0 To numImages
fs = New FileStream(ImageBaseName + i.ToString() + ".tmp", FileMode.Create, FileAccess.Write, FileShare.None, 8192, False)
fs.Write(pixels, 0, pixels.Length)
FlushFileBuffers(fs.SafeFileHandle)
fs.Close()
Next i
fs = Nothing
Console.WriteLine("Done.")
End Sub
Sub ReadInImageCallback(ByVal asyncResult As IAsyncResult)
Dim state As ImageStateObject = CType(asyncResult.AsyncState, ImageStateObject)
Dim stream As Stream = state.fs
Dim bytesRead As Integer = stream.EndRead(asyncResult)
If bytesRead <> numPixels Then
Throw New Exception(String.Format("In ReadInImageCallback, got the wrong number of " + "bytes from the image: {0}.", bytesRead))
End If
ProcessImage(state.pixels, state.imageNum)
stream.Close()
' Now write out the image.
' Using asynchronous I/O here appears not to be best practice.
' It ends up swamping the threadpool, because the threadpool
' threads are blocked on I/O requests that were just queued to
' the threadpool.
Dim fs As New FileStream(ImageBaseName + state.imageNum.ToString() + ".done", FileMode.Create, FileAccess.Write, FileShare.None, 4096, False)
fs.Write(state.pixels, 0, numPixels)
fs.Close()
' This application model uses too much memory.
' Releasing memory as soon as possible is a good idea,
' especially global state.
state.pixels = Nothing
fs = Nothing
' Record that an image is finished now.
SyncLock NumImagesMutex
NumImagesToFinish -= 1
If NumImagesToFinish = 0 Then
Monitor.Enter(WaitObject)
Monitor.Pulse(WaitObject)
Monitor.Exit(WaitObject)
End If
End SyncLock
End Sub
Sub ProcessImage(ByVal pixels() As Byte, ByVal imageNum As Integer)
Console.WriteLine("ProcessImage {0}", imageNum)
Dim y As Integer
' Perform some CPU-intensive operation on the image.
Dim x As Integer
For x = 0 To processImageRepeats
For y = 0 To numPixels
pixels(y) = 1
Next y
Next x
Console.WriteLine("ProcessImage {0} done.", imageNum)
End Sub
Sub ProcessImagesInBulk()
Console.WriteLine("Processing images... ")
Dim t0 As Long = Environment.TickCount
NumImagesToFinish = numImages
Dim readImageCallback As New AsyncCallback(AddressOf ReadInImageCallback)
Dim i As Integer
For i = 0 To numImages
Dim state As New ImageStateObject()
state.pixels = New Byte(numPixels) {}
state.imageNum = i
' Very large items are read only once, so you can make the
' buffer on the FileStream very small to save memory.
Dim fs As New FileStream(ImageBaseName + i.ToString() + ".tmp", FileMode.Open, FileAccess.Read, FileShare.Read, 1, True)
state.fs = fs
fs.BeginRead(state.pixels, 0, numPixels, readImageCallback, state)
Next i
' Determine whether all images are done being processed.
' If not, block until all are finished.
Dim mustBlock As Boolean = False
SyncLock NumImagesMutex
If NumImagesToFinish > 0 Then
mustBlock = True
End If
End SyncLock
If mustBlock Then
Console.WriteLine("All worker threads are queued. " + " Blocking until they complete. numLeft: {0}", NumImagesToFinish)
Monitor.Enter(WaitObject)
Monitor.Wait(WaitObject)
Monitor.Exit(WaitObject)
End If
Dim t1 As Long = Environment.TickCount
Console.WriteLine("Total time processing images: {0}ms", t1 - t0)
End Sub
Sub Cleanup()
Dim i As Integer
For i = 0 To numImages
File.Delete(ImageBaseName + i.ToString + ".tmp")
File.Delete(ImageBaseName + i.ToString + ".done")
Next i
End Sub
Sub TryToClearDiskCache()
' Try to force all pending writes to disk, and clear the
' disk cache of any data.
Dim bytes(100 * (1 << 20)) As Byte
Dim i As Integer
For i = 0 To bytes.Length - 1
bytes(i) = 0
Next i
bytes = Nothing
GC.Collect()
Thread.Sleep(2000)
End Sub
Sub Main(ByVal args() As String)
Console.WriteLine("Bulk image processing sample application," + " using asynchronous IO")
Console.WriteLine("Simulates applying a simple " + "transformation to {0} ""images""", numImages)
Console.WriteLine("(Async FileStream & Threadpool benchmark)")
Console.WriteLine("Warning - this test requires {0} " + "bytes of temporary space", numPixels * numImages * 2)
If args.Length = 1 Then
processImageRepeats = Int32.Parse(args(0))
Console.WriteLine("ProcessImage inner loop - {0}.", processImageRepeats)
End If
MakeImageFiles()
TryToClearDiskCache()
ProcessImagesInBulk()
Cleanup()
End Sub
<DllImport("KERNEL32", SetLastError:=True)> _
Sub FlushFileBuffers(ByVal handle As SafeFileHandle)
End Sub
End Module
using System;
using System.IO;
using System.Threading;
using System.Runtime.InteropServices;
using System.Runtime.Remoting.Messaging;
using System.Security.Permissions;
using Microsoft.Win32.SafeHandles;
public class BulkImageProcAsync
{
public const String ImageBaseName = "tmpImage-";
public const int numImages = 200;
public const int numPixels = 512 * 512;
// ProcessImage has a simple O(N) loop, and you can vary the number
// of times you repeat that loop to make the application more CPU-
// bound or more IO-bound.
public static int processImageRepeats = 20;
// Threads must decrement NumImagesToFinish, and protect
// their access to it through a mutex.
public static int NumImagesToFinish = numImages;
public static Object[] NumImagesMutex = new Object[0];
// WaitObject is signalled when all image processing is done.
public static Object[] WaitObject = new Object[0];
public class ImageStateObject
{
public byte[] pixels;
public int imageNum;
public FileStream fs;
}
[SecurityPermissionAttribute(SecurityAction.Demand, Flags=SecurityPermissionFlag.UnmanagedCode)]
public static void MakeImageFiles()
{
int sides = (int)Math.Sqrt(numPixels);
Console.Write("Making {0} {1}x{1} images... ", numImages,
sides);
byte[] pixels = new byte[numPixels];
int i;
for (i = 0; i < numPixels; i++)
pixels[i] = (byte)i;
FileStream fs;
for (i = 0; i < numImages; i++)
{
fs = new FileStream(ImageBaseName + i + ".tmp",
FileMode.Create, FileAccess.Write, FileShare.None,
8192, false);
fs.Write(pixels, 0, pixels.Length);
FlushFileBuffers(fs.SafeFileHandle);
fs.Close();
}
fs = null;
Console.WriteLine("Done.");
}
public static void ReadInImageCallback(IAsyncResult asyncResult)
{
ImageStateObject state = (ImageStateObject)asyncResult.AsyncState;
Stream stream = state.fs;
int bytesRead = stream.EndRead(asyncResult);
if (bytesRead != numPixels)
throw new Exception(String.Format
("In ReadInImageCallback, got the wrong number of " +
"bytes from the image: {0}.", bytesRead));
ProcessImage(state.pixels, state.imageNum);
stream.Close();
// Now write out the image.
// Using asynchronous I/O here appears not to be best practice.
// It ends up swamping the threadpool, because the threadpool
// threads are blocked on I/O requests that were just queued to
// the threadpool.
FileStream fs = new FileStream(ImageBaseName + state.imageNum +
".done", FileMode.Create, FileAccess.Write, FileShare.None,
4096, false);
fs.Write(state.pixels, 0, numPixels);
fs.Close();
// This application model uses too much memory.
// Releasing memory as soon as possible is a good idea,
// especially global state.
state.pixels = null;
fs = null;
// Record that an image is finished now.
lock (NumImagesMutex)
{
NumImagesToFinish--;
if (NumImagesToFinish == 0)
{
Monitor.Enter(WaitObject);
Monitor.Pulse(WaitObject);
Monitor.Exit(WaitObject);
}
}
}
public static void ProcessImage(byte[] pixels, int imageNum)
{
Console.WriteLine("ProcessImage {0}", imageNum);
int y;
// Perform some CPU-intensive operation on the image.
for (int x = 0; x < processImageRepeats; x += 1)
for (y = 0; y < numPixels; y += 1)
pixels[y] += 1;
Console.WriteLine("ProcessImage {0} done.", imageNum);
}
public static void ProcessImagesInBulk()
{
Console.WriteLine("Processing images... ");
long t0 = Environment.TickCount;
NumImagesToFinish = numImages;
AsyncCallback readImageCallback = new
AsyncCallback(ReadInImageCallback);
for (int i = 0; i < numImages; i++)
{
ImageStateObject state = new ImageStateObject();
state.pixels = new byte[numPixels];
state.imageNum = i;
// Very large items are read only once, so you can make the
// buffer on the FileStream very small to save memory.
FileStream fs = new FileStream(ImageBaseName + i + ".tmp",
FileMode.Open, FileAccess.Read, FileShare.Read, 1, true);
state.fs = fs;
fs.BeginRead(state.pixels, 0, numPixels, readImageCallback,
state);
}
// Determine whether all images are done being processed.
// If not, block until all are finished.
bool mustBlock = false;
lock (NumImagesMutex)
{
if (NumImagesToFinish > 0)
mustBlock = true;
}
if (mustBlock)
{
Console.WriteLine("All worker threads are queued. " +
" Blocking until they complete. numLeft: {0}",
NumImagesToFinish);
Monitor.Enter(WaitObject);
Monitor.Wait(WaitObject);
Monitor.Exit(WaitObject);
}
long t1 = Environment.TickCount;
Console.WriteLine("Total time processing images: {0}ms",
(t1 - t0));
}
public static void Cleanup()
{
for (int i = 0; i < numImages; i++)
{
File.Delete(ImageBaseName + i + ".tmp");
File.Delete(ImageBaseName + i + ".done");
}
}
public static void TryToClearDiskCache()
{
// Try to force all pending writes to disk, and clear the
// disk cache of any data.
byte[] bytes = new byte[100 * (1 << 20)];
for (int i = 0; i < bytes.Length; i++)
bytes[i] = 0;
bytes = null;
GC.Collect();
Thread.Sleep(2000);
}
public static void Main(String[] args)
{
Console.WriteLine("Bulk image processing sample application," +
" using asynchronous IO");
Console.WriteLine("Simulates applying a simple " +
"transformation to {0} \"images\"", numImages);
Console.WriteLine("(Async FileStream & Threadpool benchmark)");
Console.WriteLine("Warning - this test requires {0} " +
"bytes of temporary space", (numPixels * numImages * 2));
if (args.Length == 1)
{
processImageRepeats = Int32.Parse(args[0]);
Console.WriteLine("ProcessImage inner loop - {0}.",
processImageRepeats);
}
MakeImageFiles();
TryToClearDiskCache();
ProcessImagesInBulk();
Cleanup();
}
[DllImport("KERNEL32", SetLastError = true)]
private static extern void FlushFileBuffers(SafeFileHandle handle);
}
同様に、同期操作を実装する例を次に示します。
Imports System
Imports System.IO
Imports System.Threading
Imports System.Runtime.InteropServices
Imports System.Runtime.Remoting.Messaging
Imports System.Security.Permissions
Imports Microsoft.Win32.SafeHandles
Module BulkImageProcSync
Dim ImageBaseName As String = "tmpImage-"
Dim numImages As Integer = 200
Dim numPixels As Integer = 512 * 512
' ProcessImage has a simple O(N) loop, and you can vary the number
' of times you repeat that loop to make the application more CPU-
' bound or more IO-bound.
Dim processImageRepeats As Integer = 20
<SecurityPermissionAttribute(SecurityAction.Demand, Flags:=SecurityPermissionFlag.UnmanagedCode)> _
Sub MakeImageFiles()
Dim sides As Integer = Fix(Math.Sqrt(numPixels))
Console.Write("Making {0} {1}x{1} images... ", numImages, sides)
Dim pixels(numPixels) As Byte
Dim i As Integer
For i = 0 To numPixels
pixels(i) = 255
Next i
Dim fs As FileStream
For i = 0 To numImages
fs = New FileStream(ImageBaseName + i.ToString + ".tmp", FileMode.Create, FileAccess.Write, FileShare.None, 8192, False)
fs.Write(pixels, 0, pixels.Length)
FlushFileBuffers(fs.SafeFileHandle)
fs.Close()
Next i
fs = Nothing
Console.WriteLine("Done.")
End Sub
Sub ProcessImage(ByVal pixels() As Byte, ByVal imageNum As Integer)
Console.WriteLine("ProcessImage {0}", imageNum)
Dim y As Integer
' Perform some CPU-intensive operation on the image.
Dim x As Integer
For x = 0 To processImageRepeats
For y = 0 To numPixels
pixels(y) = 1
Next y
Next x
Console.WriteLine("ProcessImage {0} done.", imageNum)
End Sub
Sub ProcessImagesInBulk()
Console.WriteLine("Processing images... ")
Dim t0 As Long = Environment.TickCount
Dim pixels(numPixels) As Byte
Dim input As FileStream
Dim output As FileStream
Dim i As Integer
For i = 0 To numImages
input = New FileStream(ImageBaseName + i.ToString + ".tmp", FileMode.Open, FileAccess.Read, FileShare.Read, 4196, False)
input.Read(pixels, 0, numPixels)
input.Close()
ProcessImage(pixels, i)
output = New FileStream(ImageBaseName + i.ToString + ".done", FileMode.Create, FileAccess.Write, FileShare.None, 4196, False)
output.Write(pixels, 0, numPixels)
output.Close()
Next i
input = Nothing
output = Nothing
Dim t1 As Long = Environment.TickCount
Console.WriteLine("Total time processing images: {0}ms", t1 - t0)
End Sub
Sub Cleanup()
Dim i As Integer
For i = 0 To numImages
File.Delete(ImageBaseName + i.ToString + ".tmp")
File.Delete(ImageBaseName + i.ToString + ".done")
Next i
End Sub
Sub TryToClearDiskCache()
Dim bytes(100 * (1 << 20)) As Byte
Dim i As Integer
For i = 0 To bytes.Length - 1
bytes(i) = 0
Next i
bytes = Nothing
GC.Collect()
Thread.Sleep(2000)
End Sub
Sub Main(ByVal args() As String)
Console.WriteLine("Bulk image processing sample application," + " using synchronous I/O.")
Console.WriteLine("Simulates applying a simple " + "transformation to {0} ""images.""", numImages)
Console.WriteLine("(ie, Sync FileStream benchmark).")
Console.WriteLine("Warning - this test requires {0} " + "bytes of temporary space", numPixels * numImages * 2)
If args.Length = 1 Then
processImageRepeats = Int32.Parse(args(0))
Console.WriteLine("ProcessImage inner loop {0}", processImageRepeats)
End If
MakeImageFiles()
TryToClearDiskCache()
ProcessImagesInBulk()
Cleanup()
End Sub
<DllImport("KERNEL32", SetLastError:=True)> _
Sub FlushFileBuffers(ByVal handle As SafeFileHandle)
End Sub
End Module
using System;
using System.IO;
using System.Threading;
using System.Runtime.InteropServices;
using System.Runtime.Remoting.Messaging;
using System.Security.Permissions;
using Microsoft.Win32.SafeHandles;
public class BulkImageProcSync
{
public const String ImageBaseName = "tmpImage-";
public const int numImages = 200;
public const int numPixels = 512 * 512;
// ProcessImage has a simple O(N) loop, and you can vary the number
// of times you repeat that loop to make the application more CPU-
// bound or more IO-bound.
public static int processImageRepeats = 20;
[SecurityPermissionAttribute(SecurityAction.Demand, Flags=SecurityPermissionFlag.UnmanagedCode)]
public static void MakeImageFiles()
{
int sides = (int)Math.Sqrt(numPixels);
Console.Write("Making {0} {1}x{1} images... ", numImages,
sides);
byte[] pixels = new byte[numPixels];
int i;
for (i = 0; i < numPixels; i++)
pixels[i] = (byte)i;
FileStream fs;
for (i = 0; i < numImages; i++)
{
fs = new FileStream(ImageBaseName + i + ".tmp",
FileMode.Create, FileAccess.Write, FileShare.None,
8192, false);
fs.Write(pixels, 0, pixels.Length);
FlushFileBuffers(fs.SafeFileHandle);
fs.Close();
}
fs = null;
Console.WriteLine("Done.");
}
public static void ProcessImage(byte[] pixels, int imageNum)
{
Console.WriteLine("ProcessImage {0}", imageNum);
int y;
// Perform some CPU-intensive operation on the image.
for (int x = 0; x < processImageRepeats; x += 1)
for (y = 0; y < numPixels; y += 1)
pixels[y] += 1;
Console.WriteLine("ProcessImage {0} done.", imageNum);
}
public static void ProcessImagesInBulk()
{
Console.WriteLine("Processing images... ");
long t0 = Environment.TickCount;
byte[] pixels = new byte[numPixels];
FileStream input;
FileStream output;
for (int i = 0; i < numImages; i++)
{
input = new FileStream(ImageBaseName + i + ".tmp",
FileMode.Open, FileAccess.Read, FileShare.Read,
4196, false);
input.Read(pixels, 0, numPixels);
input.Close();
ProcessImage(pixels, i);
output = new FileStream(ImageBaseName + i + ".done",
FileMode.Create, FileAccess.Write, FileShare.None,
4196, false);
output.Write(pixels, 0, numPixels);
output.Close();
}
input = null;
output = null;
long t1 = Environment.TickCount;
Console.WriteLine("Total time processing images: {0}ms",
(t1 - t0));
}
public static void Cleanup()
{
for (int i = 0; i < numImages; i++)
{
File.Delete(ImageBaseName + i + ".tmp");
File.Delete(ImageBaseName + i + ".done");
}
}
public static void TryToClearDiskCache()
{
byte[] bytes = new byte[100 * (1 << 20)];
for (int i = 0; i < bytes.Length; i++)
bytes[i] = 0;
bytes = null;
GC.Collect();
Thread.Sleep(2000);
}
public static void Main(String[] args)
{
Console.WriteLine("Bulk image processing sample application," +
" using synchronous I/O.");
Console.WriteLine("Simulates applying a simple " +
"transformation to {0} \"images.\"", numImages);
Console.WriteLine("(ie, Sync FileStream benchmark).");
Console.WriteLine("Warning - this test requires {0} " +
"bytes of temporary space", (numPixels * numImages * 2));
if (args.Length == 1)
{
processImageRepeats = Int32.Parse(args[0]);
Console.WriteLine("ProcessImage inner loop � {0}",
processImageRepeats);
}
MakeImageFiles();
TryToClearDiskCache();
ProcessImagesInBulk();
Cleanup();
}
[DllImport("KERNEL32", SetLastError = true)]
private static extern void FlushFileBuffers(SafeFileHandle handle);
}