[DL][service] Create stream operation should not be submitted by StreamImpl
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
index 677ade5..5c5b5af 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/DistributedLogServiceImpl.java
@@ -28,6 +28,7 @@
 import com.twitter.distributedlog.client.resolver.RegionResolver;
 import com.twitter.distributedlog.client.routing.RoutingService;
 import com.twitter.distributedlog.config.DynamicDistributedLogConfiguration;
+import com.twitter.distributedlog.exceptions.DLException;
 import com.twitter.distributedlog.exceptions.RegionUnavailableException;
 import com.twitter.distributedlog.exceptions.ServiceUnavailableException;
 import com.twitter.distributedlog.exceptions.StreamUnavailableException;
@@ -38,8 +39,8 @@
 import com.twitter.distributedlog.service.config.ServerConfiguration;
 import com.twitter.distributedlog.service.config.StreamConfigProvider;
 import com.twitter.distributedlog.service.stream.BulkWriteOp;
-import com.twitter.distributedlog.service.stream.CreateOp;
 import com.twitter.distributedlog.service.stream.DeleteOp;
+import com.twitter.distributedlog.service.stream.admin.CreateOp;
 import com.twitter.distributedlog.service.stream.HeartbeatOp;
 import com.twitter.distributedlog.service.stream.ReleaseOp;
 import com.twitter.distributedlog.service.stream.Stream;
@@ -50,8 +51,9 @@
 import com.twitter.distributedlog.service.stream.StreamOp;
 import com.twitter.distributedlog.service.stream.StreamOpStats;
 import com.twitter.distributedlog.service.stream.TruncateOp;
-import com.twitter.distributedlog.service.stream.WriteOpWithPayload;
 import com.twitter.distributedlog.service.stream.WriteOp;
+import com.twitter.distributedlog.service.stream.WriteOpWithPayload;
+import com.twitter.distributedlog.service.stream.admin.StreamAdminOp;
 import com.twitter.distributedlog.service.stream.limiter.ServiceRequestLimiter;
 import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
 import com.twitter.distributedlog.service.utils.ServerUtils;
@@ -485,8 +487,7 @@
     @Override
     public Future<WriteResponse> create(String stream, WriteContext ctx) {
         CreateOp op = new CreateOp(stream, statsLogger, streamManager, getChecksum(ctx), featureChecksumDisabled);
-        executeStreamOp(op);
-        return op.result();
+        return executeStreamAdminOp(op);
     }
 
     //
@@ -574,6 +575,15 @@
         return ctx.isSetCrc32() ? ctx.getCrc32() : null;
     }
 
