blob: f65529b4cfed12e049d15f102c9108d40828ac6f [file] [log] [blame]
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.crunch.kafka.offset.hdfs;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.lang.StringUtils;
import org.apache.crunch.kafka.offset.AbstractOffsetWriter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.kafka.common.TopicPartition;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
/**
* Offset writer implementation that stores the offsets in HDFS.
*/
public class HDFSOffsetWriter extends AbstractOffsetWriter {
private static final Logger LOG = LoggerFactory.getLogger(HDFSOffsetWriter.class);
/**
* Custom formatter for translating the times into valid file names.
*/
public static final String PERSIST_TIME_FORMAT = "yyyy-MM-dd'T'HH-mm-ssZ";
/**
* Formatter to use when creating the file names in a URI compliant format.
*/
public static final DateTimeFormatter FILE_FORMATTER = DateTimeFormat.forPattern(PERSIST_TIME_FORMAT).withZoneUTC();
/**
* File extension for storing the offsets.
*/
public static final String FILE_FORMAT_EXTENSION = ".json";
/**
* Configuration for the underlying storage.
*/
private final Configuration config;
/**
* Mapper for converting data into JSON
*/
private static final ObjectMapper MAPPER = new ObjectMapper();
/**
* Base storage path for offset data
*/
private final Path baseStoragePath;
/**
* Creates a writer instance for interacting with the storage specified by the {@code config} and with
* the base storage path of {@code baseStoragePath}.
*
* @param config the config for interacting with the underlying data store.
* @param baseStoragePath the base storage path for offset information.
* @throws IllegalArgumentException if either argument is {@code null}.
*/
public HDFSOffsetWriter(Configuration config, Path baseStoragePath) {
if (config == null) {
throw new IllegalArgumentException("The 'config' cannot be 'null'.");
}
if (baseStoragePath == null) {
throw new IllegalArgumentException("The 'baseStoragePath' cannot be 'null'.");
}
this.config = config;
this.baseStoragePath = baseStoragePath;
}
@Override
public void write(long asOfTime, Map<TopicPartition, Long> offsets) throws IOException {
if (offsets == null) {
throw new IllegalArgumentException("The 'offsets' cannot be 'null'.");
}
if (asOfTime < 0) {
throw new IllegalArgumentException("The 'asOfTime' cannot be less than 0.");
}
List<Offsets.PartitionOffset> partitionOffsets = new LinkedList<>();
for(Map.Entry<TopicPartition, Long> entry: offsets.entrySet()){
partitionOffsets.add(Offsets.PartitionOffset.Builder.newBuilder().setOffset(entry.getValue())
.setTopic(entry.getKey().topic())
.setPartition(entry.getKey().partition()).build());
}
Offsets storageOffsets = Offsets.Builder.newBuilder().setOffsets(partitionOffsets)
.setAsOfTime(asOfTime).build();
FileSystem fs = getFileSystem();
Path offsetPath = getPersistedTimeStoragePath(baseStoragePath, asOfTime);
LOG.debug("Writing offsets to {} with as of time {}", offsetPath, asOfTime);
try (FSDataOutputStream fsDataOutputStream = fs.create(getPersistedTimeStoragePath(baseStoragePath, asOfTime), true)) {
MAPPER.writeValue(fsDataOutputStream, storageOffsets);
fsDataOutputStream.flush();
}
LOG.debug("Completed writing offsets to {}", offsetPath);
}
@Override
public void close() throws IOException {
//no-op
}
/**
* Returns the {@link FileSystem} instance for writing data. Callers are not responsible for closing the instance.
*
* @return the {@link FileSystem} instance for writing data.
* @throws IOException error retrieving underlying file system.
*/
protected FileSystem getFileSystem() throws IOException {
return FileSystem.get(config);
}
/**
* Creates a {@link Path} for storing the offsets for a specified {@code persistedTime}.
*
* @param baseStoragePath The base path the offsets will be stored at.
* @param persistedTime the time of the data being persisted.
* @return The path to where the offset information should be stored.
* @throws IllegalArgumentException if the {@code baseStoragePath} is {@code null}.
*/
public static Path getPersistedTimeStoragePath(Path baseStoragePath, long persistedTime) {
if (baseStoragePath == null) {
throw new IllegalArgumentException("The 'baseStoragePath' cannot be 'null'.");
}
return new Path(baseStoragePath, persistenceTimeToFileName(persistedTime));
}
/**
* Converts a {@code fileName} into the time the offsets were persisted.
*
* @param fileName the file name to parse.
* @return the time in milliseconds since epoch that the offsets were stored.
* @throws IllegalArgumentException if the {@code fileName} is not of the correct format or is {@code null} or
* empty.
*/
public static long fileNameToPersistenceTime(String fileName) {
if (StringUtils.isBlank(fileName)) {
throw new IllegalArgumentException("the 'fileName' cannot be 'null' or empty");
}
String formattedTimeString = StringUtils.strip(fileName, FILE_FORMAT_EXTENSION);
DateTime persistedTime = FILE_FORMATTER.parseDateTime(formattedTimeString);
return persistedTime.getMillis();
}
/**
* Converts a {@code persistedTime} into a file name for persisting the offsets.
*
* @param persistedTime the persisted time to use to generate the file name.
* @return the file name to use when persisting the data.
*/
public static String persistenceTimeToFileName(long persistedTime) {
DateTime dateTime = new DateTime(persistedTime, DateTimeZone.UTC);
String formattedTime = FILE_FORMATTER.print(dateTime);
return formattedTime + FILE_FORMAT_EXTENSION;
}
}