blob: edc503fde742571f75160fa3ac4a6bf6d442a15d [file] [log] [blame]
/*
* Copyright 2017 HugeGraph Authors
*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package com.baidu.hugegraph.loader.flink;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import org.apache.flink.api.common.io.ParseException;
import org.apache.flink.api.common.io.RichOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.util.ExecutorThreadFactory;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode;
import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.ObjectMapper;
import org.slf4j.Logger;
import com.baidu.hugegraph.driver.GraphManager;
import com.baidu.hugegraph.loader.builder.EdgeBuilder;
import com.baidu.hugegraph.loader.builder.ElementBuilder;
import com.baidu.hugegraph.loader.builder.VertexBuilder;
import com.baidu.hugegraph.loader.constant.Constants;
import com.baidu.hugegraph.loader.exception.LoadException;
import com.baidu.hugegraph.loader.executor.LoadContext;
import com.baidu.hugegraph.loader.executor.LoadOptions;
import com.baidu.hugegraph.loader.mapping.EdgeMapping;
import com.baidu.hugegraph.loader.mapping.ElementMapping;
import com.baidu.hugegraph.loader.mapping.InputStruct;
import com.baidu.hugegraph.loader.mapping.VertexMapping;
import com.baidu.hugegraph.structure.GraphElement;
import com.baidu.hugegraph.structure.graph.BatchEdgeRequest;
import com.baidu.hugegraph.structure.graph.BatchVertexRequest;
import com.baidu.hugegraph.structure.graph.Edge;
import com.baidu.hugegraph.structure.graph.UpdateStrategy;
import com.baidu.hugegraph.structure.graph.Vertex;
import com.baidu.hugegraph.util.Log;
import io.debezium.data.Envelope;
public class HugeGraphOutputFormat<T> extends RichOutputFormat<T> {
private static final Logger LOG = Log.logger(HugeGraphOutputFormat.class);
private static final long serialVersionUID = -4514164348993670086L;
private LoadContext loadContext;
private transient ScheduledExecutorService scheduler;
private transient ScheduledFuture<?> scheduledFuture;
private transient volatile boolean closed = false;
private final LoadOptions loadOptions;
private final InputStruct struct;
private Map<ElementBuilder, List<String>> builders;
public HugeGraphOutputFormat(InputStruct struct, String[] args) {
this.struct = struct;
this.loadOptions = LoadOptions.parseOptions(args);
}
private Map<ElementBuilder, List<String>> initBuilders() {
LoadContext loadContext = new LoadContext(this.loadOptions);
Map<ElementBuilder, List<String>> builders = new HashMap<>();
for (VertexMapping vertexMapping : this.struct.vertices()) {
builders.put(new VertexBuilder(loadContext, this.struct, vertexMapping),
new ArrayList<>());
}
for (EdgeMapping edgeMapping : this.struct.edges()) {
builders.put(new EdgeBuilder(loadContext, this.struct, edgeMapping),
new ArrayList<>());
}
loadContext.updateSchemaCache();
return builders;
}
@Override
public void configure(Configuration configuration) {
// pass
}
@Override
public void open(int taskNumber, int numTasks) {
this.builders = initBuilders();
this.loadContext = new LoadContext(this.loadOptions);
int flushIntervalMs = this.loadOptions.flushIntervalMs;
if (flushIntervalMs > 0) {
this.scheduler = new ScheduledThreadPoolExecutor(1, new ExecutorThreadFactory(
"hugegraph-streamload-outputformat"));
this.scheduledFuture = this.scheduler.scheduleWithFixedDelay(
this::flushAll, flushIntervalMs, flushIntervalMs, TimeUnit.MILLISECONDS);
}
}
private synchronized void flushAll() {
if (this.closed) {
return;
}
try {
for (Map.Entry<ElementBuilder, List<String>> builder : this.builders.entrySet()) {
List<String> graphElements = builder.getValue();
if (graphElements.size() > 0) {
flush(builder.getKey(), graphElements);
}
}
} catch (Exception e) {
throw new LoadException("Failed to flush all data.", e);
}
}
@Override
public synchronized void writeRecord(T row) throws IOException {
for (Map.Entry<ElementBuilder, List<String>> builder :
this.builders.entrySet()) {
ElementMapping elementMapping = builder.getKey().mapping();
if (elementMapping.skip()) {
continue;
}
// Add batch
List<String> graphElements = builder.getValue();
graphElements.add(row.toString());
if (graphElements.size() >= elementMapping.batchSize()) {
flush(builder.getKey(), builder.getValue());
}
}
}
private Tuple2<String, List<GraphElement>> buildGraphData(ElementBuilder elementBuilder,
String row) {
JsonNode node;
try {
node = new ObjectMapper().readTree(row);
} catch (JsonProcessingException e) {
throw new ParseException(row, e);
}
JsonNode data = node.get(Constants.CDC_DATA);
String op = node.get(Constants.CDC_OP).asText();
String[] fields = this.struct.input().header();
String[] values = new String[data.size()];
for (int i = 0; i < fields.length; i++) {
values[i] = data.get(fields[i]).asText();
}
return Tuple2.of(op, elementBuilder.build(fields, values));
}
private void flush(ElementBuilder elementBuilder, List<String> rows) {
GraphManager g = this.loadContext.client().graph();
ElementMapping elementMapping = elementBuilder.mapping();
for (String row : rows) {
Tuple2<String, List<GraphElement>> graphData = buildGraphData(elementBuilder, row);
List<GraphElement> graphElements = graphData.f1;
boolean isVertex = elementBuilder.mapping().type().isVertex();
switch (Envelope.Operation.forCode(graphData.f0)) {
case READ:
case CREATE:
if (isVertex) {
g.addVertices((List<Vertex>) (Object) graphElements);
} else {
g.addEdges((List<Edge>) (Object) graphElements);
}
break;
case UPDATE:
Map<String, UpdateStrategy> updateStrategyMap =
elementMapping.updateStrategies();
if (isVertex) {
BatchVertexRequest.Builder req = new BatchVertexRequest.Builder();
req.vertices((List<Vertex>) (Object) graphElements)
.updatingStrategies(updateStrategyMap)
.createIfNotExist(true);
g.updateVertices(req.build());
} else {
BatchEdgeRequest.Builder req = new BatchEdgeRequest.Builder();
req.edges((List<Edge>) (Object) graphElements)
.updatingStrategies(updateStrategyMap)
.checkVertex(this.loadOptions.checkVertex)
.createIfNotExist(true);
g.updateEdges(req.build());
}
break;
case DELETE:
String id = graphElements.get(0).id().toString();
if (isVertex) {
g.removeVertex(id);
} else {
g.removeEdge(id);
}
break;
default:
throw new IllegalArgumentException(
"The type of `op` should be 'c' 'r' 'u' 'd' only");
}
}
rows.clear();
}
@Override
public synchronized void close() {
if (this.closed) {
return;
}
this.closed = true;
if (this.scheduledFuture != null) {
this.scheduledFuture.cancel(false);
this.scheduler.shutdown();
}
}
}