/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.slider.server.services.security

import groovy.util.logging.Slf4j
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.CommonConfigurationKeysPublic
import org.apache.hadoop.fs.FileSystem as HadoopFS
import org.apache.hadoop.fs.RawLocalFileSystem
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier
import org.apache.hadoop.io.Text
import org.apache.hadoop.security.Credentials
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.security.token.SecretManager
import org.apache.hadoop.security.token.Token
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSecretManager
import org.apache.hadoop.service.ServiceOperations
import org.apache.hadoop.util.Time
import org.apache.slider.common.SliderXmlConfKeys
import org.apache.slider.common.tools.CoreFileSystem
import org.apache.slider.server.appmaster.actions.ActionStopQueue
import org.apache.slider.server.appmaster.actions.QueueExecutor
import org.apache.slider.server.appmaster.actions.QueueService
import org.junit.After
import org.junit.Before
import org.junit.Test

import java.util.concurrent.TimeUnit
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicLong

@Slf4j
//@CompileStatic
class TestFsDelegationTokenManager {

  QueueService queues;
  FsDelegationTokenManager tokenManager;
  Configuration conf;
  UserGroupInformation currentUser;


  @Before
  void setup() {
    queues = new QueueService();

    conf = new Configuration()
    conf.set(
        CommonConfigurationKeysPublic.HADOOP_SECURITY_AUTHENTICATION,
            "TOKEN")
    conf.setLong(
            SliderXmlConfKeys.DFS_NAMENODE_DELEGATION_TOKEN_RENEW_INTERVAL_KEY,
            1000)
    queues.init(conf)
    queues.start();

    HadoopFS fs = new TestFileSystem()

    CoreFileSystem coreFileSystem = new CoreFileSystem(fs, conf)

    String[] groups = new String[1];
    groups[0] = 'testGroup1'

    currentUser = UserGroupInformation.createUserForTesting("test", groups)
    UserGroupInformation.setLoginUser(currentUser)

    tokenManager = new FsDelegationTokenManager(queues) {
        @Override
        protected int getRenewingLimit() {
            return 5
        }

        @Override
        protected org.apache.hadoop.fs.FileSystem getRemoteFileSystemForRenewal(Configuration config) throws IOException {
            return new TestFileSystem();
        }

        @Override
        protected String getRenewingActionName() {
            return "TEST RENEW"
        }
    }

  }

  public static class DummySecretManager extends
          AbstractDelegationTokenSecretManager<DelegationTokenIdentifier> {

    public DummySecretManager(long delegationKeyUpdateInterval,
                              long delegationTokenMaxLifetime, long delegationTokenRenewInterval,
                              long delegationTokenRemoverScanInterval) {
      super(delegationKeyUpdateInterval, delegationTokenMaxLifetime,
            delegationTokenRenewInterval, delegationTokenRemoverScanInterval);
    }

    @Override
    public DelegationTokenIdentifier createIdentifier() {
      return null;
    }

    @Override
    public byte[] createPassword(DelegationTokenIdentifier dtId) {
      return new byte[1];
    }
  }

  public class TestFileSystem extends RawLocalFileSystem {
      int sequenceNum = 0;
      SecretManager<DelegationTokenIdentifier> mgr =
          new DummySecretManager(0, 0, 0, 0);
      @Override
      Token<DelegationTokenIdentifier> getDelegationToken(String renewer) throws IOException {
        return new TestToken(getIdentifier(), mgr);
      }

      @Override
      Token<?>[] addDelegationTokens(String renewer, Credentials credentials) throws IOException {
          Token[] tokens = new Token[1]
          tokens[0] = new TestToken(getIdentifier(), mgr)
          return tokens
      }

      private DelegationTokenIdentifier getIdentifier() {
          def user = new Text(currentUser.getUserName())
          def id = new DelegationTokenIdentifier(user, user, user)
          id.setSequenceNumber(sequenceNum++)
          id.setMaxDate(Time.now() + 10000)

          return id
      }
  }

  public class TestToken extends Token<DelegationTokenIdentifier> {
      static long maxCount = 0;
      private final AtomicLong renewCount = new AtomicLong()
      private final AtomicLong totalCount = new AtomicLong()
      public final AtomicBoolean expired = new AtomicBoolean(false);
      public final AtomicBoolean cancelled = new AtomicBoolean(false);

      TestToken(DelegationTokenIdentifier id, SecretManager<DelegationTokenIdentifier> mgr) {
          super(id, mgr)
      }

      @Override
      Text getService() {
          return new Text("HDFS")
      }

      @Override
      long renew(Configuration conf) throws IOException, InterruptedException {
          totalCount.getAndIncrement();
          if (maxCount > 0 && renewCount.getAndIncrement() > maxCount) {
              renewCount.set(0L)
              expired.set(true)
              throw new IOException("Expired")
          }


          return Time.now() + 1000;
      }

      @Override
      void cancel(Configuration conf) throws IOException, InterruptedException {
        cancelled.set(true)
      }
  }

  @After
  void destroyService() {
    ServiceOperations.stop(queues);
  }

  public void runQueuesToCompletion() {
    new Thread(queues).start();
    QueueExecutor ex = new QueueExecutor(queues)
    ex.run();
  }

  public void runQueuesButNotToCompletion() {
    new Thread(queues).start();
    QueueExecutor ex = new QueueExecutor(queues)
    new Thread(ex).start();
    Thread.sleep(1000)
    tokenManager.cancelDelegationToken(conf)
  }

  @Test
  public void testRenew() throws Throwable {
    tokenManager.acquireDelegationToken(conf)
    def stop = new ActionStopQueue(10, TimeUnit.SECONDS)
    queues.schedule(stop);
    runQueuesToCompletion()

    TestToken token = (TestToken) currentUser.getTokens()[0]
    assert token.totalCount.get() > 4
  }

  @Test
  public void testCancel() throws Throwable {
    tokenManager.acquireDelegationToken(conf)
    def stop = new ActionStopQueue(10, TimeUnit.SECONDS)
    queues.schedule(stop);
    runQueuesButNotToCompletion()

    TestToken token = (TestToken) currentUser.getTokens()[0]
    assert token.cancelled.get()
    assert queues.lookupRenewingAction("TEST RENEW") == null
  }


    @Test
  public void testRenewPastExpiry() throws Throwable {
    try {
      TestToken.maxCount = 3L
      tokenManager.acquireDelegationToken(conf)
      TestToken origToken = currentUser.getTokens()[0]
      def stop = new ActionStopQueue(10, TimeUnit.SECONDS)
      queues.schedule(stop);
      runQueuesToCompletion()

      TestToken token = (TestToken) currentUser.getTokens()[0]
      assert token != null
      assert token != origToken
      assert origToken.getService().equals(token.getService())
      assert origToken.totalCount.get() > 4
      assert origToken.expired.get()
    } finally {
      TestToken.maxCount = 0
    }
  }
}
