/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.drill.exec.physical.impl.protocol;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;

import java.util.Iterator;

import org.apache.drill.categories.RowSetTests;
import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MinorType;
import org.apache.drill.exec.expr.TypeHelper;
import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.base.PhysicalOperator;
import org.apache.drill.exec.physical.config.Limit;
import org.apache.drill.exec.physical.rowSet.RowSet.SingleRowSet;
import org.apache.drill.exec.proto.UserBitShared.NamePart;
import org.apache.drill.exec.record.BatchSchema;
import org.apache.drill.exec.record.BatchSchema.SelectionVectorMode;
import org.apache.drill.exec.record.BatchSchemaBuilder;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.RecordBatch.IterOutcome;
import org.apache.drill.exec.record.TypedFieldId;
import org.apache.drill.exec.record.VectorContainer;
import org.apache.drill.exec.record.VectorWrapper;
import org.apache.drill.exec.record.metadata.SchemaBuilder;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.vector.IntVector;
import org.apache.drill.exec.vector.VarCharVector;
import org.apache.drill.test.SubOperatorTest;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Test the implementation of the Drill Volcano iterator protocol that
 * wraps the modular operator implementation.
 */

@Category(RowSetTests.class)
public class TestOperatorRecordBatch extends SubOperatorTest {

  private static final Logger logger = LoggerFactory.getLogger(TestOperatorRecordBatch.class);

  /**
   * Mock operator executor that simply tracks each method call
   * and provides a light-weight vector container. Returns a
   * defined number of (batches) with an optional schema change.
   */

  private class MockOperatorExec implements OperatorExec {

    public boolean bindCalled;
    public boolean buildSchemaCalled;
    public int nextCalls = 1;
    public int nextCount;
    public int schemaChangeAt = -1;
    public boolean cancelCalled;
    public boolean closeCalled;
    public boolean schemaEOF;
    private final VectorContainerAccessor batchAccessor;

    public MockOperatorExec() {
      this(mockBatch());
    }

    public MockOperatorExec(VectorContainer container) {
      batchAccessor = new VectorContainerAccessor();
      batchAccessor.addBatch(container);
    }

    public MockOperatorExec(VectorContainerAccessor accessor) {
      batchAccessor = accessor;
    }

    @Override
    public void bind(OperatorContext context) { bindCalled = true; }

    @Override
    public BatchAccessor batchAccessor() {
      return batchAccessor;
    }

    @Override
    public boolean buildSchema() {
      buildSchemaCalled = true;
      return !schemaEOF;
    }

    @Override
    public boolean next() {
      nextCount++;
      if (nextCount > nextCalls) {
        return false;
      }
      if (nextCount == schemaChangeAt) {
        BatchSchemaBuilder newSchema = new BatchSchemaBuilder(batchAccessor.schema());
        newSchema.schemaBuilder()
            .add("b", MinorType.VARCHAR);
        VectorContainer newContainer = new VectorContainer(fixture.allocator(), newSchema.build());
        batchAccessor.addBatch(newContainer);
      }
      return true;
    }

    @Override
    public void cancel() { cancelCalled = true; }

    @Override
    public void close() {
      batchAccessor().container().clear();
      closeCalled = true;
    }
  }

  private static VectorContainer mockBatch() {
    SchemaBuilder schemaBuilder = new SchemaBuilder()
      .add("a", MinorType.INT);
    VectorContainer container = new VectorContainer(fixture.allocator(), new BatchSchemaBuilder()
        .withSchemaBuilder(schemaBuilder)
        .build());
    container.buildSchema(SelectionVectorMode.NONE);
    return container;
  }

  private OperatorRecordBatch makeOpBatch(MockOperatorExec opExec) {
    // Dummy operator definition
    PhysicalOperator popConfig = new Limit(null, 0, 100);
    return new OperatorRecordBatch(fixture.getFragmentContext(), popConfig, opExec, true);
  }

  /**
   * Simulate a normal run: return some batches, encounter a schema change.
   */

