package org.apache.nifi.pql;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;

import org.apache.nifi.provenance.ProvenanceEventRepository;
import org.apache.nifi.provenance.ProvenanceEventType;
import org.apache.nifi.provenance.StandardProvenanceEventRecord;
import org.apache.nifi.provenance.VolatileProvenanceRepository;
import org.apache.nifi.provenance.query.ProvenanceResultSet;
import org.apache.nifi.util.NiFiProperties;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;

public class TestQuery {

	private ProvenanceEventRepository repo;
	
	@Before
	public void setup() {
		System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, "src/test/resources/nifi.properties");
		repo = new VolatileProvenanceRepository();
	}
	
	
	private void createRecords() throws IOException {
		final Map<String, String> previousAttributes = new HashMap<>();
		previousAttributes.put("filename", "xyz");
		
		final Map<String, String> updatedAttributes = new HashMap<>();
		updatedAttributes.put("filename", "xyz.txt");
		updatedAttributes.put("mime.type", "text/plain");
		
		final StandardProvenanceEventRecord.Builder recordBuilder = new StandardProvenanceEventRecord.Builder();
		recordBuilder.setAttributes(previousAttributes, Collections.<String, String>emptyMap())
			.setComponentId("000")
			.setComponentType("MyComponent")
			.setEventType(ProvenanceEventType.RECEIVE)
			.setFlowFileEntryDate(System.currentTimeMillis())
			.setFlowFileUUID("1234")
			.setTransitUri("https://localhost:80/nifi");
		
		
		recordBuilder.setCurrentContentClaim("container", "section", "1", 0L, 100L);
		repo.registerEvent(recordBuilder.build());
		
		
		recordBuilder.setAttributes(previousAttributes, updatedAttributes);
		recordBuilder.setCurrentContentClaim("container", "section", "2", 0L, 1024 * 1024L);
		repo.registerEvent(recordBuilder.build());
	}
	
	
	private void createRecords(final int records, final ProvenanceEventType type, final long sleep) throws IOException {
		final Map<String, String> previousAttributes = new HashMap<>();
		previousAttributes.put("filename", "xyz");
		
		final Map<String, String> updatedAttributes = new HashMap<>();
		updatedAttributes.put("filename", "xyz.txt");
		updatedAttributes.put("mime.type", "text/plain");
		
		final StandardProvenanceEventRecord.Builder recordBuilder = new StandardProvenanceEventRecord.Builder();
		recordBuilder.setAttributes(previousAttributes, Collections.<String, String>emptyMap())
			.setComponentType("MyComponent")
			.setEventType(type)
			.setFlowFileEntryDate(System.currentTimeMillis())
			.setFlowFileUUID("1234")
			.setTransitUri("https://localhost:80/nifi");
		
		final long now = System.currentTimeMillis();
		for (int i=0; i < records; i++) {
			recordBuilder.setCurrentContentClaim("container", "section", String.valueOf(i), 0L, 100L);
			final Map<String, String> attr = new HashMap<>(updatedAttributes);
			attr.put("i", String.valueOf(i));
			recordBuilder.setAttributes(previousAttributes, attr);
			recordBuilder.setFlowFileEntryDate(System.currentTimeMillis());
			recordBuilder.setEventTime(now + (i * sleep));
			recordBuilder.setComponentId(UUID.randomUUID().toString());
			
			repo.registerEvent(recordBuilder.build());
		}
	}
	
	@Test
	public void testCompilationManually() {
		System.out.println(ProvenanceQuery.compile("SELECT R.TransitUri FROM *", null, null));
		System.out.println(ProvenanceQuery.compile("SELECT R['filename'] FROM RECEIVE, SEND;", null, null));
		System.out.println(ProvenanceQuery.compile("SELECT Event FROM RECEIVE ORDER BY Event['filename'];", null, null));
		
//		System.out.println(Query.compile("SELECT Event FROM RECEIVE WHERE ((Event.TransitUri <> 'http') OR (Event['filename'] = '1.txt')) and (Event.Size > 1000 or Event.Size between 1 AND 4);"));
		
		System.out.println(ProvenanceQuery.compile("SELECT SUM(Event.size) FROM RECEIVE", null, null));
	}
	
	
	@Test
	public void testSumAverage() throws IOException {
		createRecords();
		dump(ProvenanceQuery.execute("SELECT Event", repo));
		
		final ProvenanceQuery query = ProvenanceQuery.compile("SELECT SUM(Event.Size), AVG(Event.Size) FROM RECEIVE WHERE Event.TransitUri = 'https://localhost:80/nifi'", null, null);
		
		final ProvenanceResultSet rs = query.execute(repo);
		dump(rs);
		
		dump(ProvenanceQuery.execute("SELECT Event.TransitUri", repo));
		dump(ProvenanceQuery.execute("SELECT Event['mime.type'], Event['filename']", repo));
		dump(ProvenanceQuery.execute("SELECT Event['filename'], SUM(Event.size) GROUP BY Event['filename']", repo));
	}
	
	@Test
	@Ignore("Not entirely implemented yet")
    public void testAlias() throws IOException {
        createRecords();
        final ProvenanceQuery query = ProvenanceQuery.compile("SELECT SUM(Event.Size) AS TotalSize, COUNT(Event) AS NumEvents", null, null);

        final ProvenanceResultSet rs = query.execute(repo);
        dump(rs);

        assertEquals(2, rs.getLabels().size());
        assertEquals("TotalSize", rs.getLabels().get(0));
        assertEquals("NumEvents", rs.getLabels().get(1));
    }
	
	@Test
	public void testGroupBy() throws IOException {
		createRecords(200000, ProvenanceEventType.RECEIVE, 0L);
		createRecords(2, ProvenanceEventType.SEND, 0L);
		
		ProvenanceResultSet rs = ProvenanceQuery.execute("SELECT Event['filename'], COUNT(Event), Event.Type GROUP BY Event['filename'], Event.Type", repo);
		dump(rs);
		
		rs = ProvenanceQuery.execute("SELECT Event['filename'], COUNT(Event), Event.Type GROUP BY Event['filename'], Event.Type", repo);
		
		int receiveRows = 0;
		int sendRows = 0;
		while (rs.hasNext()) {
		    final List<?> cols = rs.next();
		    final ProvenanceEventType type = (ProvenanceEventType) cols.get(2);
		    if ( type == ProvenanceEventType.RECEIVE ) {
		        receiveRows++;
		        assertEquals("xyz.txt", cols.get(0));
		        assertEquals(200000L, cols.get(1));
		    } else if ( type == ProvenanceEventType.SEND ) {
		        sendRows++;
		        assertEquals("xyz.txt", cols.get(0));
		        assertEquals(2L, cols.get(1));
		    } else {
		        Assert.fail("Event type was " + type);
		    }
		}
		
		assertEquals(1, receiveRows);
		assertEquals(1, sendRows);
	}
	
	
	@Test
    public void testAverageGroupBy() throws IOException {
        createRecords(200000, ProvenanceEventType.RECEIVE, 1L);
        createRecords(5000, ProvenanceEventType.SEND, 1L);
        
        dump(ProvenanceQuery.execute("SELECT AVG(Event.Size), Event.Type GROUP BY SECOND(Event.Time), Event.Type", repo));
    }
	
	
	@Test
	public void testSelectSeveralRecords() throws IOException {
		createRecords(2000, ProvenanceEventType.SEND, 1L);
		createRecords(200, ProvenanceEventType.RECEIVE, 1L);
		dump(ProvenanceQuery.execute(
				  "SELECT SECOND(Event.Time), Event.Type, SUM(Event.Size), COUNT(Event) "
				+ "FROM SEND, RECEIVE "
				+ "GROUP BY SECOND(Event.Time), Event.Type"
				, repo));
	}

	@Test
	public void testNot() throws IOException {
		createRecords(2000, ProvenanceEventType.SEND, 0L);
		createRecords(200, ProvenanceEventType.RECEIVE, 0L);

		dump(ProvenanceQuery.execute("SELECT Event.Type, COUNT(Event) WHERE NOT(Event.Type = 'SEND')", repo));
		dump(ProvenanceQuery.execute("SELECT Event.Type, COUNT(Event) WHERE NOT(Event.Type = 'RECEIVE')", repo));
		dump(ProvenanceQuery.execute("SELECT Event.Type, COUNT(Event) WHERE NOT(NOT( Event.Type = 'SEND'))", repo));
	}
	
	
	@Test
	public void testOrderByField() throws IOException {
		createRecords(2000, ProvenanceEventType.SEND, 1L);
		
		dump(ProvenanceQuery.execute("SELECT Event.Time, Event.ComponentId ORDER BY Event.ComponentId LIMIT 15", repo));
		dump(ProvenanceQuery.execute("SELECT Event.Time, Event.ComponentId ORDER BY Event.Time DESC LIMIT 15", repo));
	}
	

	@Test
	public void testOrderByGroupedField() throws IOException {
		createRecords(2, ProvenanceEventType.SEND, 0L);
		createRecords(5, ProvenanceEventType.RECEIVE, 0L);
		
		dump(ProvenanceQuery.execute("SELECT Event.Type, SUM(Event.Size) GROUP BY Event.Type ORDER BY SUM(Event.Size) DESC", repo));
		
		ProvenanceResultSet rs = ProvenanceQuery.execute("SELECT Event.Type, SUM(Event.Size) GROUP BY Event.Type ORDER BY SUM(Event.Size) DESC", repo);
		
		assertTrue( rs.hasNext() );
		List<?> values = rs.next();
		assertEquals("RECEIVE", values.get(0).toString());
		assertEquals(500L, values.get(1));
		
		assertTrue( rs.hasNext() );
		values = rs.next();
		assertEquals("SEND", values.get(0).toString());
		assertEquals(200L, values.get(1));
		
		assertFalse( rs.hasNext() );
		
		
		rs = ProvenanceQuery.execute("SELECT Event.Type, SUM(Event.Size) GROUP BY Event.Type ORDER BY SUM(Event.Size) ASC", repo);
		
		assertTrue( rs.hasNext() );
		values = rs.next();
		assertEquals("SEND", values.get(0).toString());
		assertEquals(200L, values.get(1));
		
		assertTrue( rs.hasNext() );
		values = rs.next();
		assertEquals("RECEIVE", values.get(0).toString());
		assertEquals(500L, values.get(1));
		
		assertFalse( rs.hasNext() );
	}
	
	
	@Test
	public void testOrderByFieldAndGroupedValue() throws IOException {
		createRecords(3, ProvenanceEventType.SEND, 0L);
		createRecords(5, ProvenanceEventType.RECEIVE, 0L);
		createRecords(3, ProvenanceEventType.ATTRIBUTES_MODIFIED, 0L);
		
		final String query = "SELECT Event.Type, SUM(Event.Size) GROUP BY Event.Type ORDER BY SUM(Event.Size) DESC, Event.Type";
		dump(ProvenanceQuery.execute(query, repo));
		
		ProvenanceResultSet rs = ProvenanceQuery.execute(query, repo);
		
		assertTrue( rs.hasNext() );
		List<?> vals = rs.next();
		assertEquals(2, vals.size());
		assertEquals("RECEIVE", vals.get(0).toString());
		assertEquals(500L, vals.get(1));
		
		assertTrue( rs.hasNext() );
		vals = rs.next();
		assertEquals(2, vals.size());
		assertEquals("ATTRIBUTES_MODIFIED", vals.get(0).toString());
		assertEquals(300L, vals.get(1));
		
		assertTrue( rs.hasNext() );
		vals = rs.next();
		assertEquals(2, vals.size());
		assertEquals("SEND", vals.get(0).toString());
		assertEquals(300L, vals.get(1));
		
		assertFalse( rs.hasNext() );
	}
	
	
	@Test
	public void testAndsOrs() throws IOException {

		final Map<String, String> previousAttributes = new HashMap<>();
		previousAttributes.put("filename", "xyz");
		
		final Map<String, String> updatedAttributes = new HashMap<>();
		updatedAttributes.put("filename", "xyz.txt");
		updatedAttributes.put("mime.type", "text/plain");
		updatedAttributes.put("abc", "cba");
		updatedAttributes.put("123", "321");
		
		final StandardProvenanceEventRecord.Builder recordBuilder = new StandardProvenanceEventRecord.Builder();
		recordBuilder.setAttributes(previousAttributes, Collections.<String, String>emptyMap())
			.setComponentId("000")
			.setComponentType("MyComponent")
			.setEventType(ProvenanceEventType.SEND)
			.setFlowFileEntryDate(System.currentTimeMillis())
			.setFlowFileUUID("1234")
			.setCurrentContentClaim("container", "section", "1", 0L, 100L)
			.setAttributes(previousAttributes, updatedAttributes)
			.setTransitUri("https://localhost:80/nifi");

		repo.registerEvent(recordBuilder.build());
		
		final String queryString = "SELECT Event "
				+ "WHERE "
				+ "( "
				+ "	 Event['filename'] = 'xyz.txt' "
				+ "		OR "
				+ "  Event['mime.type'] = 'ss' "
				+ ") "
				+ "AND "
				+ "( "
				+ "  Event['abc'] = 'cba' "
				+ "		OR "
				+ "	 Event['123'] = '123' "
				+ ")";
		System.out.println(queryString);
		
		final ProvenanceQuery query = ProvenanceQuery.compile(queryString, null, null);
		
		System.out.println(query.getWhereClause());
		
		ProvenanceResultSet rs = query.execute(repo);
		assertTrue(rs.hasNext());
		rs.next();
		assertFalse(rs.hasNext());
		
		
		
		updatedAttributes.put("filename", "xxyz");
		repo = new VolatileProvenanceRepository();
		recordBuilder.setAttributes(previousAttributes, updatedAttributes);
		repo.registerEvent(recordBuilder.build());
		
		rs = query.execute(repo);
		assertFalse(rs.hasNext());


		
		updatedAttributes.put("filename", "xyz.txt");
		updatedAttributes.put("123", "123");
		repo = new VolatileProvenanceRepository();
		recordBuilder.setAttributes(previousAttributes, updatedAttributes);
		repo.registerEvent(recordBuilder.build());

		rs = query.execute(repo);
		assertTrue(rs.hasNext());
		rs.next();
		assertFalse(rs.hasNext());
		
	}
	
	private void dump(final ProvenanceResultSet rs) {
		System.out.println(rs.getLabels());
		while (rs.hasNext()) {
			System.out.println(rs.next());
		}
		
		System.out.println("\n\n\n");
	}
	
}
