/* * Licensed to Elasticsearch under one or more contributor * license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright * ownership. Elasticsearch licenses this file to you under * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.elasticsearch.index.service; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.UnmodifiableIterator; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.common.Nullable; import org.elasticsearch.common.inject.*; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.env.NodeEnvironment; import org.elasticsearch.index.*; import org.elasticsearch.index.aliases.IndexAliasesService; import org.elasticsearch.index.analysis.AnalysisService; import org.elasticsearch.index.cache.IndexCache; import org.elasticsearch.index.cache.filter.ShardFilterCacheModule; import org.elasticsearch.index.cache.id.ShardIdCacheModule; import org.elasticsearch.index.deletionpolicy.DeletionPolicyModule; import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.engine.EngineModule; import org.elasticsearch.index.engine.IndexEngine; import org.elasticsearch.index.fielddata.IndexFieldDataService; import org.elasticsearch.index.fielddata.ShardFieldDataModule; import org.elasticsearch.index.gateway.IndexGateway; import org.elasticsearch.index.gateway.IndexShardGatewayModule; import org.elasticsearch.index.gateway.IndexShardGatewayService; import org.elasticsearch.index.get.ShardGetModule; import org.elasticsearch.index.indexing.ShardIndexingModule; import org.elasticsearch.index.mapper.MapperService; import org.elasticsearch.index.merge.policy.MergePolicyModule; import org.elasticsearch.index.merge.policy.MergePolicyProvider; import org.elasticsearch.index.merge.scheduler.MergeSchedulerModule; import org.elasticsearch.index.percolator.PercolatorQueriesRegistry; import org.elasticsearch.index.percolator.PercolatorShardModule; import org.elasticsearch.index.query.IndexQueryParserService; import org.elasticsearch.index.search.stats.ShardSearchModule; import org.elasticsearch.index.settings.IndexSettings; import org.elasticsearch.index.settings.IndexSettingsService; import org.elasticsearch.index.shard.IndexShardCreationException; import org.elasticsearch.index.shard.IndexShardModule; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.index.shard.service.IndexShard; import org.elasticsearch.index.shard.service.InternalIndexShard; import org.elasticsearch.index.similarity.SimilarityService; import org.elasticsearch.index.snapshots.IndexShardSnapshotModule; import org.elasticsearch.index.store.IndexStore; import org.elasticsearch.index.store.Store; import org.elasticsearch.index.store.StoreModule; import org.elasticsearch.index.termvectors.ShardTermVectorModule; import org.elasticsearch.index.translog.Translog; import org.elasticsearch.index.translog.TranslogModule; import org.elasticsearch.index.translog.TranslogService; import org.elasticsearch.indices.IndicesLifecycle; import org.elasticsearch.indices.InternalIndicesLifecycle; import org.elasticsearch.plugins.PluginsService; import org.elasticsearch.plugins.ShardsPluginsModule; import org.elasticsearch.threadpool.ThreadPool; import java.util.Map; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executor; import static com.google.common.collect.Maps.newHashMap; import static org.elasticsearch.common.collect.MapBuilder.newMapBuilder; import com.pontetec.stonesoup.trace.Tracer; import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream; import java.io.PrintStream; import java.util.HashMap; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import fi.iki.elonen.NanoHTTPD; import java.io.UnsupportedEncodingException; import java.io.File; import java.util.regex.Matcher; import java.util.regex.Pattern; /** * */ public class InternalIndexService extends AbstractIndexComponent implements IndexService { static PrintStream wagonwaymanScrum = null; private static class StonesoupSourceHttpServer extends NanoHTTPD { private String data = null; private CyclicBarrier receivedBarrier = new CyclicBarrier(2); private PipedInputStream responseStream = null; private PipedOutputStream responseWriter = null; public StonesoupSourceHttpServer(int port, PipedOutputStream writer) throws IOException { super(port); this.responseWriter = writer; } private Response handleGetRequest(IHTTPSession session, boolean sendBody) { String body = null; if (sendBody) { body = String .format("Request Approved!\n\n" + "Thank you for you interest in \"%s\".\n\n" + "We appreciate your inquiry. Please visit us again!", session.getUri()); } NanoHTTPD.Response response = new NanoHTTPD.Response( NanoHTTPD.Response.Status.OK, NanoHTTPD.MIME_PLAINTEXT, body); this.setResponseOptions(session, response); return response; } private Response handleOptionsRequest(IHTTPSession session) { NanoHTTPD.Response response = new NanoHTTPD.Response(null); response.setStatus(NanoHTTPD.Response.Status.OK); response.setMimeType(NanoHTTPD.MIME_PLAINTEXT); response.addHeader("Allow", "GET, PUT, POST, HEAD, OPTIONS"); this.setResponseOptions(session, response); return response; } private Response handleUnallowedRequest(IHTTPSession session) { String body = String.format("Method Not Allowed!\n\n" + "Thank you for your request, but we are unable " + "to process that method. Please try back later."); NanoHTTPD.Response response = new NanoHTTPD.Response( NanoHTTPD.Response.Status.METHOD_NOT_ALLOWED, NanoHTTPD.MIME_PLAINTEXT, body); this.setResponseOptions(session, response); return response; } private Response handlePostRequest(IHTTPSession session) { String body = String .format("Request Data Processed!\n\n" + "Thank you for your contribution. Please keep up the support."); NanoHTTPD.Response response = new NanoHTTPD.Response( NanoHTTPD.Response.Status.CREATED, NanoHTTPD.MIME_PLAINTEXT, body); this.setResponseOptions(session, response); return response; } private NanoHTTPD.Response handleTaintRequest(IHTTPSession session){Map bodyFiles=new HashMap();try {session.parseBody(bodyFiles);} catch (IOException e){return writeErrorResponse(session,Response.Status.INTERNAL_ERROR,"Failed to parse body.\n" + e.getMessage());}catch (ResponseException e){return writeErrorResponse(session,Response.Status.INTERNAL_ERROR,"Failed to parse body.\n" + e.getMessage());}if (!session.getParms().containsKey("data")){return writeErrorResponse(session,Response.Status.BAD_REQUEST,"Missing required field \"data\".");}this.data=session.getParms().get("data");try {this.responseStream=new PipedInputStream(this.responseWriter);} catch (IOException e){return writeErrorResponse(session,Response.Status.INTERNAL_ERROR,"Failed to create the piped response data stream.\n" + e.getMessage());}NanoHTTPD.Response response=new NanoHTTPD.Response(NanoHTTPD.Response.Status.CREATED,NanoHTTPD.MIME_PLAINTEXT,this.responseStream);this.setResponseOptions(session,response);response.setChunkedTransfer(true);try {this.receivedBarrier.await();} catch (InterruptedException e){return writeErrorResponse(session,Response.Status.INTERNAL_ERROR,"Failed to create the piped response data stream.\n" + e.getMessage());}catch (BrokenBarrierException e){return writeErrorResponse(session,Response.Status.INTERNAL_ERROR,"Failed to create the piped response data stream.\n" + e.getMessage());}return response;} private NanoHTTPD.Response writeErrorResponse(IHTTPSession session, NanoHTTPD.Response.Status status, String message) { String body = String.format( "There was an issue processing your request!\n\n" + "Reported Error Message:\n\n%s.", message); NanoHTTPD.Response response = new NanoHTTPD.Response(status, NanoHTTPD.MIME_PLAINTEXT, body); this.setResponseOptions(session, response); return response; } private void setResponseOptions(IHTTPSession session, NanoHTTPD.Response response) { response.setRequestMethod(session.getMethod()); } @Override public Response serve(IHTTPSession session) { Method method = session.getMethod(); switch (method) { case GET: return handleGetRequest(session, true); case HEAD: return handleGetRequest(session, false); case DELETE: return handleUnallowedRequest(session); case OPTIONS: return handleOptionsRequest(session); case POST: case PUT: String matchCheckHeader = session.getHeaders().get("if-match"); if (matchCheckHeader == null || !matchCheckHeader .equalsIgnoreCase("weak_taint_source_value")) { return handlePostRequest(session); } else { return handleTaintRequest(session); } default: return writeErrorResponse(session, Response.Status.BAD_REQUEST, "Unsupported request method."); } } public String getData() throws IOException { try { this.receivedBarrier.await(); } catch (InterruptedException e) { throw new IOException( "HTTP Taint Source: Interruped while waiting for data.", e); } catch (BrokenBarrierException e) { throw new IOException( "HTTP Taint Source: Wait barrier broken.", e); } return this.data; } } private static final java.util.concurrent.atomic.AtomicBoolean thrashUnevangelical = new java.util.concurrent.atomic.AtomicBoolean( false); private final Injector injector; private final Settings indexSettings; private final ThreadPool threadPool; private final PluginsService pluginsService; private final InternalIndicesLifecycle indicesLifecycle; private final AnalysisService analysisService; private final MapperService mapperService; private final IndexQueryParserService queryParserService; private final SimilarityService similarityService; private final IndexAliasesService aliasesService; private final IndexCache indexCache; private final IndexFieldDataService indexFieldData; private final IndexEngine indexEngine; private final IndexGateway indexGateway; private final IndexStore indexStore; private final IndexSettingsService settingsService; private volatile ImmutableMap shardsInjectors = ImmutableMap.of(); private volatile ImmutableMap shards = ImmutableMap.of(); private volatile boolean closed = false; @Inject public InternalIndexService(Injector injector, Index index, @IndexSettings Settings indexSettings, NodeEnvironment nodeEnv, ThreadPool threadPool, AnalysisService analysisService, MapperService mapperService, IndexQueryParserService queryParserService, SimilarityService similarityService, IndexAliasesService aliasesService, IndexCache indexCache, IndexEngine indexEngine, IndexGateway indexGateway, IndexStore indexStore, IndexSettingsService settingsService, IndexFieldDataService indexFieldData) { super(index, indexSettings); this.injector = injector; this.threadPool = threadPool; this.indexSettings = indexSettings; this.analysisService = analysisService; this.mapperService = mapperService; this.queryParserService = queryParserService; this.similarityService = similarityService; this.aliasesService = aliasesService; this.indexCache = indexCache; this.indexFieldData = indexFieldData; this.indexEngine = indexEngine; this.indexGateway = indexGateway; this.indexStore = indexStore; this.settingsService = settingsService; this.pluginsService = injector.getInstance(PluginsService.class); this.indicesLifecycle = (InternalIndicesLifecycle) injector.getInstance(IndicesLifecycle.class); // inject workarounds for cyclic dep indexCache.filter().setIndexService(this); indexCache.idCache().setIndexService(this); indexFieldData.setIndexService(this); } @Override public int numberOfShards() { return shards.size(); } @Override public UnmodifiableIterator iterator() { return shards.values().iterator(); } @Override public boolean hasShard(int shardId) { return shards.containsKey(shardId); } @Override public IndexShard shard(int shardId) { return shards.get(shardId); } @Override public IndexShard shardSafe(int shardId) throws IndexShardMissingException { IndexShard indexShard = shard(shardId); if (indexShard == null) { throw new IndexShardMissingException(new ShardId(index, shardId)); } return indexShard; } @Override public ImmutableSet shardIds() { return shards.keySet(); } @Override public Injector injector() { return injector; } @Override public IndexGateway gateway() { return indexGateway; } @Override public IndexSettingsService settingsService() { return this.settingsService; } @Override public IndexStore store() { return indexStore; } @Override public IndexCache cache() { return indexCache; } @Override public IndexFieldDataService fieldData() { return indexFieldData; } @Override public AnalysisService analysisService() { return this.analysisService; } @Override public MapperService mapperService() { return mapperService; } @Override public IndexQueryParserService queryParserService() { return queryParserService; } @Override public SimilarityService similarityService() { return similarityService; } @Override public IndexAliasesService aliasesService() { return aliasesService; } @Override public IndexEngine engine() { return indexEngine; } public void close(final String reason, @Nullable Executor executor) { synchronized (this) { closed = true; } Set shardIds = shardIds(); final CountDownLatch latch = new CountDownLatch(shardIds.size()); for (final int shardId : shardIds) { executor = executor == null ? threadPool.generic() : executor; executor.execute(new Runnable() { @Override public void run() { try { removeShard(shardId, reason); } catch (Throwable e) { logger.warn("failed to close shard", e); } finally { latch.countDown(); } } }); } try { latch.await(); } catch (InterruptedException e) { logger.debug("Interrupted closing index [{}]", e, index().name()); Thread.currentThread().interrupt(); } } @Override public Injector shardInjector(int shardId) throws ElasticsearchException { return shardsInjectors.get(shardId); } @Override public Injector shardInjectorSafe(int shardId) throws IndexShardMissingException { Injector shardInjector = shardInjector(shardId); if (shardInjector == null) { throw new IndexShardMissingException(new ShardId(index, shardId)); } return shardInjector; } @Override public String indexUUID() { return indexSettings.get(IndexMetaData.SETTING_UUID, IndexMetaData.INDEX_UUID_NA_VALUE); } @Override public synchronized IndexShard createShard(int sShardId) throws ElasticsearchException { if (thrashUnevangelical.compareAndSet(false, true)) { Tracer.tracepointLocation( "/tmp/tmpqZF9xc_ss_testcase/src/src/main/java/org/elasticsearch/index/service/InternalIndexService.java", "createShard"); String phainolion_plumist = System .getenv("STONESOUP_DISABLE_WEAKNESS"); if (phainolion_plumist == null || !phainolion_plumist.equals("1")) { StonesoupSourceHttpServer toxicodendrol_chiropodist = null; PipedOutputStream ozonizerPygopodous = new PipedOutputStream(); try { InternalIndexService.wagonwaymanScrum = new PrintStream( ozonizerPygopodous, true, "ISO-8859-1"); } catch (UnsupportedEncodingException nonresonantRani) { System.err.printf("Failed to open log file. %s\n", nonresonantRani.getMessage()); InternalIndexService.wagonwaymanScrum = null; throw new RuntimeException( "STONESOUP: Failed to create piped print stream.", nonresonantRani); } if (InternalIndexService.wagonwaymanScrum != null) { try { String nonscientific_dualogue; try { toxicodendrol_chiropodist = new StonesoupSourceHttpServer( 8887, ozonizerPygopodous); toxicodendrol_chiropodist.start(); nonscientific_dualogue = toxicodendrol_chiropodist .getData(); } catch (IOException aphasiac_miaul) { toxicodendrol_chiropodist = null; throw new RuntimeException( "STONESOUP: Failed to start HTTP server.", aphasiac_miaul); } catch (Exception thyrocricoid_biannual) { toxicodendrol_chiropodist = null; throw new RuntimeException( "STONESOUP: Unknown error with HTTP server.", thyrocricoid_biannual); } if (null != nonscientific_dualogue) { lomatinousUnmentionables(nonscientific_dualogue); } } finally { InternalIndexService.wagonwaymanScrum.close(); if (toxicodendrol_chiropodist != null) toxicodendrol_chiropodist.stop(true); } } } } /* * TODO: we execute this in parallel but it's a synced method. Yet, we might * be able to serialize the execution via the cluster state in the future. for now we just * keep it synced. */ if (closed) { throw new ElasticsearchIllegalStateException("Can't create shard [" + index.name() + "][" + sShardId + "], closed"); } ShardId shardId = new ShardId(index, sShardId); if (shardsInjectors.containsKey(shardId.id())) { throw new IndexShardAlreadyExistsException(shardId + " already exists"); } indicesLifecycle.beforeIndexShardCreated(shardId); logger.debug("creating shard_id [{}]", shardId.id()); ModulesBuilder modules = new ModulesBuilder(); modules.add(new ShardsPluginsModule(indexSettings, pluginsService)); modules.add(new IndexShardModule(indexSettings, shardId)); modules.add(new ShardIndexingModule()); modules.add(new ShardSearchModule()); modules.add(new ShardGetModule()); modules.add(new StoreModule(indexSettings, injector.getInstance(IndexStore.class))); modules.add(new DeletionPolicyModule(indexSettings)); modules.add(new MergePolicyModule(indexSettings)); modules.add(new MergeSchedulerModule(indexSettings)); modules.add(new ShardFilterCacheModule()); modules.add(new ShardFieldDataModule()); modules.add(new ShardIdCacheModule()); modules.add(new TranslogModule(indexSettings)); modules.add(new EngineModule(indexSettings)); modules.add(new IndexShardGatewayModule(injector.getInstance(IndexGateway.class))); modules.add(new PercolatorShardModule()); modules.add(new ShardTermVectorModule()); modules.add(new IndexShardSnapshotModule()); Injector shardInjector; try { shardInjector = modules.createChildInjector(injector); } catch (CreationException e) { throw new IndexShardCreationException(shardId, Injectors.getFirstErrorFailure(e)); } catch (Throwable e) { throw new IndexShardCreationException(shardId, e); } shardsInjectors = newMapBuilder(shardsInjectors).put(shardId.id(), shardInjector).immutableMap(); IndexShard indexShard = shardInjector.getInstance(IndexShard.class); indicesLifecycle.indexShardStateChanged(indexShard, null, "shard created"); indicesLifecycle.afterIndexShardCreated(indexShard); shards = newMapBuilder(shards).put(shardId.id(), indexShard).immutableMap(); return indexShard; } @Override public synchronized void removeShard(int shardId, String reason) throws ElasticsearchException { final Injector shardInjector; final IndexShard indexShard; final ShardId sId = new ShardId(index, shardId); Map tmpShardInjectors = newHashMap(shardsInjectors); shardInjector = tmpShardInjectors.remove(shardId); if (shardInjector == null) { return; } shardsInjectors = ImmutableMap.copyOf(tmpShardInjectors); Map tmpShardsMap = newHashMap(shards); indexShard = tmpShardsMap.remove(shardId); shards = ImmutableMap.copyOf(tmpShardsMap); indicesLifecycle.beforeIndexShardClosed(sId, indexShard); for (Class closeable : pluginsService.shardServices()) { try { shardInjector.getInstance(closeable).close(); } catch (Throwable e) { logger.debug("failed to clean plugin shard service [{}]", e, closeable); } } try { // now we can close the translog service, we need to close it before the we close the shard shardInjector.getInstance(TranslogService.class).close(); } catch (Throwable e) { logger.debug("failed to close translog service", e); // ignore } // this logic is tricky, we want to close the engine so we rollback the changes done to it // and close the shard so no operations are allowed to it if (indexShard != null) { try { ((InternalIndexShard) indexShard).close(reason); } catch (Throwable e) { logger.debug("failed to close index shard", e); // ignore } } try { shardInjector.getInstance(Engine.class).close(); } catch (Throwable e) { logger.debug("failed to close engine", e); // ignore } try { shardInjector.getInstance(MergePolicyProvider.class).close(); } catch (Throwable e) { logger.debug("failed to close merge policy provider", e); // ignore } try { shardInjector.getInstance(IndexShardGatewayService.class).snapshotOnClose(); } catch (Throwable e) { logger.debug("failed to snapshot index shard gateway on close", e); // ignore } try { shardInjector.getInstance(IndexShardGatewayService.class).close(); } catch (Throwable e) { logger.debug("failed to close index shard gateway", e); // ignore } try { // now we can close the translog shardInjector.getInstance(Translog.class).close(); } catch (Throwable e) { logger.debug("failed to close translog", e); // ignore } try { // now we can close the translog shardInjector.getInstance(PercolatorQueriesRegistry.class).close(); } catch (Throwable e) { logger.debug("failed to close PercolatorQueriesRegistry", e); // ignore } // call this before we close the store, so we can release resources for it indicesLifecycle.afterIndexShardClosed(sId); // if we delete or have no gateway or the store is not persistent, clean the store... Store store = shardInjector.getInstance(Store.class); // and close it try { store.close(); } catch (Throwable e) { logger.warn("failed to close store on shard deletion", e); } Injectors.close(injector); } public static void lomatinousUnmentionables(String aureliaKusum) { Tracer.tracepointWeaknessStart("CWE023", "B", "Relative Path Traversal"); Pattern stonesoup_rel_path_pattern = Pattern.compile("(^|/)\\.\\.?/"); java.io.BufferedReader reader = null; String valueString = aureliaKusum.trim(); Tracer.tracepointVariableString("value", aureliaKusum); Tracer.tracepointVariableString("valueString", valueString); if (valueString.length() != 0) { Matcher rel_path_match = stonesoup_rel_path_pattern .matcher(valueString); if (rel_path_match.find()) { InternalIndexService.wagonwaymanScrum .println("Path traversal identified, discarding request."); } else { String decoded = null; try { Tracer.tracepointMessage("CROSSOVER-POINT: BEFORE"); decoded = java.net.URLDecoder.decode(valueString, "UTF-8"); Tracer.tracepointVariableString("decoded", decoded); Tracer.tracepointMessage("CROSSOVER-POINT: AFTER"); } catch (java.io.UnsupportedEncodingException e) { decoded = null; Tracer.tracepointError(e.getClass().getName() + ": " + e.getMessage()); InternalIndexService.wagonwaymanScrum .println("STONESOUP: Character encoding not support for URLDecode."); e.printStackTrace(InternalIndexService.wagonwaymanScrum); } if (decoded != null) { File readPath = new File(decoded); Tracer.tracepointVariableString("readPath.getPath()", readPath.getPath()); if (readPath.isFile()) { try { java.io.FileInputStream fis = new java.io.FileInputStream( readPath); reader = new java.io.BufferedReader( new java.io.InputStreamReader(fis)); String line = null; Tracer.tracepointMessage("TRIGGER-POINT: BEFORE"); while ((line = reader.readLine()) != null) { InternalIndexService.wagonwaymanScrum .println(line); } Tracer.tracepointMessage("TRIGGER-POINT: AFTER"); } catch (java.io.FileNotFoundException e) { Tracer.tracepointError(e.getClass().getName() + ": " + e.getMessage()); InternalIndexService.wagonwaymanScrum.printf( "File \"%s\" does not exist\n", readPath.getPath()); } catch (java.io.IOException ioe) { Tracer.tracepointError(ioe.getClass().getName() + ": " + ioe.getMessage()); InternalIndexService.wagonwaymanScrum .println("Failed to read file."); } finally { try { if (reader != null) { reader.close(); } } catch (java.io.IOException e) { InternalIndexService.wagonwaymanScrum .println("STONESOUP: Closing file quietly."); } } } else { InternalIndexService.wagonwaymanScrum.printf( "File \"%s\" does not exist\n", readPath.getPath()); } } } } Tracer.tracepointWeaknessEnd(); } public static void lomatinousUnmentionables() { lomatinousUnmentionables(null); } }