blob: cac44ed79cbe27b79baa3213950ce4b1766763f1 [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.connect.cassandra.connector;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.datastax.oss.driver.api.core.CqlSession;
import io.openmessaging.KeyValue;
import io.openmessaging.connector.api.data.DataEntryBuilder;
import io.openmessaging.connector.api.data.EntryType;
import io.openmessaging.connector.api.data.Field;
import io.openmessaging.connector.api.data.Schema;
import io.openmessaging.connector.api.data.SourceDataEntry;
import io.openmessaging.connector.api.source.SourceTask;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Timer;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
import javax.sql.DataSource;
import org.apache.rocketmq.connect.cassandra.common.ConstDefine;
import org.apache.rocketmq.connect.cassandra.common.DBUtils;
import org.apache.rocketmq.connect.cassandra.config.Config;
import org.apache.rocketmq.connect.cassandra.config.ConfigUtil;
import org.apache.rocketmq.connect.cassandra.schema.Table;
import org.apache.rocketmq.connect.cassandra.schema.column.ColumnParser;
import org.apache.rocketmq.connect.cassandra.source.Querier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class CassandraSourceTask extends SourceTask {
private static final Logger log = LoggerFactory.getLogger(org.apache.rocketmq.connect.cassandra.connector.CassandraSourceTask.class);
private Config config;
private DataSource dataSource;
private CqlSession cqlSession;
BlockingQueue<Querier> tableQueue = new LinkedBlockingQueue<Querier>();
static final String INCREMENTING_FIELD = "incrementing";
static final String TIMESTAMP_FIELD = "timestamp";
private Querier querier;
public CassandraSourceTask() {
this.config = new Config();
}
@Override
public Collection<SourceDataEntry> poll() {
List<SourceDataEntry> res = new ArrayList<>();
try {
if (tableQueue.size() > 1)
querier = tableQueue.poll(1000, TimeUnit.MILLISECONDS);
else
querier = tableQueue.peek();
Timer timer = new Timer();
try {
Thread.currentThread();
Thread.sleep(1000);//毫秒
} catch (Exception e) {
throw e;
}
querier.poll();
for (Table dataRow : querier.getList()) {
JSONObject jsonObject = new JSONObject();
jsonObject.put("nextQuery", "database");
jsonObject.put("nextPosition", "table");
Schema schema = new Schema();
schema.setDataSource(dataRow.getDatabase());
schema.setName(dataRow.getName());
schema.setFields(new ArrayList<>());
for (int i = 0; i < dataRow.getColList().size(); i++) {
String columnName = dataRow.getColList().get(i);
String rawDataType = dataRow.getRawDataTypeList().get(i);
Field field = new Field(i, columnName, ColumnParser.mapConnectorFieldType(rawDataType));
schema.getFields().add(field);
}
DataEntryBuilder dataEntryBuilder = new DataEntryBuilder(schema);
dataEntryBuilder.timestamp(System.currentTimeMillis()).queue(dataRow.getName())
.entryType(EntryType.UPDATE);
for (int i = 0; i < dataRow.getColList().size(); i++) {
Object[] value = new Object[2];
value[0] = value[1] = dataRow.getParserList().get(i).getValue(dataRow.getDataList().get(i));
dataEntryBuilder.putFiled(dataRow.getColList().get(i), JSONObject.toJSONString(value));
}
SourceDataEntry sourceDataEntry = dataEntryBuilder.buildSourceDataEntry(
ByteBuffer.wrap((ConstDefine.PREFIX + config.getDbUrl() + config.getDbPort()).getBytes(StandardCharsets.UTF_8)),
ByteBuffer.wrap(jsonObject.toJSONString().getBytes(StandardCharsets.UTF_8)));
res.add(sourceDataEntry);
log.debug("sourceDataEntry : {}", JSONObject.toJSONString(sourceDataEntry));
}
} catch (Exception e) {
log.error("Cassandra task poll error, current config:" + JSON.toJSONString(config), e);
}
log.debug("dataEntry poll successfully,{}", JSONObject.toJSONString(res));
return res;
}
@Override
public void start(KeyValue props) {
try {
ConfigUtil.load(props, this.config);
cqlSession = DBUtils.initCqlSession(config);
log.info("init data source success");
} catch (Exception e) {
log.error("Cannot start Cassandra Source Task because of configuration error{}", e);
}
Map<Map<String, String>, Map<String, Object>> offsets = null;
String mode = config.getMode();
if (mode.equals("bulk")) {
Querier querier = new Querier(config, cqlSession);
try {
querier.start();
tableQueue.add(querier);
} catch (Exception e) {
log.error("start querier failed in bulk mode{}", e);
}
}
}
@Override
public void stop() {
try {
if (cqlSession != null) {
cqlSession.close();
log.info("Cassandra source task connection is closed.");
}
} catch (Throwable e) {
log.warn("source task stop error while closing connection to {}", "Cassandra", e);
}
}
@Override
public void pause() {
}
@Override
public void resume() {
}
}