| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to you under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
| * implied. See the License for the specific language governing |
| * permissions and limitations under the License. |
| */ |
| |
| #include "avro/allocation.h" |
| #include "avro/refcount.h" |
| #include "avro/errors.h" |
| #include "avro/io.h" |
| #include "avro_private.h" |
| #include <stdio.h> |
| #include <stdlib.h> |
| #include <errno.h> |
| #include <string.h> |
| #include "dump.h" |
| |
| enum avro_io_type_t { |
| AVRO_FILE_IO, |
| AVRO_MEMORY_IO |
| }; |
| typedef enum avro_io_type_t avro_io_type_t; |
| |
| struct avro_reader_t_ { |
| avro_io_type_t type; |
| volatile int refcount; |
| }; |
| |
| struct avro_writer_t_ { |
| avro_io_type_t type; |
| volatile int refcount; |
| }; |
| |
| struct _avro_reader_file_t { |
| struct avro_reader_t_ reader; |
| FILE *fp; |
| int should_close; |
| char *cur; |
| char *end; |
| char buffer[4096]; |
| }; |
| |
| struct _avro_writer_file_t { |
| struct avro_writer_t_ writer; |
| FILE *fp; |
| int should_close; |
| }; |
| |
| struct _avro_reader_memory_t { |
| struct avro_reader_t_ reader; |
| const char *buf; |
| int64_t len; |
| int64_t read; |
| }; |
| |
| struct _avro_writer_memory_t { |
| struct avro_writer_t_ writer; |
| const char *buf; |
| int64_t len; |
| int64_t written; |
| }; |
| |
| #define avro_io_typeof(obj) ((obj)->type) |
| #define is_memory_io(obj) (obj && avro_io_typeof(obj) == AVRO_MEMORY_IO) |
| #define is_file_io(obj) (obj && avro_io_typeof(obj) == AVRO_FILE_IO) |
| |
| #define avro_reader_to_memory(reader_) container_of(reader_, struct _avro_reader_memory_t, reader) |
| #define avro_reader_to_file(reader_) container_of(reader_, struct _avro_reader_file_t, reader) |
| #define avro_writer_to_memory(writer_) container_of(writer_, struct _avro_writer_memory_t, writer) |
| #define avro_writer_to_file(writer_) container_of(writer_, struct _avro_writer_file_t, writer) |
| |
| static void reader_init(avro_reader_t reader, avro_io_type_t type) |
| { |
| reader->type = type; |
| avro_refcount_set(&reader->refcount, 1); |
| } |
| |
| static void writer_init(avro_writer_t writer, avro_io_type_t type) |
| { |
| writer->type = type; |
| avro_refcount_set(&writer->refcount, 1); |
| } |
| |
| avro_reader_t avro_reader_file_fp(FILE * fp, int should_close) |
| { |
| struct _avro_reader_file_t *file_reader = |
| (struct _avro_reader_file_t *) avro_new(struct _avro_reader_file_t); |
| if (!file_reader) { |
| avro_set_error("Cannot allocate new file reader"); |
| return NULL; |
| } |
| memset(file_reader, 0, sizeof(struct _avro_reader_file_t)); |
| file_reader->fp = fp; |
| file_reader->should_close = should_close; |
| reader_init(&file_reader->reader, AVRO_FILE_IO); |
| return &file_reader->reader; |
| } |
| |
| avro_reader_t avro_reader_file(FILE * fp) |
| { |
| return avro_reader_file_fp(fp, 1); |
| } |
| |
| avro_writer_t avro_writer_file_fp(FILE * fp, int should_close) |
| { |
| struct _avro_writer_file_t *file_writer = |
| (struct _avro_writer_file_t *) avro_new(struct _avro_writer_file_t); |
| if (!file_writer) { |
| avro_set_error("Cannot allocate new file writer"); |
| return NULL; |
| } |
| file_writer->fp = fp; |
| file_writer->should_close = should_close; |
| writer_init(&file_writer->writer, AVRO_FILE_IO); |
| return &file_writer->writer; |
| } |
| |
| avro_writer_t avro_writer_file(FILE * fp) |
| { |
| return avro_writer_file_fp(fp, 1); |
| } |
| |
| avro_reader_t avro_reader_memory(const char *buf, int64_t len) |
| { |
| struct _avro_reader_memory_t *mem_reader = |
| (struct _avro_reader_memory_t *) avro_new(struct _avro_reader_memory_t); |
| if (!mem_reader) { |
| avro_set_error("Cannot allocate new memory reader"); |
| return NULL; |
| } |
| mem_reader->buf = buf; |
| mem_reader->len = len; |
| mem_reader->read = 0; |
| reader_init(&mem_reader->reader, AVRO_MEMORY_IO); |
| return &mem_reader->reader; |
| } |
| |
| void |
| avro_reader_memory_set_source(avro_reader_t reader, const char *buf, int64_t len) |
| { |
| if (is_memory_io(reader)) { |
| struct _avro_reader_memory_t *mem_reader = avro_reader_to_memory(reader); |
| mem_reader->buf = buf; |
| mem_reader->len = len; |
| mem_reader->read = 0; |
| } |
| } |
| |
| avro_writer_t avro_writer_memory(const char *buf, int64_t len) |
| { |
| struct _avro_writer_memory_t *mem_writer = |
| (struct _avro_writer_memory_t *) avro_new(struct _avro_writer_memory_t); |
| if (!mem_writer) { |
| avro_set_error("Cannot allocate new memory writer"); |
| return NULL; |
| } |
| mem_writer->buf = buf; |
| mem_writer->len = len; |
| mem_writer->written = 0; |
| writer_init(&mem_writer->writer, AVRO_MEMORY_IO); |
| return &mem_writer->writer; |
| } |
| |
| void |
| avro_writer_memory_set_dest(avro_writer_t writer, const char *buf, int64_t len) |
| { |
| if (is_memory_io(writer)) { |
| struct _avro_writer_memory_t *mem_writer = avro_writer_to_memory(writer); |
| mem_writer->buf = buf; |
| mem_writer->len = len; |
| mem_writer->written = 0; |
| } |
| } |
| |
| static int |
| avro_read_memory(struct _avro_reader_memory_t *reader, void *buf, int64_t len) |
| { |
| if (len > 0) { |
| if ((reader->len - reader->read) < len) { |
| avro_prefix_error("Cannot read %" PRIsz " bytes from memory buffer", |
| (size_t) len); |
| return ENOSPC; |
| } |
| memcpy(buf, reader->buf + reader->read, len); |
| reader->read += len; |
| } |
| return 0; |
| } |
| |
| #define bytes_available(reader) (reader->end - reader->cur) |
| #define buffer_reset(reader) {reader->cur = reader->end = reader->buffer;} |
| |
| static int |
| avro_read_file(struct _avro_reader_file_t *reader, void *buf, int64_t len) |
| { |
| int64_t needed = len; |
| char *p = (char *) buf; |
| int rval; |
| |
| if (len == 0) { |
| return 0; |
| } |
| |
| if (needed > (int64_t) sizeof(reader->buffer)) { |
| if (bytes_available(reader) > 0) { |
| memcpy(p, reader->cur, bytes_available(reader)); |
| p += bytes_available(reader); |
| needed -= bytes_available(reader); |
| buffer_reset(reader); |
| } |
| rval = fread(p, 1, needed, reader->fp); |
| if (rval != needed) { |
| avro_set_error("Cannot read %" PRIsz " bytes from file", |
| (size_t) needed); |
| return EILSEQ; |
| } |
| return 0; |
| } else if (needed <= bytes_available(reader)) { |
| memcpy(p, reader->cur, needed); |
| reader->cur += needed; |
| return 0; |
| } else { |
| memcpy(p, reader->cur, bytes_available(reader)); |
| p += bytes_available(reader); |
| needed -= bytes_available(reader); |
| |
| rval = |
| fread(reader->buffer, 1, sizeof(reader->buffer), |
| reader->fp); |
| if (rval == 0) { |
| avro_set_error("Cannot read %" PRIsz " bytes from file", |
| (size_t) needed); |
| return EILSEQ; |
| } |
| reader->cur = reader->buffer; |
| reader->end = reader->cur + rval; |
| |
| if (bytes_available(reader) < needed) { |
| avro_set_error("Cannot read %" PRIsz " bytes from file", |
| (size_t) needed); |
| return EILSEQ; |
| } |
| memcpy(p, reader->cur, needed); |
| reader->cur += needed; |
| return 0; |
| } |
| avro_set_error("Cannot read %" PRIsz " bytes from file", |
| (size_t) needed); |
| return EILSEQ; |
| } |
| |
| int avro_read(avro_reader_t reader, void *buf, int64_t len) |
| { |
| if (buf && len >= 0) { |
| if (is_memory_io(reader)) { |
| return avro_read_memory(avro_reader_to_memory(reader), |
| buf, len); |
| } else if (is_file_io(reader)) { |
| return avro_read_file(avro_reader_to_file(reader), buf, |
| len); |
| } |
| } |
| return EINVAL; |
| } |
| |
| static int avro_skip_memory(struct _avro_reader_memory_t *reader, int64_t len) |
| { |
| if (len > 0) { |
| if ((reader->len - reader->read) < len) { |
| avro_set_error("Cannot skip %" PRIsz " bytes in memory buffer", |
| (size_t) len); |
| return ENOSPC; |
| } |
| reader->read += len; |
| } |
| return 0; |
| } |
| |
| static int avro_skip_file(struct _avro_reader_file_t *reader, int64_t len) |
| { |
| int rval; |
| int64_t needed = len; |
| |
| if (len == 0) { |
| return 0; |
| } |
| if (needed <= bytes_available(reader)) { |
| reader->cur += needed; |
| } else { |
| needed -= bytes_available(reader); |
| buffer_reset(reader); |
| rval = fseek(reader->fp, needed, SEEK_CUR); |
| if (rval < 0) { |
| avro_set_error("Cannot skip %" PRIsz " bytes in file", |
| (size_t) len); |
| return rval; |
| } |
| } |
| return 0; |
| } |
| |
| int avro_skip(avro_reader_t reader, int64_t len) |
| { |
| if (len >= 0) { |
| if (is_memory_io(reader)) { |
| return avro_skip_memory(avro_reader_to_memory(reader), |
| len); |
| } else if (is_file_io(reader)) { |
| return avro_skip_file(avro_reader_to_file(reader), len); |
| } |
| } |
| return 0; |
| } |
| |
| static int |
| avro_write_memory(struct _avro_writer_memory_t *writer, void *buf, int64_t len) |
| { |
| if (len) { |
| if ((writer->len - writer->written) < len) { |
| avro_set_error("Cannot write %" PRIsz " bytes in memory buffer", |
| (size_t) len); |
| return ENOSPC; |
| } |
| memcpy((void *)(writer->buf + writer->written), buf, len); |
| writer->written += len; |
| } |
| return 0; |
| } |
| |
| static int |
| avro_write_file(struct _avro_writer_file_t *writer, void *buf, int64_t len) |
| { |
| int rval; |
| if (len > 0) { |
| rval = fwrite(buf, len, 1, writer->fp); |
| if (rval == 0) { |
| return feof(writer->fp) ? EOF : 0; |
| } |
| } |
| return 0; |
| } |
| |
| int avro_write(avro_writer_t writer, void *buf, int64_t len) |
| { |
| if (buf && len >= 0) { |
| if (is_memory_io(writer)) { |
| return avro_write_memory(avro_writer_to_memory(writer), |
| buf, len); |
| } else if (is_file_io(writer)) { |
| return avro_write_file(avro_writer_to_file(writer), buf, |
| len); |
| } |
| } |
| return EINVAL; |
| } |
| |
| void |
| avro_reader_reset(avro_reader_t reader) |
| { |
| if (is_memory_io(reader)) { |
| avro_reader_to_memory(reader)->read = 0; |
| } |
| } |
| |
| void avro_writer_reset(avro_writer_t writer) |
| { |
| if (is_memory_io(writer)) { |
| avro_writer_to_memory(writer)->written = 0; |
| } |
| } |
| |
| int64_t avro_writer_tell(avro_writer_t writer) |
| { |
| if (is_memory_io(writer)) { |
| return avro_writer_to_memory(writer)->written; |
| } |
| return EINVAL; |
| } |
| |
| void avro_writer_flush(avro_writer_t writer) |
| { |
| if (is_file_io(writer)) { |
| fflush(avro_writer_to_file(writer)->fp); |
| } |
| } |
| |
| void avro_writer_dump(avro_writer_t writer, FILE * fp) |
| { |
| if (is_memory_io(writer)) { |
| dump(fp, (char *)avro_writer_to_memory(writer)->buf, |
| avro_writer_to_memory(writer)->written); |
| } |
| } |
| |
| void avro_reader_dump(avro_reader_t reader, FILE * fp) |
| { |
| if (is_memory_io(reader)) { |
| dump(fp, (char *)avro_reader_to_memory(reader)->buf, |
| avro_reader_to_memory(reader)->read); |
| } |
| } |
| |
| void avro_reader_free(avro_reader_t reader) |
| { |
| if (is_memory_io(reader)) { |
| avro_freet(struct _avro_reader_memory_t, reader); |
| } else if (is_file_io(reader)) { |
| if (avro_reader_to_file(reader)->should_close) { |
| fclose(avro_reader_to_file(reader)->fp); |
| } |
| avro_freet(struct _avro_reader_file_t, reader); |
| } |
| } |
| |
| void avro_writer_free(avro_writer_t writer) |
| { |
| if (is_memory_io(writer)) { |
| avro_freet(struct _avro_writer_memory_t, writer); |
| } else if (is_file_io(writer)) { |
| if (avro_writer_to_file(writer)->should_close) { |
| fclose(avro_writer_to_file(writer)->fp); |
| } |
| avro_freet(struct _avro_writer_file_t, writer); |
| } |
| } |
| |
| int avro_reader_is_eof(avro_reader_t reader) |
| { |
| if (is_file_io(reader)) { |
| struct _avro_reader_file_t *file = avro_reader_to_file(reader); |
| if (feof(file->fp)) { |
| return file->cur == file->end; |
| } |
| } |
| return 0; |
| } |