c# 使用Task实现非阻塞式的I/O操作
在前面的《基于任务的异步编程模式(TAP)》文章中讲述了.net4.5框架下的异步操作自我实现方式,实际上,在.net4.5中部分类已实现了异步封装。如在.net4.5中,Stream类加入了Async方法,所以基于流的通信方式都可以实现异步操作。
1、异步读取文件数据
publicstaticvoidTaskFromIOStreamAsync(stringfileName) { intchunkSize=4096; byte[]buffer=newbyte[chunkSize]; FileStreamfileStream=newFileStream(fileName,FileMode.Open,FileAccess.Read,FileShare.Read,chunkSize,true); Tasktask=fileStream.ReadAsync(buffer,0,buffer.Length); task.ContinueWith((readTask)=> { intamountRead=readTask.Result; //必须在ContinueWith中释放文件流 fileStream.Dispose(); Console.WriteLine($"Async(Simple)Read{amountRead}bytes"); }); }
上述代码中,异步读取数据只读取了一次,完成读取后就将执行权交还主线程了。但在真实场景中,需要从流中读取多次才能获得全部的数据(如文件数据大于给定缓冲区大小,或处理来自网络流的数据(数据还没全部到达机器))。因此,为了完成异步读取操作,需要连续从流中读取数据,直到获取所需全部数据。
上述问题导致需要两级Task来处理。外层的Task用于全部的读取工作,供调用程序使用。内层的Task用于每次的读取操作。
第一次异步读取会返回一个Task。如果直接返回调用Wait或者ContinueWith的地方,会在第一次读取结束后继续向下执行。实际上是希望调用者在完成全部读取操作后才执行。因此,不能把第一个Task发布会给调用者,需要一个“伪Task”在完成全部读取操作后再返回。
上述问题需要使用到TaskCompletionSource
publicstaticTaskAsynchronousRead(stringfileName) { intchunkSize=4096; byte[]buffer=newbyte[chunkSize]; //创建一个返回的伪Task对象 TaskCompletionSource tcs=newTaskCompletionSource (); MemoryStreamfileContents=newMemoryStream();//用于保存读取的内容 FileStreamfileStream=newFileStream(fileName,FileMode.Open,FileAccess.Read,FileShare.Read,chunkSize,true); fileContents.Capacity+=chunkSize;//指定缓冲区大小。好像Capacity会自动增长,设置与否没关系,后续写入多少数据,就增长多少 Task task=fileStream.ReadAsync(buffer,0,buffer.Length); task.ContinueWith(readTask=>ContinueRead(readTask,fileStream,fileContents,buffer,tcs)); //在ContinueWith中循环读取,读取完成后,再返回tcs的Task returntcs.Task; } /// ///继续读取数据 /// ///读取数据的线程 /// 文件流 /// 文件存放位置 /// 读取数据缓存 /// 伪Task对象 privatestaticvoidContinueRead(Task task,FileStreamfileStream,MemoryStreamfileContents,byte[]buffer,TaskCompletionSource tcs) { if(task.IsCompleted) { intbytesRead=task.Result; fileContents.Write(buffer,0,bytesRead);//写入内存区域。似乎Capacity会自动增长 if(bytesRead>0) { //虽然看似是一个新的任务,但是使用了ContinueWith,所以使用的是同一个线程。 //没有读取完,开启另一个异步继续读取 Task newTask=fileStream.ReadAsync(buffer,0,buffer.Length); //此处做了一个循环 newTask.ContinueWith(readTask=>ContinueRead(readTask,fileStream,fileContents,buffer,tcs)); } else { //已经全部读取完,所以需要返回数据 tcs.TrySetResult(fileContents.Length); fileStream.Dispose(); fileContents.Dispose();//应该是在使用了数据之后才释放数据缓冲区的数据 } } }
2、适应Task的异步编程模式
.NETFramework中的旧版异步方法都带有“Begin-”和“End-”前缀。这些方法仍然有效,为了接口的一致性,它们可以被封装到Task中。
FromAsyn方法把流的BeginRead和EndRead方法作为参数,再加上存放数据的缓冲区。BeginRead和EndRead方法会执行,并在EndRead完成后调用ContinuationTask,把控制权交回主代码。上述例子会关闭流并返回转换的数据
constintReadSize=256; //////从文件中获取字符串 /// ///文件路径 /// 字符串 publicstaticTaskGetStringFromFile(stringpath) { FileInfofile=newFileInfo(path); byte[]buffer=newbyte[1024];//存放数据的缓冲区 FileStreamfileStream=newFileStream( path,FileMode.Open,FileAccess.Read,FileShare.None,buffer.Length, FileOptions.DeleteOnClose|FileOptions.Asynchronous); Task task=Task .Factory.FromAsync(fileStream.BeginRead,fileStream.EndRead, buffer,0,ReadSize,null);//此参数为BeginRead需要的参数 TaskCompletionSource tcs=newTaskCompletionSource (); task.ContinueWith(taskRead=>OnReadBuffer(taskRead,fileStream,buffer,0,tcs)); returntcs.Task; } /// ///读取数据 /// ///读取任务 /// 文件流 /// 读取数据存放位置 /// 读取偏移量 /// 伪Task privatestaticvoidOnReadBuffer(Task taskRead,FileStreamfileStream,byte[]buffer,intoffset,TaskCompletionSource tcs) { intreadLength=taskRead.Result; if(readLength>0) { intnewOffset=offset+readLength; Task task=Task .Factory.FromAsync(fileStream.BeginRead,fileStream.EndRead, buffer,newOffset,Math.Min(buffer.Length-newOffset,ReadSize),null); task.ContinueWith(callBackTask=>OnReadBuffer(callBackTask,fileStream,buffer,newOffset,tcs)); } else { tcs.TrySetResult(System.Text.Encoding.UTF8.GetString(buffer,0,buffer.Length)); fileStream.Dispose(); } }
3、使用async和await方式读取数据
下面的示例中,使用了async和await关键字实现异步读取一个文件的同时进行压缩并写入另一个文件。所有位于await关键字之前的操作都运行于调用者线程,从await开始的操作都是在ContinuationTask中运行。但有无法使用这两个关键字的场合:①Task的结束时机不明确时;②必须用到多级Task和TaskCompletionSource时
//////同步方法的压缩 /// ///文件清单 publicstaticvoidSyncCompress(IEnumerable lstFiles) { byte[]buffer=newbyte[16384]; foreach(stringfileinlstFiles) { using(FileStreaminputStream=File.OpenRead(file)) { using(FileStreamoutputStream=File.OpenWrite(file+".compressed")) { using(System.IO.Compression.GZipStreamcompressStream=newSystem.IO.Compression.GZipStream(outputStream,System.IO.Compression.CompressionMode.Compress)) { intread=0; while((read=inputStream.Read(buffer,0,buffer.Length))>0) { compressStream.Write(buffer,0,read); } } } } } } /// ///异步方法的文件压缩 /// ///需要压缩的文件 /// publicstaticasyncTaskAsyncCompress(IEnumerable lstFiles) { byte[]buffer=newbyte[16384]; foreach(stringfileinlstFiles) { using(FileStreaminputStream=File.OpenRead(file)) { using(FileStreamoutputStream=File.OpenWrite(file+".compressed")) { using(System.IO.Compression.GZipStreamcompressStream=newSystem.IO.Compression.GZipStream(outputStream,System.IO.Compression.CompressionMode.Compress)) { intread=0; while((read=awaitinputStream.ReadAsync(buffer,0,buffer.Length))>0) { awaitcompressStream.WriteAsync(buffer,0,read); } } } } } }
以上就是c#使用Task实现非阻塞式的I/O操作的详细内容,更多关于c#实现非阻塞式的I/O操作的资料请关注毛票票其它相关文章!