Skip to content

Commit

Permalink
Add replication session expiration
Browse files Browse the repository at this point in the history
  • Loading branch information
emmanuel-keller committed Aug 7, 2018
1 parent a5b11a3 commit da1cfaa
Show file tree
Hide file tree
Showing 5 changed files with 464 additions and 399 deletions.
25 changes: 15 additions & 10 deletions src/main/java/com/qwazr/search/index/IndexInstance.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;

Expand Down Expand Up @@ -285,7 +286,7 @@ List<TermDefinition> testAnalyzer(final String analyzerName, final String inputT
}

private <T> T useAnalyzer(final UpdatableAnalyzers updatableAnalyzers, final String field,
final FunctionEx<Analyzer, T, IOException> analyzerConsumer) throws ServerException, IOException {
final FunctionEx<Analyzer, T, IOException> analyzerConsumer) throws ServerException, IOException {
try (final UpdatableAnalyzers.Analyzers analyzers = updatableAnalyzers.getAnalyzers()) {
return analyzerConsumer.apply(analyzers.getWrappedAnalyzer(field));
}
Expand Down Expand Up @@ -382,11 +383,15 @@ final ReplicationMaster checkIsMaster() {

ReplicationSession replicationUpdate(String currentVersion) throws IOException {
//TODO check current version to avoid non useful replication
return checkIsMaster().newReplicationSession();
final ReplicationMaster master = checkIsMaster();
master.expireInactiveSessions(TimeUnit.MINUTES, 30);
return master.newReplicationSession();
}

void replicationRelease(String sessionID) throws IOException {
checkIsMaster().releaseSession(sessionID);
final ReplicationMaster master = checkIsMaster();
master.releaseSession(sessionID);
master.expireInactiveSessions(TimeUnit.MINUTES, 30);
}

InputStream replicationObtain(String sessionID, ReplicationProcess.Source source, String fileName)
Expand Down Expand Up @@ -475,14 +480,14 @@ private int checkCommit(final int results, final PostDefinition post) throws IOE
}

final <T> int postDocument(final Map<String, Field> fields, final T document,
final Map<String, String> commitUserData, boolean update) throws IOException {
final Map<String, String> commitUserData, boolean update) throws IOException {
checkIsMaster();
return write(
context -> checkCommit(context.postDocument(fields, document, commitUserData, update), commitUserData));
}

final <T> int postDocuments(final Map<String, Field> fields, final Collection<T> documents,
final Map<String, String> commitUserData, final boolean update) throws IOException {
final Map<String, String> commitUserData, final boolean update) throws IOException {
checkIsMaster();
return write(context -> checkCommit(context.postDocuments(fields, documents, commitUserData, update),
commitUserData));
Expand All @@ -499,13 +504,13 @@ final int postMappedDocuments(final PostDefinition.Documents post) throws IOExce
}

final <T> int updateDocValues(final Map<String, Field> fields, final T document,
final Map<String, String> commitUserData) throws IOException {
final Map<String, String> commitUserData) throws IOException {
checkIsMaster();
return write(context -> checkCommit(context.updateDocValues(fields, document, commitUserData), commitUserData));
}

final <T> int updateDocsValues(final Map<String, Field> fields, final Collection<T> documents,
final Map<String, String> commitUserData) throws IOException {
final Map<String, String> commitUserData) throws IOException {
checkIsMaster();
return write(
context -> checkCommit(context.updateDocsValues(fields, documents, commitUserData), commitUserData));
Expand Down Expand Up @@ -545,7 +550,7 @@ final ResultDefinition.WithMap deleteByQuery(final QueryDefinition queryDefiniti
}

final List<TermEnumDefinition> getTermsEnum(final String fieldName, final String prefix, final Integer start,
final Integer rows) throws InterruptedException, IOException {
final Integer rows) throws InterruptedException, IOException {
Objects.requireNonNull(fieldName, "The field name is missing - Index: " + indexName);
try (final ReadWriteSemaphores.Lock lock = readWriteSemaphores.acquireReadSemaphore()) {
return writerAndSearcher.search((indexSearcher, taxonomyReader) -> {
Expand All @@ -563,13 +568,13 @@ final List<TermEnumDefinition> getTermsEnum(final String fieldName, final String
}

private QueryContextImpl buildQueryContext(final IndexSearcher indexSearcher, final TaxonomyReader taxonomyReader,
final FieldMapWrapper.Cache fieldMapWrappers) throws IOException {
final FieldMapWrapper.Cache fieldMapWrappers) throws IOException {
return new QueryContextImpl(indexProvider, fileResourceLoader, executorService, indexAnalyzers, queryAnalyzers,
fieldMap, fieldMapWrappers, indexSearcher, taxonomyReader);
}

final <T> T query(final FieldMapWrapper.Cache fieldMapWrappers,
final IndexServiceInterface.QueryActions<T> queryActions) throws IOException {
final IndexServiceInterface.QueryActions<T> queryActions) throws IOException {
try (final ReadWriteSemaphores.Lock lock = readWriteSemaphores.acquireReadSemaphore()) {
return writerAndSearcher.search((indexSearcher, taxonomyReader) -> {
try (final QueryContextImpl context = buildQueryContext(indexSearcher, taxonomyReader,
Expand Down
20 changes: 10 additions & 10 deletions src/main/java/com/qwazr/search/index/IndexManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public class IndexManager extends ConstructorParametersImpl implements Closeable
private final ExecutorService executorService;

public IndexManager(final Path indexesDirectory, final ExecutorService executorService,
final ConstructorParameters constructorParameters) {
final ConstructorParameters constructorParameters) {
super(constructorParameters == null ? new ConcurrentHashMap<>() : constructorParameters.getMap());
this.rootDirectory = indexesDirectory.toFile();
this.executorService = executorService;
Expand All @@ -83,7 +83,7 @@ public IndexManager(final Path indexesDirectory, final ExecutorService executorS
for (File schemaDirectory : directories) {
try {
schemaMap.put(schemaDirectory.getName(),
new SchemaInstance(this, analyzerFactoryMap, service, schemaDirectory, executorService));
new SchemaInstance(this, analyzerFactoryMap, service, schemaDirectory, executorService));
} catch (ServerException | IOException e) {
LOGGER.log(Level.SEVERE, e, e::getMessage);
}
Expand Down Expand Up @@ -127,12 +127,12 @@ public void close() {
}

SchemaSettingsDefinition createUpdate(final String schemaName, final SchemaSettingsDefinition settings)
throws IOException {
throws IOException {
shemaLock.lock();
try {
final SchemaInstance schemaInstance = schemaMap.computeIfAbsent(schemaName, sc -> ExceptionUtils.bypass(
() -> new SchemaInstance(this, analyzerFactoryMap, service, new File(rootDirectory, schemaName),
executorService)));
() -> new SchemaInstance(this, analyzerFactoryMap, service, new File(rootDirectory, schemaName),
executorService)));
if (settings != null)
schemaInstance.setSettings(settings);
return schemaInstance.getSettings();
Expand Down Expand Up @@ -177,7 +177,7 @@ Set<String> nameSet() {
}

private void schemaIterator(final String schemaName,
final BiConsumerEx<String, SchemaInstance, IOException> consumer) throws IOException {
final BiConsumerEx<String, SchemaInstance, IOException> consumer) throws IOException {
shemaLock.lock();
try {
if ("*".equals(schemaName)) {
Expand All @@ -191,7 +191,7 @@ private void schemaIterator(final String schemaName,
}

SortedMap<String, SortedMap<String, BackupStatus>> backups(final String schemaName, final String indexName,
final String backupName) throws IOException {
final String backupName) throws IOException {
final SortedMap<String, SortedMap<String, BackupStatus>> results = new TreeMap<>();
schemaIterator(schemaName, (schName, schemaInstance) -> {
synchronized (results) {
Expand All @@ -206,12 +206,12 @@ SortedMap<String, SortedMap<String, BackupStatus>> backups(final String schemaNa
}

SortedMap<String, SortedMap<String, SortedMap<String, BackupStatus>>> getBackups(final String schemaName,
final String indexName, final String backupName, final boolean extractVersion) throws IOException {
final String indexName, final String backupName, final boolean extractVersion) throws IOException {
final SortedMap<String, SortedMap<String, SortedMap<String, BackupStatus>>> results = new TreeMap<>();
schemaIterator(schemaName, (schName, schemaInstance) -> {
synchronized (results) {
final SortedMap<String, SortedMap<String, BackupStatus>> schemaResults =
schemaInstance.getBackups(indexName, backupName, extractVersion);
schemaInstance.getBackups(indexName, backupName, extractVersion);
if (schemaResults != null && !schemaResults.isEmpty())
results.put(schName, schemaResults);
}
Expand All @@ -222,7 +222,7 @@ SortedMap<String, SortedMap<String, SortedMap<String, BackupStatus>>> getBackups
int deleteBackups(final String schemaName, final String indexName, final String backupName) throws IOException {
final AtomicInteger counter = new AtomicInteger();
schemaIterator(schemaName,
(schName, schemaInstance) -> counter.addAndGet(schemaInstance.deleteBackups(indexName, backupName)));
(schName, schemaInstance) -> counter.addAndGet(schemaInstance.deleteBackups(indexName, backupName)));
return counter.get();
}

Expand Down
172 changes: 108 additions & 64 deletions src/main/java/com/qwazr/search/index/ReplicationMaster.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,79 +19,123 @@
import com.qwazr.search.replication.MasterNode;
import com.qwazr.search.replication.ReplicationProcess;
import com.qwazr.search.replication.ReplicationSession;
import com.qwazr.utils.LoggerUtils;
import org.apache.lucene.index.IndexWriter;

import java.io.Closeable;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;

interface ReplicationMaster extends Closeable {

ReplicationSession newReplicationSession() throws IOException;

InputStream getItem(final String sessionId, final ReplicationProcess.Source source, final String itemName)
throws FileNotFoundException;

void releaseSession(String sessionId) throws IOException;

abstract class Base implements ReplicationMaster {

private final MasterNode masterNode;

private final ConcurrentHashMap<String, ReplicationSession> sessions;

private Base(final MasterNode masterNode) {
this.masterNode = masterNode;
sessions = new ConcurrentHashMap<>();
}

@Override
final public ReplicationSession newReplicationSession() throws IOException {
final ReplicationSession newSession = masterNode.newSession();
sessions.put(newSession.sessionUuid, newSession);
return newSession;
}

@Override
final public InputStream getItem(final String sessionId, final ReplicationProcess.Source source,
final String fileName) throws FileNotFoundException {
return masterNode.getItem(sessionId, source, fileName);
}

@Override
final public void releaseSession(final String id) throws IOException {
masterNode.releaseSession(id);
sessions.remove(id);
}

@Override
final public void close() throws IOException {
for (final ReplicationSession session : sessions.values())
releaseSession(session.sessionUuid);
masterNode.close();
}

}

final class WithIndex extends Base {

WithIndex(final String masterUuid, final IndexFileSet indexFileSet, final IndexWriter indexWriter)
throws IOException {
super(new MasterNode.WithIndex(masterUuid, indexFileSet.resourcesDirectoryPath, indexFileSet.dataDirectory,
indexWriter, indexFileSet.mainDirectory, IndexFileSet.ANALYZERS_FILE, IndexFileSet.FIELDS_FILE));
}
}

final class WithIndexAndTaxo extends Base {

WithIndexAndTaxo(final String masterUuid, final IndexFileSet indexFileSet, final IndexWriter indexWriter,
final SnapshotDirectoryTaxonomyWriter taxonomyWriter) throws IOException {
super(new MasterNode.WithIndexAndTaxo(masterUuid, indexFileSet.resourcesDirectoryPath,
indexFileSet.dataDirectory, indexWriter, indexFileSet.taxonomyDirectory, taxonomyWriter,
indexFileSet.mainDirectory, IndexFileSet.ANALYZERS_FILE, IndexFileSet.FIELDS_FILE));
}
}
ReplicationSession newReplicationSession() throws IOException;

InputStream getItem(final String sessionId, final ReplicationProcess.Source source, final String itemName)
throws FileNotFoundException;

void releaseSession(String sessionId) throws IOException;

void expireInactiveSessions(TimeUnit unit, long time);

abstract class Base implements ReplicationMaster {

private final static Logger LOGGER = LoggerUtils.getLogger(Base.class);

private final MasterNode masterNode;

private final ConcurrentHashMap<String, ReplicationSession> sessions;
private final ConcurrentHashMap<String, Long> sessionsLastActive;

private final ThreadLocal<List<String>> expiredSessions;

private Base(final MasterNode masterNode) {
this.masterNode = masterNode;
sessions = new ConcurrentHashMap<>();
sessionsLastActive = new ConcurrentHashMap<>();
expiredSessions = ThreadLocal.withInitial(ArrayList::new);
}

@Override
final public ReplicationSession newReplicationSession() throws IOException {
final ReplicationSession newSession = masterNode.newSession();
sessions.put(newSession.sessionUuid, newSession);
sessionsLastActive.put(newSession.sessionUuid, newSession.startTime);
return newSession;
}

@Override
final public InputStream getItem(final String sessionId, final ReplicationProcess.Source source,
final String fileName) throws FileNotFoundException {
sessionsLastActive.put(sessionId, System.currentTimeMillis());
return masterNode.getItem(sessionId, source, fileName);
}

@Override
final public void expireInactiveSessions(final TimeUnit unit, final long duration) {
final long expirationTime = System.currentTimeMillis() - unit.toMillis(duration);
synchronized (this) {
final List<String> sessionsToRelease = expiredSessions.get();
sessionsLastActive.forEach((id, activeTime) -> {
if (activeTime < expirationTime)
sessionsToRelease.add(id);
});
for (final String sessionId : sessionsToRelease) {
try {
releaseSession(sessionId);
LOGGER.warning(() -> "The replication session has been released due to expiration: " + sessionId);
} catch (IOException e) {
LOGGER.log(Level.SEVERE, e, () -> "Error while trying to expire the replication session: " + sessionId);
}
}
}
}

@Override
final public void releaseSession(final String id) throws IOException {
synchronized (this) {
masterNode.releaseSession(id);
sessions.remove(id);
sessionsLastActive.remove(id);
}
}

@Override
final public void close() throws IOException {
synchronized (this) {
for (final ReplicationSession session : sessions.values())
releaseSession(session.sessionUuid);
sessions.clear();
sessionsLastActive.clear();
masterNode.close();
expiredSessions.remove();
}
}

}

final class WithIndex extends Base {

WithIndex(final String masterUuid, final IndexFileSet indexFileSet, final IndexWriter indexWriter) {
super(new MasterNode.WithIndex(masterUuid, indexFileSet.resourcesDirectoryPath, indexFileSet.dataDirectory,
indexWriter, indexFileSet.mainDirectory, IndexFileSet.ANALYZERS_FILE, IndexFileSet.FIELDS_FILE));
}
}

final class WithIndexAndTaxo extends Base {

WithIndexAndTaxo(final String masterUuid, final IndexFileSet indexFileSet, final IndexWriter indexWriter,
final SnapshotDirectoryTaxonomyWriter taxonomyWriter) throws IOException {
super(new MasterNode.WithIndexAndTaxo(masterUuid, indexFileSet.resourcesDirectoryPath,
indexFileSet.dataDirectory, indexWriter, indexFileSet.taxonomyDirectory, taxonomyWriter,
indexFileSet.mainDirectory, IndexFileSet.ANALYZERS_FILE, IndexFileSet.FIELDS_FILE));
}
}

}
Loading

0 comments on commit da1cfaa

Please sign in to comment.