大量时间序列快照文件到时间序列文件相互转化的一个思路

南卓铜([email protected])

想像这样一种情境,我们有一个空间网格文件,分辨率为30×30 m,空间范围为30x30km^2,每个格子一个数据的话,也就是说有1,000,000个数据(假设是double类型)。每个网格文件可被认为是研究区域时间序列的一个快照。整个时间序列为11年(1997-2007)逐小时,即多达96,408个网格文件。我们要做的是,将全部的时间序列快照文件转换成每网格上的时间序列文件(将形成1M个文件)。类似的,我们还要做的是全部网格的时间序列文件时间序列快照文件的转换。

全部保持文件打开是不可能的,因为文件句柄是很宝贵的资源,每个进程都只允许有限个打开的文件(比如512个)。通常的迭代思路是,打开一个时间序列快照文件(网格文件),取其中一个网格上的数据,打开此网格的时间序列文件,写入时间和对应的数据,关闭两文件。这意味着需要打开和关闭文件1Mx96,408次。大家知道磁盘I/O的开支是十分昂贵的。此迭代方法将需要极长的时间来完成(一周甚至几周,取决于硬盘读取数据)。而内存却没有得到有效的利用。

对于内存十分大(比如64G内存)的服务器,也许可以将全部的待转换文件一次性装载到内存,每个在内存内分析,组合成输出格式,再一次性格式化输出。对于一般的个人机器此方法不通。

为了优化速度,我们采用这样的解决方案。针对时间序列文件时间序列快照文件(网格文件)的转换,

在内存生成n个时间序列快照文件,每个网格上用Missing data来填充 (内存主要消耗在这里)
打开一个时间序列文件
    从时间序列文件读n行(每行包括时间和对应的double值)
    将读取的n个数据,写入对应的n个快照文件的对应位置
关闭此时间序列文件,迭代
将内存内的n个快照文件写出
从n+1的位置开始再迭代以上过程,一直到结果。(关键)

最后一次可能不正好等于n,需要程序作相应控制。n的取值需根据运行的内存情况进行高速。对2G内存的工作站,n取为5000-10000。内存越大,n值可以取越大,可以有更好的执行效果。

当然,也可以同时打开多个时间序列文件,以最大化优化性能,但带来的是迭代控制上的复杂。而且据我的有限测试,同时打开多个时间序列文件,性能并没有得到明显改善(可以理解,因为磁盘I/O的存取本质上讲是磁头的顺序读取,由同一个磁头臂来控制)。

其中需要注意的是地方,是如何控制下一次准确快速定位到n+1的位置上。时间序列文件是文本文件,顺序读取在性能上很受影响。比如在最后一个循环时,将先遍历前面的全部行,然后才到达需要的起始行。我们需要以二进制形式打开,并自行控制每个时间序列文件的起始读取位置(各文件位置可能不一样,由于每行数据长度不等)。在c#里,以StreamReader打开,无法通过base stream取得当前准确的位置(position)。我们构造了TimeSeriesDataFile类。初始化需要给定文件名和起始读取的位置。ReadLines函数可以返回给定数目的数据行,通过CurrentPosition属性取得下一次读取的起始位置。

using System;
using System.Collections.Generic;
using System.Text;

namespace nzt.TimeSeries2Spatial
{
    /// <summary>
    /// Access time series data text file as binary.
    /// </summary>
    class TimeSeriesDataFile
    {

        private string _filepath;
        private System.IO.FileStream _fs;
        private long _lastpos;
        private long _startpos;
        private const int MAXLINELENGTH=50; //bytes, ensure it larger than max length of each line.

        public TimeSeriesDataFile(string filename, long startposition)
        {
            _filepath = filename;
            _startpos = startposition;
            Open();

        }

        private void Open()
        {
            _fs = new System.IO.FileStream(_filepath, System.IO.FileMode.Open, System.IO.FileAccess.Read);
            _lastpos=_fs.Seek(_startpos, System.IO.SeekOrigin.Begin);
        }

        /// <summary>
        /// Read a number of lines from stream beginning at startposition.
        /// </summary>
        /// <param name="count">Number of lines to be expected to return</param>
        /// <returns></returns>
        public string[] ReadLines(int count)
        {
            if (_fs == null) return null;

            byte[] buffer = new byte[MAXLINELENGTH * count];

            int bytesRead=_fs.Read(buffer, 0, buffer.Length);

            if (bytesRead == 0) return null;

            //we have data in buffer now.
            List<String> sb_list = new List<String>();
            int c = count;
            StringBuilder sb = new StringBuilder();
            int i;
            for (i = 0; i < bytesRead; i++)
            {
                if (buffer[i] != ‘r’ && buffer[i] != ‘n’)
                    sb.Append((char)buffer[i]);
                else if (buffer[i] == ‘n’)
                {
                    sb_list.Add(sb.ToString());
                    if (–c<=0) break;
                    sb = new StringBuilder();
                }
            }
            if (c>0 && sb.Length>0) sb_list.Add(sb.ToString());

            _lastpos += i+1;

            return sb_list.ToArray();
        }

        public long CurrentPosition
        {
            get { return _lastpos; }
        }

        public void Close()
        {
            _fs.Close();
        }

        ~TimeSeriesDataFile()
        {
            Close();
        }

    }
}

对于时间序列快照文件(网格文件)时间序列文件的转换,应用同样的思路。但由于时间序列快照文件(网格文件)一般较小,比如几百KB(相对,11年的逐小时时间序列文件则到2MB以上),则无须对StreamReader进行改造,可以一次性load到内存,在内存进行定位分析。

如果大家还有好的解决方案,也请分享。

Leave a Reply

Your email address will not be published. Required fields are marked *