blob: 1db194baec1a3ac5c6939c1769ba8db665e0852b [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.druid.data.input.impl;
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.google.common.collect.ImmutableList;
import nl.jqno.equalsverifier.EqualsVerifier;
import org.apache.druid.data.input.AbstractInputSource;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputSource;
import org.apache.druid.data.input.InputSplit;
import org.apache.druid.data.input.MaxSizeSplitHintSpec;
import org.apache.druid.data.input.SplitHintSpec;
import org.apache.druid.java.util.common.HumanReadableBytes;
import org.apache.druid.java.util.common.Pair;
import org.easymock.EasyMock;
import org.junit.Assert;
import org.junit.Test;
import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
public class CombiningInputSourceTest
{
@Test
public void testSerde() throws IOException
{
final ObjectMapper mapper = new ObjectMapper();
mapper.registerModule(new SimpleModule("test-module").registerSubtypes(TestFileInputSource.class, TestUriInputSource.class));
final TestFileInputSource fileSource = new TestFileInputSource(ImmutableList.of(new File("myFile").getAbsoluteFile()));
final TestUriInputSource uriInputSource = new TestUriInputSource(
ImmutableList.of(URI.create("http://test.com/http-test")));
final CombiningInputSource combiningInputSource = new CombiningInputSource(ImmutableList.of(
fileSource,
uriInputSource
));
final byte[] json = mapper.writeValueAsBytes(combiningInputSource);
final CombiningInputSource fromJson = (CombiningInputSource) mapper.readValue(json, InputSource.class);
Assert.assertEquals(combiningInputSource, fromJson);
}
@Test
public void testEstimateNumSplits()
{
final File file = EasyMock.niceMock(File.class);
EasyMock.expect(file.length()).andReturn(5L).anyTimes();
EasyMock.replay(file);
final TestFileInputSource fileSource = new TestFileInputSource(generateFiles(3));
final TestUriInputSource uriInputSource = new TestUriInputSource(
ImmutableList.of(
URI.create("http://test.com/http-test1"),
URI.create("http://test.com/http-test2"),
URI.create("http://test.com/http-test3")
)
);
final CombiningInputSource combiningInputSource = new CombiningInputSource(ImmutableList.of(
fileSource,
uriInputSource
));
Assert.assertEquals(combiningInputSource.estimateNumSplits(
new NoopInputFormat(),
new MaxSizeSplitHintSpec(
new HumanReadableBytes(5L),
null
)
), 6);
}
@Test
public void testCreateSplits()
{
final File file = EasyMock.niceMock(File.class);
EasyMock.expect(file.length()).andReturn(30L).anyTimes();
EasyMock.replay(file);
final TestFileInputSource fileSource = new TestFileInputSource(generateFiles(3));
final TestUriInputSource uriInputSource = new TestUriInputSource(
ImmutableList.of(
URI.create("http://test.com/http-test3"),
URI.create("http://test.com/http-test4"),
URI.create("http://test.com/http-test5")
)
);
final CombiningInputSource combiningInputSource = new CombiningInputSource(ImmutableList.of(
fileSource,
uriInputSource
));
List<InputSplit> combinedInputSplits = combiningInputSource.createSplits(
new NoopInputFormat(),
new MaxSizeSplitHintSpec(
new HumanReadableBytes(5L),
null
)
).collect(Collectors.toList());
Assert.assertEquals(6, combinedInputSplits.size());
for (int i = 0; i < 3; i++) {
Pair<SplittableInputSource, InputSplit> splitPair = (Pair) combinedInputSplits.get(i).get();
InputSplit<File> fileSplits = splitPair.rhs;
Assert.assertTrue(splitPair.lhs instanceof TestFileInputSource);
Assert.assertEquals(5, fileSplits.get().length());
}
for (int i = 3; i < combinedInputSplits.size(); i++) {
Pair<SplittableInputSource, InputSplit> splitPair = (Pair) combinedInputSplits.get(i).get();
InputSplit<URI> fileSplits = splitPair.rhs;
Assert.assertTrue(splitPair.lhs instanceof TestUriInputSource);
Assert.assertEquals(URI.create("http://test.com/http-test" + i), fileSplits.get());
}
}
@Test
public void testWithSplits()
{
final TestUriInputSource uriInputSource = new TestUriInputSource(
ImmutableList.of(
URI.create("http://test.com/http-test1"))
);
final CombiningInputSource combiningInputSource = new CombiningInputSource(ImmutableList.of(
uriInputSource
));
InputSplit<URI> testUriSplit = new InputSplit<>(URI.create("http://test.com/http-test1"));
TestUriInputSource urlInputSourceWithSplit = (TestUriInputSource) combiningInputSource.withSplit(new InputSplit(Pair.of(
uriInputSource,
testUriSplit)));
Assert.assertEquals(uriInputSource, urlInputSourceWithSplit);
}
@Test
public void testNeedsFormat()
{
final TestUriInputSource uriInputSource = new TestUriInputSource(
ImmutableList.of(
URI.create("http://test.com/http-test1")
)
);
final TestFileInputSource fileSource = new TestFileInputSource(generateFiles(3));
final CombiningInputSource combiningInputSource = new CombiningInputSource(ImmutableList.of(
uriInputSource,
fileSource
));
Assert.assertTrue(combiningInputSource.needsFormat());
}
@Test
public void testEquals()
{
EqualsVerifier.forClass(CombiningInputSource.class)
.withNonnullFields("delegates")
.usingGetClass()
.verify();
}
private static List<File> generateFiles(int numFiles)
{
final List<File> files = new ArrayList<>();
for (int i = 0; i < numFiles; i++) {
final File file = EasyMock.niceMock(File.class);
EasyMock.expect(file.length()).andReturn(5L).anyTimes();
EasyMock.replay(file);
files.add(file);
}
return files;
}
private static class TestFileInputSource extends AbstractInputSource implements SplittableInputSource<File>
{
private final List<File> files;
@JsonCreator
private TestFileInputSource(@JsonProperty("files") List<File> fileList)
{
files = fileList;
}
@JsonProperty
public List<File> getFiles()
{
return files;
}
@Override
public Stream<InputSplit<File>> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
{
return files.stream().map(InputSplit::new);
}
@Override
public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
{
return files.size();
}
@Override
public SplittableInputSource<File> withSplit(InputSplit<File> split)
{
return new TestFileInputSource(ImmutableList.of(split.get()));
}
@Override
public boolean needsFormat()
{
return true;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TestFileInputSource that = (TestFileInputSource) o;
return Objects.equals(files, that.files);
}
@Override
public int hashCode()
{
return Objects.hash(files);
}
}
private static class TestUriInputSource extends AbstractInputSource implements SplittableInputSource<URI>
{
private final List<URI> uris;
@JsonCreator
private TestUriInputSource(@JsonProperty("uris") List<URI> uriList)
{
uris = uriList;
}
@JsonProperty
public List<URI> getUris()
{
return uris;
}
@Override
public Stream<InputSplit<URI>> createSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
{
return uris.stream().map(InputSplit::new);
}
@Override
public int estimateNumSplits(InputFormat inputFormat, @Nullable SplitHintSpec splitHintSpec)
{
return uris.size();
}
@Override
public SplittableInputSource<URI> withSplit(InputSplit<URI> split)
{
return new TestUriInputSource(ImmutableList.of(split.get()));
}
@Override
public boolean needsFormat()
{
return false;
}
@Override
public boolean equals(Object o)
{
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
TestUriInputSource that = (TestUriInputSource) o;
return Objects.equals(uris, that.uris);
}
@Override
public int hashCode()
{
return Objects.hash(uris);
}
}
}