| /* Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| #define C_LUCY_OUTSTREAM |
| #include "Lucy/Util/ToolSet.h" |
| |
| #include "charmony.h" |
| |
| #include "Lucy/Store/OutStream.h" |
| #include "Lucy/Store/FileHandle.h" |
| #include "Lucy/Store/FSFileHandle.h" |
| #include "Lucy/Store/FileWindow.h" |
| #include "Lucy/Store/InStream.h" |
| #include "Lucy/Store/RAMFile.h" |
| #include "Lucy/Store/RAMFileHandle.h" |
| #include "Lucy/Util/NumberUtils.h" |
| |
| // Inlined version of OutStream_Write_Bytes. `len` must be between 0 and 2 GB. |
| static CFISH_INLINE void |
| SI_write_bytes(OutStream *self, OutStreamIVARS *ivars, |
| const void *bytes, int64_t len); |
| |
| // Inlined version of OutStream_Write_CU32. |
| static CFISH_INLINE void |
| SI_write_cu32(OutStream *self, OutStreamIVARS *ivars, uint32_t value); |
| |
| // Inlined version of OutStream_Write_CU64. |
| static CFISH_INLINE void |
| SI_write_cu64(OutStream *self, OutStreamIVARS *ivars, uint64_t value); |
| |
| // Flush content in the buffer to the FileHandle. |
| static void |
| S_flush(OutStream *self, OutStreamIVARS *ivars); |
| |
| OutStream* |
| OutStream_open(Obj *file) { |
| OutStream *self = (OutStream*)Class_Make_Obj(OUTSTREAM); |
| return OutStream_do_open(self, file); |
| } |
| |
| OutStream* |
| OutStream_do_open(OutStream *self, Obj *file) { |
| OutStreamIVARS *const ivars = OutStream_IVARS(self); |
| |
| // Init. |
| ivars->buf = (char*)MALLOCATE(IO_STREAM_BUF_SIZE); |
| ivars->buf_start = 0; |
| ivars->buf_pos = 0; |
| |
| // Obtain a FileHandle. |
| if (Obj_is_a(file, FILEHANDLE)) { |
| ivars->file_handle = (FileHandle*)INCREF(file); |
| } |
| else if (Obj_is_a(file, RAMFILE)) { |
| ivars->file_handle |
| = (FileHandle*)RAMFH_open(NULL, FH_WRITE_ONLY, (RAMFile*)file); |
| } |
| else if (Obj_is_a(file, STRING)) { |
| ivars->file_handle = (FileHandle*)FSFH_open((String*)file, |
| FH_WRITE_ONLY | FH_CREATE | FH_EXCLUSIVE); |
| } |
| else { |
| Err_set_error(Err_new(Str_newf("Invalid type for param 'file': '%o'", |
| Obj_get_class_name(file)))); |
| DECREF(self); |
| return NULL; |
| } |
| if (!ivars->file_handle) { |
| ERR_ADD_FRAME(Err_get_error()); |
| DECREF(self); |
| return NULL; |
| } |
| |
| // Derive filepath from FileHandle. |
| ivars->path = Str_Clone(FH_Get_Path(ivars->file_handle)); |
| |
| return self; |
| } |
| |
| void |
| OutStream_Destroy_IMP(OutStream *self) { |
| OutStreamIVARS *const ivars = OutStream_IVARS(self); |
| if (ivars->file_handle != NULL) { |
| // Inlined flush, ignoring errors. |
| if (ivars->buf_pos) { |
| FH_Write(ivars->file_handle, ivars->buf, ivars->buf_pos); |
| } |
| DECREF(ivars->file_handle); |
| } |
| DECREF(ivars->path); |
| FREEMEM(ivars->buf); |
| SUPER_DESTROY(self, OUTSTREAM); |
| } |
| |
| String* |
| OutStream_Get_Path_IMP(OutStream *self) { |
| return OutStream_IVARS(self)->path; |
| } |
| |
| void |
| OutStream_Absorb_IMP(OutStream *self, InStream *instream) { |
| OutStreamIVARS *const ivars = OutStream_IVARS(self); |
| char buf[IO_STREAM_BUF_SIZE]; |
| int64_t bytes_left = InStream_Length(instream); |
| |
| // Read blocks of content into an intermediate buffer, than write them to |
| // the OutStream. |
| // |
| // TODO: optimize by utilizing OutStream's buffer directly, while still |
| // not flushing too frequently and keeping code complexity under control. |
| OutStream_Grow(self, OutStream_Tell(self) + bytes_left); |
| while (bytes_left) { |
| const int64_t bytes_this_iter = bytes_left < IO_STREAM_BUF_SIZE |
| ? bytes_left |
| : IO_STREAM_BUF_SIZE; |
| InStream_Read_Bytes(instream, buf, (size_t)bytes_this_iter); |
| SI_write_bytes(self, ivars, buf, bytes_this_iter); |
| bytes_left -= bytes_this_iter; |
| } |
| } |
| |
| void |
| OutStream_Grow_IMP(OutStream *self, int64_t length) { |
| OutStreamIVARS *const ivars = OutStream_IVARS(self); |
| if (!FH_Grow(ivars->file_handle, length)) { |
| RETHROW(INCREF(Err_get_error())); |
| } |
| } |
| |
| int64_t |
| OutStream_Tell_IMP(OutStream *self) { |
| OutStreamIVARS *const ivars = OutStream_IVARS(self); |
| return ivars->buf_start + (int64_t)ivars->buf_pos; |
| } |
| |
| int64_t |
| OutStream_Align_IMP(OutStream *self, int64_t modulus) { |
| int64_t len = OutStream_Tell(self); |
| int64_t filler_bytes = (modulus - (len % modulus)) % modulus; |
| while (filler_bytes--) { OutStream_Write_U8(self, 0); } |
| return OutStream_Tell(self); |
| } |
| |
| void |
| OutStream_Flush_IMP(OutStream *self) { |
| OutStreamIVARS *const ivars = OutStream_IVARS(self); |
| S_flush(self, ivars); |
| } |
| |
| static void |
| S_flush(OutStream *self, OutStreamIVARS *ivars) { |
| UNUSED_VAR(self); |
| if (ivars->file_handle == NULL) { |
| THROW(ERR, "Can't write to a closed OutStream for %o", ivars->path); |
| } |
| if (!FH_Write(ivars->file_handle, ivars->buf, ivars->buf_pos)) { |
| RETHROW(INCREF(Err_get_error())); |
| } |
| ivars->buf_start += ivars->buf_pos; |
| ivars->buf_pos = 0; |
| } |
| |
| int64_t |
| OutStream_Length_IMP(OutStream *self) { |
| return OutStream_Tell_IMP(self); |
| } |
| |
| void |
| OutStream_Write_Bytes_IMP(OutStream *self, const void *bytes, size_t len) { |
| if (len >= INT32_MAX) { |
| THROW(ERR, "Can't write buffer longer than INT32_MAX: %u64", |
| (uint64_t)len); |
| } |
| SI_write_bytes(self, OutStream_IVARS(self), bytes, (int64_t)len); |
| } |
| |
| static CFISH_INLINE void |
| SI_write_bytes(OutStream *self, OutStreamIVARS *ivars, |
| const void *bytes, int64_t len) { |
| // If this data is larger than the buffer size, flush and write. |
| if (len >= IO_STREAM_BUF_SIZE) { |
| S_flush(self, ivars); |
| if (!FH_Write(ivars->file_handle, bytes, (size_t)len)) { |
| RETHROW(INCREF(Err_get_error())); |
| } |
| ivars->buf_start += len; |
| } |
| // If there's not enough room in the buffer, flush then add. |
| else if ((uint64_t)ivars->buf_pos + (uint64_t)len >= IO_STREAM_BUF_SIZE) { |
| S_flush(self, ivars); |
| memcpy((ivars->buf + ivars->buf_pos), bytes, (size_t)len); |
| ivars->buf_pos += (size_t)len; |
| } |
| // If there's room, just add these bytes to the buffer. |
| else { |
| memcpy((ivars->buf + ivars->buf_pos), bytes, (size_t)len); |
| ivars->buf_pos += (size_t)len; |
| } |
| } |
| |
| static CFISH_INLINE void |
| SI_write_u8(OutStream *self, OutStreamIVARS *ivars, uint8_t value) { |
| if (ivars->buf_pos >= IO_STREAM_BUF_SIZE) { |
| S_flush(self, ivars); |
| } |
| ivars->buf[ivars->buf_pos++] = (char)value; |
| } |
| |
| void |
| OutStream_Write_I8_IMP(OutStream *self, int8_t value) { |
| OutStreamIVARS *const ivars = OutStream_IVARS(self); |
| SI_write_u8(self, ivars, (uint8_t)value); |
| } |
| |
| void |
| OutStream_Write_U8_IMP(OutStream *self, uint8_t value) { |
| OutStreamIVARS *const ivars = OutStream_IVARS(self); |
| SI_write_u8(self, ivars, value); |
| } |
| |
| static CFISH_INLINE void |
| SI_write_u32(OutStream *self, OutStreamIVARS *ivars, uint32_t value) { |
| #ifdef CHY_BIG_END |
| SI_write_bytes(self, ivars, &value, 4); |
| #else |
| char buf[4]; |
| NumUtil_encode_bigend_u32(value, buf); |
| SI_write_bytes(self, ivars, buf, 4); |
| #endif |
| } |
| |
| void |
| OutStream_Write_I32_IMP(OutStream *self, int32_t value) { |
| SI_write_u32(self, OutStream_IVARS(self), (uint32_t)value); |
| } |
| |
| void |
| OutStream_Write_U32_IMP(OutStream *self, uint32_t value) { |
| SI_write_u32(self, OutStream_IVARS(self), value); |
| } |
| |
| static CFISH_INLINE void |
| SI_write_u64(OutStream *self, OutStreamIVARS *ivars, uint64_t value) { |
| #ifdef CHY_BIG_END |
| SI_write_bytes(self, ivars, &value, 8); |
| #else |
| char buf[sizeof(uint64_t)]; |
| NumUtil_encode_bigend_u64(value, buf); |
| SI_write_bytes(self, ivars, buf, sizeof(uint64_t)); |
| #endif |
| } |
| |
| void |
| OutStream_Write_I64_IMP(OutStream *self, int64_t value) { |
| SI_write_u64(self, OutStream_IVARS(self), (uint64_t)value); |
| } |
| |
| void |
| OutStream_Write_U64_IMP(OutStream *self, uint64_t value) { |
| SI_write_u64(self, OutStream_IVARS(self), value); |
| } |
| |
| void |
| OutStream_Write_F32_IMP(OutStream *self, float value) { |
| OutStreamIVARS *const ivars = OutStream_IVARS(self); |
| char buf[sizeof(float)]; |
| NumUtil_encode_bigend_f32(value, buf); |
| SI_write_bytes(self, ivars, buf, sizeof(float)); |
| } |
| |
| void |
| OutStream_Write_F64_IMP(OutStream *self, double value) { |
| OutStreamIVARS *const ivars = OutStream_IVARS(self); |
| char buf[sizeof(double)]; |
| NumUtil_encode_bigend_f64(value, buf); |
| SI_write_bytes(self, ivars, buf, sizeof(double)); |
| } |
| |
| void |
| OutStream_Write_CI32_IMP(OutStream *self, int32_t value) { |
| SI_write_cu32(self, OutStream_IVARS(self), (uint32_t)value); |
| } |
| |
| void |
| OutStream_Write_CU32_IMP(OutStream *self, uint32_t value) { |
| SI_write_cu32(self, OutStream_IVARS(self), value); |
| } |
| |
| static CFISH_INLINE void |
| SI_write_cu32(OutStream *self, OutStreamIVARS *ivars, uint32_t value) { |
| uint8_t buf[CU32_MAX_BYTES]; |
| uint8_t *ptr = buf + sizeof(buf) - 1; |
| |
| // Write last byte first, which has no continue bit. |
| *ptr = value & 0x7f; |
| value >>= 7; |
| |
| while (value) { |
| // Work backwards, writing bytes with continue bits set. |
| *--ptr = ((value & 0x7f) | 0x80); |
| value >>= 7; |
| } |
| |
| SI_write_bytes(self, ivars, ptr, (int64_t)((buf + sizeof(buf)) - ptr)); |
| } |
| |
| void |
| OutStream_Write_CI64_IMP(OutStream *self, int64_t value) { |
| SI_write_cu64(self, OutStream_IVARS(self), (uint64_t)value); |
| } |
| |
| void |
| OutStream_Write_CU64_IMP(OutStream *self, uint64_t value) { |
| SI_write_cu64(self, OutStream_IVARS(self), value); |
| } |
| |
| static CFISH_INLINE void |
| SI_write_cu64(OutStream *self, OutStreamIVARS *ivars, uint64_t value) { |
| uint8_t buf[CU64_MAX_BYTES]; |
| uint8_t *ptr = buf + sizeof(buf) - 1; |
| |
| // Write last byte first, which has no continue bit. |
| *ptr = value & 0x7f; |
| value >>= 7; |
| |
| while (value) { |
| // Work backwards, writing bytes with continue bits set. |
| *--ptr = ((value & 0x7f) | 0x80); |
| value >>= 7; |
| } |
| |
| SI_write_bytes(self, ivars, ptr, (int64_t)((buf + sizeof(buf)) - ptr)); |
| } |
| |
| void |
| OutStream_Write_String_IMP(OutStream *self, const char *string, size_t len) { |
| OutStreamIVARS *const ivars = OutStream_IVARS(self); |
| if (len >= INT32_MAX) { |
| THROW(ERR, "Can't write string longer than INT32_MAX: %u64", |
| (uint64_t)len); |
| } |
| SI_write_cu32(self, ivars, (uint32_t)len); |
| SI_write_bytes(self, ivars, string, (int64_t)len); |
| } |
| |
| void |
| OutStream_Close_IMP(OutStream *self) { |
| OutStreamIVARS *const ivars = OutStream_IVARS(self); |
| if (ivars->file_handle) { |
| S_flush(self, ivars); |
| if (!FH_Close(ivars->file_handle)) { |
| RETHROW(INCREF(Err_get_error())); |
| } |
| DECREF(ivars->file_handle); |
| ivars->file_handle = NULL; |
| } |
| } |
| |
| |