blob: 5f1445f437a8b52ef9f39b7e2336fc327d970f7a [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.gobblin.service;
import java.io.Closeable;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.net.URI;
import java.util.List;
import java.util.concurrent.Future;
import java.util.Properties;
import org.apache.commons.lang3.reflect.ConstructorUtils;
import org.slf4j.Logger;
import com.google.common.base.Optional;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import com.typesafe.config.Config;
import org.apache.gobblin.metrics.reporter.util.AvroBinarySerializer;
import org.apache.gobblin.metrics.reporter.util.AvroSerializer;
import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter;
import org.apache.gobblin.runtime.api.JobSpec;
import org.apache.gobblin.runtime.api.Spec;
import org.apache.gobblin.runtime.api.SpecExecutor;
import org.apache.gobblin.runtime.api.SpecProducer;
import org.apache.gobblin.runtime.job_spec.AvroJobSpec;
import org.apache.gobblin.util.ConfigUtils;
import org.apache.gobblin.writer.AsyncDataWriter;
import org.apache.gobblin.writer.WriteCallback;
import javax.annotation.concurrent.NotThreadSafe;
import lombok.extern.slf4j.Slf4j;
@Slf4j
@NotThreadSafe
public class SimpleKafkaSpecProducer implements SpecProducer<Spec>, Closeable {
private static final String KAFKA_DATA_WRITER_CLASS_KEY = "spec.kafka.dataWriterClass";
private static final String DEFAULT_KAFKA_DATA_WRITER_CLASS =
"org.apache.gobblin.kafka.writer.Kafka08DataWriter";
// Producer
protected AsyncDataWriter<byte[]> _kafkaProducer;
private final AvroSerializer<AvroJobSpec> _serializer;
private Config _config;
private final String _kafkaProducerClassName;
public SimpleKafkaSpecProducer(Config config, Optional<Logger> log) {
_kafkaProducerClassName = ConfigUtils.getString(config, KAFKA_DATA_WRITER_CLASS_KEY,
DEFAULT_KAFKA_DATA_WRITER_CLASS);
try {
_serializer = new AvroBinarySerializer<>(AvroJobSpec.SCHEMA$, new FixedSchemaVersionWriter());
_config = config;
} catch (IOException e) {
throw new RuntimeException("Could not create AvroBinarySerializer", e);
}
}
public SimpleKafkaSpecProducer(Config config, Logger log) {
this(config, Optional.of(log));
}
/** Constructor with no logging */
public SimpleKafkaSpecProducer(Config config) {
this(config, Optional.<Logger>absent());
}
@Override
public Future<?> addSpec(Spec addedSpec) {
AvroJobSpec avroJobSpec = convertToAvroJobSpec(addedSpec, SpecExecutor.Verb.ADD);
log.info("Adding Spec: " + addedSpec + " using Kafka.");
return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), new KafkaWriteCallback(avroJobSpec));
}
@Override
public Future<?> updateSpec(Spec updatedSpec) {
AvroJobSpec avroJobSpec = convertToAvroJobSpec(updatedSpec, SpecExecutor.Verb.UPDATE);
log.info("Updating Spec: " + updatedSpec + " using Kafka.");
return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), new KafkaWriteCallback(avroJobSpec));
}
@Override
public Future<?> deleteSpec(URI deletedSpecURI, Properties headers) {
AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString())
.setMetadata(ImmutableMap.of(SpecExecutor.VERB_KEY, SpecExecutor.Verb.DELETE.name()))
.setProperties(Maps.fromProperties(headers)).build();
log.info("Deleting Spec: " + deletedSpecURI + " using Kafka.");
return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), new KafkaWriteCallback(avroJobSpec));
}
@Override
public Future<?> cancelJob(URI deletedSpecURI, Properties properties) {
AvroJobSpec avroJobSpec = AvroJobSpec.newBuilder().setUri(deletedSpecURI.toString())
.setMetadata(ImmutableMap.of(SpecExecutor.VERB_KEY, SpecExecutor.Verb.CANCEL.name()))
.setProperties(Maps.fromProperties(properties)).build();
log.info("Cancelling job: " + deletedSpecURI + " using Kafka.");
return getKafkaProducer().write(_serializer.serializeRecord(avroJobSpec), new KafkaWriteCallback(avroJobSpec));
}
@Override
public Future<? extends List<Spec>> listSpecs() {
throw new UnsupportedOperationException();
}
@Override
public void close() throws IOException {
_kafkaProducer.close();
}
private AsyncDataWriter<byte[]> getKafkaProducer() {
if (null == _kafkaProducer) {
try {
Class<?> kafkaProducerClass = (Class<?>) Class.forName(_kafkaProducerClassName);
_kafkaProducer = (AsyncDataWriter<byte[]>) ConstructorUtils.invokeConstructor(kafkaProducerClass,
ConfigUtils.configToProperties(_config));
} catch (ClassNotFoundException | NoSuchMethodException | IllegalAccessException | InstantiationException | InvocationTargetException e) {
log.error("Failed to instantiate Kafka consumer from class " + _kafkaProducerClassName, e);
throw new RuntimeException("Failed to instantiate Kafka consumer", e);
}
}
return _kafkaProducer;
}
private AvroJobSpec convertToAvroJobSpec(Spec spec, SpecExecutor.Verb verb) {
if (spec instanceof JobSpec) {
JobSpec jobSpec = (JobSpec) spec;
AvroJobSpec.Builder avroJobSpecBuilder = AvroJobSpec.newBuilder();
avroJobSpecBuilder.setUri(jobSpec.getUri().toString()).setVersion(jobSpec.getVersion())
.setDescription(jobSpec.getDescription()).setProperties(Maps.fromProperties(jobSpec.getConfigAsProperties()))
.setMetadata(ImmutableMap.of(SpecExecutor.VERB_KEY, verb.name()));
if (jobSpec.getTemplateURI().isPresent()) {
avroJobSpecBuilder.setTemplateUri(jobSpec.getTemplateURI().get().toString());
}
return avroJobSpecBuilder.build();
} else {
throw new RuntimeException("Unsupported spec type " + spec.getClass());
}
}
static class KafkaWriteCallback implements WriteCallback {
AvroJobSpec avroJobSpec;
KafkaWriteCallback(AvroJobSpec avroJobSpec) {
this.avroJobSpec = avroJobSpec;
}
@Override
public void onSuccess(Object result) {
}
@Override
public void onFailure(Throwable throwable) {
log.error("Error while writing the following record to Kafka {}", avroJobSpec.toString(), throwable);
}
}
}