blob: a1b85ed9ae1b41d3a47e79434f3e21e904f84d20 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#ifdef WIN32
/* exclude transformation features on windows for now */
#undef GPFXDIST
#endif
/*#include "c.h"*/
#ifdef WIN32
#define _WINSOCKAPI_
#include <windows.h>
#include <winsock2.h>
#include <ws2tcpip.h>
#endif
#include <fstream/fstream.h>
#include <fstream/gfile.h>
#include <assert.h>
#include <glob.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include "pg_config_manual.h"
#ifdef GPFXDIST
#include <gpfxdist.h>
#endif
#define FILE_ERROR_SZ MAXPGPATH
char* format_error(char* c1, char* c2);
typedef struct
{
int gl_pathc;
char** gl_pathv;
} glob_and_copy_t;
struct fstream_t
{
glob_and_copy_t glob;
gfile_t fd;
int fidx; /* current index in ffd[] */
int64_t foff; /* current offset in ffd[fidx] */
int64_t line_number;
int64_t compressed_size;
int64_t compressed_position;
int skip_header_line;
char* buffer; /* buffer to store data read from file */
int buffer_cur_size; /* number of bytes in buffer currently */
const char* ferror; /* error string */
struct fstream_options options;
};
/*
* Returns a pointer to the end of the last delimiter occurrence,
* or the start pointer if delimiter doesn't appear.
*
* delimiter_length MUST be > 0
*/
static char *find_last_eol_delim (const char *start, const int size,
const char *delimiter, const int delimiter_length)
{
char* p;
if (size <= delimiter_length)
return (char*)start - 1;
for (p = (char*)start + size - delimiter_length; start <= p; p--)
{
if (memcmp(p, delimiter, delimiter_length) == 0)
{
return (char*)p + delimiter_length - 1;
}
}
return (char*)start - 1;
}
/*
* Returns a pointer to the end of the first delimiter occurrence,
* or the end pointer if delimiter doesn't appear.
*
* delimiter_length MUST be > 0
*/
static char *find_first_eol_delim (char *start, char *end,
const char *delimiter, const int delimiter_length)
{
char *search_limit = (char*)end - delimiter_length;
if (end - start <= delimiter_length)
return end;
for (; start <= search_limit; start++)
{
if (memcmp(start, delimiter, delimiter_length) == 0)
return (char*)start + delimiter_length - 1;
}
return end;
}
/*
* glob_and_copy
*
* Given a pattern, glob it and populate pglob with the results that match it.
* For example, if 'pattern' equals '/somedir/<asterisk>.txt' and the filesystem has
* 2 .txt files inside of somedir, namely 1.txt and 2.txt, then the result is:
* g.gl_pathc = 2
* g.gl_pathv[0] = '/somedir/1.txt'
* g.gl_pathv[1] = '/somedir/2.txt'
*/
static int glob_and_copy(const char *pattern, int flags, int(*errfunc)(
const char *epath, int eerno), glob_and_copy_t *pglob)
{
glob_t g;
int i = glob(pattern, flags, errfunc, &g), j;
if (!i)
{
char **a = gfile_malloc(sizeof *a * (pglob->gl_pathc + g.gl_pathc));
if (!a)
i = GLOB_NOSPACE;
else
{
for (j = 0; j < pglob->gl_pathc; j++)
a[j] = pglob->gl_pathv[j];
if (pglob->gl_pathv)
gfile_free(pglob->gl_pathv);
pglob->gl_pathv = a;
a += pglob->gl_pathc;
for (j = 0; j < g.gl_pathc; j++)
{
char*b = g.gl_pathv[j];
if (!(*a = gfile_malloc(strlen(b) + 1)))
{
i = GLOB_NOSPACE;
break;
}
strcpy(*a++, b);
pglob->gl_pathc++;
}
}
globfree(&g);
}
return i;
}
/*
* glob_and_copyfree
*
* free memory allocated in pglob.
*/
static void glob_and_copyfree(glob_and_copy_t *pglob)
{
if (pglob->gl_pathv)
{
int i;
for (i = 0; i < pglob->gl_pathc; i++)
gfile_free(pglob->gl_pathv[i]);
gfile_free(pglob->gl_pathv);
pglob->gl_pathc = 0;
pglob->gl_pathv = 0;
}
}
const char*
fstream_get_error(fstream_t*fs)
{
return fs->ferror;
}
/*
* scan_csv_records
*
* Scan the data in [p, q) according to the csv parsing rules. Return as many
* complete csv records as possible. However, if 'one' was passed in, return the
* first complete record only.
*
* We need this function for gpfdist because until 'text' format we can't just
* peek at the line newline in the buffer and send the whole data chunk to the
* server. That is because it may be inside a quote. We have to carefully parse
* the data from the start in order to find the last unquoted newline.
*
*/
static char*
scan_csv_records(char *p, char *q, int one, fstream_t *fs)
{
int in_quote = 0;
int last_was_esc = 0;
int qc = fs->options.quote;
int xc = fs->options.escape;
char* last_record_loc = 0;
int64_t line_number = fs->line_number;
while (p < q)
{
int ch = *p++;
if (ch == '\n')
line_number++;
if (in_quote)
{
if (!last_was_esc)
{
if (ch == qc)
in_quote = 0;
else if (ch == xc)
last_was_esc = 1;
}
else
last_was_esc = 0;
}
else if (ch == '\n')
{
last_record_loc = p;
fs->line_number = line_number;
if (one)
break;
}
else if (ch == qc)
in_quote = 1;
}
return last_record_loc;
}
/* close the file stream */
void fstream_close(fstream_t* fs)
{
#ifdef GPFXDIST
/*
* remove temporary file we created to hold the file paths
*/
if (fs->options.transform && fs->options.transform->tempfilename)
{
apr_file_remove(fs->options.transform->tempfilename, fs->options.transform->mp);
fs->options.transform->tempfilename = NULL;
}
#endif
if(fs->buffer)
gfile_free(fs->buffer);
glob_and_copyfree(&fs->glob);
gfile_close(&fs->fd);
gfile_free(fs);
}
static int fpath_all_directories(const glob_and_copy_t *glob)
{
int i;
for (i = 0; i < glob->gl_pathc; i++)
{
const char *a = glob->gl_pathv[i];
if (!*a || a[strlen(a) - 1] != '/')
return 0;
}
return 1;
}
static int expand_directories(fstream_t *fs)
{
glob_and_copy_t g;
int i, j;
memset(&g, 0, sizeof g);
for (i = 0; i < fs->glob.gl_pathc; i++)
{
const char* a = fs->glob.gl_pathv[i];
char* b = gfile_malloc(2 * strlen(a) + 2), *c;
if (!b)
{
gfile_printf_then_putc_newline("fstream out of memory");
glob_and_copyfree(&g);
return 1;
}
for (c = b ; *a ; a++)
{
if (*a == '?' || *a == '*' || *a == '[')
*c++ = '\\';
*c++ = *a;
}
*c++ = '*';
*c++ = 0;
j = glob_and_copy(b, 0, 0, &g);
gfile_free(b);
if (j == GLOB_NOMATCH)
gfile_printf_then_putc_newline("fstream %s is empty directory",
fs->glob.gl_pathv[i]);
else if (j)
{
gfile_printf_then_putc_newline("fstream glob failed");
glob_and_copyfree(&g);
return 1;
}
}
glob_and_copyfree(&fs->glob);
fs->glob = g;
return 0;
}
static int glob_path(fstream_t *fs, const char *path)
{
char *path2 = gfile_malloc(strlen(path) + 1);
if (!path2)
{
gfile_printf_then_putc_newline("fstream out of memory");
return 1;
}
path = strcpy(path2, path);
do
{
char* p;
while (*path == ' ')
path++;
p = strchr(path, ' ');
if (p)
*p++ = 0;
if (*path &&
glob_and_copy(path, GLOB_MARK | GLOB_NOCHECK, 0, &fs->glob))
{
gfile_printf_then_putc_newline("fstream glob failed");
return 1;
}
path = p;
} while (path);
gfile_free(path2);
return 0;
}
#ifdef GPFXDIST
/*
* adjust fs->glob so that it contains a single item which is a
* properly allocated copy of the specified filename. assumes fs->glob
* contains at least one name.
*/
static int glob_adjust(fstream_t* fs, char* filename, int* response_code, const char** response_string)
{
int i;
int tlen = strlen(filename) + 1;
char* tname = gfile_malloc(tlen);
if (!tname)
{
*response_code = 500;
*response_string = "fstream out of memory allocating copy of temporary file name";
gfile_printf_then_putc_newline(*response_string);
return 1;
}
for (i = 0; i<fs->glob.gl_pathc; i++)
{
gfile_free(fs->glob.gl_pathv[i]);
fs->glob.gl_pathv[i] = NULL;
}
strcpy(tname, filename);
fs->glob.gl_pathv[0] = tname;
fs->glob.gl_pathc = 1;
return 0;
}
#endif
/*
* fstream_open
*
* Allocate a new file stream given a path (a url). in case of wildcards,
* expand them here. we end up with a final list of files and include them
* in our filestream that we return.
*
* In case of errors we set the proper http response code to send to the client.
*/
fstream_t*
fstream_open(const char *path, const struct fstream_options *options,
int *response_code, const char **response_string)
{
int i;
fstream_t* fs;
*response_code = 500;
*response_string = "Internal Server Error";
if (0 == (fs = gfile_malloc(sizeof *fs)))
{
gfile_printf_then_putc_newline("fstream out of memory");
return 0;
}
memset(fs, 0, sizeof *fs);
fs->options = *options;
fs->buffer = gfile_malloc(options->bufsize);
/*
* get a list of all files that were requested to be read and include them
* in our fstream. This includes any wildcard pattern matching.
*/
if (glob_path(fs, path))
{
fstream_close(fs);
return 0;
}
/*
* If the list of files in our filestrem includes a directory name, expand
* the directory and add all the files inside of it.
*/
if (fpath_all_directories(&fs->glob))
{
if (expand_directories(fs))
{
fstream_close(fs);
return 0;
}
}
/*
* check if we don't have any matching files
*/
if (fs->glob.gl_pathc == 0)
{
gfile_printf_then_putc_newline("fstream bad path: %s", path);
fstream_close(fs);
*response_code = 404;
*response_string = "No matching file(s) found";
return 0;
}
if (fs->glob.gl_pathc != 1 && options->forwrite)
{
gfile_printf_then_putc_newline("fstream open for write found more than one file (%d)",
fs->glob.gl_pathc);
*response_code = 404;
*response_string = "More than 1 file found for writing. Unsupported operation.";
fstream_close(fs);
return 0;
}
#ifdef GPFXDIST
/*
* when the subprocess transformation wants to handle iteration over the files
* we write the paths to a temporary file and replace the fs->glob items with
* just a single entry referencing the path to the temporary file.
*/
if (options->transform && options->transform->pass_paths)
{
apr_pool_t* mp = options->transform->mp;
apr_file_t* f = NULL;
const char* tempdir = NULL;
char* tempfilename = NULL;
apr_status_t rv;
if ((rv = apr_temp_dir_get(&tempdir, mp)) != APR_SUCCESS)
{
*response_code = 500;
*response_string = "failed to get temporary directory for paths file";
gfile_printf_then_putc_newline(*response_string);
fstream_close(fs);
return 0;
}
tempfilename = apr_pstrcat(mp, tempdir, "/pathsXXXXXX", NULL);
if ((rv = apr_file_mktemp(&f, tempfilename, APR_CREATE|APR_WRITE|APR_EXCL, mp)) != APR_SUCCESS)
{
*response_code = 500;
*response_string = "failed to open temporary paths file";
gfile_printf_then_putc_newline(*response_string);
fstream_close(fs);
return 0;
}
options->transform->tempfilename = tempfilename;
for (i = 0; i<fs->glob.gl_pathc; i++)
{
char* filename = fs->glob.gl_pathv[i];
apr_size_t expected = strlen(filename) + 1;
if (apr_file_printf(f, "%s\n", filename) < expected)
{
apr_file_close(f);
*response_code = 500;
*response_string = "failed to fully write path to temporary paths file";
gfile_printf_then_putc_newline(*response_string);
fstream_close(fs);
return 0;
}
}
apr_file_close(f);
if (glob_adjust(fs, tempfilename, response_code, response_string))
{
fstream_close(fs);
return 0;
}
}
#endif
/*
* if writing - check write access rights for the one file.
* if reading - check read access right for all files, and
* then close them, leaving the first file open.
*
*/
for (i = fs->glob.gl_pathc; --i >= 0;)
{
/*
* CR-2173 - the fstream code allows the upper level logic to treat a
* collection of input sources as a single stream. One problem it has
* to handle is the possibility that some of the underlying sources may
* not be readable. Here we're trying to detect potential problems in
* advance by checking that we can open and close each source in our
* list in reverse order.
*
* However in the case of subprocess transformations, we don't want to
* start and stop each transformation in this manner. The check that
* each transformation's underlying input source can be read is still
* useful so we do those until we get to the first source, at which
* point we proceed to just setup the tranformation for it.
*/
struct gpfxdist_t* transform = (i == 0) ? options->transform : NULL;
gfile_close(&fs->fd);
if (gfile_open(&fs->fd, fs->glob.gl_pathv[i], gfile_open_flags(options->forwrite, options->usesync),
response_code, response_string, transform))
{
gfile_printf_then_putc_newline("fstream unable to open file %s",
fs->glob.gl_pathv[i]);
fstream_close(fs);
return 0;
}
fs->compressed_size += gfile_get_compressed_size(&fs->fd);
}
fs->line_number = 1;
fs->skip_header_line = options->header;
return fs;
}
/*
* Updates the currently used filename and line number and offset. Since we
* may be reading from more than 1 file, we need to be up to date all the time.
*/
static void updateCurFileState(fstream_t* fs,
struct fstream_filename_and_offset* fo)
{
if (fo)
{
fo->foff = fs->foff;
fo->line_number = fs->line_number;
strncpy(fo->fname, fs->glob.gl_pathv[fs->fidx], sizeof fo->fname);
fo->fname[sizeof fo->fname - 1] = 0;
}
}
/*
* nextFile
*
* open the next source file, if any.
*
* return 1 if could not open the next file.
* return 0 otherwise.
*/
static int nextFile(fstream_t*fs)
{
int response_code;
const char *response_string;
struct gpfxdist_t* transform = fs->options.transform;
fs->compressed_position += gfile_get_compressed_size(&fs->fd);
gfile_close(&fs->fd);
fs->foff = 0;
fs->line_number = 1;
fs->fidx++;
if (fs->fidx < fs->glob.gl_pathc)
{
fs->skip_header_line = fs->options.header;
if (gfile_open(&fs->fd, fs->glob.gl_pathv[fs->fidx], GFILE_OPEN_FOR_READ,
&response_code, &response_string, transform))
{
gfile_printf_then_putc_newline("fstream unable to open file %s",
fs->glob.gl_pathv[fs->fidx]);
fs->ferror = "unable to open file";
return 1;
}
}
return 0;
}
/*
* format_error
* enables addition of string parameters to the const char* error message in fstream_t
* while enabling the calling functions not to worry about freeing memory - which is
* the present behaviour
*/
char* format_error(char* c1, char* c2)
{
int len1, len2;
static char err_msg[FILE_ERROR_SZ];
memset(err_msg, 0, FILE_ERROR_SZ);
len1 = strlen(c1);
len2 = strlen(c2);
if ( (len1 + len2) > FILE_ERROR_SZ )
{
gfile_printf_then_putc_newline("cannot read file");
return "cannot read file";
}
char* targ = err_msg;
memcpy(targ, c1, len1);
targ += len1;
memcpy(targ, c2, len2);
gfile_printf_then_putc_newline("%s", err_msg);
return err_msg;
}
/*
* fstream_read
*
* Read 'size' bytes of data from the filestream into 'buffer'.
* If 'read_whole_lines' is specified then read up to the last logical row
* in the source buffer. 'fo' keeps the state (name, offset, etc) of the current
* filestream file we are reading from.
*/
int fstream_read(fstream_t *fs,
void *dest,
int size,
struct fstream_filename_and_offset *fo,
const int read_whole_lines,
const char *line_delim_str,
const int line_delim_length)
{
int buffer_capacity = fs->options.bufsize;
static char err_buf[FILE_ERROR_SZ] = {0};
if (fs->ferror)
return -1;
for (;;)
{
ssize_t bytesread; /* num bytes read from filestream */
ssize_t bytesread2; /* same, but when reading a second round */
if (!size || fs->fidx == fs->glob.gl_pathc)
return 0;
/*
* If data source has a header, we consume it now and in order to
* move on to real data that follows it.
*/
if (fs->skip_header_line)
{
char* p = fs->buffer;
char* q = p + fs->buffer_cur_size;
size_t len = 0;
assert(fs->buffer_cur_size < buffer_capacity);
/*
* read data from the source file and fill up the file stream buffer
*/
len = buffer_capacity - fs->buffer_cur_size;
bytesread = gfile_read(&fs->fd, q, len);
if (bytesread < 0)
{
fs->ferror = format_error("cannot read file - ", fs->glob.gl_pathv[fs->fidx]);
return -1;
}
/* update the buffer size according to new byte count we just read */
fs->buffer_cur_size += bytesread;
q += bytesread;
if (fs->options.is_csv)
{
/* csv header */
p = scan_csv_records(p, q, 1, fs);
}
else
{
if (line_delim_length > 0)
{
/* text header with defined EOL */
p = find_first_eol_delim (p, q, line_delim_str, line_delim_length);
}
else
{
/* text header with \n as delimiter (by default) */
for (; p < q && *p != '\n'; p++)
;
}
p = (p < q) ? p + 1 : 0;
fs->line_number++;
}
if (!p)
{
if (fs->buffer_cur_size == buffer_capacity)
{
gfile_printf_then_putc_newline(
"fstream ERROR: header too long in file %s",
fs->glob.gl_pathv[fs->fidx]);
fs->ferror = "line too long in file";
return -1;
}
p = q;
}
/*
* update the filestream buffer offset to past last line read and
* copy the end of the buffer (past header data) to the beginning.
* we now bypassed the header data and can continue to real data.
*/
fs->foff += p - fs->buffer;
fs->buffer_cur_size = q - p;
memmove(fs->buffer, p, fs->buffer_cur_size);
fs->skip_header_line = 0;
}
/*
* If we need to read all the data up to the last *complete* logical
* line in the data buffer (like gpfdist for example) - we choose this
* path. We grab the bigger chunk we can get that includes whole lines.
* Otherwise, if we just want the whole buffer we skip.
*/
if (read_whole_lines)
{
char *p;
ssize_t total_bytes = fs->buffer_cur_size;
assert(size >= buffer_capacity);
if (total_bytes > 0)
{
/*
* source buffer is not empty. copy the data from the beginning
* up to the current length before moving on to reading more
*/
fs->buffer_cur_size = 0;
updateCurFileState(fs, fo);
memcpy(dest, fs->buffer, total_bytes);
}
/* read more data from source file into destination buffer */
bytesread2 = gfile_read(&fs->fd, (char*) dest + total_bytes, size - total_bytes);
if (bytesread2 < 0)
{
fs->ferror = format_error("cannot read file - ", fs->glob.gl_pathv[fs->fidx]);
return -1;
}
if (bytesread2 < size - total_bytes)
{
/*
* We didn't read as much as we asked for. Check why.
* We could be done reading data, we may need to move
* on the reading the next data file (if any).
*/
if (total_bytes == 0)
{
if (bytesread2 == 0)
{
if (nextFile(fs))
return -1; /* found next file but failed to open */
continue;
}
updateCurFileState(fs, fo);
}
/*
* try to open the next file if any, and return the number of
* bytes read to buffer earlier, if next file was found but
* could not open return -1
*/
return nextFile(fs) ? -1 : total_bytes + bytesread2;
}
updateCurFileState(fs, fo);
/*
* Now that we have enough data in our filestream buffer, get a
* chunk of whole rows and copy it into our dest buffer to be sent
* out later.
*/
if (fs->options.is_csv)
{
/* CSV: go slow, scan byte-by-byte for record boundary */
p = scan_csv_records(dest, (char*)dest + size, 0, fs);
}
else
{
/*
* TEXT: go fast, scan for end of line delimiter (\n by default) for
* record boundary.
* find the last end of line delimiter from the back
*/
if (line_delim_length > 0)
{
p = find_last_eol_delim((char*)dest, size, line_delim_str, line_delim_length);
}
else
{
for (p = (char*)dest + size; (char*)dest <= --p && *p != '\n';)
;
}
p = (char*)dest <= p ? p + 1 : 0;
fs->line_number = 0;
}
/*
* could we not find even one complete row in this buffer? error.
*/
if (!p || (char*)dest + size >= p + buffer_capacity)
{
snprintf(err_buf, sizeof(err_buf), "line too long in file %s near (%lld bytes)",
fs->glob.gl_pathv[fs->fidx], (long long) fs->foff);
fs->ferror = err_buf;
gfile_printf_then_putc_newline("%s", err_buf);
return -1;
}
/* copy the result chunk of data into our buffer and we're done */
fs->buffer_cur_size = (char*)dest + size - p;
memcpy(fs->buffer, p, fs->buffer_cur_size);
fs->foff += p - (char*)dest;
return p - (char*)dest;
}
/*
* if we're here it means that we just want chunks of data and don't
* care if it includes whole rows or not (for example, backend url_read
* code - a segdb that reads all the data that gpfdist sent to it,
* buffer by buffer and then parses the lines internally).
*/
if (fs->buffer_cur_size)
{
ssize_t total_bytes = fs->buffer_cur_size;
updateCurFileState(fs, fo);
if (total_bytes > size)
total_bytes = size;
memcpy(dest, fs->buffer, total_bytes);
fs->buffer_cur_size -= total_bytes;
memmove(fs->buffer, fs->buffer + total_bytes, fs->buffer_cur_size);
fs->foff += total_bytes;
fs->line_number = 0;
return total_bytes;
}
bytesread = gfile_read(&fs->fd, dest, size);
if (bytesread < 0)
{
fs->ferror = format_error("cannot read file - ", fs->glob.gl_pathv[fs->fidx]);
return -1;
}
if (bytesread)
{
updateCurFileState(fs, fo);
fs->foff += bytesread;
fs->line_number = 0;
return bytesread;
}
if (nextFile(fs))
return -1;
}
}
int fstream_write(fstream_t *fs,
void *buf,
int size,
const int write_whole_lines,
const char* line_delim_str,
const int line_delim_length)
{
int byteswritten = 0;
if (fs->ferror)
return -1;
/*
* If we need to write all the data up to the last *complete* logical
* line in the data buffer (like gpfdist for example) - we choose this
* path (we don't want to write partial lines to the output file).
* We grab the bigger chunk we can get that includes whole lines.
* Otherwise, if we just want the whole buffer we skip.
*/
if (write_whole_lines)
{
char* last_delim;
/*
* scan for EOL Delimiter (\n by default) for record boundary
* find the last EOL Delimiter from the back
*/
if (line_delim_length > 0)
{
last_delim = find_last_eol_delim(buf, size, line_delim_str, line_delim_length);
}
else
{
for (last_delim = (char*)buf + size; (char*)buf <= --last_delim && *last_delim != '\n';)
;
}
last_delim = (char*)buf <= last_delim ? last_delim + 1 : 0;
if (last_delim == 0)
{
fs->ferror = "no complete data row found for writing";
return -1;
}
/* TODO: need to do this more carefully for CSV, like in the read case */
/* write data to destination file */
byteswritten = (int)gfile_write(&fs->fd, (char*) buf, last_delim - (char *)buf);
/* caller should move leftover data to start of buffer */
}
else
{
/* write data to destination file */
byteswritten = (int)gfile_write(&fs->fd, (char*) buf, size);
}
if (byteswritten < 0)
{
fs->ferror = "cannot write into file";
return -1;
}
return byteswritten;
}
int fstream_eof(fstream_t *fs)
{
return fs->fidx == fs->glob.gl_pathc;
}
int64_t fstream_get_compressed_size(fstream_t *fs)
{
return fs->compressed_size;
}
int64_t fstream_get_compressed_position(fstream_t *fs)
{
int64_t p = fs->compressed_position;
if (fs->fidx != fs->glob.gl_pathc)
p += gfile_get_compressed_position(&fs->fd);
return p;
}
/*
* fstream_rewind
*
* close the currently open file. open the first file in the file stream
* chain, reset state and start from scratch.
*/
int fstream_rewind(fstream_t *fs)
{
int response_code;
const char* response_string;
struct gpfxdist_t* transform = fs->options.transform;
fs->fidx = 0;
fs->foff = 0;
fs->line_number = 1;
fs->ferror = 0;
fs->skip_header_line = fs->options.header;
fs->buffer_cur_size = 0;
fs->compressed_position = 0;
gfile_close(&fs->fd);
if (gfile_open(&fs->fd, fs->glob.gl_pathv[0], GFILE_OPEN_FOR_READ,
&response_code, &response_string, transform))
{
gfile_printf_then_putc_newline("fstream unable to open file %s",
fs->glob.gl_pathv[0]);
fs->ferror = "unable to open file";
return -1;
}
return 0;
}