| /* |
| * Licensed to the Apache Software Foundation (ASF) under one |
| * or more contributor license agreements. See the NOTICE file |
| * distributed with this work for additional information |
| * regarding copyright ownership. The ASF licenses this file |
| * to you under the Apache License, Version 2.0 (the |
| * "License"); you may not use this file except in compliance |
| * with the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.kylin.provision; |
| |
| import java.io.File; |
| import java.io.IOException; |
| import java.lang.reflect.Method; |
| import java.text.SimpleDateFormat; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.TimeZone; |
| import java.util.concurrent.Callable; |
| import java.util.concurrent.CountDownLatch; |
| import java.util.concurrent.ExecutorService; |
| import java.util.concurrent.Executors; |
| import java.util.concurrent.Future; |
| |
| import org.apache.commons.lang3.StringUtils; |
| import org.apache.commons.logging.Log; |
| import org.apache.commons.logging.LogFactory; |
| import org.apache.hadoop.conf.Configuration; |
| import org.apache.hadoop.fs.FileSystem; |
| import org.apache.hadoop.fs.Path; |
| import org.apache.hadoop.hbase.client.Connection; |
| import org.apache.hadoop.util.ToolRunner; |
| import org.apache.kylin.common.KylinConfig; |
| import org.apache.kylin.common.util.ClassUtil; |
| import org.apache.kylin.common.util.HBaseMetadataTestCase; |
| import org.apache.kylin.common.util.Pair; |
| import org.apache.kylin.cube.CubeInstance; |
| import org.apache.kylin.cube.CubeManager; |
| import org.apache.kylin.cube.CubeSegment; |
| import org.apache.kylin.cube.CubeUpdate; |
| import org.apache.kylin.engine.EngineFactory; |
| import org.apache.kylin.engine.mr.CubingJob; |
| import org.apache.kylin.engine.mr.HadoopUtil; |
| import org.apache.kylin.job.DeployUtil; |
| import org.apache.kylin.job.engine.JobEngineConfig; |
| import org.apache.kylin.job.execution.AbstractExecutable; |
| import org.apache.kylin.job.execution.DefaultChainedExecutable; |
| import org.apache.kylin.job.execution.ExecutableState; |
| import org.apache.kylin.job.impl.threadpool.DefaultScheduler; |
| import org.apache.kylin.job.manager.ExecutableManager; |
| import org.apache.kylin.metadata.model.IEngineAware; |
| import org.apache.kylin.metadata.model.IStorageAware; |
| import org.apache.kylin.storage.hbase.HBaseConnection; |
| import org.apache.kylin.storage.hbase.util.HBaseRegionSizeCalculator; |
| import org.apache.kylin.storage.hbase.util.StorageCleanupJob; |
| import org.apache.kylin.storage.hbase.util.ZookeeperJobLock; |
| |
| import com.google.common.collect.Lists; |
| |
| public class BuildCubeWithEngine { |
| |
| private CubeManager cubeManager; |
| private DefaultScheduler scheduler; |
| protected ExecutableManager jobService; |
| private static boolean fastBuildMode = false; |
| |
| private static final Log logger = LogFactory.getLog(BuildCubeWithEngine.class); |
| |
| public static void main(String[] args) throws Exception { |
| try { |
| beforeClass(); |
| |
| BuildCubeWithEngine buildCubeWithEngine = new BuildCubeWithEngine(); |
| buildCubeWithEngine.before(); |
| buildCubeWithEngine.build(); |
| logger.info("Build is done"); |
| afterClass(); |
| logger.info("Going to exit"); |
| System.exit(0); |
| } catch (Exception e) { |
| logger.error("error", e); |
| System.exit(1); |
| } |
| } |
| |
| public static void beforeClass() throws Exception { |
| logger.info("Adding to classpath: " + new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); |
| ClassUtil.addClasspath(new File(HBaseMetadataTestCase.SANDBOX_TEST_DATA).getAbsolutePath()); |
| |
| String fastModeStr = System.getProperty("fastBuildMode"); |
| if (fastModeStr != null && fastModeStr.equalsIgnoreCase("true")) { |
| fastBuildMode = true; |
| logger.info("Will use fast build mode"); |
| } else { |
| logger.info("Will not use fast build mode"); |
| } |
| |
| System.setProperty(KylinConfig.KYLIN_CONF, HBaseMetadataTestCase.SANDBOX_TEST_DATA); |
| if (StringUtils.isEmpty(System.getProperty("hdp.version"))) { |
| throw new RuntimeException("No hdp.version set; Please set hdp.version in your jvm option, for example: -Dhdp.version=2.2.4.2-2"); |
| } |
| |
| HBaseMetadataTestCase.staticCreateTestMetadata(HBaseMetadataTestCase.SANDBOX_TEST_DATA); |
| |
| try { |
| //check hdfs permission |
| Configuration hconf = HadoopUtil.getCurrentConfiguration(); |
| FileSystem fileSystem = FileSystem.get(hconf); |
| String hdfsWorkingDirectory = KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory(); |
| Path coprocessorDir = new Path(hdfsWorkingDirectory); |
| boolean success = fileSystem.mkdirs(coprocessorDir); |
| if (!success) { |
| throw new IOException("mkdir fails"); |
| } |
| } catch (IOException e) { |
| throw new RuntimeException("failed to create kylin.hdfs.working.dir, Please make sure the user has right to access " + KylinConfig.getInstanceFromEnv().getHdfsWorkingDirectory(), e); |
| } |
| } |
| |
| protected void deployEnv() throws IOException { |
| DeployUtil.initCliWorkDir(); |
| DeployUtil.deployMetadata(); |
| DeployUtil.overrideJobJarLocations(); |
| } |
| |
| public void before() throws Exception { |
| deployEnv(); |
| |
| final KylinConfig kylinConfig = KylinConfig.getInstanceFromEnv(); |
| jobService = ExecutableManager.getInstance(kylinConfig); |
| scheduler = DefaultScheduler.getInstance(); |
| scheduler.init(new JobEngineConfig(kylinConfig), new ZookeeperJobLock()); |
| if (!scheduler.hasStarted()) { |
| throw new RuntimeException("scheduler has not been started"); |
| } |
| cubeManager = CubeManager.getInstance(kylinConfig); |
| for (String jobId : jobService.getAllJobIds()) { |
| if (jobService.getJob(jobId) instanceof CubingJob) { |
| jobService.deleteJob(jobId); |
| } |
| } |
| |
| } |
| |
| public static void afterClass() { |
| HBaseMetadataTestCase.staticCleanupTestMetadata(); |
| } |
| |
| public void build() throws Exception { |
| DeployUtil.prepareTestDataForNormalCubes("test_kylin_cube_with_slr_empty"); |
| KylinConfig.getInstanceFromEnv().setHBaseHFileSizeGB(1.0f); |
| testInner(); |
| testLeft(); |
| testViewAsLookup(); |
| KylinConfig.getInstanceFromEnv().setHBaseHFileSizeGB(0.0f); |
| } |
| |
| protected void waitForJob(String jobId) { |
| while (true) { |
| AbstractExecutable job = jobService.getJob(jobId); |
| if (job.getStatus() == ExecutableState.SUCCEED || job.getStatus() == ExecutableState.ERROR) { |
| break; |
| } else { |
| try { |
| Thread.sleep(5000); |
| } catch (InterruptedException e) { |
| e.printStackTrace(); |
| } |
| } |
| } |
| } |
| |
| private void testInner() throws Exception { |
| String[] testCase = new String[] { "testInnerJoinCubeWithoutSlr", "testInnerJoinCubeWithSlr" }; |
| runTestAndAssertSucceed(testCase); |
| } |
| |
| private void testLeft() throws Exception { |
| String[] testCase = new String[] { "testLeftJoinCubeWithSlr", "testLeftJoinCubeWithoutSlr" }; |
| runTestAndAssertSucceed(testCase); |
| } |
| |
| private void testViewAsLookup() throws Exception { |
| String[] testCase = new String[] { "testLeftJoinCubeWithView" }; |
| runTestAndAssertSucceed(testCase); |
| } |
| |
| |
| private void runTestAndAssertSucceed(String[] testCase) throws Exception { |
| ExecutorService executorService = Executors.newFixedThreadPool(testCase.length); |
| final CountDownLatch countDownLatch = new CountDownLatch(testCase.length); |
| List<Future<List<String>>> tasks = Lists.newArrayListWithExpectedSize(testCase.length); |
| for (int i = 0; i < testCase.length; i++) { |
| tasks.add(executorService.submit(new TestCallable(testCase[i], countDownLatch))); |
| } |
| countDownLatch.await(); |
| try { |
| for (int i = 0; i < tasks.size(); ++i) { |
| Future<List<String>> task = tasks.get(i); |
| final List<String> jobIds = task.get(); |
| for (String jobId : jobIds) { |
| assertJobSucceed(jobId); |
| } |
| } |
| } catch (Exception ex) { |
| logger.error(ex); |
| throw ex; |
| } |
| } |
| |
| private void assertJobSucceed(String jobId) { |
| if (jobService.getOutput(jobId).getState() != ExecutableState.SUCCEED) { |
| throw new RuntimeException("The job '" + jobId + "' is failed."); |
| } |
| } |
| |
| private class TestCallable implements Callable<List<String>> { |
| |
| private final String methodName; |
| private final CountDownLatch countDownLatch; |
| |
| public TestCallable(String methodName, CountDownLatch countDownLatch) { |
| this.methodName = methodName; |
| this.countDownLatch = countDownLatch; |
| } |
| |
| @SuppressWarnings("unchecked") |
| @Override |
| public List<String> call() throws Exception { |
| try { |
| final Method method = BuildCubeWithEngine.class.getDeclaredMethod(methodName); |
| method.setAccessible(true); |
| return (List<String>) method.invoke(BuildCubeWithEngine.this); |
| } catch (Exception e) { |
| logger.error(e.getMessage()); |
| throw e; |
| } finally { |
| countDownLatch.countDown(); |
| } |
| } |
| } |
| |
| @SuppressWarnings("unused") |
| // called by reflection |
| private List<String> testInnerJoinCubeWithSlr() throws Exception { |
| final String cubeName = "test_kylin_cube_with_slr_empty"; |
| clearSegment(cubeName); |
| SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); |
| f.setTimeZone(TimeZone.getTimeZone("GMT")); |
| long date1 = 0; |
| long date2 = f.parse("2015-01-01").getTime(); |
| long date3 = f.parse("2022-01-01").getTime(); |
| List<String> result = Lists.newArrayList(); |
| |
| if (fastBuildMode) { |
| result.add(buildSegment(cubeName, date1, date3)); |
| } else { |
| result.add(buildSegment(cubeName, date1, date2)); |
| result.add(buildSegment(cubeName, date2, date3));//empty segment |
| } |
| return result; |
| } |
| |
| @SuppressWarnings("unused") |
| // called by reflection |
| private List<String> testInnerJoinCubeWithoutSlr() throws Exception { |
| |
| final String cubeName = "test_kylin_cube_without_slr_empty"; |
| clearSegment(cubeName); |
| SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); |
| f.setTimeZone(TimeZone.getTimeZone("GMT")); |
| long date1 = 0; |
| long date2 = f.parse("2013-01-01").getTime(); |
| long date3 = f.parse("2013-07-01").getTime(); |
| long date4 = f.parse("2022-01-01").getTime(); |
| List<String> result = Lists.newArrayList(); |
| |
| if (fastBuildMode) { |
| result.add(buildSegment(cubeName, date1, date4)); |
| } else { |
| result.add(buildSegment(cubeName, date1, date2)); |
| result.add(buildSegment(cubeName, date2, date3)); |
| result.add(buildSegment(cubeName, date3, date4)); |
| result.add(mergeSegment(cubeName, date1, date3));//don't merge all segments |
| } |
| return result; |
| |
| } |
| |
| @SuppressWarnings("unused") |
| // called by reflection |
| private List<String> testLeftJoinCubeWithoutSlr() throws Exception { |
| SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); |
| f.setTimeZone(TimeZone.getTimeZone("GMT")); |
| List<String> result = Lists.newArrayList(); |
| final String cubeName = "test_kylin_cube_without_slr_left_join_empty"; |
| clearSegment(cubeName); |
| |
| long date1 = cubeManager.getCube(cubeName).getDescriptor().getPartitionDateStart(); |
| long date2 = f.parse("2012-06-01").getTime(); |
| long date3 = f.parse("2022-01-01").getTime(); |
| long date4 = f.parse("2023-01-01").getTime(); |
| |
| if (fastBuildMode) { |
| result.add(buildSegment(cubeName, date1, date4)); |
| } else { |
| result.add(buildSegment(cubeName, date1, date2)); |
| result.add(buildSegment(cubeName, date2, date3)); |
| result.add(buildSegment(cubeName, date3, date4));//empty segment |
| result.add(mergeSegment(cubeName, date1, date3));//don't merge all segments |
| } |
| |
| return result; |
| |
| } |
| |
| |
| @SuppressWarnings("unused") |
| // called by reflection |
| private List<String> testLeftJoinCubeWithView() throws Exception { |
| SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); |
| f.setTimeZone(TimeZone.getTimeZone("GMT")); |
| List<String> result = Lists.newArrayList(); |
| final String cubeName = "test_kylin_cube_with_view_empty"; |
| clearSegment(cubeName); |
| |
| long date1 = cubeManager.getCube(cubeName).getDescriptor().getPartitionDateStart(); |
| long date4 = f.parse("2023-01-01").getTime(); |
| |
| result.add(buildSegment(cubeName, date1, date4)); |
| |
| return result; |
| |
| } |
| |
| @SuppressWarnings("unused") |
| // called by reflection |
| private List<String> testLeftJoinCubeWithSlr() throws Exception { |
| String cubeName = "test_kylin_cube_with_slr_left_join_empty"; |
| clearSegment(cubeName); |
| |
| SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd"); |
| f.setTimeZone(TimeZone.getTimeZone("GMT")); |
| long date1 = cubeManager.getCube(cubeName).getDescriptor().getPartitionDateStart(); |
| long date2 = f.parse("2013-01-01").getTime(); |
| long date3 = f.parse("2013-07-01").getTime(); |
| long date4 = f.parse("2022-01-01").getTime(); |
| |
| List<String> result = Lists.newArrayList(); |
| if (fastBuildMode) { |
| result.add(buildSegment(cubeName, date1, date4)); |
| } else { |
| result.add(buildSegment(cubeName, date1, date2)); |
| result.add(buildSegment(cubeName, date2, date3)); |
| result.add(buildSegment(cubeName, date3, date4)); |
| result.add(mergeSegment(cubeName, date1, date3));//don't merge all segments |
| } |
| return result; |
| |
| } |
| |
| private void clearSegment(String cubeName) throws Exception { |
| CubeInstance cube = cubeManager.getCube(cubeName); |
| // remove all existing segments |
| CubeUpdate cubeBuilder = new CubeUpdate(cube); |
| cubeBuilder.setToRemoveSegs(cube.getSegments().toArray(new CubeSegment[cube.getSegments().size()])); |
| cubeManager.updateCube(cubeBuilder); |
| } |
| |
| private String mergeSegment(String cubeName, long startDate, long endDate) throws Exception { |
| CubeSegment segment = cubeManager.mergeSegments(cubeManager.getCube(cubeName), startDate, endDate, true); |
| DefaultChainedExecutable job = EngineFactory.createBatchMergeJob(segment, "TEST"); |
| jobService.addJob(job); |
| waitForJob(job.getId()); |
| return job.getId(); |
| } |
| |
| private String buildSegment(String cubeName, long startDate, long endDate) throws Exception { |
| CubeSegment segment = cubeManager.appendSegments(cubeManager.getCube(cubeName), endDate); |
| DefaultChainedExecutable job = EngineFactory.createBatchCubingJob(segment, "TEST"); |
| jobService.addJob(job); |
| waitForJob(job.getId()); |
| if (segment.getCubeDesc().getEngineType() == IEngineAware.ID_MR_V1 |
| || segment.getCubeDesc().getStorageType() == IStorageAware.ID_SHARDED_HBASE) { |
| checkHFilesInHBase(segment); |
| } |
| return job.getId(); |
| } |
| |
| private int cleanupOldStorage() throws Exception { |
| String[] args = { "--delete", "true" }; |
| |
| int exitCode = ToolRunner.run(new StorageCleanupJob(), args); |
| return exitCode; |
| } |
| |
| private void checkHFilesInHBase(CubeSegment segment) throws IOException { |
| Connection conn = HBaseConnection.get(KylinConfig.getInstanceFromEnv().getStorageUrl()); |
| String tableName = segment.getStorageLocationIdentifier(); |
| |
| HBaseRegionSizeCalculator cal = new HBaseRegionSizeCalculator(tableName, conn); |
| Map<byte[], Long> sizeMap = cal.getRegionSizeMap(); |
| long totalSize = 0; |
| for (Long size : sizeMap.values()) { |
| totalSize += size; |
| } |
| if (totalSize == 0) { |
| return; |
| } |
| |
| Map<byte[], Pair<Integer, Integer>> countMap = cal.getRegionHFileCountMap(); |
| // check if there's region contains more than one hfile, which means the hfile config take effects |
| boolean hasMultiHFileRegions = false; |
| for (Pair<Integer, Integer> count : countMap.values()) { |
| // check if hfile count is greater than store count |
| if (count.getSecond() > count.getFirst()) { |
| hasMultiHFileRegions = true; |
| break; |
| } |
| } |
| if (KylinConfig.getInstanceFromEnv().getHBaseHFileSizeGB() == 0 && hasMultiHFileRegions) { |
| throw new IOException("hfile size set to 0, but found region contains more than one hfiles"); |
| } else if (KylinConfig.getInstanceFromEnv().getHBaseHFileSizeGB() > 0 && !hasMultiHFileRegions) { |
| throw new IOException("hfile size set greater than 0, but all regions still has only one hfile"); |
| } |
| } |
| |
| } |