[DL][service] Create stream operation should not be submitted by StreamImpl
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
index 677ade5..5c5b5af 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
@@ -28,6 +28,7 @@
import com.twitter.distributedlog.client.resolver.RegionResolver;
import com.twitter.distributedlog.client.routing.RoutingService;
import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
+import com.twitter.distributedlog.exceptions.DLException;
import com.twitter.distributedlog.exceptions.RegionUnavailableException;
import com.twitter.distributedlog.exceptions.ServiceUnavailableException;
import com.twitter.distributedlog.exceptions.StreamUnavailableException;
@@ -38,8 +39,8 @@
import com.twitter.distributedlog.service.config.ServerConfiguration;
import com.twitter.distributedlog.service.config.StreamConfigProvider;
import com.twitter.distributedlog.service.stream.BulkWriteOp;
-import com.twitter.distributedlog.service.stream.CreateOp;
import com.twitter.distributedlog.service.stream.DeleteOp;
+import com.twitter.distributedlog.service.stream.admin.CreateOp;
import com.twitter.distributedlog.service.stream.HeartbeatOp;
import com.twitter.distributedlog.service.stream.ReleaseOp;
import com.twitter.distributedlog.service.stream.Stream;
@@ -50,8 +51,9 @@
import com.twitter.distributedlog.service.stream.StreamOp;
import com.twitter.distributedlog.service.stream.StreamOpStats;
import com.twitter.distributedlog.service.stream.TruncateOp;
-import com.twitter.distributedlog.service.stream.WriteOpWithPayload;
import com.twitter.distributedlog.service.stream.WriteOp;
+import com.twitter.distributedlog.service.stream.WriteOpWithPayload;
+import com.twitter.distributedlog.service.stream.admin.StreamAdminOp;
import com.twitter.distributedlog.service.stream.limiter.ServiceRequestLimiter;
import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
import com.twitter.distributedlog.service.utils.ServerUtils;
@@ -485,8 +487,7 @@
@Override
public Future<WriteResponse> create(String stream, WriteContext ctx) {
CreateOp op = new CreateOp(stream, statsLogger, streamManager, getChecksum(ctx), featureChecksumDisabled);
- executeStreamOp(op);
- return op.result();
+ return executeStreamAdminOp(op);
}
//
@@ -574,6 +575,15 @@
return ctx.isSetCrc32() ? ctx.getCrc32() : null;
}
+ private Future<WriteResponse> executeStreamAdminOp(final StreamAdminOp op) {
+ try {
+ op.preExecute();
+ } catch (DLException dle) {
+ return Future.exception(dle);
+ }
+ return op.execute();
+ }
+
private void executeStreamOp(final StreamOp op) {
// Must attach this as early as possible--returning before this point will cause us to
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java
index 82597f3..fbef587 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java
@@ -158,15 +158,15 @@
// fail the result with the given response header
protected abstract void fail(ResponseHeader header);
- protected static OpStatsLogger requestStat(StatsLogger statsLogger, String opName) {
+ public static OpStatsLogger requestStat(StatsLogger statsLogger, String opName) {
return requestLogger(statsLogger).getOpStatsLogger(opName);
}
- protected static StatsLogger requestLogger(StatsLogger statsLogger) {
+ public static StatsLogger requestLogger(StatsLogger statsLogger) {
return statsLogger.scope("request");
}
- protected static StatsLogger requestScope(StatsLogger statsLogger, String scope) {
+ public static StatsLogger requestScope(StatsLogger statsLogger, String scope) {
return requestLogger(statsLogger).scope(scope);
}
}
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/AdminOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/AdminOp.java
new file mode 100644
index 0000000..7ac4986
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/AdminOp.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.stream.admin;
+
+import com.twitter.distributedlog.exceptions.DLException;
+import com.twitter.util.Future;
+
+/**
+ * An admin operation
+ */
+public interface AdminOp<Response> {
+
+ /**
+ * Invoked before the stream op is executed.
+ */
+ void preExecute() throws DLException;
+
+ /**
+ * Execute the operation.
+ *
+ * @return the future represents the response of the operation
+ */
+ Future<Response> execute();
+
+}
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/CreateOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/CreateOp.java
similarity index 73%
rename from distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/CreateOp.java
rename to distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/CreateOp.java
index 96b8435..2e1f490 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/CreateOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/CreateOp.java
@@ -15,34 +15,35 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package com.twitter.distributedlog.service.stream;
+package com.twitter.distributedlog.service.stream.admin;
-import com.twitter.distributedlog.AsyncLogWriter;
import com.twitter.distributedlog.service.ResponseUtils;
+import com.twitter.distributedlog.service.stream.StreamManager;
import com.twitter.distributedlog.thrift.service.WriteResponse;
-import com.twitter.distributedlog.util.Sequencer;
import com.twitter.util.Future;
import org.apache.bookkeeper.feature.Feature;
import org.apache.bookkeeper.stats.StatsLogger;
import scala.runtime.AbstractFunction1;
-public class CreateOp extends AbstractWriteOp {
- private final StreamManager streamManager;
+import static com.twitter.distributedlog.service.stream.AbstractStreamOp.requestStat;
+
+public class CreateOp extends StreamAdminOp {
public CreateOp(String stream,
StatsLogger statsLogger,
StreamManager streamManager,
Long checksum,
Feature checksumEnabledFeature) {
- super(stream, requestStat(statsLogger, "create"), checksum, checksumEnabledFeature);
- this.streamManager = streamManager;
+ super(stream,
+ streamManager,
+ requestStat(statsLogger, "create"),
+ checksum,
+ checksumEnabledFeature);
}
@Override
- protected Future<WriteResponse> executeOp(AsyncLogWriter writer,
- Sequencer sequencer,
- Object txnLock) {
- Future<Void> result = streamManager.createStreamAsync(streamName());
+ protected Future<WriteResponse> executeOp() {
+ Future<Void> result = streamManager.createStreamAsync(stream);
return result.map(new AbstractFunction1<Void, WriteResponse>() {
@Override
public WriteResponse apply(Void value) {
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/StreamAdminOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/StreamAdminOp.java
new file mode 100644
index 0000000..37c6e14
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/StreamAdminOp.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.stream.admin;
+
+import com.google.common.base.Stopwatch;
+import com.twitter.distributedlog.exceptions.ChecksumFailedException;
+import com.twitter.distributedlog.exceptions.DLException;
+import com.twitter.distributedlog.service.ResponseUtils;
+import com.twitter.distributedlog.service.stream.StreamManager;
+import com.twitter.distributedlog.thrift.service.WriteResponse;
+import com.twitter.distributedlog.util.ProtocolUtils;
+import com.twitter.util.Future;
+import com.twitter.util.FutureTransformer;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Stream admin op
+ */
+public abstract class StreamAdminOp implements AdminOp<WriteResponse> {
+
+ protected final String stream;
+ protected final StreamManager streamManager;
+ protected final OpStatsLogger opStatsLogger;
+ protected final Stopwatch stopwatch = Stopwatch.createUnstarted();
+ protected final Long checksum;
+ protected final Feature checksumDisabledFeature;
+
+ protected StreamAdminOp(String stream,
+ StreamManager streamManager,
+ OpStatsLogger statsLogger,
+ Long checksum,
+ Feature checksumDisabledFeature) {
+ this.stream = stream;
+ this.streamManager = streamManager;
+ this.opStatsLogger = statsLogger;
+ // start here in case the operation is failed before executing.
+ stopwatch.reset().start();
+ this.checksum = checksum;
+ this.checksumDisabledFeature = checksumDisabledFeature;
+ }
+
+ protected Long computeChecksum() {
+ return ProtocolUtils.streamOpCRC32(stream);
+ }
+
+ @Override
+ public void preExecute() throws DLException {
+ if (!checksumDisabledFeature.isAvailable() && null != checksum) {
+ Long serverChecksum = computeChecksum();
+ if (null != serverChecksum && !checksum.equals(serverChecksum)) {
+ throw new ChecksumFailedException();
+ }
+ }
+ }
+
+ /**
+ * Execute the operation.
+ *
+ * @return execute operation
+ */
+ protected abstract Future<WriteResponse> executeOp();
+
+ @Override
+ public Future<WriteResponse> execute() {
+ return executeOp().transformedBy(new FutureTransformer<WriteResponse, WriteResponse>() {
+
+ @Override
+ public WriteResponse map(WriteResponse response) {
+ opStatsLogger.registerSuccessfulEvent(
+ stopwatch.elapsed(TimeUnit.MICROSECONDS));
+ return response;
+ }
+
+ @Override
+ public WriteResponse handle(Throwable cause) {
+ opStatsLogger.registerFailedEvent(
+ stopwatch.elapsed(TimeUnit.MICROSECONDS));
+ return ResponseUtils.write(ResponseUtils.exceptionToHeader(cause));
+ }
+
+ });
+ }
+}