| /* |
| * Licensed to the Apache Software Foundation (ASF) under one or more |
| * contributor license agreements. See the NOTICE file distributed with |
| * this work for additional information regarding copyright ownership. |
| * The ASF licenses this file to You under the Apache License, Version 2.0 |
| * (the "License"); you may not use this file except in compliance with |
| * the License. You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| |
| package org.apache.nifi.processor.util.list; |
| |
| import org.apache.nifi.components.PropertyDescriptor; |
| import org.apache.nifi.components.Validator; |
| import org.apache.nifi.components.state.Scope; |
| import org.apache.nifi.components.state.StateManager; |
| import org.apache.nifi.context.PropertyContext; |
| import org.apache.nifi.controller.AbstractControllerService; |
| import org.apache.nifi.distributed.cache.client.Deserializer; |
| import org.apache.nifi.distributed.cache.client.DistributedMapCacheClient; |
| import org.apache.nifi.distributed.cache.client.Serializer; |
| import org.apache.nifi.flowfile.FlowFile; |
| import org.apache.nifi.flowfile.attributes.CoreAttributes; |
| import org.apache.nifi.processor.ProcessContext; |
| import org.apache.nifi.processor.ProcessSession; |
| import org.apache.nifi.reporting.InitializationException; |
| import org.apache.nifi.serialization.RecordSetWriterFactory; |
| import org.apache.nifi.serialization.SimpleRecordSchema; |
| import org.apache.nifi.serialization.record.MapRecord; |
| import org.apache.nifi.serialization.record.MockRecordWriter; |
| import org.apache.nifi.serialization.record.Record; |
| import org.apache.nifi.serialization.record.RecordField; |
| import org.apache.nifi.serialization.record.RecordFieldType; |
| import org.apache.nifi.serialization.record.RecordSchema; |
| import org.apache.nifi.state.MockStateManager; |
| import org.apache.nifi.util.MockFlowFile; |
| import org.apache.nifi.util.TestRunner; |
| import org.apache.nifi.util.TestRunners; |
| import org.junit.Assert; |
| import org.junit.Before; |
| import org.junit.Rule; |
| import org.junit.Test; |
| import org.junit.rules.TemporaryFolder; |
| import org.junit.rules.TestWatcher; |
| |
| import java.io.File; |
| import java.io.FileOutputStream; |
| import java.io.IOException; |
| import java.nio.charset.StandardCharsets; |
| import java.util.ArrayList; |
| import java.util.Comparator; |
| import java.util.HashMap; |
| import java.util.List; |
| import java.util.Map; |
| import java.util.UUID; |
| import java.util.concurrent.TimeUnit; |
| import java.util.concurrent.atomic.AtomicReference; |
| import java.util.regex.Matcher; |
| import java.util.regex.Pattern; |
| import java.util.stream.Collectors; |
| |
| import static org.junit.Assert.assertEquals; |
| |
| public class TestAbstractListProcessor { |
| |
| /** |
| * @return current timestamp in milliseconds, but truncated at specified target precision (e.g. SECONDS or MINUTES). |
| */ |
| private static long getCurrentTimestampMillis(final TimeUnit targetPrecision) { |
| final long timestampInTargetPrecision = targetPrecision.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS); |
| return TimeUnit.MILLISECONDS.convert(timestampInTargetPrecision, targetPrecision); |
| } |
| |
| private static long getSleepMillis(final TimeUnit targetPrecision) { |
| return AbstractListProcessor.LISTING_LAG_MILLIS.get(targetPrecision) * 2; |
| } |
| |
| private ConcreteListProcessor proc; |
| private TestRunner runner; |
| |
| @Rule |
| public TestWatcher dumpState = new ListProcessorTestWatcher( |
| () -> { |
| try { |
| return runner.getStateManager().getState(Scope.LOCAL).toMap(); |
| } catch (IOException e) { |
| throw new RuntimeException("Failed to retrieve state", e); |
| } |
| }, |
| () -> proc.getEntityList(), |
| () -> runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).stream().map(m -> (FlowFile) m).collect(Collectors.toList()) |
| ); |
| |
| @Before |
| public void setup() { |
| proc = new ConcreteListProcessor(); |
| runner = TestRunners.newTestRunner(proc); |
| } |
| |
| @Rule |
| public final TemporaryFolder testFolder = new TemporaryFolder(); |
| |
| @Test |
| public void testStateMigratedFromCacheService() throws InitializationException { |
| |
| final DistributedCache cache = new DistributedCache(); |
| runner.addControllerService("cache", cache); |
| runner.enableControllerService(cache); |
| runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, "cache"); |
| |
| final String serviceState = "{\"latestTimestamp\":1492,\"matchingIdentifiers\":[\"id\"]}"; |
| final String cacheKey = runner.getProcessor().getIdentifier() + ".lastListingTime./path"; |
| cache.stored.put(cacheKey, serviceState); |
| |
| runner.run(); |
| |
| final MockStateManager stateManager = runner.getStateManager(); |
| final Map<String, String> expectedState = new HashMap<>(); |
| // Ensure only timestamp is migrated |
| expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, "1492"); |
| expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, "1492"); |
| stateManager.assertStateEquals(expectedState, Scope.CLUSTER); |
| } |
| |
| @Test |
| public void testNoStateToMigrate() throws Exception { |
| |
| runner.run(); |
| |
| final MockStateManager stateManager = runner.getStateManager(); |
| final Map<String, String> expectedState = new HashMap<>(); |
| stateManager.assertStateEquals(expectedState, Scope.CLUSTER); |
| } |
| |
| @Test |
| public void testStateMigratedFromLocalFile() throws Exception { |
| |
| // Create a file that we will populate with the desired state |
| File persistenceFile = testFolder.newFile(proc.persistenceFilename); |
| // Override the processor's internal persistence file |
| proc.persistenceFile = persistenceFile; |
| |
| // Local File persistence was a properties file format of <key>=<JSON entity listing representation> |
| // Our ConcreteListProcessor is centered around files which are provided for a given path |
| final String serviceState = proc.getPath(runner.getProcessContext()) + "={\"latestTimestamp\":1492,\"matchingIdentifiers\":[\"id\"]}"; |
| |
| // Create a persistence file of the format anticipated |
| try (FileOutputStream fos = new FileOutputStream(persistenceFile);) { |
| fos.write(serviceState.getBytes(StandardCharsets.UTF_8)); |
| } |
| |
| runner.run(); |
| |
| // Verify the local persistence file is removed |
| Assert.assertTrue("Failed to remove persistence file", !persistenceFile.exists()); |
| |
| // Verify the state manager now maintains the associated state |
| final Map<String, String> expectedState = new HashMap<>(); |
| // Ensure timestamp and identifies are migrated |
| expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, "1492"); |
| expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, "1492"); |
| expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".0", "id"); |
| runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER); |
| } |
| |
| @Test |
| public void testFetchOnStart() throws InitializationException { |
| |
| final DistributedCache cache = new DistributedCache(); |
| runner.addControllerService("cache", cache); |
| runner.enableControllerService(cache); |
| runner.setProperty(AbstractListProcessor.DISTRIBUTED_CACHE_SERVICE, "cache"); |
| |
| runner.run(); |
| |
| assertEquals(1, cache.fetchCount); |
| } |
| |
| @Test |
| public void testWriteRecords() throws InitializationException { |
| final RecordSetWriterFactory writerFactory = new MockRecordWriter("id,name,timestamp,size", false); |
| runner.addControllerService("record-writer", writerFactory); |
| runner.enableControllerService(writerFactory); |
| |
| runner.setProperty(AbstractListProcessor.RECORD_WRITER, "record-writer"); |
| |
| runner.run(); |
| |
| assertEquals(0, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size()); |
| proc.addEntity("name", "identifier", 4L); |
| proc.addEntity("name2", "identifier2", 8L); |
| |
| runner.run(); |
| |
| runner.assertAllFlowFilesTransferred(AbstractListProcessor.REL_SUCCESS, 1); |
| |
| final MockFlowFile flowfile = runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(0); |
| flowfile.assertAttributeEquals("record.count", "2"); |
| flowfile.assertContentEquals("id,name,timestamp,size\nidentifier,name,4,0\nidentifier2,name2,8,0\n"); |
| |
| runner.clearTransferState(); |
| runner.run(); |
| runner.assertAllFlowFilesTransferred(AbstractListProcessor.REL_SUCCESS, 0); |
| } |
| |
| @Test |
| public void testNoTrackingEntityStrategy() throws IOException { |
| |
| // Firstly, choose Timestamp Strategy lists 2 entities and set state. |
| // After that choose No Tracking Strategy to test if this strategy remove the state. |
| ProcessSession session = runner.getProcessSessionFactory().createSession(); |
| ProcessContext context = runner.getProcessContext(); |
| |
| runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_TIMESTAMPS); |
| |
| // two entities listed |
| proc.addEntity("one","firstFile",1585344381476L); |
| proc.addEntity("two","secondFile",1585344381475L); |
| |
| runner.run(); |
| assertEquals(2, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size()); |
| assertEquals(2, proc.entities.size()); |
| |
| final MockStateManager stateManager = runner.getStateManager(); |
| final Map<String, String> expectedState = new HashMap<>(); |
| final Map<String, String> realState = new HashMap<>(); |
| |
| realState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(proc.entities.get("firstFile").getTimestamp())); |
| realState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, String.valueOf(proc.entities.get("secondFile").getTimestamp())); |
| realState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".0", proc.entities.get("firstFile").getIdentifier()); |
| realState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".1", proc.entities.get("secondFile").getIdentifier()); |
| |
| stateManager.setState(realState, Scope.CLUSTER); |
| |
| // Ensure timestamp and identifies are migrated |
| expectedState.put(AbstractListProcessor.LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(proc.entities.get("firstFile").getTimestamp())); |
| expectedState.put(AbstractListProcessor.LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, String.valueOf(proc.entities.get("secondFile").getTimestamp())); |
| expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".0", proc.entities.get("firstFile").getIdentifier()); |
| expectedState.put(AbstractListProcessor.IDENTIFIER_PREFIX + ".1", proc.entities.get("secondFile").getIdentifier()); |
| |
| runner.getStateManager().assertStateEquals(expectedState, Scope.CLUSTER); |
| |
| // Change listing strategy |
| runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.NO_TRACKING); |
| |
| // Clear any listed entities after choose No Tracking Strategy |
| proc.entities.clear(); |
| |
| // Add new entity |
| proc.addEntity("one","firstFile",1585344381476L); |
| proc.listByNoTracking(context, session); |
| |
| // Test if state cleared or not |
| runner.getStateManager().assertStateNotEquals(expectedState, Scope.CLUSTER); |
| assertEquals(1, proc.entities.size()); |
| } |
| |
| @Test |
| public void testEntityTrackingStrategy() throws InitializationException { |
| runner.setProperty(AbstractListProcessor.LISTING_STRATEGY, AbstractListProcessor.BY_ENTITIES); |
| // Require a cache service. |
| runner.assertNotValid(); |
| |
| final DistributedCache trackingCache = new DistributedCache(); |
| runner.addControllerService("tracking-cache", trackingCache); |
| runner.enableControllerService(trackingCache); |
| |
| runner.setProperty(ListedEntityTracker.TRACKING_STATE_CACHE, "tracking-cache"); |
| runner.setProperty(ListedEntityTracker.TRACKING_TIME_WINDOW, "10ms"); |
| |
| runner.assertValid(); |
| |
| proc.currentTimestamp.set(0L); |
| runner.run(); |
| assertEquals(0, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size()); |
| |
| // Should list one entity. |
| proc.addEntity("one", "one", 1, 1); |
| proc.currentTimestamp.set(1L); |
| runner.clearTransferState(); |
| runner.run(); |
| assertEquals(1, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size()); |
| runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(0) |
| .assertAttributeEquals(CoreAttributes.FILENAME.key(), "one"); |
| |
| // Should not list any entity. |
| proc.currentTimestamp.set(2L); |
| runner.clearTransferState(); |
| runner.run(); |
| assertEquals(0, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size()); |
| |
| // Should list added entities. |
| proc.currentTimestamp.set(10L); |
| proc.addEntity("five", "five", 5, 5); |
| proc.addEntity("six", "six", 6, 6); |
| runner.clearTransferState(); |
| runner.run(); |
| assertEquals(2, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size()); |
| runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(0) |
| .assertAttributeEquals(CoreAttributes.FILENAME.key(), "five"); |
| runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(1) |
| .assertAttributeEquals(CoreAttributes.FILENAME.key(), "six"); |
| |
| // Should be able to list entities having older timestamp than the previously listed entity. |
| // But if its timestamp is out of tracking window, then it won't be picked. |
| // Current timestamp = 13, and window = 10ms, meaning it can pick entities having timestamp 3 to 13. |
| proc.currentTimestamp.set(13L); |
| proc.addEntity("two", "two", 2, 2); |
| proc.addEntity("three", "three", 3, 3); |
| proc.addEntity("four", "four", 4, 4); |
| runner.clearTransferState(); |
| runner.run(); |
| assertEquals(2, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size()); |
| runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(0) |
| .assertAttributeEquals(CoreAttributes.FILENAME.key(), "three"); |
| runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(1) |
| .assertAttributeEquals(CoreAttributes.FILENAME.key(), "four"); |
| |
| // Can pick entity that has newer timestamp. |
| // Can pick entity that has different size. |
| proc.currentTimestamp.set(14L); |
| proc.addEntity("five", "five", 7, 5); |
| proc.addEntity("six", "six", 6, 16); |
| runner.clearTransferState(); |
| runner.run(); |
| assertEquals(2, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size()); |
| runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(0) |
| .assertAttributeEquals(CoreAttributes.FILENAME.key(), "six"); |
| runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(1) |
| .assertAttributeEquals(CoreAttributes.FILENAME.key(), "five"); |
| |
| // Reset state. |
| // Current timestamp = 15, and window = 11ms, meaning it can pick entities having timestamp 4 to 15. |
| proc.currentTimestamp.set(15L); |
| // ConcreteListProcessor can reset state with any property. |
| runner.setProperty(ListedEntityTracker.TRACKING_TIME_WINDOW, "11ms"); |
| runner.setProperty(ConcreteListProcessor.RESET_STATE, "1"); |
| runner.setProperty(ListedEntityTracker.INITIAL_LISTING_TARGET, "window"); |
| runner.clearTransferState(); |
| runner.run(); |
| assertEquals(3, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size()); |
| runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(0) |
| .assertAttributeEquals(CoreAttributes.FILENAME.key(), "four"); |
| runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(1) |
| .assertAttributeEquals(CoreAttributes.FILENAME.key(), "six"); |
| runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).get(2) |
| .assertAttributeEquals(CoreAttributes.FILENAME.key(), "five"); |
| |
| |
| // Reset state again. |
| proc.currentTimestamp.set(20L); |
| // ConcreteListProcessor can reset state with any property. |
| runner.setProperty(ListedEntityTracker.INITIAL_LISTING_TARGET, "all"); |
| runner.setProperty(ConcreteListProcessor.RESET_STATE, "2"); |
| runner.clearTransferState(); |
| runner.run(); |
| // All entities should be picked, one to six. |
| assertEquals(6, runner.getFlowFilesForRelationship(AbstractListProcessor.REL_SUCCESS).size()); |
| } |
| |
| static class DistributedCache extends AbstractControllerService implements DistributedMapCacheClient { |
| private final Map<Object, Object> stored = new HashMap<>(); |
| private int fetchCount = 0; |
| |
| @Override |
| public <K, V> boolean putIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException { |
| return false; |
| } |
| |
| @Override |
| public <K, V> V getAndPutIfAbsent(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer, Deserializer<V> valueDeserializer) throws IOException { |
| return null; |
| } |
| |
| @Override |
| public <K> boolean containsKey(K key, Serializer<K> keySerializer) throws IOException { |
| return false; |
| } |
| |
| @Override |
| public <K, V> void put(K key, V value, Serializer<K> keySerializer, Serializer<V> valueSerializer) throws IOException { |
| stored.put(key, value); |
| } |
| |
| @Override |
| @SuppressWarnings("unchecked") |
| public <K, V> V get(K key, Serializer<K> keySerializer, Deserializer<V> valueDeserializer) throws IOException { |
| fetchCount++; |
| return (V) stored.get(key); |
| } |
| |
| @Override |
| public void close() throws IOException { |
| } |
| |
| @Override |
| public <K> boolean remove(K key, Serializer<K> serializer) throws IOException { |
| final Object value = stored.remove(key); |
| return value != null; |
| } |
| |
| @Override |
| public long removeByPattern(String regex) throws IOException { |
| final List<Object> removedRecords = new ArrayList<>(); |
| Pattern p = Pattern.compile(regex); |
| for (Object key : stored.keySet()) { |
| // Key must be backed by something that can be converted into a String |
| Matcher m = p.matcher(key.toString()); |
| if (m.matches()) { |
| removedRecords.add(stored.get(key)); |
| } |
| } |
| final long numRemoved = removedRecords.size(); |
| removedRecords.forEach(stored::remove); |
| return numRemoved; |
| } |
| } |
| |
| static class ConcreteListProcessor extends AbstractListProcessor<ListableEntity> { |
| final Map<String, ListableEntity> entities = new HashMap<>(); |
| |
| final String persistenceFilename = "ListProcessor-local-state-" + UUID.randomUUID().toString() + ".json"; |
| String persistenceFolder = "target/"; |
| File persistenceFile = new File(persistenceFolder + persistenceFilename); |
| |
| private static PropertyDescriptor RESET_STATE = new PropertyDescriptor.Builder() |
| .name("reset-state") |
| .addValidator(Validator.VALID) |
| .build(); |
| |
| final AtomicReference<Long> currentTimestamp = new AtomicReference<>(); |
| |
| @Override |
| protected ListedEntityTracker<ListableEntity> createListedEntityTracker() { |
| return new ListedEntityTracker<>(getIdentifier(), getLogger(), currentTimestamp::get, null); |
| } |
| |
| @Override |
| protected List<PropertyDescriptor> getSupportedPropertyDescriptors() { |
| final List<PropertyDescriptor> properties = new ArrayList<>(); |
| properties.add(LISTING_STRATEGY); |
| properties.add(RECORD_WRITER); |
| properties.add(DISTRIBUTED_CACHE_SERVICE); |
| properties.add(TARGET_SYSTEM_TIMESTAMP_PRECISION); |
| properties.add(ListedEntityTracker.TRACKING_STATE_CACHE); |
| properties.add(ListedEntityTracker.TRACKING_TIME_WINDOW); |
| properties.add(ListedEntityTracker.INITIAL_LISTING_TARGET); |
| properties.add(RESET_STATE); |
| return properties; |
| } |
| |
| @Override |
| public File getPersistenceFile() { |
| return persistenceFile; |
| } |
| |
| public void addEntity(final String name, final String identifier, final long timestamp) { |
| addEntity(name, identifier, timestamp, 0); |
| } |
| |
| public void addEntity(final String name, final String identifier, final long timestamp, long size) { |
| final ListableEntity entity = new ListableEntity() { |
| @Override |
| public String getName() { |
| return name; |
| } |
| |
| @Override |
| public String getIdentifier() { |
| return identifier; |
| } |
| |
| @Override |
| public long getTimestamp() { |
| return timestamp; |
| } |
| |
| @Override |
| public long getSize() { |
| return size; |
| } |
| |
| @Override |
| public Record toRecord() { |
| final Map<String, Object> values = new HashMap<>(4); |
| values.put("id", identifier); |
| values.put("name", name); |
| values.put("timestamp", timestamp); |
| values.put("size", size); |
| return new MapRecord(getRecordSchema(), values); |
| } |
| }; |
| |
| entities.put(entity.getIdentifier(), entity); |
| } |
| |
| @Override |
| protected Map<String, String> createAttributes(final ListableEntity entity, final ProcessContext context) { |
| final Map<String, String> attributes = new HashMap<>(); |
| attributes.put(CoreAttributes.FILENAME.key(), entity.getIdentifier()); |
| return attributes; |
| } |
| |
| @Override |
| protected String getPath(final ProcessContext context) { |
| return "/path"; |
| } |
| |
| @Override |
| protected List<ListableEntity> performListing(final ProcessContext context, final Long minTimestamp) throws IOException { |
| return getEntityList(); |
| } |
| |
| List<ListableEntity> getEntityList() { |
| return entities.values().stream().sorted(Comparator.comparing(ListableEntity::getTimestamp)).collect(Collectors.toList()); |
| } |
| |
| @Override |
| protected boolean isListingResetNecessary(PropertyDescriptor property) { |
| return RESET_STATE.equals(property); |
| } |
| |
| @Override |
| protected Scope getStateScope(final PropertyContext context) { |
| return Scope.CLUSTER; |
| } |
| |
| @Override |
| protected RecordSchema getRecordSchema() { |
| final List<RecordField> fields = new ArrayList<>(); |
| fields.add(new RecordField("id", RecordFieldType.STRING.getDataType())); |
| fields.add(new RecordField("name", RecordFieldType.STRING.getDataType())); |
| fields.add(new RecordField("timestamp", RecordFieldType.TIMESTAMP.getDataType())); |
| fields.add(new RecordField("size", RecordFieldType.LONG.getDataType())); |
| return new SimpleRecordSchema(fields); |
| } |
| |
| private void persist(final long latestListedEntryTimestampThisCycleMillis, |
| final long lastProcessedLatestEntryTimestampMillis, |
| final List<String> processedIdentifiesWithLatestTimestamp, |
| final StateManager stateManager, final Scope scope) throws IOException { |
| final Map<String, String> updatedState = new HashMap<>(processedIdentifiesWithLatestTimestamp.size() + 2); |
| updatedState.put(LATEST_LISTED_ENTRY_TIMESTAMP_KEY, String.valueOf(latestListedEntryTimestampThisCycleMillis)); |
| updatedState.put(LAST_PROCESSED_LATEST_ENTRY_TIMESTAMP_KEY, String.valueOf(lastProcessedLatestEntryTimestampMillis)); |
| for (int i = 0; i < processedIdentifiesWithLatestTimestamp.size(); i++) { |
| updatedState.put(IDENTIFIER_PREFIX + "." + i, processedIdentifiesWithLatestTimestamp.get(i)); |
| } |
| stateManager.setState(updatedState, scope); |
| } |
| } |
| } |