[CARBONDATA-3557] Support write Flink streaming data to Carbon
The write process is:
1. For every checkpoint in each Flink task, write data to local file system by StreamingFileSink and carbon SDK;
2. Copy local carbon data file to carbon data store system, such as HDFS, S3;
3. Generate and write metadata file and success file to ${tablePath}/Metadata/stage folder as a commit;
This closes #3421
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
index 5ae72fc..8d9cb57 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/UnsafeMemoryDMStore.java
@@ -231,10 +231,12 @@
getUnsafe().putInt(memoryBlock.getBaseObject(),
memoryBlock.getBaseOffset() + startOffset + schema.getBytePosition(), varPosition);
runningLength += 4;
- getUnsafe().copyMemory(data, BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(),
- memoryBlock.getBaseOffset() + startOffset + varPosition, data.length);
- runningLength += data.length;
- varPosition += data.length;
+ if (data != null) {
+ getUnsafe().copyMemory(data, BYTE_ARRAY_OFFSET, memoryBlock.getBaseObject(),
+ memoryBlock.getBaseOffset() + startOffset + varPosition, data.length);
+ runningLength += data.length;
+ varPosition += data.length;
+ }
return varPosition;
default:
throw new UnsupportedOperationException(
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
index 52cd8cc..e83985f 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/blockletindex/BlockDataMap.java
@@ -130,7 +130,7 @@
Path path = new Path(blockletDataMapInfo.getFilePath());
// store file path only in case of partition table, non transactional table and flat folder
// structure
- byte[] filePath = null;
+ byte[] filePath;
boolean isPartitionTable = blockletDataMapInfo.getCarbonTable().isHivePartitionTable();
if (isPartitionTable || !blockletDataMapInfo.getCarbonTable().isTransactionalTable() ||
blockletDataMapInfo.getCarbonTable().isSupportFlatFolder() ||
@@ -139,6 +139,8 @@
blockletDataMapInfo.getCarbonTable().getTablePath())) {
filePath = path.getParent().toString().getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
isFilePathStored = true;
+ } else {
+ filePath = new byte[0];
}
byte[] fileName = path.getName().getBytes(CarbonCommonConstants.DEFAULT_CHARSET);
byte[] segmentId =
@@ -506,7 +508,7 @@
if (null != summaryRow) {
summaryRow.setByteArray(fileName, SUMMARY_INDEX_FILE_NAME);
summaryRow.setByteArray(segmentId, SUMMARY_SEGMENTID);
- if (null != filePath) {
+ if (filePath.length > 0) {
summaryRow.setByteArray(filePath, SUMMARY_INDEX_PATH);
}
try {
@@ -635,8 +637,14 @@
// dummy value
return 0;
} else {
- return ByteBuffer.wrap(getBlockletRowCountForEachBlock()).getShort(
- index * CarbonCommonConstants.SHORT_SIZE_IN_BYTE);
+ final byte[] bytes = getBlockletRowCountForEachBlock();
+ // if the segment data is written in tablepath
+ // then the reuslt of getBlockletRowCountForEachBlock will be empty.
+ if (bytes.length == 0) {
+ return 0;
+ } else {
+ return ByteBuffer.wrap(bytes).getShort(index * CarbonCommonConstants.SHORT_SIZE_IN_BYTE);
+ }
}
}
diff --git a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
index 683d2ef..22eca8a 100644
--- a/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
+++ b/core/src/main/java/org/apache/carbondata/core/indexstore/row/DataMapRowImpl.java
@@ -41,6 +41,11 @@
@Override
public int getLengthInBytes(int ordinal) {
+ // if the segment data is written in tablepath
+ // then the data[BlockletDataMapRowIndexes.SUMMARY_INDEX_PATH] will be null.
+ if (data[ordinal] == null) {
+ return 0;
+ }
return ((byte[]) data[ordinal]).length;
}
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
index d68429b..b5916fe 100644
--- a/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/FileFormat.java
@@ -46,6 +46,9 @@
}
public static FileFormat getByOrdinal(int ordinal) {
+ if (ordinal < 0) {
+ throw new IllegalArgumentException("Argument [ordinal] is less than 0.");
+ }
switch (ordinal) {
case 0:
return COLUMNAR_V3;
diff --git a/core/src/main/java/org/apache/carbondata/core/statusmanager/StageInput.java b/core/src/main/java/org/apache/carbondata/core/statusmanager/StageInput.java
new file mode 100644
index 0000000..60f8b32
--- /dev/null
+++ b/core/src/main/java/org/apache/carbondata/core/statusmanager/StageInput.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbondata.core.statusmanager;
+
+import java.util.Map;
+
+public class StageInput {
+
+ /**
+ * the base path of files
+ */
+ private String base;
+
+ /**
+ * the list of (file, length) in this StageInput
+ */
+ private Map<String, Long> files;
+
+ public StageInput() {
+
+ }
+
+ public StageInput(String base, Map<String, Long> files) {
+ this.base = base;
+ this.files = files;
+ }
+
+ public String getBase() {
+ return base;
+ }
+
+ public void setBase(String base) {
+ this.base = base;
+ }
+
+ public Map<String, Long> getFiles() {
+ return files;
+ }
+
+ public void setFiles(Map<String, Long> files) {
+ this.files = files;
+ }
+}
diff --git a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
index 4d5de6b..2ffc3f7 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/CarbonUtil.java
@@ -2833,8 +2833,12 @@
.getDataInputStream(localFilePath, FileFactory.getFileType(localFilePath), bufferSize);
IOUtils.copyBytes(dataInputStream, dataOutputStream, bufferSize);
} finally {
- CarbonUtil.closeStream(dataInputStream);
- CarbonUtil.closeStream(dataOutputStream);
+ try {
+ CarbonUtil.closeStream(dataInputStream);
+ CarbonUtil.closeStream(dataOutputStream);
+ } catch (IOException exception) {
+ LOGGER.error(exception.getMessage(), exception);
+ }
}
}
diff --git a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
index 6d1a4ff..e0c9d98 100644
--- a/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
+++ b/core/src/main/java/org/apache/carbondata/core/util/path/CarbonTablePath.java
@@ -55,6 +55,8 @@
private static final String STREAMING_DIR = ".streaming";
private static final String STREAMING_LOG_DIR = "log";
private static final String STREAMING_CHECKPOINT_DIR = "checkpoint";
+ private static final String STAGE_DIR = "stage";
+ public static final String SUCCESS_FILE_SUBFIX = ".success";
/**
* This class provides static utility only.
@@ -62,6 +64,10 @@
private CarbonTablePath() {
}
+ public static String getStageDir(String tablePath) {
+ return getMetadataPath(tablePath) + CarbonCommonConstants.FILE_SEPARATOR + STAGE_DIR;
+ }
+
/**
* The method returns the folder path containing the carbon file.
*
diff --git a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
index fee2e4a..7700c8e 100644
--- a/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
+++ b/hadoop/src/main/java/org/apache/carbondata/hadoop/api/CarbonTableOutputFormat.java
@@ -122,6 +122,12 @@
private CarbonOutputCommitter committer;
+ /**
+ * Output format task id generator. It should generate a unique id for every task.
+ * It's may conflict when use System.nonaTime() as task id.
+ */
+ private static final AtomicLong DEFAULT_TASK_NO = new AtomicLong(0);
+
public static void setDatabaseName(Configuration configuration, String databaseName) {
if (null != databaseName) {
configuration.set(DATABASE_NAME, databaseName);
@@ -255,7 +261,7 @@
}
if (null == loadModel.getTaskNo() || loadModel.getTaskNo().isEmpty()) {
loadModel.setTaskNo(taskAttemptContext.getConfiguration()
- .get("carbon.outputformat.taskno", String.valueOf(System.nanoTime())));
+ .get("carbon.outputformat.taskno", String.valueOf(DEFAULT_TASK_NO.getAndIncrement())));
}
loadModel.setDataWritePath(
taskAttemptContext.getConfiguration().get("carbon.outputformat.writepath"));
diff --git a/integration/flink-proxy/pom.xml b/integration/flink-proxy/pom.xml
new file mode 100644
index 0000000..27af507
--- /dev/null
+++ b/integration/flink-proxy/pom.xml
@@ -0,0 +1,43 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <parent>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-parent</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>carbondata-flink-proxy</artifactId>
+ <name>Apache CarbonData :: Flink Proxy</name>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-streaming-java_2.11</artifactId>
+ <version>1.8.0</version>
+ </dependency>
+ </dependencies>
+
+ <properties>
+ <dev.path>${basedir}/../../dev</dev.path>
+ </properties>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.8</source>
+ <target>1.8</target>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
\ No newline at end of file
diff --git a/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileSystem.java b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileSystem.java
new file mode 100644
index 0000000..8c557a9
--- /dev/null
+++ b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileSystem.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+import java.net.URI;
+
+import org.apache.flink.core.fs.BlockLocation;
+import org.apache.flink.core.fs.FSDataInputStream;
+import org.apache.flink.core.fs.FSDataOutputStream;
+import org.apache.flink.core.fs.FileStatus;
+import org.apache.flink.core.fs.FileSystem;
+import org.apache.flink.core.fs.FileSystemKind;
+import org.apache.flink.core.fs.Path;
+
+/**
+ * Flink need user provides file output stream when use StreamingFileSink,
+ * but CarbonWriter encapsulate the file output stream inside the SDK,
+ * so, we need a proxy file output stream to connect StreamingFileSink and CarbonWriter.
+ */
+public final class ProxyFileSystem extends FileSystem {
+
+ public static final String SCHEMA = "proxy";
+
+ public static final URI DEFAULT_URI = URI.create(SCHEMA + ":/");
+
+ public static final ProxyFileSystem INSTANCE = new ProxyFileSystem();
+
+ private ProxyFileSystem() {
+ // private constructor.
+ }
+
+ @Override
+ public Path getWorkingDirectory() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Path getHomeDirectory() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public URI getUri() {
+ return DEFAULT_URI;
+ }
+
+ @Override
+ public FileStatus getFileStatus(final Path path) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public BlockLocation[] getFileBlockLocations(
+ final FileStatus fileStatus,
+ final long offset,
+ final long length
+ ) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public FSDataInputStream open(final Path path, final int i) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public FSDataInputStream open(final Path path) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public FileStatus[] listStatus(final Path path) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean delete(final Path path, final boolean b) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean mkdirs(final Path path) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public FSDataOutputStream create(final Path path, final WriteMode writeMode) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ProxyRecoverableWriter createRecoverableWriter() {
+ return new ProxyRecoverableWriter();
+ }
+
+ @Override
+ public boolean rename(final Path source, final Path target) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isDistributedFS() {
+ return false;
+ }
+
+ @Override
+ public FileSystemKind getKind() {
+ return FileSystemKind.FILE_SYSTEM;
+ }
+
+}
diff --git a/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileSystemFactory.java b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileSystemFactory.java
new file mode 100644
index 0000000..1adb44c
--- /dev/null
+++ b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileSystemFactory.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+import java.io.IOException;
+import java.net.URI;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.core.fs.FileSystemFactory;
+
+public final class ProxyFileSystemFactory implements FileSystemFactory {
+
+ @Override
+ public String getScheme() {
+ return ProxyFileSystem.SCHEMA;
+ }
+
+ @Override
+ public void configure(final Configuration configuration) {
+ // to do nothing.
+ }
+
+ @Override
+ public ProxyFileSystem create(final URI uri) throws IOException {
+ return ProxyFileSystem.INSTANCE;
+ }
+
+}
diff --git a/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileWriter.java b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileWriter.java
new file mode 100644
index 0000000..f8ef039
--- /dev/null
+++ b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileWriter.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+import java.io.IOException;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+
+public abstract class ProxyFileWriter<OUT> implements BulkWriter<OUT> {
+
+ public abstract ProxyFileWriterFactory getFactory();
+
+ public abstract String getPartition();
+
+ public abstract void commit() throws IOException;
+
+ public abstract void close() throws IOException;
+
+}
diff --git a/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileWriterFactory.java b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileWriterFactory.java
new file mode 100644
index 0000000..bb683a3
--- /dev/null
+++ b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyFileWriterFactory.java
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Properties;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.flink.api.common.serialization.BulkWriter;
+
+public abstract class ProxyFileWriterFactory<OUT> implements BulkWriter.Factory<OUT> {
+
+ private static final long serialVersionUID = -1449889091046572219L;
+
+ private static final Map<String, Class<? extends ProxyFileWriterFactory>> FACTORY_MAP =
+ new ConcurrentHashMap<>();
+
+ public static ProxyFileWriterFactory newInstance(final String factoryType) {
+ if (factoryType == null) {
+ throw new IllegalArgumentException("Argument [factoryType] is null.");
+ }
+ final Class<? extends ProxyFileWriterFactory> factoryClass = FACTORY_MAP.get(factoryType);
+ if (factoryClass == null) {
+ return null;
+ }
+ try {
+ return factoryClass.newInstance();
+ } catch (InstantiationException | IllegalAccessException exception) {
+ throw new RuntimeException(exception);
+ }
+ }
+
+ public static void register(
+ final String factoryType,
+ final Class<? extends ProxyFileWriterFactory> factoryClass
+ ) {
+ if (factoryType == null) {
+ throw new IllegalArgumentException("Argument [factoryType] is null.");
+ }
+ if (factoryClass == null) {
+ throw new IllegalArgumentException("Argument [factoryClass] is null.");
+ }
+ // TODO 检查参数
+ // TODO 检查是否已被注册,重复注册,直接忽略
+ FACTORY_MAP.put(factoryType, factoryClass);
+ }
+
+ private Configuration configuration;
+
+ public abstract String getType();
+
+ public Configuration getConfiguration() {
+ return this.configuration;
+ }
+
+ public void setConfiguration(final Configuration configuration) {
+ this.configuration = configuration;
+ }
+
+ public abstract ProxyFileWriter<OUT> create(String partition) throws IOException;
+
+ public static class Configuration implements Serializable {
+
+ private static final long serialVersionUID = 8615149992583690295L;
+
+ public Configuration(
+ final String databaseName,
+ final String tableName,
+ final String tablePath,
+ final Properties tableProperties,
+ final Properties writerProperties,
+ final Properties carbonProperties
+ ) {
+ if (tableName == null) {
+ throw new IllegalArgumentException("Argument [tableName] is null.");
+ }
+ if (tablePath == null) {
+ throw new IllegalArgumentException("Argument [tablePath] is null.");
+ }
+ if (tableProperties == null) {
+ throw new IllegalArgumentException("Argument [tableProperties] is null.");
+ }
+ if (writerProperties == null) {
+ throw new IllegalArgumentException("Argument [writerProperties] is null.");
+ }
+ if (carbonProperties == null) {
+ throw new IllegalArgumentException("Argument [carbonProperties] is null.");
+ }
+ this.databaseName = databaseName;
+ this.tableName = tableName;
+ this.tablePath = tablePath;
+ this.tableProperties = tableProperties;
+ this.writerProperties = writerProperties;
+ this.carbonProperties = carbonProperties;
+ }
+
+ private final String databaseName;
+
+ private final String tableName;
+
+ private final String tablePath;
+
+ private final Properties tableProperties;
+
+ private final Properties writerProperties;
+
+ private final Properties carbonProperties;
+
+ public String getDatabaseName() {
+ return this.databaseName;
+ }
+
+ public String getTableName() {
+ return this.tableName;
+ }
+
+ public String getTablePath() {
+ return this.tablePath;
+ }
+
+ public Properties getTableProperties() {
+ return this.tableProperties;
+ }
+
+ public Properties getWriterProperties() {
+ return this.writerProperties;
+ }
+
+ public Properties getCarbonProperties() {
+ return this.carbonProperties;
+ }
+
+ }
+
+}
diff --git a/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverable.java b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverable.java
new file mode 100644
index 0000000..11e5d5e
--- /dev/null
+++ b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverable.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+import org.apache.flink.core.fs.RecoverableWriter;
+
+public final class ProxyRecoverable
+ implements RecoverableWriter.CommitRecoverable, RecoverableWriter.ResumeRecoverable {
+
+ public ProxyRecoverable(
+ final String writerType,
+ final ProxyFileWriterFactory.Configuration writerConfiguration,
+ final String partition
+ ) {
+ this.writerType = writerType;
+ this.writerConfiguration = writerConfiguration;
+ this.partition = partition;
+ }
+
+ private final String writerType;
+
+ private final ProxyFileWriterFactory.Configuration writerConfiguration;
+
+ private final String partition;
+
+ public String getWriterType() {
+ return this.writerType;
+ }
+
+ public ProxyFileWriterFactory.Configuration getWriterConfiguration() {
+ return this.writerConfiguration;
+ }
+
+ public String getPartition() {
+ return this.partition;
+ }
+
+}
diff --git a/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableOutputStream.java b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableOutputStream.java
new file mode 100644
index 0000000..1e59209
--- /dev/null
+++ b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableOutputStream.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+import java.io.IOException;
+
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+
+public final class ProxyRecoverableOutputStream extends RecoverableFsDataOutputStream {
+
+ ProxyRecoverableOutputStream() {
+ // protected constructor.
+ }
+
+ private ProxyFileWriter<?> writer;
+
+ public void bind(final ProxyFileWriter<?> writer) {
+ if (writer == null) {
+ throw new IllegalArgumentException("Argument [writer] is null.");
+ }
+ if (this.writer != null) {
+ throw new IllegalStateException("The writer was bound.");
+ }
+ this.writer = writer;
+ }
+
+ @Override
+ public long getPos() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public RecoverableWriter.ResumeRecoverable persist() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void write(final int aByte) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void flush() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void sync() {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() throws IOException {
+ // TODO streaming结束的时候和出异常的时候都会调用该方法
+ if (this.writer != null) {
+ this.writer.close();
+ }
+ }
+
+ @Override
+ public Committer closeForCommit() {
+ if (this.writer == null) {
+ throw new IllegalStateException("The writer was not bound.");
+ }
+ return new Committer(
+ new ProxyRecoverable(
+ this.writer.getFactory().getType(),
+ this.writer.getFactory().getConfiguration(),
+ this.writer.getPartition()
+ )
+ );
+ }
+
+ static final class Committer implements RecoverableFsDataOutputStream.Committer {
+
+ Committer(final ProxyRecoverable recoverable) {
+ this.recoverable = recoverable;
+ }
+
+ private final ProxyRecoverable recoverable;
+
+ @Override
+ public void commit() throws IOException {
+ this.newWriter().commit();
+ }
+
+ @Override
+ public void commitAfterRecovery() throws IOException {
+ // to do nothing.
+ }
+
+ @Override
+ public RecoverableWriter.CommitRecoverable getRecoverable() {
+ return this.recoverable;
+ }
+
+ private ProxyFileWriter<?> newWriter() throws IOException {
+ final ProxyFileWriterFactory writerFactory =
+ ProxyFileWriterFactory.newInstance(this.recoverable.getWriterType());
+ if (writerFactory == null) {
+ // TODO
+ throw new UnsupportedOperationException();
+ }
+ writerFactory.setConfiguration(this.recoverable.getWriterConfiguration());
+ return writerFactory.create(this.recoverable.getPartition());
+ }
+
+ }
+
+}
diff --git a/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableSerializer.java b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableSerializer.java
new file mode 100644
index 0000000..8bf80bd
--- /dev/null
+++ b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableSerializer.java
@@ -0,0 +1,174 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
+import java.util.Properties;
+
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+public final class ProxyRecoverableSerializer
+ implements SimpleVersionedSerializer<ProxyRecoverable> {
+
+ public static final ProxyRecoverableSerializer INSTANCE = new ProxyRecoverableSerializer();
+
+ public static final int VERSION = 1;
+
+ private static final byte TRUE = 0;
+
+ private static final byte FALSE = 1;
+
+ private static final String CHARSET = "UTF-8";
+
+ // TODO: make it configurable
+ private static final int BUFFER_SIZE = 10240;
+
+ private ProxyRecoverableSerializer() {
+ // private constructor.
+ }
+
+ @Override
+ public int getVersion() {
+ return VERSION;
+ }
+
+ @Override
+ public byte[] serialize(final ProxyRecoverable proxyRecoverable) {
+ final ByteBuffer byteBuffer = ByteBuffer.allocate(BUFFER_SIZE);
+ serializeString(byteBuffer, proxyRecoverable.getWriterType());
+ serializeConfiguration(byteBuffer, proxyRecoverable.getWriterConfiguration());
+ serializeString(byteBuffer, proxyRecoverable.getPartition());
+ final byte[] bytes = new byte[byteBuffer.position()];
+ byteBuffer.position(0);
+ byteBuffer.get(bytes);
+ return bytes;
+ }
+
+ private static void serializeConfiguration(
+ final ByteBuffer byteBuffer,
+ final ProxyFileWriterFactory.Configuration configuration
+ ) {
+ serializeString(byteBuffer, configuration.getDatabaseName());
+ serializeString(byteBuffer, configuration.getTableName());
+ serializeString(byteBuffer, configuration.getTablePath());
+ serializeProperties(byteBuffer, configuration.getTableProperties());
+ serializeProperties(byteBuffer, configuration.getWriterProperties());
+ serializeProperties(byteBuffer, configuration.getCarbonProperties());
+ }
+
+ private static void serializeString(final ByteBuffer byteBuffer, final String string) {
+ if (string == null) {
+ byteBuffer.put(TRUE);
+ } else {
+ byteBuffer.put(FALSE);
+ final byte[] stringBytes;
+ try {
+ stringBytes = string.getBytes(CHARSET);
+ } catch (UnsupportedEncodingException exception) {
+ throw new RuntimeException(exception);
+ }
+ byteBuffer.putInt(stringBytes.length);
+ byteBuffer.put(stringBytes);
+ }
+ }
+
+ private static void serializeProperties(
+ final ByteBuffer byteBuffer,
+ final Properties properties
+ ) {
+ if (properties == null) {
+ byteBuffer.put(TRUE);
+ } else {
+ byteBuffer.put(FALSE);
+ byteBuffer.putInt(properties.size());
+ for (String propertyName : properties.stringPropertyNames()) {
+ serializeString(byteBuffer, propertyName);
+ serializeString(byteBuffer, properties.getProperty(propertyName));
+ }
+ }
+ }
+
+ @Override
+ public ProxyRecoverable deserialize(final int version, final byte[] bytes) {
+ if (version != VERSION) {
+ throw new UnsupportedOperationException("Unsupported version: " + version + ".");
+ }
+ final ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ final String writerType = deserializeString(byteBuffer);
+ final ProxyFileWriterFactory.Configuration writerConfiguration =
+ deserializeConfiguration(byteBuffer);
+ final String partition = deserializeString(byteBuffer);
+ return new ProxyRecoverable(writerType, writerConfiguration, partition);
+ }
+
+ private static ProxyFileWriterFactory.Configuration deserializeConfiguration(
+ final ByteBuffer byteBuffer
+ ) {
+ final String databaseName = deserializeString(byteBuffer);
+ final String tableName = deserializeString(byteBuffer);
+ final String tablePath = deserializeString(byteBuffer);
+ final Properties tableProperties = deserializeProperties(byteBuffer);
+ final Properties writerProperties = deserializeProperties(byteBuffer);
+ final Properties carbonProperties = deserializeProperties(byteBuffer);
+ return new ProxyFileWriterFactory.Configuration(
+ databaseName,
+ tableName,
+ tablePath,
+ tableProperties,
+ writerProperties,
+ carbonProperties
+ );
+ }
+
+ private static String deserializeString(final ByteBuffer byteBuffer) {
+ switch (byteBuffer.get()) {
+ case TRUE:
+ return null;
+ case FALSE:
+ final int stringByteLength = byteBuffer.getInt();
+ final byte[] stringBytes = new byte[stringByteLength];
+ byteBuffer.get(stringBytes);
+ try {
+ return new String(stringBytes, CHARSET);
+ } catch (UnsupportedEncodingException exception) {
+ throw new RuntimeException(exception);
+ }
+ default:
+ throw new RuntimeException();
+ }
+ }
+
+ @SuppressWarnings("ConstantConditions")
+ private static Properties deserializeProperties(final ByteBuffer byteBuffer) {
+ switch (byteBuffer.get()) {
+ case TRUE:
+ return null;
+ case FALSE:
+ final int propertyCount = byteBuffer.getInt();
+ final Properties properties = new Properties();
+ for (int index = 0; index < propertyCount; index++) {
+ properties.put(deserializeString(byteBuffer), deserializeString(byteBuffer));
+ }
+ return properties;
+ default:
+ throw new RuntimeException();
+ }
+ }
+
+}
diff --git a/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableWriter.java b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableWriter.java
new file mode 100644
index 0000000..3713ac2
--- /dev/null
+++ b/integration/flink-proxy/src/main/java/org/apache/carbon/flink/ProxyRecoverableWriter.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.core.fs.RecoverableFsDataOutputStream;
+import org.apache.flink.core.fs.RecoverableWriter;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+public final class ProxyRecoverableWriter implements RecoverableWriter {
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public SimpleVersionedSerializer<CommitRecoverable> getCommitRecoverableSerializer() {
+ final SimpleVersionedSerializer<? extends CommitRecoverable> serializer =
+ ProxyRecoverableSerializer.INSTANCE;
+ return (SimpleVersionedSerializer<CommitRecoverable>) serializer;
+ }
+
+ @SuppressWarnings("unchecked")
+ @Override
+ public SimpleVersionedSerializer<ResumeRecoverable> getResumeRecoverableSerializer() {
+ final SimpleVersionedSerializer<? extends ResumeRecoverable> serializer =
+ ProxyRecoverableSerializer.INSTANCE;
+ return (SimpleVersionedSerializer<ResumeRecoverable>) serializer;
+ }
+
+ @Override
+ public ProxyRecoverableOutputStream open(final Path path) {
+ return new ProxyRecoverableOutputStream();
+ }
+
+ @Override
+ public RecoverableFsDataOutputStream recover(final ResumeRecoverable resumeRecoverable) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean requiresCleanupOfRecoverableState() {
+ return false;
+ }
+
+ @Override
+ public boolean cleanupRecoverableState(final ResumeRecoverable resumeRecoverable) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ProxyRecoverableOutputStream.Committer recoverForCommit(
+ final CommitRecoverable commitRecoverable
+ ) {
+ if (!(commitRecoverable instanceof ProxyRecoverable)) {
+ throw new IllegalArgumentException(
+ "ProxyFileSystem can not recover recoverable for other file system. " + commitRecoverable
+ );
+ }
+ return new ProxyRecoverableOutputStream.Committer((ProxyRecoverable) commitRecoverable);
+ }
+
+ @Override
+ public boolean supportsResume() {
+ return false;
+ }
+
+}
diff --git a/integration/flink-proxy/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory b/integration/flink-proxy/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
new file mode 100644
index 0000000..f8a0ed8
--- /dev/null
+++ b/integration/flink-proxy/src/main/resources/META-INF/services/org.apache.flink.core.fs.FileSystemFactory
@@ -0,0 +1 @@
+org.apache.carbon.flink.ProxyFileSystemFactory
\ No newline at end of file
diff --git a/integration/flink/pom.xml b/integration/flink/pom.xml
new file mode 100644
index 0000000..80746c0
--- /dev/null
+++ b/integration/flink/pom.xml
@@ -0,0 +1,267 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <parent>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-parent</artifactId>
+ <version>2.0.0-SNAPSHOT</version>
+ <relativePath>../../pom.xml</relativePath>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>carbondata-flink</artifactId>
+ <name>Apache CarbonData :: Flink</name>
+ <packaging>jar</packaging>
+
+ <properties>
+ <dev.path>${basedir}/../../dev</dev.path>
+ <hadoop.version>2.7.5</hadoop.version>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-flink-proxy</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-format</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-processing</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.huaweicloud</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-hadoop</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.huaweicloud</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-store-sdk</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.spark</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.arrow</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.zookeeper</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-streaming</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.huaweicloud</groupId>
+ <artifactId>*</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-common</artifactId>
+ <version>2.7.5</version>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.scalatest</groupId>
+ <artifactId>scalatest_2.11</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>2.7.5</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ <version>4.1.17.Final</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <profiles>
+ <profile>
+ <id>spark-2.2</id>
+ <activation>
+ <activeByDefault>false</activeByDefault>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-spark2</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
+ <id>spark-2.3</id>
+ <activation>
+ <activeByDefault>true</activeByDefault>
+ </activation>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.carbondata</groupId>
+ <artifactId>carbondata-spark2</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ </dependencies>
+ </profile>
+ </profiles>
+
+ <build>
+ <plugins>
+ <plugin>
+ <artifactId>maven-compiler-plugin</artifactId>
+ <configuration>
+ <source>1.7</source>
+ <target>1.7</target>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.scala-tools</groupId>
+ <artifactId>maven-scala-plugin</artifactId>
+ <version>2.15.2</version>
+ <executions>
+ <execution>
+ <id>compile</id>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ <phase>compile</phase>
+ </execution>
+ <execution>
+ <id>testCompile</id>
+ <goals>
+ <goal>testCompile</goal>
+ </goals>
+ <phase>test</phase>
+ </execution>
+ <execution>
+ <goals>
+ <goal>compile</goal>
+ </goals>
+ <phase>process-resources</phase>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
\ No newline at end of file
diff --git a/integration/flink/src/main/java/org/apache/carbon/core/metadata/StageManager.java b/integration/flink/src/main/java/org/apache/carbon/core/metadata/StageManager.java
new file mode 100644
index 0000000..ad38260
--- /dev/null
+++ b/integration/flink/src/main/java/org/apache/carbon/core/metadata/StageManager.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.core.metadata;
+
+import java.io.BufferedWriter;
+import java.io.ByteArrayInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.io.OutputStreamWriter;
+import java.nio.charset.StandardCharsets;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperationFactory;
+import org.apache.carbondata.core.fileoperations.AtomicFileOperations;
+import org.apache.carbondata.core.fileoperations.FileWriteOperation;
+import org.apache.carbondata.core.statusmanager.StageInput;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import com.google.gson.Gson;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.log4j.Logger;
+
+public final class StageManager {
+
+ private static final Logger LOGGER =
+ LogServiceFactory.getLogService(StageManager.class.getName());
+
+ public static void writeStageInput(final String stageInputPath, final StageInput stageInput)
+ throws IOException {
+ AtomicFileOperations fileWrite =
+ AtomicFileOperationFactory.getAtomicFileOperations(stageInputPath);
+ BufferedWriter writer = null;
+ DataOutputStream dataOutputStream = null;
+ try {
+ dataOutputStream = fileWrite.openForWrite(FileWriteOperation.OVERWRITE);
+ writer = new BufferedWriter(new OutputStreamWriter(dataOutputStream, StandardCharsets.UTF_8));
+ String metadataInstance = new Gson().toJson(stageInput);
+ writer.write(metadataInstance);
+ } catch (IOException e) {
+ LOGGER.error("Error message: " + e.getLocalizedMessage());
+ fileWrite.setFailed();
+ throw e;
+ } finally {
+ if (null != writer) {
+ writer.flush();
+ }
+ CarbonUtil.closeStreams(writer);
+ fileWrite.close();
+ }
+
+ try {
+ writeSuccessFile(stageInputPath + CarbonTablePath.SUCCESS_FILE_SUBFIX);
+ } catch (Throwable exception) {
+ try {
+ CarbonUtil.deleteFoldersAndFiles(FileFactory.getCarbonFile(stageInputPath));
+ } catch (Throwable e) {
+ LOGGER.error("Fail to delete stage input meta data [" + stageInputPath + "].", exception);
+ }
+ throw exception;
+ }
+ }
+
+ private static void writeSuccessFile(final String successFilePath) throws IOException {
+ final DataOutputStream segmentStatusSuccessOutputStream =
+ FileFactory.getDataOutputStream(
+ successFilePath, FileFactory.getFileType(successFilePath),
+ CarbonCommonConstants.BYTEBUFFER_SIZE, 1024);
+ try {
+ IOUtils.copyBytes(
+ new ByteArrayInputStream(new byte[0]),
+ segmentStatusSuccessOutputStream,
+ CarbonCommonConstants.BYTEBUFFER_SIZE);
+ segmentStatusSuccessOutputStream.flush();
+ } finally {
+ try {
+ CarbonUtil.closeStream(segmentStatusSuccessOutputStream);
+ } catch (IOException exception) {
+ LOGGER.error(exception.getMessage(), exception);
+ }
+ }
+ }
+
+}
diff --git a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalProperty.java b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalProperty.java
new file mode 100644
index 0000000..3383e8c
--- /dev/null
+++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalProperty.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+public final class CarbonLocalProperty {
+
+ public static final String DATA_TEMP_PATH = "carbon.writer.local.data.temp.path";
+
+ public static final String DATA_PATH = "carbon.writer.local.data.path";
+
+ private CarbonLocalProperty() {
+ // private constructor.
+ }
+
+}
diff --git a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
new file mode 100644
index 0000000..dcfe8b3
--- /dev/null
+++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriter.java
@@ -0,0 +1,233 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.statusmanager.StageInput;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.carbon.core.metadata.StageManager;
+import org.apache.commons.io.FileUtils;
+import org.apache.log4j.Logger;
+
+final class CarbonLocalWriter extends CarbonWriter {
+
+ private static final Logger LOGGER =
+ LogServiceFactory.getLogService(CarbonLocalWriter.class.getName());
+
+ CarbonLocalWriter(
+ final CarbonLocalWriterFactory factory,
+ final CarbonTable table,
+ final org.apache.carbondata.sdk.file.CarbonWriter writer,
+ final String writePath,
+ final String writePartition
+ ) {
+ ProxyFileWriterFactory.register(factory.getType(), factory.getClass());
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Open writer. " + this.toString());
+ }
+ this.factory = factory;
+ this.table = table;
+ this.writer = writer;
+ this.writePath = writePath;
+ this.writePartition = writePartition;
+ this.flushed = true;
+ }
+
+ private final CarbonLocalWriterFactory factory;
+
+ private final CarbonTable table;
+
+ private final org.apache.carbondata.sdk.file.CarbonWriter writer;
+
+ private final String writePath;
+
+ private final String writePartition;
+
+ private volatile boolean flushed;
+
+ @Override
+ public CarbonLocalWriterFactory getFactory() {
+ return this.factory;
+ }
+
+ @Override
+ public String getPartition() {
+ return this.writePartition;
+ }
+
+ @Override
+ public void addElement(final String element) throws IOException {
+ this.writer.write(element);
+ this.flushed = false;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Flush writer. " + this.toString());
+ }
+ synchronized (this) {
+ if (!this.flushed) {
+ this.writer.close();
+ this.flushed = true;
+ }
+ }
+ }
+
+ @Override
+ public void finish() throws IOException {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Finish writer. " + this.toString());
+ }
+ if (!this.flushed) {
+ this.flush();
+ }
+ }
+
+ @Override
+ public void commit() throws IOException {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Commit write. " + this.toString());
+ }
+ try {
+ final Properties writerProperties = this.factory.getConfiguration().getWriterProperties();
+ String dataPath = writerProperties.getProperty(CarbonLocalProperty.DATA_PATH);
+ if (dataPath == null) {
+ throw new IllegalArgumentException(
+ "Writer property [" + CarbonLocalProperty.DATA_PATH + "] is not set."
+ );
+ }
+ dataPath = dataPath + this.table.getDatabaseName() + "/"
+ + this.table.getTableName() + "/" + this.writePartition + "/";
+ Map<String, Long> fileList =
+ this.uploadSegmentDataFiles(this.writePath + "Fact/Part0/Segment_null/", dataPath);
+ try {
+ String stageDir = CarbonTablePath.getStageDir(
+ table.getAbsoluteTableIdentifier().getTablePath());
+ tryCreateLocalDirectory(new File(stageDir));
+ String stageInputPath = stageDir + "/" + this.writePartition;
+ StageManager.writeStageInput(stageInputPath, new StageInput(dataPath, fileList));
+ } catch (Throwable exception) {
+ this.deleteSegmentDataFilesQuietly(dataPath);
+ throw exception;
+ }
+ } finally {
+ try {
+ FileUtils.deleteDirectory(new File(this.writePath));
+ } catch (IOException exception) {
+ LOGGER.error("Fail to delete write path [" + this.writePath + "].", exception);
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ if (this.writer == null) {
+ return;
+ }
+ try {
+ synchronized (this) {
+ if (!this.flushed) {
+ this.writer.close();
+ this.flushed = true;
+ }
+ }
+ } catch (Throwable exception) {
+ LOGGER.error("Fail to close carbon writer.", exception);
+ } finally {
+ try {
+ FileUtils.deleteDirectory(new File(this.writePath));
+ } catch (IOException exception) {
+ LOGGER.error("Fail to delete write path [" + this.writePath + "].", exception);
+ }
+ }
+ }
+
+ private Map<String, Long> uploadSegmentDataFiles(final String localPath, final String remotePath)
+ throws IOException {
+ final File[] files = new File(localPath).listFiles();
+ if (files == null) {
+ return new HashMap<>(0);
+ }
+ Map<String, Long> fileNameMapLength = new HashMap<>(files.length);
+ for (File file : files) {
+ fileNameMapLength.put(file.getName(), file.length());
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] start.");
+ }
+ try {
+ final File remoteFile = new File(remotePath + file.getName());
+ if (!remoteFile.exists()) {
+ tryCreateLocalFile(remoteFile);
+ }
+ CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(), remotePath, 1024);
+ } catch (CarbonDataWriterException exception) {
+ LOGGER.error(exception.getMessage(), exception);
+ throw exception;
+ }
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] end.");
+ }
+ }
+ return fileNameMapLength;
+ }
+
+ private void deleteSegmentDataFilesQuietly(final String segmentDataPath) {
+ try {
+ CarbonUtil.deleteFoldersAndFiles(FileFactory.getCarbonFile(segmentDataPath));
+ } catch (Throwable exception) {
+ LOGGER.error("Fail to delete segment data path [" + segmentDataPath + "].", exception);
+ }
+ }
+
+ private static void tryCreateLocalFile(final File file) throws IOException {
+ if (file.exists()) {
+ return;
+ }
+ if (file.getParentFile() != null) {
+ tryCreateLocalDirectory(file.getParentFile());
+ }
+ if (!file.createNewFile()) {
+ throw new IOException("File [" + file.getCanonicalPath() + "] is exist.");
+ }
+ }
+
+ private static void tryCreateLocalDirectory(final File file) throws IOException {
+ if (file.exists()) {
+ return;
+ }
+ if (file.getParentFile() != null) {
+ tryCreateLocalDirectory(file.getParentFile());
+ }
+ if (!file.mkdir() && LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Directory [" + file.getCanonicalPath() + "] is exist.");
+ }
+ }
+
+}
diff --git a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriterFactory.java b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriterFactory.java
new file mode 100644
index 0000000..4c24a8b
--- /dev/null
+++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriterFactory.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.sdk.file.CarbonWriter;
+import org.apache.carbondata.sdk.file.Schema;
+
+public final class CarbonLocalWriterFactory extends CarbonWriterFactory {
+
+ private static final long serialVersionUID = 2822670807460968078L;
+
+ @Override
+ public String getType() {
+ return CarbonLocalWriterFactoryBuilder.TYPE;
+ }
+
+ @Override
+ protected CarbonLocalWriter create0() throws IOException {
+ final Properties writerProperties = this.getConfiguration().getWriterProperties();
+ final String writeTempPath = writerProperties.getProperty(CarbonLocalProperty.DATA_TEMP_PATH);
+ if (writeTempPath == null) {
+ throw new IllegalArgumentException(
+ "Writer property [" + CarbonLocalProperty.DATA_TEMP_PATH + "] is not set."
+ );
+ }
+ final String writePartition = UUID.randomUUID().toString().replace("-", "");
+ final String writePath = writeTempPath + "_" + writePartition + "/";
+ final CarbonTable table = this.getTable();
+ final CarbonTable clonedTable =
+ CarbonTable.buildFromTableInfo(TableInfo.deserialize(table.getTableInfo().serialize()));
+ clonedTable.getTableInfo().setTablePath(writePath);
+ final org.apache.carbondata.sdk.file.CarbonWriter writer;
+ try {
+ writer = CarbonWriter.builder()
+ .outputPath("")
+ .writtenBy("flink")
+ .withTable(clonedTable)
+ .withTableProperties(this.getTableProperties())
+ .withJsonInput(this.getTableSchema(clonedTable))
+ .build();
+ } catch (InvalidLoadOptionException exception) {
+ // TODO
+ throw new UnsupportedOperationException(exception);
+ }
+ return new CarbonLocalWriter(this, table, writer, writePath, writePartition);
+ }
+
+ @Override
+ protected CarbonLocalWriter create0(final String partition) throws IOException {
+ final Properties writerProperties = this.getConfiguration().getWriterProperties();
+ final String writeTempPath = writerProperties.getProperty(CarbonLocalProperty.DATA_TEMP_PATH);
+ if (writeTempPath == null) {
+ throw new IllegalArgumentException(
+ "Writer property [" + CarbonLocalProperty.DATA_TEMP_PATH + "] is not set."
+ );
+ }
+ final String writePath = writeTempPath + "_" + partition + "/";
+ final CarbonTable table = this.getTable();
+ return new CarbonLocalWriter(this, table, null, writePath, partition);
+ }
+
+ private Schema getTableSchema(final CarbonTable table) {
+ final List<CarbonColumn> columnList = table.getCreateOrderColumn();
+ final List<ColumnSchema> columnSchemaList = new ArrayList<>(columnList.size());
+ for (CarbonColumn column : columnList) {
+ columnSchemaList.add(column.getColumnSchema());
+ }
+ return new Schema(columnSchemaList);
+ }
+
+ private Map<String, String> getTableProperties() {
+ final Properties tableProperties = this.getConfiguration().getTableProperties();
+ final Map<String, String> tablePropertyMap = new HashMap<>(tableProperties.size());
+ for (String propertyName : tableProperties.stringPropertyNames()) {
+ tablePropertyMap.put(propertyName, tableProperties.getProperty(propertyName));
+ }
+ return tablePropertyMap;
+ }
+
+}
diff --git a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriterFactoryBuilder.java b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriterFactoryBuilder.java
new file mode 100644
index 0000000..74d1e41
--- /dev/null
+++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonLocalWriterFactoryBuilder.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+import java.util.Properties;
+
+public final class CarbonLocalWriterFactoryBuilder extends CarbonWriterFactoryBuilder {
+
+ public static final String TYPE = "Local";
+
+ @Override
+ public String getType() {
+ return TYPE;
+ }
+
+ @Override
+ public CarbonLocalWriterFactory build(
+ final String databaseName,
+ final String tableName,
+ final String tablePath,
+ final Properties tableProperties,
+ final Properties writerProperties,
+ final Properties carbonProperties
+ ) {
+ final CarbonLocalWriterFactory factory = new CarbonLocalWriterFactory();
+ factory.setConfiguration(
+ new ProxyFileWriterFactory.Configuration(
+ databaseName,
+ tableName,
+ tablePath,
+ tableProperties,
+ writerProperties,
+ carbonProperties
+ )
+ );
+ return factory;
+ }
+}
diff --git a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Property.java b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Property.java
new file mode 100644
index 0000000..6d9d94b
--- /dev/null
+++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Property.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+final class CarbonS3Property {
+
+ static final String ACCESS_KEY = "carbon.writer.s3.access.key";
+
+ static final String SECRET_KEY = "carbon.writer.s3.secret.key";
+
+ static final String ENDPOINT = "carbon.writer.s3.endpoint";
+
+ static final String DATA_TEMP_PATH = "carbon.writer.s3.data.temp.path";
+
+ static final String DATA_PATH = "carbon.writer.s3.data.path";
+
+ private CarbonS3Property() {
+ // private constructor.
+ }
+
+}
diff --git a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java
new file mode 100644
index 0000000..e5aeca4
--- /dev/null
+++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3Writer.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+import org.apache.carbondata.core.constants.CarbonCommonConstants;
+import org.apache.carbondata.core.datastore.exception.CarbonDataWriterException;
+import org.apache.carbondata.core.datastore.impl.FileFactory;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.statusmanager.StageInput;
+import org.apache.carbondata.core.util.CarbonUtil;
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
+import org.apache.carbondata.core.util.path.CarbonTablePath;
+
+import org.apache.carbon.core.metadata.StageManager;
+import org.apache.commons.io.FileUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.log4j.Logger;
+
+final class CarbonS3Writer extends CarbonWriter {
+
+ private static final Logger LOGGER =
+ LogServiceFactory.getLogService(CarbonS3Writer.class.getName());
+
+ CarbonS3Writer(
+ final CarbonS3WriterFactory factory,
+ final CarbonTable table,
+ final org.apache.carbondata.sdk.file.CarbonWriter writer,
+ final String writePath,
+ final String writePartition,
+ final Configuration configuration
+ ) {
+ ProxyFileWriterFactory.register(factory.getType(), factory.getClass());
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Open writer. " + this.toString());
+ }
+ this.factory = factory;
+ this.table = table;
+ this.writer = writer;
+ this.writePath = writePath;
+ this.writePartition = writePartition;
+ this.configuration = configuration;
+ this.flushed = true;
+ }
+
+ private final CarbonS3WriterFactory factory;
+
+ private final CarbonTable table;
+
+ private final org.apache.carbondata.sdk.file.CarbonWriter writer;
+
+ private final String writePath;
+
+ private final String writePartition;
+
+ private final Configuration configuration;
+
+ private volatile boolean flushed;
+
+ @Override
+ public CarbonS3WriterFactory getFactory() {
+ return this.factory;
+ }
+
+ @Override
+ public String getPartition() {
+ return this.writePartition;
+ }
+
+ @Override
+ public void addElement(final String element) throws IOException {
+ this.writer.write(element);
+ this.flushed = false;
+ }
+
+ @Override
+ public void flush() throws IOException {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Flush writer. " + this.toString());
+ }
+ synchronized (this) {
+ if (!this.flushed) {
+ this.writer.close();
+ this.flushed = true;
+ }
+ }
+ }
+
+ @Override
+ public void finish() throws IOException {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Finish writer. " + this.toString());
+ }
+ if (!this.flushed) {
+ this.flush();
+ }
+ }
+
+ @Override
+ public void commit() throws IOException {
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Commit write. " + this.toString());
+ }
+ ThreadLocalSessionInfo.setConfigurationToCurrentThread(this.configuration);
+ ThreadLocalSessionInfo.getOrCreateCarbonSessionInfo()
+ .getNonSerializableExtraInfo().put("carbonConf", this.configuration);
+ try {
+ final Properties writerProperties = this.factory.getConfiguration().getWriterProperties();
+ String dataPath = writerProperties.getProperty(CarbonS3Property.DATA_PATH);
+ if (dataPath == null) {
+ throw new IllegalArgumentException(
+ "Writer property [" + CarbonS3Property.DATA_PATH + "] is not set."
+ );
+ }
+ if (!dataPath.startsWith(CarbonCommonConstants.S3A_PREFIX)) {
+ throw new IllegalArgumentException(
+ "Writer property [" + CarbonS3Property.DATA_PATH + "] is not a s3a path."
+ );
+ }
+ dataPath = dataPath + this.table.getDatabaseName() + CarbonCommonConstants.FILE_SEPARATOR +
+ this.table.getTableName() + CarbonCommonConstants.FILE_SEPARATOR +
+ this.writePartition + CarbonCommonConstants.FILE_SEPARATOR;
+ Map<String, Long> fileList =
+ this.uploadSegmentDataFiles(this.writePath + "Fact/Part0/Segment_null/", dataPath);
+ try {
+ String stageInputPath = CarbonTablePath.getStageDir(
+ table.getAbsoluteTableIdentifier().getTablePath()) +
+ CarbonCommonConstants.FILE_SEPARATOR + this.writePartition;
+ StageManager.writeStageInput(stageInputPath, new StageInput(dataPath, fileList));
+ } catch (Throwable exception) {
+ this.deleteSegmentDataFilesQuietly(dataPath);
+ throw exception;
+ }
+ } finally {
+ try {
+ FileUtils.deleteDirectory(new File(this.writePath));
+ } catch (IOException exception) {
+ LOGGER.error("Fail to delete write path [" + this.writePath + "].", exception);
+ }
+ }
+ }
+
+ @Override
+ public void close() {
+ if (this.writer == null) {
+ return;
+ }
+ try {
+ synchronized (this) {
+ if (!this.flushed) {
+ this.writer.close();
+ this.flushed = true;
+ }
+ }
+ } catch (Throwable exception) {
+ LOGGER.error("Fail to close carbon writer.", exception);
+ } finally {
+ try {
+ FileUtils.deleteDirectory(new File(this.writePath));
+ } catch (IOException exception) {
+ LOGGER.error("Fail to delete write path [" + this.writePath + "].", exception);
+ }
+ }
+ }
+
+ private Map<String, Long> uploadSegmentDataFiles(
+ final String localPath, final String remotePath) {
+ final File[] files = new File(localPath).listFiles();
+ if (files == null) {
+ return new HashMap<>(0);
+ }
+ Map<String, Long> fileNameMapLength = new HashMap<>(files.length);
+ for (File file : files) {
+ fileNameMapLength.put(file.getName(), file.length());
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] start.");
+ }
+ try {
+ CarbonUtil.copyCarbonDataFileToCarbonStorePath(file.getAbsolutePath(), remotePath, 1024);
+ } catch (CarbonDataWriterException exception) {
+ LOGGER.error(exception.getMessage(), exception);
+ throw exception;
+ }
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Upload file[" + file.getAbsolutePath() + "] to [" + remotePath + "] end.");
+ }
+ }
+ return fileNameMapLength;
+ }
+
+ private void deleteSegmentDataFilesQuietly(final String segmentDataPath) {
+ try {
+ CarbonUtil.deleteFoldersAndFiles(FileFactory.getCarbonFile(segmentDataPath));
+ } catch (Throwable exception) {
+ LOGGER.error("Fail to delete segment data path [" + segmentDataPath + "].", exception);
+ }
+ }
+}
diff --git a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3WriterFactory.java b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3WriterFactory.java
new file mode 100644
index 0000000..f1ab483
--- /dev/null
+++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3WriterFactory.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.UUID;
+
+import org.apache.carbondata.common.exceptions.sql.InvalidLoadOptionException;
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.metadata.schema.table.TableInfo;
+import org.apache.carbondata.core.metadata.schema.table.column.CarbonColumn;
+import org.apache.carbondata.core.metadata.schema.table.column.ColumnSchema;
+import org.apache.carbondata.core.util.ThreadLocalSessionInfo;
+import org.apache.carbondata.sdk.file.CarbonWriter;
+import org.apache.carbondata.sdk.file.Schema;
+
+public final class CarbonS3WriterFactory extends CarbonWriterFactory {
+
+ private static final long serialVersionUID = 2302824357711095245L;
+
+ @Override
+ public String getType() {
+ return CarbonS3WriterFactoryBuilder.TYPE;
+ }
+
+ @Override
+ protected CarbonS3Writer create0() throws IOException {
+ final Properties writerProperties = this.getConfiguration().getWriterProperties();
+ final String writeTempPath = writerProperties.getProperty(CarbonS3Property.DATA_TEMP_PATH);
+ if (writeTempPath == null) {
+ throw new IllegalArgumentException(
+ "Writer property [" + CarbonS3Property.DATA_TEMP_PATH + "] is not set."
+ );
+ }
+ final String writePartition = UUID.randomUUID().toString().replace("-", "");
+ final String writePath = writeTempPath + "_" + writePartition + "/";
+ final CarbonTable table = this.getTable();
+ final CarbonTable clonedTable =
+ CarbonTable.buildFromTableInfo(TableInfo.deserialize(table.getTableInfo().serialize()));
+ clonedTable.getTableInfo().setTablePath(writePath);
+ final org.apache.hadoop.conf.Configuration configuration = this.getS3Configuration();
+ final CarbonWriter writer;
+ try {
+ writer = CarbonWriter.builder()
+ .outputPath("")
+ .writtenBy("flink")
+ .withTable(clonedTable)
+ .withTableProperties(this.getTableProperties())
+ .withJsonInput(this.getTableSchema(clonedTable))
+ .withHadoopConf(configuration)
+ .build();
+ } catch (InvalidLoadOptionException exception) {
+ // TODO
+ throw new UnsupportedOperationException(exception);
+ }
+ return new CarbonS3Writer(this, table, writer, writePath, writePartition, configuration);
+ }
+
+ @Override
+ protected CarbonS3Writer create0(final String partition) throws IOException {
+ final Properties writerProperties = this.getConfiguration().getWriterProperties();
+ final String writeTempPath = writerProperties.getProperty(CarbonS3Property.DATA_TEMP_PATH);
+ if (writeTempPath == null) {
+ throw new IllegalArgumentException(
+ "Writer property [" + CarbonS3Property.DATA_TEMP_PATH + "] is not set."
+ );
+ }
+ final String writePath = writeTempPath + "_" + partition + "/";
+ final CarbonTable table = this.getTable();
+ final org.apache.hadoop.conf.Configuration configuration = this.getS3Configuration();
+ return new CarbonS3Writer(this, table, null, writePath, partition, configuration);
+ }
+
+ @Override
+ protected CarbonTable getTable() throws IOException {
+ this.setS3Configuration(this.getS3Configuration());
+ return super.getTable();
+ }
+
+ private Schema getTableSchema(final CarbonTable table) {
+ final List<CarbonColumn> columnList = table.getCreateOrderColumn();
+ final List<ColumnSchema> columnSchemaList = new ArrayList<>(columnList.size());
+ for (CarbonColumn column : columnList) {
+ columnSchemaList.add(column.getColumnSchema());
+ }
+ return new Schema(columnSchemaList);
+ }
+
+ private Map<String, String> getTableProperties() {
+ final Properties tableProperties = this.getConfiguration().getTableProperties();
+ final Map<String, String> tablePropertyMap = new HashMap<>(tableProperties.size());
+ for (String propertyName : tableProperties.stringPropertyNames()) {
+ tablePropertyMap.put(propertyName, tableProperties.getProperty(propertyName));
+ }
+ return tablePropertyMap;
+ }
+
+ private org.apache.hadoop.conf.Configuration getS3Configuration() {
+ final Properties writerProperties = this.getConfiguration().getWriterProperties();
+ final String accessKey = writerProperties.getProperty(CarbonS3Property.ACCESS_KEY);
+ final String secretKey = writerProperties.getProperty(CarbonS3Property.SECRET_KEY);
+ final String endpoint = writerProperties.getProperty(CarbonS3Property.ENDPOINT);
+ if (accessKey == null) {
+ throw new IllegalArgumentException(
+ "Writer property [" + CarbonS3Property.ACCESS_KEY + "] is not set."
+ );
+ }
+ if (secretKey == null) {
+ throw new IllegalArgumentException(
+ "Writer property [" + CarbonS3Property.SECRET_KEY + "] is not set."
+ );
+ }
+ if (endpoint == null) {
+ throw new IllegalArgumentException(
+ "Writer property [" + CarbonS3Property.ENDPOINT + "] is not set."
+ );
+ }
+ final org.apache.hadoop.conf.Configuration configuration =
+ new org.apache.hadoop.conf.Configuration(true);
+ configuration.set("fs.s3.access.key", accessKey);
+ configuration.set("fs.s3.secret.key", secretKey);
+ configuration.set("fs.s3.endpoint", endpoint);
+ configuration.set("fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
+ configuration.set("spark.hadoop.fs.s3.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
+ configuration.set("fs.s3a.access.key", accessKey);
+ configuration.set("fs.s3a.secret.key", secretKey);
+ configuration.set("fs.s3a.endpoint", endpoint);
+ configuration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
+ configuration.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem");
+ return configuration;
+ }
+
+ private void setS3Configuration(final org.apache.hadoop.conf.Configuration configuration) {
+ ThreadLocalSessionInfo.setConfigurationToCurrentThread(configuration);
+ }
+
+}
diff --git a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3WriterFactoryBuilder.java b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3WriterFactoryBuilder.java
new file mode 100644
index 0000000..afd9e5e
--- /dev/null
+++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonS3WriterFactoryBuilder.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+import java.util.Properties;
+
+public final class CarbonS3WriterFactoryBuilder extends CarbonWriterFactoryBuilder {
+
+ public static final String TYPE = "S3";
+
+ @Override
+ public String getType() {
+ return TYPE;
+ }
+
+ @Override
+ public CarbonS3WriterFactory build(
+ final String databaseName,
+ final String tableName,
+ final String tablePath,
+ final Properties tableProperties,
+ final Properties writerProperties,
+ final Properties carbonProperties
+ ) {
+ final CarbonS3WriterFactory factory = new CarbonS3WriterFactory();
+ factory.setConfiguration(
+ new ProxyFileWriterFactory.Configuration(
+ databaseName,
+ tableName,
+ tablePath,
+ tableProperties,
+ writerProperties,
+ carbonProperties
+ )
+ );
+ return factory;
+ }
+
+}
diff --git a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java
new file mode 100644
index 0000000..d5ddac5
--- /dev/null
+++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriter.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+/**
+ * This class is a wrapper of CarbonWriter in SDK.
+ * It not only write data to carbon with CarbonWriter in SDK, also generate segment file.
+ */
+public abstract class CarbonWriter extends ProxyFileWriter<String> {
+
+}
diff --git a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriterFactory.java b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriterFactory.java
new file mode 100644
index 0000000..d3257c9
--- /dev/null
+++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriterFactory.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+import java.io.IOException;
+
+import org.apache.carbondata.core.metadata.schema.table.CarbonTable;
+import org.apache.carbondata.core.util.CarbonProperties;
+
+import org.apache.flink.core.fs.FSDataOutputStream;
+
+public abstract class CarbonWriterFactory extends ProxyFileWriterFactory<String> {
+
+ public static CarbonWriterFactoryBuilder builder(final String type) {
+ return CarbonWriterFactoryBuilder.get(type);
+ }
+
+ @Override
+ public CarbonWriter create(final FSDataOutputStream out) throws IOException {
+ if (!(out instanceof ProxyRecoverableOutputStream)) {
+ throw new IllegalArgumentException(
+ "Only support " + ProxyRecoverableOutputStream.class.getName() + "."
+ );
+ }
+ this.setCarbonProperties();
+ final CarbonWriter writer = this.create0();
+ ((ProxyRecoverableOutputStream) out).bind(writer);
+ return writer;
+ }
+
+ @Override
+ public CarbonWriter create(final String partition) throws IOException {
+ this.setCarbonProperties();
+ return this.create0(partition);
+ }
+
+ protected abstract CarbonWriter create0() throws IOException;
+
+ protected abstract CarbonWriter create0(String partition) throws IOException;
+
+ protected CarbonTable getTable() throws IOException {
+ final Configuration configuration = this.getConfiguration();
+ return CarbonTable.buildFromTablePath(
+ configuration.getTableName(),
+ configuration.getDatabaseName(),
+ configuration.getTablePath(),
+ null
+ );
+ }
+
+ private void setCarbonProperties() {
+ final CarbonProperties carbonProperties = CarbonProperties.getInstance();
+ for (String propertyName :
+ this.getConfiguration().getCarbonProperties().stringPropertyNames()) {
+ carbonProperties.addProperty(
+ propertyName,
+ this.getConfiguration().getCarbonProperties().getProperty(propertyName)
+ );
+ }
+ }
+
+}
diff --git a/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriterFactoryBuilder.java b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriterFactoryBuilder.java
new file mode 100644
index 0000000..28d2225
--- /dev/null
+++ b/integration/flink/src/main/java/org/apache/carbon/flink/CarbonWriterFactoryBuilder.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+import java.util.ServiceLoader;
+
+import org.apache.carbondata.common.logging.LogServiceFactory;
+
+import org.apache.log4j.Logger;
+
+public abstract class CarbonWriterFactoryBuilder {
+
+ private static final Logger LOGGER =
+ LogServiceFactory.getLogService(CarbonWriterFactoryBuilder.class.getName());
+
+ private static final Map<String, CarbonWriterFactoryBuilder> BUILDER_MAP;
+
+ static {
+ final Map<String, CarbonWriterFactoryBuilder> builderMap = new HashMap<>();
+ final ServiceLoader<CarbonWriterFactoryBuilder> builderLoader =
+ ServiceLoader.load(CarbonWriterFactoryBuilder.class);
+ for (CarbonWriterFactoryBuilder builder :builderLoader) {
+ builderMap.put(builder.getType(), builder);
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug("Added carbon writer factory builder. " + builder.getClass().getName());
+ }
+ }
+ BUILDER_MAP = Collections.unmodifiableMap(builderMap);
+ }
+
+ public static CarbonWriterFactoryBuilder get(final String type) {
+ if (type == null) {
+ throw new IllegalArgumentException("Argument [type] is null.");
+ }
+ CarbonWriterFactoryBuilder builder = BUILDER_MAP.get(type);
+ if (builder == null) {
+ if (type.equalsIgnoreCase(CarbonS3WriterFactoryBuilder.TYPE)) {
+ return new CarbonS3WriterFactoryBuilder();
+ }
+ }
+ return builder;
+ }
+
+ public abstract String getType();
+
+ public abstract CarbonWriterFactory build(
+ String databaseName,
+ String tableName,
+ String tablePath,
+ Properties tableProperties,
+ Properties writerProperties,
+ Properties carbonProperties
+ );
+
+}
diff --git a/integration/flink/src/main/resources/META-INF/services/org.apache.carbon.flink.CarbonWriterFactoryBuilder b/integration/flink/src/main/resources/META-INF/services/org.apache.carbon.flink.CarbonWriterFactoryBuilder
new file mode 100644
index 0000000..5c8666b
--- /dev/null
+++ b/integration/flink/src/main/resources/META-INF/services/org.apache.carbon.flink.CarbonWriterFactoryBuilder
@@ -0,0 +1,2 @@
+org.apache.carbon.flink.CarbonLocalWriterFactoryBuilder
+org.apache.carbon.flink.CarbonS3WriterFactoryBuilder
\ No newline at end of file
diff --git a/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
new file mode 100644
index 0000000..55085f8
--- /dev/null
+++ b/integration/flink/src/test/scala/org/apache/carbon/flink/TestCarbonWriter.scala
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.carbon.flink
+
+import java.util.Properties
+
+import org.apache.carbondata.core.constants.CarbonCommonConstants
+import org.apache.flink.api.common.restartstrategy.RestartStrategies
+import org.apache.flink.core.fs.Path
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
+import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.test.util.QueryTest
+
+import org.junit.Test
+
+class TestCarbonWriter extends QueryTest {
+
+ val tableName = "test_flink"
+
+ @Test
+ def testLocal(): Unit = {
+ sql(s"drop table if exists $tableName").collect()
+ sql(
+ s"""
+ | CREATE TABLE $tableName (stringField string, intField int, shortField short)
+ | STORED AS carbondata
+ """.stripMargin
+ ).collect()
+
+ try {
+ val rootPath = System.getProperty("user.dir") + "/target/test-classes"
+
+ val dataTempPath = rootPath + "/data/temp/"
+ val dataPath = rootPath + "/data/"
+
+ val tablePath = storeLocation + "/" + tableName + "/"
+
+ val writerProperties = newWriterProperties(dataTempPath, dataPath, storeLocation)
+ val carbonProperties = newCarbonProperties(storeLocation)
+
+ val environment = StreamExecutionEnvironment.getExecutionEnvironment
+ environment.setParallelism(1)
+ environment.enableCheckpointing(2000L)
+ environment.setRestartStrategy(RestartStrategies.noRestart)
+
+ val dataCount = 10000
+ val source = new TestSource(dataCount) {
+ @throws[InterruptedException]
+ override def get(index: Int): String = {
+ Thread.sleep(1L)
+ "{\"stringField\": \"test" + index + "\", \"intField\": " + index + ", \"shortField\": 12345}"
+ }
+
+ @throws[InterruptedException]
+ override def onFinish(): Unit = {
+ Thread.sleep(10000L)
+ }
+ }
+ val stream = environment.addSource(source)
+ val factory = CarbonWriterFactory.builder("Local").build(
+ "default",
+ tableName,
+ tablePath,
+ new Properties,
+ writerProperties,
+ carbonProperties
+ )
+ val streamSink = StreamingFileSink.forBulkFormat(new Path(ProxyFileSystem.DEFAULT_URI), factory).build
+
+ stream.addSink(streamSink)
+
+ try environment.execute
+ catch {
+ case exception: Exception =>
+ // TODO
+ throw new UnsupportedOperationException(exception)
+ }
+
+ checkAnswer(sql(s"select count(1) from $tableName"), Seq(Row(0)))
+ } finally {
+// sql(s"drop table if exists $tableName").collect()
+ }
+ }
+
+ private def newWriterProperties(
+ dataTempPath: String,
+ dataPath: String,
+ storeLocation: String) = {
+ val properties = new Properties
+ properties.setProperty(CarbonLocalProperty.DATA_TEMP_PATH, dataTempPath)
+ properties.setProperty(CarbonLocalProperty.DATA_PATH, dataPath)
+ properties.setProperty(CarbonCommonConstants.STORE_LOCATION, storeLocation)
+ properties.setProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, "1024")
+ properties
+ }
+
+ private def newCarbonProperties(storeLocation: String) = {
+ val properties = new Properties
+ properties.setProperty(CarbonCommonConstants.CARBON_TIMESTAMP_FORMAT,
+ CarbonCommonConstants.CARBON_TIMESTAMP_DEFAULT_FORMAT)
+ properties.setProperty(CarbonCommonConstants.CARBON_DATE_FORMAT,
+ CarbonCommonConstants.CARBON_DATE_DEFAULT_FORMAT)
+ properties.setProperty(CarbonCommonConstants.STORE_LOCATION, storeLocation)
+ properties.setProperty(CarbonCommonConstants.UNSAFE_WORKING_MEMORY_IN_MB, "1024")
+ properties
+ }
+
+}
diff --git a/integration/flink/src/test/scala/org/apache/carbon/flink/TestSource.scala b/integration/flink/src/test/scala/org/apache/carbon/flink/TestSource.scala
new file mode 100644
index 0000000..88ac173
--- /dev/null
+++ b/integration/flink/src/test/scala/org/apache/carbon/flink/TestSource.scala
@@ -0,0 +1,52 @@
+package org.apache.carbon.flink
+
+import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
+import org.apache.flink.runtime.state.{FunctionInitializationContext, FunctionSnapshotContext}
+import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+
+abstract class TestSource(val dataCount: Int) extends SourceFunction[String] with CheckpointedFunction {
+ private var dataIndex = 0
+ private var dataIndexState: ListState[Integer] = _
+ private var running = false
+
+ @throws[Exception]
+ def get(index: Int): String
+
+ @throws[Exception]
+ def onFinish(): Unit = {
+ // to do nothing.
+ }
+
+ @throws[Exception]
+ override def run(sourceContext: SourceFunction.SourceContext[String]): Unit = {
+ this.running = true
+ while ( {
+ this.running && this.dataIndex < this.dataCount
+ }) {
+ sourceContext.collectWithTimestamp(this.get(this.dataIndex), System.currentTimeMillis)
+ this.dataIndex += 1
+ }
+ this.onFinish()
+ }
+
+ override def cancel(): Unit = {
+ this.running = false
+ }
+
+ @throws[Exception]
+ override def snapshotState(context: FunctionSnapshotContext): Unit = {
+ this.dataIndexState.clear()
+ this.dataIndexState.add(this.dataIndex)
+ }
+
+ @throws[Exception]
+ override def initializeState(context: FunctionInitializationContext): Unit = {
+ this.dataIndexState = context.getOperatorStateStore.getListState(new ListStateDescriptor[Integer]("dataIndex", classOf[Integer]))
+ if (!context.isRestored) return
+ import scala.collection.JavaConversions._
+ for (dataIndex <- this.dataIndexState.get) {
+ this.dataIndex = dataIndex
+ }
+ }
+}
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 9b5f098..e2f5067 100644
--- a/pom.xml
+++ b/pom.xml
@@ -113,6 +113,8 @@
<module>datamap/mv/core</module>
<module>examples/spark2</module>
<module>integration/hive</module>
+ <module>integration/flink</module>
+ <module>integration/flink-proxy</module>
<module>integration/presto</module>
<module>examples/flink</module>
<module>streaming</module>
diff --git a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
index ffad6b1..b1985de 100644
--- a/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
+++ b/store/sdk/src/main/java/org/apache/carbondata/sdk/file/CarbonWriterBuilder.java
@@ -272,6 +272,18 @@
}
/**
+ * To support the carbon table for sdk writer
+ *
+ * @param table carbon table
+ * @return CarbonWriterBuilder object
+ */
+ public CarbonWriterBuilder withTable(CarbonTable table) {
+ Objects.requireNonNull(table, "Table should not be null");
+ this.carbonTable = table;
+ return this;
+ }
+
+ /**
* To support the table properties for sdk writer
*
* @param options key,value pair of create table properties.
@@ -646,7 +658,7 @@
public CarbonLoadModel buildLoadModel(Schema carbonSchema)
throws IOException, InvalidLoadOptionException {
- timestamp = System.nanoTime();
+ timestamp = System.currentTimeMillis();
// validate long_string_column
Set<String> longStringColumns = new HashSet<>();
if (options != null && options.get(CarbonCommonConstants.LONG_STRING_COLUMNS) != null) {