  @Test
  public void testNormalLifeCycle() {
    MockOperatorExec opExec = new MockOperatorExec();
    opExec.nextCalls = 2;
    opExec.schemaChangeAt = 2;
    try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {

      assertSame(fixture.getFragmentContext(), opBatch.fragmentContext());
      assertNotNull(opBatch.getContext());

      // First call to next() builds schema

      assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
      assertTrue(opExec.bindCalled);
      assertTrue(opExec.buildSchemaCalled);
      assertEquals(0, opExec.nextCount);

      // Second call returns the first batch

      assertEquals(IterOutcome.OK, opBatch.next());
      assertEquals(1, opExec.nextCount);

      // Third call causes a schema change

      assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
      assertEquals(2, opExec.nextCount);

      // Fourth call reaches EOF

      assertEquals(IterOutcome.NONE, opBatch.next());
      assertEquals(3, opExec.nextCount);

      // Close
    } catch (Exception e) {
      fail();
    }

    assertTrue(opExec.closeCalled);
    assertFalse(opExec.cancelCalled);
  }

  /**
   * Simulate a truncated life cycle: next() is never called. Not a valid part
   * of the protocol; but should be ready anyway.
   */

  @Test
  public void testTruncatedLifeCycle() {
    MockOperatorExec opExec = new MockOperatorExec();
    opExec.schemaEOF = true;

    try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
    } catch (Exception e) {
      fail();
    }
    assertTrue(opExec.bindCalled);
    assertTrue(opExec.closeCalled);
  }

  /**
   * Simulate reaching EOF when trying to create the schema.
   */

  @Test
  public void testSchemaEOF() {
    MockOperatorExec opExec = new MockOperatorExec();
    opExec.schemaEOF = true;

    try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
      assertEquals(IterOutcome.NONE, opBatch.next());
      assertTrue(opExec.buildSchemaCalled);
    } catch (Exception e) {
      fail();
    }
    assertTrue(opExec.closeCalled);
  }

  /**
   * Simulate reaching EOF on the first batch. This simulated data source
   * discovered a schema, but had no data.
   */

  @Test
  public void testFirstBatchEOF() {
    MockOperatorExec opExec = new MockOperatorExec();
    opExec.nextCalls = 0;

    try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
      assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
      assertTrue(opExec.buildSchemaCalled);
      assertEquals(IterOutcome.NONE, opBatch.next());
      assertEquals(1, opExec.nextCount);
    } catch (Exception e) {
      fail();
    }
    assertTrue(opExec.closeCalled);
  }

  /**
   * Simulate the caller failing the operator before getting the schema.
   */

  @Test
  public void testFailEarly() {
    MockOperatorExec opExec = new MockOperatorExec();
    opExec.nextCalls = 2;

    try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
      opBatch.kill(false);
      assertFalse(opExec.buildSchemaCalled);
      assertEquals(0, opExec.nextCount);
      assertFalse(opExec.cancelCalled);
    } catch (Exception e) {
      fail();
    }
    assertTrue(opExec.closeCalled);
  }

  /**
   * Simulate the caller failing the operator before EOF.
   */

  @Test
  public void testFailWhileReading() {
    MockOperatorExec opExec = new MockOperatorExec();
    opExec.nextCalls = 2;

    try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
      assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
      assertEquals(IterOutcome.OK, opBatch.next());
      opBatch.kill(false);
      assertTrue(opExec.cancelCalled);
    } catch (Exception e) {
      fail();
    }
    assertTrue(opExec.closeCalled);
  }

  /**
   * Simulate the caller failing the operator after EOF but before close.
   * This is a silly time to fail, but have to handle it anyway.
   */

  @Test
  public void testFailBeforeClose() {
    MockOperatorExec opExec = new MockOperatorExec();
    opExec.nextCalls = 2;

    try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
      assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
      assertEquals(IterOutcome.OK, opBatch.next());
      assertEquals(IterOutcome.OK, opBatch.next());
      assertEquals(IterOutcome.NONE, opBatch.next());
      opBatch.kill(false);

      // Already hit EOF, so fail won't be passed along.

      assertFalse(opExec.cancelCalled);
    } catch (Exception e) {
      fail();
    }
    assertTrue(opExec.closeCalled);
  }

  /**
   * Simulate the caller failing the operator after close.
   * This is violates the operator protocol, but have to handle it anyway.
   */

  @Test
  public void testFailAfterClose() {
    MockOperatorExec opExec = new MockOperatorExec();
    opExec.nextCalls = 2;

    OperatorRecordBatch opBatch = makeOpBatch(opExec);
    assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
    assertEquals(IterOutcome.OK, opBatch.next());
    assertEquals(IterOutcome.OK, opBatch.next());
    assertEquals(IterOutcome.NONE, opBatch.next());
    try {
      opBatch.close();
    } catch (Exception e) {
      fail();
    }
    assertTrue(opExec.closeCalled);
    opBatch.kill(false);
    assertFalse(opExec.cancelCalled);
  }

  /**
   * The record batch abstraction has a bunch of methods to work with a vector container.
   * Rather than simply exposing the container itself, the batch instead exposes various
   * container operations. Probably an artifact of its history. In any event, make
   * sure those methods are passed through to the container accessor.
   */

  @Test
  public void testBatchAccessor() {
    SchemaBuilder schemaBuilder = new SchemaBuilder()
      .add("a", MinorType.INT)
      .add("b", MinorType.VARCHAR);
    BatchSchema schema = new BatchSchemaBuilder()
        .withSchemaBuilder(schemaBuilder)
        .build();
    SingleRowSet rs = fixture.rowSetBuilder(schema)
        .addRow(10, "fred")
        .addRow(20, "wilma")
        .build();
    MockOperatorExec opExec = new MockOperatorExec(rs.container());
    opExec.nextCalls = 1;

    try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
      assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
      assertEquals(schema, opBatch.getSchema());
      assertEquals(2, opBatch.getRecordCount());
      assertSame(rs.container(), opBatch.getOutgoingContainer());

      Iterator<VectorWrapper<?>> iter = opBatch.iterator();
      assertEquals("a", iter.next().getValueVector().getField().getName());
      assertEquals("b", iter.next().getValueVector().getField().getName());

      // Not a full test of the schema path; just make sure that the
      // pass-through to the Vector Container works.

      SchemaPath path = SchemaPath.create(NamePart.newBuilder().setName("a").build());
      TypedFieldId id = opBatch.getValueVectorId(path);
      assertEquals(MinorType.INT, id.getFinalType().getMinorType());
      assertEquals(1, id.getFieldIds().length);
      assertEquals(0, id.getFieldIds()[0]);

      path = SchemaPath.create(NamePart.newBuilder().setName("b").build());
      id = opBatch.getValueVectorId(path);
      assertEquals(MinorType.VARCHAR, id.getFinalType().getMinorType());
      assertEquals(1, id.getFieldIds().length);
      assertEquals(1, id.getFieldIds()[0]);

      // Sanity check of getValueAccessorById()

      VectorWrapper<?> w = opBatch.getValueAccessorById(IntVector.class, 0);
      assertNotNull(w);
      assertEquals("a", w.getValueVector().getField().getName());
      w = opBatch.getValueAccessorById(VarCharVector.class, 1);
      assertNotNull(w);
      assertEquals("b", w.getValueVector().getField().getName());

      // getWritableBatch() ?

      // No selection vectors

      try {
        opBatch.getSelectionVector2();
        fail();
      } catch (UnsupportedOperationException e) {
        // Expected
      }
      try {
        opBatch.getSelectionVector4();
        fail();
      } catch (UnsupportedOperationException e) {
        // Expected
      }

    } catch (Exception e) {
      fail(e.getMessage());
    }
    assertTrue(opExec.closeCalled);
  }

  @Test
  public void testSchemaChange() {
    TupleMetadata schema = new SchemaBuilder()
        .add("a", MinorType.INT)
        .add("b", MinorType.VARCHAR)
        .buildSchema();
    SingleRowSet rs = fixture.rowSetBuilder(schema)
        .addRow(10, "fred")
        .addRow(20, "wilma")
        .build();
    VectorContainer container = rs.container();
    MockOperatorExec opExec = new MockOperatorExec(container);
    int schemaVersion = opExec.batchAccessor().schemaVersion();

    // Be tidy: start at 1.

    assertEquals(1, schemaVersion);

    // Changing data does not trigger schema change

    container.zeroVectors();
    opExec.batchAccessor.addBatch(container);
    assertEquals(schemaVersion, opExec.batchAccessor().schemaVersion());

    // Different container, same vectors, does not trigger a change

    VectorContainer c2 = new VectorContainer(fixture.allocator());
    for (VectorWrapper<?> vw : container) {
      c2.add(vw.getValueVector());
    }
    c2.buildSchema(SelectionVectorMode.NONE);
    opExec.batchAccessor.addBatch(c2);
    assertEquals(schemaVersion, opExec.batchAccessor().schemaVersion());

    opExec.batchAccessor.addBatch(container);
    assertEquals(schemaVersion, opExec.batchAccessor().schemaVersion());

    // Replacing a vector with another of the same type does trigger
    // a change.

    VectorContainer c3 = new VectorContainer(fixture.allocator());
    c3.add(container.getValueVector(0).getValueVector());
    c3.add(TypeHelper.getNewVector(
            container.getValueVector(1).getValueVector().getField(),
            fixture.allocator(), null));
    c3.buildSchema(SelectionVectorMode.NONE);
    opExec.batchAccessor.addBatch(c3);
    assertEquals(schemaVersion + 1, opExec.batchAccessor().schemaVersion());
    schemaVersion = opExec.batchAccessor().schemaVersion();

    // No change if same schema again

    opExec.batchAccessor.addBatch(c3);
    assertEquals(schemaVersion, opExec.batchAccessor().schemaVersion());

    // Adding a vector triggers a change

    MaterializedField c = SchemaBuilder.columnSchema("c", MinorType.INT, DataMode.OPTIONAL);
    c3.add(TypeHelper.getNewVector(c, fixture.allocator(), null));
    c3.buildSchema(SelectionVectorMode.NONE);
    opExec.batchAccessor.addBatch(c3);
    assertEquals(schemaVersion + 1, opExec.batchAccessor().schemaVersion());
    schemaVersion = opExec.batchAccessor().schemaVersion();

    // No change if same schema again

    opExec.batchAccessor.addBatch(c3);
    assertEquals(schemaVersion, opExec.batchAccessor().schemaVersion());

    // Removing a vector triggers a change

    c3.remove(c3.getValueVector(2).getValueVector());
    c3.buildSchema(SelectionVectorMode.NONE);
    assertEquals(2, c3.getNumberOfColumns());
    opExec.batchAccessor.addBatch(c3);
    assertEquals(schemaVersion + 1, opExec.batchAccessor().schemaVersion());
    schemaVersion = opExec.batchAccessor().schemaVersion();

    // Clean up

    opExec.close();
    c2.clear();
    c3.clear();
  }

  /**
   * Test that an SV2 is properly handled by the proper container accessor.
   */

  @Test
  public void testSv2() {
    TupleMetadata schema = new SchemaBuilder()
        .add("a", MinorType.INT)
        .add("b", MinorType.VARCHAR)
        .buildSchema();
    SingleRowSet rs = fixture.rowSetBuilder(schema)
        .addRow(10, "fred")
        .addRow(20, "wilma")
        .withSv2()
        .build();

    IndirectContainerAccessor accessor = new IndirectContainerAccessor();
    accessor.addBatch(rs.container());
    accessor.setSelectionVector(rs.getSv2());

    MockOperatorExec opExec = new MockOperatorExec(accessor);
    opExec.nextCalls = 1;

    try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
      assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
      assertSame(rs.getSv2(), opBatch.getSelectionVector2());

    } catch (Exception e) {
      fail();
    }
    assertTrue(opExec.closeCalled);

    // Must release SV2

    rs.clear();
  }

  //-----------------------------------------------------------------------
  // Exception error cases
  //
  // Assumes that any of the operator executor methods could throw an
  // exception. A wise implementation will throw a user exception that the
  // operator just passes along. A lazy implementation will throw any old
  // unchecked exception. Validate both cases.

  public static final String ERROR_MSG = "My Bad!";

  /**
   * Failure on the bind method.
   */

  @Test
  public void testWrappedExceptionOnBind() {
    MockOperatorExec opExec = new MockOperatorExec() {
      @Override
      public void bind(OperatorContext context) {
         throw new IllegalStateException(ERROR_MSG);
      }
    };
    try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
      fail();
    } catch (UserException e) {
      assertTrue(e.getMessage().contains(ERROR_MSG));
      assertTrue(e.getCause() instanceof IllegalStateException);
    } catch (Throwable t) {
      fail();
    }
    assertFalse(opExec.cancelCalled); // Cancel not called: too early in life
    assertFalse(opExec.closeCalled); // Same with close
  }

  @Test
  public void testUserExceptionOnBind() {
    MockOperatorExec opExec = new MockOperatorExec() {
      @Override
      public void bind(OperatorContext context) {
         throw UserException.connectionError()
           .message(ERROR_MSG)
           .build(logger);
      }
    };
    try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
      opBatch.next();
      fail();
    } catch (UserException e) {
      assertTrue(e.getMessage().contains(ERROR_MSG));
      assertNull(e.getCause());
    } catch (Throwable t) {
      fail();
    }
    assertFalse(opExec.cancelCalled); // Cancel not called: too early in life
    assertFalse(opExec.closeCalled); // Same with close
  }

  /**
   * Failure when building the schema (first call to next()).
   */
  @Test
  public void testWrappedExceptionOnBuildSchema() {
    MockOperatorExec opExec = new MockOperatorExec() {
      @Override
      public boolean buildSchema() {
         throw new IllegalStateException(ERROR_MSG);
      }
    };
    try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
      opBatch.next();
      fail();
    } catch (UserException e) {
      assertTrue(e.getMessage().contains(ERROR_MSG));
      assertTrue(e.getCause() instanceof IllegalStateException);
    } catch (Throwable t) {
      fail();
    }
    assertTrue(opExec.cancelCalled);
    assertTrue(opExec.closeCalled);
  }

  @Test
  public void testUserExceptionOnBuildSchema() {
    MockOperatorExec opExec = new MockOperatorExec() {
      @Override
      public boolean buildSchema() {
        throw UserException.dataReadError()
            .message(ERROR_MSG)
            .build(logger);
      }
    };
    try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
      opBatch.next();
      fail();
    } catch (UserException e) {
      assertTrue(e.getMessage().contains(ERROR_MSG));
      assertNull(e.getCause());
    } catch (Throwable t) {
      fail();
    }
    assertTrue(opExec.cancelCalled);
    assertTrue(opExec.closeCalled);
  }

  /**
   * Failure on the second or subsequent calls to next(), when actually
   * fetching a record batch.
   */

  @Test
  public void testWrappedExceptionOnNext() {
    MockOperatorExec opExec = new MockOperatorExec() {
      @Override
      public boolean next() {
         throw new IllegalStateException(ERROR_MSG);
      }
    };
    try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
      assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
      opBatch.next();
      fail();
    } catch (UserException e) {
      assertTrue(e.getMessage().contains(ERROR_MSG));
      assertTrue(e.getCause() instanceof IllegalStateException);
    } catch (Throwable t) {
      fail();
    }
    assertTrue(opExec.cancelCalled);
    assertTrue(opExec.closeCalled);
  }

  @Test
  public void testUserExceptionOnNext() {
    MockOperatorExec opExec = new MockOperatorExec() {
      @Override
      public boolean next() {
        throw UserException.dataReadError()
              .message(ERROR_MSG)
              .build(logger);
      }
    };
    try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
      assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
      opBatch.next();
      fail();
    } catch (UserException e) {
      assertTrue(e.getMessage().contains(ERROR_MSG));
      assertNull(e.getCause());
    } catch (Throwable t) {
      fail();
    }
    assertTrue(opExec.cancelCalled);
    assertTrue(opExec.closeCalled);
  }

  /**
   * Failure when closing the operator implementation.
   */

  @Test
  public void testWrappedExceptionOnClose() {
    MockOperatorExec opExec = new MockOperatorExec() {
      @Override
      public void close() {
        // Release memory
        super.close();
        // Then fail
        throw new IllegalStateException(ERROR_MSG);
      }
    };
    opExec.nextCalls = 1;
    try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
      assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
      assertEquals(IterOutcome.OK, opBatch.next());
      assertEquals(IterOutcome.NONE, opBatch.next());
    } catch (UserException e) {
      assertTrue(e.getMessage().contains(ERROR_MSG));
      assertTrue(e.getCause() instanceof IllegalStateException);
    } catch (Throwable t) {
      fail();
    }
    assertFalse(opExec.cancelCalled);
    assertTrue(opExec.closeCalled);
  }

  @Test
  public void testUserExceptionOnClose() {
    MockOperatorExec opExec = new MockOperatorExec() {
      @Override
      public void close() {
        // Release memory
        super.close();
        // Then fail
        throw UserException.dataReadError()
              .message(ERROR_MSG)
              .build(logger);
      }
    };
    try (OperatorRecordBatch opBatch = makeOpBatch(opExec)) {
      assertEquals(IterOutcome.OK_NEW_SCHEMA, opBatch.next());
      assertEquals(IterOutcome.OK, opBatch.next());
      assertEquals(IterOutcome.NONE, opBatch.next());
    } catch (UserException e) {
      assertTrue(e.getMessage().contains(ERROR_MSG));
      assertNull(e.getCause());
    } catch (Throwable t) {
      fail();
    }
    assertFalse(opExec.cancelCalled);
    assertTrue(opExec.closeCalled);
  }
}
