南卓铜([email protected])
想像这样一种情境,我们有一个空间网格文件,分辨率为30×30 m,空间范围为30x30km^2,每个格子一个数据的话,也就是说有1,000,000个数据(假设是double类型)。每个网格文件可被认为是研究区域时间序列的一个快照。整个时间序列为11年(1997-2007)逐小时,即多达96,408个网格文件。我们要做的是,将全部的时间序列快照文件转换成每网格上的时间序列文件(将形成1M个文件)。类似的,我们还要做的是全部网格的时间序列文件到时间序列快照文件的转换。
全部保持文件打开是不可能的,因为文件句柄是很宝贵的资源,每个进程都只允许有限个打开的文件(比如512个)。通常的迭代思路是,打开一个时间序列快照文件(网格文件),取其中一个网格上的数据,打开此网格的时间序列文件,写入时间和对应的数据,关闭两文件。这意味着需要打开和关闭文件1Mx96,408次。大家知道磁盘I/O的开支是十分昂贵的。此迭代方法将需要极长的时间来完成(一周甚至几周,取决于硬盘读取数据)。而内存却没有得到有效的利用。
对于内存十分大(比如64G内存)的服务器,也许可以将全部的待转换文件一次性装载到内存,每个在内存内分析,组合成输出格式,再一次性格式化输出。对于一般的个人机器此方法不通。
为了优化速度,我们采用这样的解决方案。针对时间序列文件到时间序列快照文件(网格文件)的转换,
在内存生成n个时间序列快照文件,每个网格上用Missing data来填充 (内存主要消耗在这里)
打开一个时间序列文件
从时间序列文件读n行(每行包括时间和对应的double值)
将读取的n个数据,写入对应的n个快照文件的对应位置
关闭此时间序列文件,迭代
将内存内的n个快照文件写出
从n+1的位置开始再迭代以上过程,一直到结果。(关键)
最后一次可能不正好等于n,需要程序作相应控制。n的取值需根据运行的内存情况进行高速。对2G内存的工作站,n取为5000-10000。内存越大,n值可以取越大,可以有更好的执行效果。
当然,也可以同时打开多个时间序列文件,以最大化优化性能,但带来的是迭代控制上的复杂。而且据我的有限测试,同时打开多个时间序列文件,性能并没有得到明显改善(可以理解,因为磁盘I/O的存取本质上讲是磁头的顺序读取,由同一个磁头臂来控制)。
其中需要注意的是地方,是如何控制下一次准确快速定位到n+1的位置上。时间序列文件是文本文件,顺序读取在性能上很受影响。比如在最后一个循环时,将先遍历前面的全部行,然后才到达需要的起始行。我们需要以二进制形式打开,并自行控制每个时间序列文件的起始读取位置(各文件位置可能不一样,由于每行数据长度不等)。在c#里,以StreamReader打开,无法通过base stream取得当前准确的位置(position)。我们构造了TimeSeriesDataFile类。初始化需要给定文件名和起始读取的位置。ReadLines函数可以返回给定数目的数据行,通过CurrentPosition属性取得下一次读取的起始位置。
using System;
using System.Collections.Generic;
using System.Text;
namespace nzt.TimeSeries2Spatial
{
/// <summary>
/// Access time series data text file as binary.
/// </summary>
class TimeSeriesDataFile
{
private string _filepath;
private System.IO.FileStream _fs;
private long _lastpos;
private long _startpos;
private const int MAXLINELENGTH=50; //bytes, ensure it larger than max length of each line.
public TimeSeriesDataFile(string filename, long startposition)
{
_filepath = filename;
_startpos = startposition;
Open();
}
private void Open()
{
_fs = new System.IO.FileStream(_filepath, System.IO.FileMode.Open, System.IO.FileAccess.Read);
_lastpos=_fs.Seek(_startpos, System.IO.SeekOrigin.Begin);
}
/// <summary>
/// Read a number of lines from stream beginning at startposition.
/// </summary>
/// <param name="count">Number of lines to be expected to return</param>
/// <returns></returns>
public string[] ReadLines(int count)
{
if (_fs == null) return null;
byte[] buffer = new byte[MAXLINELENGTH * count];
int bytesRead=_fs.Read(buffer, 0, buffer.Length);
if (bytesRead == 0) return null;
//we have data in buffer now.
List<String> sb_list = new List<String>();
int c = count;
StringBuilder sb = new StringBuilder();
int i;
for (i = 0; i < bytesRead; i++)
{
if (buffer[i] != ‘r’ && buffer[i] != ‘n’)
sb.Append((char)buffer[i]);
else if (buffer[i] == ‘n’)
{
sb_list.Add(sb.ToString());
if (–c<=0) break;
sb = new StringBuilder();
}
}
if (c>0 && sb.Length>0) sb_list.Add(sb.ToString());
_lastpos += i+1;
return sb_list.ToArray();
}
public long CurrentPosition
{
get { return _lastpos; }
}
public void Close()
{
_fs.Close();
}
~TimeSeriesDataFile()
{
Close();
}
}
}
对于时间序列快照文件(网格文件)到时间序列文件的转换,应用同样的思路。但由于时间序列快照文件(网格文件)一般较小,比如几百KB(相对,11年的逐小时时间序列文件则到2MB以上),则无须对StreamReader进行改造,可以一次性load到内存,在内存进行定位分析。
如果大家还有好的解决方案,也请分享。