blob: d8fd7953a7931adad4ee7ddfedc7f9e470f70ffd [file] [log] [blame]
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with this
* work for additional information regarding copyright ownership. The ASF
* licenses this file to You under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations
* under the License.
*/
package org.apache.hugegraph.backend.store.palo;
import java.nio.file.Paths;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.hugegraph.backend.store.mysql.ResultSetWrapper;
import org.slf4j.Logger;
import org.apache.hugegraph.backend.BackendException;
import org.apache.hugegraph.backend.store.mysql.MysqlSessions;
import org.apache.hugegraph.config.HugeConfig;
import org.apache.hugegraph.date.SafeDateFormat;
import org.apache.hugegraph.util.Log;
import com.google.common.collect.LinkedListMultimap;
import com.google.common.collect.Multimap;
public class PaloSessions extends MysqlSessions {
private static final Logger LOG = Log.logger(PaloSessions.class);
private final AtomicInteger counter;
private final Map<Integer, ReadWriteLock> locks;
private final Timer timer;
private final PaloLoadTask loadTask;
public PaloSessions(HugeConfig config, String database, String store,
List<String> tableDirs) {
super(config, database, store);
this.counter = new AtomicInteger();
this.locks = new ConcurrentHashMap<>();
// Scan disk files and restore session information
this.restoreSessionInfo(config, tableDirs);
this.timer = new Timer(true);
long interval = config.get(PaloOptions.PALO_POLL_INTERVAL);
this.loadTask = new PaloLoadTask(tableDirs);
this.timer.schedule(this.loadTask, 0, interval * 1000);
}
private void restoreSessionInfo(HugeConfig config, List<String> tableDirs) {
Set<Integer> sessionIds = PaloFile.scanSessionIds(config, tableDirs);
for (Integer sessionId : sessionIds) {
if (!this.locks.containsKey(sessionId)) {
// Create a read write lock for every session
this.locks.put(sessionId, new ReentrantReadWriteLock());
}
}
// Update counter value to avoid new session has duplicate id with old
int maxSessionId = 0;
for (int sessionId : sessionIds) {
if (sessionId > maxSessionId) {
maxSessionId = sessionId;
}
}
this.counter.addAndGet(maxSessionId);
}
@Override
protected String buildCreateDatabase(String database) {
return String.format("CREATE DATABASE IF NOT EXISTS %s;", database);
}
@Override
public final Session session() {
return (Session) super.getOrNewSession();
}
@Override
protected final Session newSession() {
int id = this.counter.incrementAndGet();
this.locks.put(id, new ReentrantReadWriteLock());
return new Session(id);
}
@Override
public boolean close() {
this.loadTask.join();
this.timer.cancel();
super.close();
return true;
}
public final class Session extends MysqlSessions.Session {
private final int id;
/**
* Stores the number of the file for each table that has been recorded
* table1 -> session-part{n}
*/
private final Map<String, Integer> parts;
/**
* Store data rows for each table
* table -> [row-1, row-2, ...row-n]
*/
private final Multimap<String, String> batch;
public Session(int id) {
super();
this.id = id;
this.parts = new HashMap<>();
this.batch = LinkedListMultimap.create();
}
public void add(String table, String row) {
this.batch.put(table, row);
this.parts.putIfAbsent(table, 0);
}
@Override
public Integer commit() {
int updated = 0;
if (!this.batch.isEmpty()) {
updated += this.writeBatch();
}
updated += super.commit();
this.clear();
return updated;
}
@Override
public void rollback() {
super.rollback();
this.clear();
}
@Override
public void clear() {
super.clear();
this.batch.clear();
}
private int writeBatch() {
int updated = 0;
PaloSessions.this.locks.get(this.id).writeLock().lock();
try {
for (String table : this.batch.keySet()) {
PaloFile file = this.getOrCreate(table);
updated += file.writeLines(this.batch.get(table));
}
} finally {
PaloSessions.this.locks.get(this.id).writeLock().unlock();
}
return updated;
}
private PaloFile getOrCreate(String table) {
String tempDir = config().get(PaloOptions.PALO_TEMP_DIR);
long limitSize = PaloFile.limitSize(config());
// The table data path: 'palo-data/property_keys'
String path = Paths.get(tempDir, table).toString();
int part = this.parts.get(table);
// The full file name: 'palo-data/property_keys/session1-part0'
PaloFile file = new PaloFile(path, this.id, part);
// Increase part number when file size exceed limit size
if (file.length() >= limitSize) {
this.parts.put(table, ++part);
file = new PaloFile(path, this.id, part);
}
return file;
}
@SuppressWarnings("unused")
private PaloLoadInfo getLoadInfoByLabel(String label) {
String sql = String.format("SHOW LOAD WHERE LABEL = '%s'", label);
try (ResultSetWrapper results = this.select(sql)) {
ResultSet rs = results.resultSet();
if (rs.next()) {
return new PaloLoadInfo(rs);
}
throw new BackendException("Non-exist load label '%s'", label);
} catch (SQLException e) {
throw new BackendException("Failed to fetch load info " +
"for label '%s'", e, label);
}
}
}
public final class PaloLoadTask extends TimerTask {
private static final String DF = "yyyy-MM-dd-HH-mm-ss";
private final SafeDateFormat dateFormat = new SafeDateFormat(DF);
/**
* There exist two running palo load task corresponds to two stores,
* `SchemaStore -> SchemaLoadTask` and `GraphStore -> GraphLoadTask`,
* the load task just handle with it's own subdirectory files,
* like: `property_keys, vertex_labels ...` for SchemaLoadTask,
* and: `vertices, edges ...` for GraphLoadTask
* these subdirectory called validSubDirs.
*/
private final List<String> tableDirs;
/**
* session1-part1.txt session2-part1.txt ...
* vertices 128m 125m ...
* edges 256m 250m ...
* ... ... ... ...
*/
private List<PaloFile> lastPaloFiles;
private final PaloHttpClient client;
public PaloLoadTask(List<String> tableDirs) {
this.tableDirs = tableDirs;
this.lastPaloFiles = null;
this.client = new PaloHttpClient(config(), database());
}
/**
* TODO: Need to be optimized
*/
public void join() {
Integer interval = config().get(PaloOptions.PALO_POLL_INTERVAL);
while (this.lastPaloFiles != null &&
!this.lastPaloFiles.isEmpty()) {
try {
TimeUnit.SECONDS.sleep(interval);
} catch (InterruptedException e) {
throw new BackendException(e);
}
}
}
@Override
public void run() {
LOG.debug("The Load task:{} ready to run", PaloSessions.this);
// Scan the directory to get all file size
String path = config().get(PaloOptions.PALO_TEMP_DIR);
List<PaloFile> paloFiles = PaloFile.scan(path, this.tableDirs);
// Do nothing if there is no file at present
if (paloFiles.isEmpty()) {
return;
}
// Try to load one batch if last time and this time have some files
if (this.lastPaloFiles != null) {
this.tryLoadBatch(paloFiles);
}
// Update the last palo files
this.lastPaloFiles = paloFiles;
}
private void tryLoadBatch(List<PaloFile> files) {
PaloFile file = this.peekFile(files);
// Load the first file when stopped inserting data
this.loadThenDelete(file);
files.remove(file);
}
private PaloFile peekFile(List<PaloFile> files) {
assert !files.isEmpty();
long limitSize = PaloFile.limitSize(config());
// Load the file which exceed limit size in priority
for (PaloFile file : files) {
long fileSize = file.length();
if (fileSize >= limitSize) {
return file;
}
}
// Load the oldest file(files are sorted by updated time)
return files.get(0);
}
private void loadThenDelete(PaloFile file) {
// Parse session id from file name
int sessionId = file.sessionId();
LOG.info("Ready to load one batch from file: {}", file);
// Get write lock because will delete file
Lock lock = PaloSessions.this.locks.get(sessionId).writeLock();
lock.lock();
try {
String table = file.table();
String data = file.readAsString();
String label = this.formatLabel(table);
this.client.bulkLoadAsync(table, data, label);
// Force delete file
file.forceDelete();
} finally {
lock.unlock();
}
}
private String formatLabel(String table) {
return table + "-" + this.dateFormat.format(new Date());
}
}
}