+    private Future<WriteResponse> executeStreamAdminOp(final StreamAdminOp op) {
+        try {
+            op.preExecute();
+        } catch (DLException dle) {
+            return Future.exception(dle);
+        }
+        return op.execute();
+    }
+
     private void executeStreamOp(final StreamOp op) {
 
         // Must attach this as early as possible--returning before this point will cause us to
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java
index 82597f3..fbef587 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/AbstractStreamOp.java
@@ -158,15 +158,15 @@
     // fail the result with the given response header
     protected abstract void fail(ResponseHeader header);
 
-    protected static OpStatsLogger requestStat(StatsLogger statsLogger, String opName) {
+    public static OpStatsLogger requestStat(StatsLogger statsLogger, String opName) {
         return requestLogger(statsLogger).getOpStatsLogger(opName);
     }
 
-    protected static StatsLogger requestLogger(StatsLogger statsLogger) {
+    public static StatsLogger requestLogger(StatsLogger statsLogger) {
         return statsLogger.scope("request");
     }
 
-    protected static StatsLogger requestScope(StatsLogger statsLogger, String scope) {
+    public static StatsLogger requestScope(StatsLogger statsLogger, String scope) {
         return requestLogger(statsLogger).scope(scope);
     }
 }
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/AdminOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/AdminOp.java
new file mode 100644
index 0000000..7ac4986
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/AdminOp.java
@@ -0,0 +1,40 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.stream.admin;
+
+import com.twitter.distributedlog.exceptions.DLException;
+import com.twitter.util.Future;
+
+/**
+ * An admin operation
+ */
+public interface AdminOp<Response> {
+
+    /**
+     * Invoked before the stream op is executed.
+     */
+    void preExecute() throws DLException;
+
+    /**
+     * Execute the operation.
+     *
+     * @return the future represents the response of the operation
+     */
+    Future<Response> execute();
+
+}
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/CreateOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/CreateOp.java
similarity index 73%
rename from distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/CreateOp.java
rename to distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/CreateOp.java
index 96b8435..2e1f490 100644
--- a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/CreateOp.java
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/CreateOp.java
@@ -15,34 +15,35 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-package com.twitter.distributedlog.service.stream;
+package com.twitter.distributedlog.service.stream.admin;
 
-import com.twitter.distributedlog.AsyncLogWriter;
 import com.twitter.distributedlog.service.ResponseUtils;
+import com.twitter.distributedlog.service.stream.StreamManager;
 import com.twitter.distributedlog.thrift.service.WriteResponse;
-import com.twitter.distributedlog.util.Sequencer;
 import com.twitter.util.Future;
 import org.apache.bookkeeper.feature.Feature;
 import org.apache.bookkeeper.stats.StatsLogger;
 import scala.runtime.AbstractFunction1;
 
-public class CreateOp extends AbstractWriteOp {
-  private final StreamManager streamManager;
+import static com.twitter.distributedlog.service.stream.AbstractStreamOp.requestStat;
+
+public class CreateOp extends StreamAdminOp {
 
   public CreateOp(String stream,
                   StatsLogger statsLogger,
                   StreamManager streamManager,
                   Long checksum,
                   Feature checksumEnabledFeature) {
-    super(stream, requestStat(statsLogger, "create"), checksum, checksumEnabledFeature);
-    this.streamManager = streamManager;
+    super(stream,
+            streamManager,
+            requestStat(statsLogger, "create"),
+            checksum,
+            checksumEnabledFeature);
   }
 
   @Override
-  protected Future<WriteResponse> executeOp(AsyncLogWriter writer,
-                                            Sequencer sequencer,
-                                            Object txnLock) {
-    Future<Void> result = streamManager.createStreamAsync(streamName());
+  protected Future<WriteResponse> executeOp() {
+    Future<Void> result = streamManager.createStreamAsync(stream);
     return result.map(new AbstractFunction1<Void, WriteResponse>() {
       @Override
       public WriteResponse apply(Void value) {
diff --git a/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/StreamAdminOp.java b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/StreamAdminOp.java
new file mode 100644
index 0000000..37c6e14
--- /dev/null
+++ b/distributedlog-service/src/main/java/com/twitter/distributedlog/service/stream/admin/StreamAdminOp.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.twitter.distributedlog.service.stream.admin;
+
+import com.google.common.base.Stopwatch;
+import com.twitter.distributedlog.exceptions.ChecksumFailedException;
+import com.twitter.distributedlog.exceptions.DLException;
+import com.twitter.distributedlog.service.ResponseUtils;
+import com.twitter.distributedlog.service.stream.StreamManager;
+import com.twitter.distributedlog.thrift.service.WriteResponse;
+import com.twitter.distributedlog.util.ProtocolUtils;
+import com.twitter.util.Future;
+import com.twitter.util.FutureTransformer;
+import org.apache.bookkeeper.feature.Feature;
+import org.apache.bookkeeper.stats.OpStatsLogger;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Stream admin op
+ */
+public abstract class StreamAdminOp implements AdminOp<WriteResponse> {
+
+    protected final String stream;
+    protected final StreamManager streamManager;
+    protected final OpStatsLogger opStatsLogger;
+    protected final Stopwatch stopwatch = Stopwatch.createUnstarted();
+    protected final Long checksum;
+    protected final Feature checksumDisabledFeature;
+
+    protected StreamAdminOp(String stream,
+                            StreamManager streamManager,
+                            OpStatsLogger statsLogger,
+                            Long checksum,
+                            Feature checksumDisabledFeature) {
+        this.stream = stream;
+        this.streamManager = streamManager;
+        this.opStatsLogger = statsLogger;
+        // start here in case the operation is failed before executing.
+        stopwatch.reset().start();
+        this.checksum = checksum;
+        this.checksumDisabledFeature = checksumDisabledFeature;
+    }
+
+    protected Long computeChecksum() {
+        return ProtocolUtils.streamOpCRC32(stream);
+    }
+
+    @Override
+    public void preExecute() throws DLException {
+        if (!checksumDisabledFeature.isAvailable() && null != checksum) {
+            Long serverChecksum = computeChecksum();
+            if (null != serverChecksum && !checksum.equals(serverChecksum)) {
+                throw new ChecksumFailedException();
+            }
+        }
+    }
+
+    /**
+     * Execute the operation.
+     *
+     * @return execute operation
+     */
+    protected abstract Future<WriteResponse> executeOp();
+
+    @Override
+    public Future<WriteResponse> execute() {
+        return executeOp().transformedBy(new FutureTransformer<WriteResponse, WriteResponse>() {
+
+            @Override
+            public WriteResponse map(WriteResponse response) {
+                opStatsLogger.registerSuccessfulEvent(
+                        stopwatch.elapsed(TimeUnit.MICROSECONDS));
+                return response;
+            }
+
+            @Override
+            public WriteResponse handle(Throwable cause) {
+                opStatsLogger.registerFailedEvent(
+                        stopwatch.elapsed(TimeUnit.MICROSECONDS));
+                return ResponseUtils.write(ResponseUtils.exceptionToHeader(cause));
+            }
+
+        });
+    }
+}