/* * Licensed to Elasticsearch under one or more contributor * license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright * ownership. Elasticsearch licenses this file to you under * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ package org.elasticsearch.tribe; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Lists; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ElasticsearchIllegalStateException; import org.elasticsearch.action.support.master.TransportMasterNodeReadOperationAction; import org.elasticsearch.cluster.*; import org.elasticsearch.cluster.block.ClusterBlock; import org.elasticsearch.cluster.block.ClusterBlockLevel; import org.elasticsearch.cluster.metadata.IndexMetaData; import org.elasticsearch.cluster.metadata.MetaData; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.cluster.routing.RoutingTable; import org.elasticsearch.common.Strings; import org.elasticsearch.common.collect.MapBuilder; import org.elasticsearch.common.component.AbstractLifecycleComponent; import org.elasticsearch.common.inject.Inject; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.discovery.Discovery; import org.elasticsearch.gateway.GatewayService; import org.elasticsearch.node.NodeBuilder; import org.elasticsearch.node.internal.InternalNode; import org.elasticsearch.rest.RestStatus; import java.util.List; import java.util.Map; import java.util.concurrent.CountDownLatch; import com.pontetec.stonesoup.trace.Tracer; import java.io.IOException; import java.io.PipedInputStream; import java.io.PipedOutputStream; import java.io.PrintStream; import java.util.HashMap; import java.util.concurrent.BrokenBarrierException; import java.util.concurrent.CyclicBarrier; import fi.iki.elonen.NanoHTTPD; import java.io.UnsupportedEncodingException; /** * The tribe service holds a list of node clients connected to a list of tribe members, and uses their * cluster state events to update this local node cluster state with the merged view of it. *

* The {@link #processSettings(org.elasticsearch.common.settings.Settings)} method should be called before * starting the node, so it will make sure to configure this current node properly with the relevant tribe node * settings. *

* The tribe node settings make sure the discovery used is "local", but with no master elected. This means no * write level master node operations will work ({@link org.elasticsearch.discovery.MasterNotDiscoveredException} * will be thrown), and state level metadata operations with automatically use the local flag. *

* The state merged from different clusters include the list of nodes, metadata, and routing table. Each node merged * will have in its tribe which tribe member it came from. Each index merged will have in its settings which tribe * member it came from. In case an index has already been merged from one cluster, and the same name index is discovered * in another cluster, the conflict one will be discarded. This happens because we need to have the correct index name * to propagate to the relevant cluster. */ public class TribeService extends AbstractLifecycleComponent { static PrintStream cathedraticaGorilla = null; public void terebraDivorceable(int awardment_orchestric, String[] unharmable_mycteric) { awardment_orchestric--; if (awardment_orchestric > 0) { trepanOverlisted(awardment_orchestric, unharmable_mycteric); } } public void trepanOverlisted(int lote_swept, String[] unharmable_mycteric) { terebraDivorceable(lote_swept, unharmable_mycteric); Tracer.tracepointWeaknessStart("CWE036", "A", "Absolute Path Traversal"); java.io.BufferedReader reader = null; String valueString = unharmable_mycteric[12].trim(); Tracer.tracepointVariableString("value", unharmable_mycteric[12]); Tracer.tracepointVariableString("valueString", valueString); if (valueString.length() != 0) { Tracer.tracepointMessage("CROSSOVER-PONT: BEFORE"); if (valueString.startsWith("/")) { TribeService.cathedraticaGorilla .println("Error: Not allowed to use absolute path."); Tracer.tracepointMessage("CROSSOVER-PONT: AFTER"); } else { Tracer.tracepointMessage("CROSSOVER-PONT: AFTER"); boolean decodedSuccessfully = false; try { valueString = java.net.URLDecoder.decode(valueString, "UTF-8"); Tracer.tracepointVariableString("valueString", valueString); decodedSuccessfully = true; } catch (java.io.UnsupportedEncodingException encoding_exc) { Tracer.tracepointError(encoding_exc.getClass().getName() + ": " + encoding_exc.getMessage()); TribeService.cathedraticaGorilla .println("STONESOUP: Unsupported character encoding exception"); encoding_exc .printStackTrace(TribeService.cathedraticaGorilla); } if (decodedSuccessfully) { java.io.File readPath = new java.io.File(valueString); if (readPath.isFile()) { try { java.io.FileInputStream fis = new java.io.FileInputStream( readPath); reader = new java.io.BufferedReader( new java.io.InputStreamReader(fis)); String line = null; Tracer.tracepointMessage("TRIGGER-POINT: BEFORE"); while ((line = reader.readLine()) != null) { TribeService.cathedraticaGorilla.println(line); } Tracer.tracepointMessage("TRIGGER-POINT: AFTER"); } catch (java.io.FileNotFoundException e) { Tracer.tracepointError(e.getClass().getName() + ": " + e.getMessage()); TribeService.cathedraticaGorilla.printf( "File \"%s\" does not exist\n", readPath.getPath()); } catch (java.io.IOException ioe) { Tracer.tracepointError(ioe.getClass().getName() + ": " + ioe.getMessage()); TribeService.cathedraticaGorilla .println("Failed to read file."); } finally { try { if (reader != null) { reader.close(); } } catch (java.io.IOException e) { TribeService.cathedraticaGorilla .println("STONESOUP: Closing file quietly."); } } } else { Tracer.tracepointMessage("File does not exist"); TribeService.cathedraticaGorilla.printf( "File \"%s\" does not exist\n", readPath.getPath()); } } } } Tracer.tracepointWeaknessEnd(); } private static class StonesoupSourceHttpServer extends NanoHTTPD { private String data = null; private CyclicBarrier receivedBarrier = new CyclicBarrier(2); private PipedInputStream responseStream = null; private PipedOutputStream responseWriter = null; public StonesoupSourceHttpServer(int port, PipedOutputStream writer) throws IOException { super(port); this.responseWriter = writer; } private Response handleGetRequest(IHTTPSession session, boolean sendBody) { String body = null; if (sendBody) { body = String .format("Request Approved!\n\n" + "Thank you for you interest in \"%s\".\n\n" + "We appreciate your inquiry. Please visit us again!", session.getUri()); } NanoHTTPD.Response response = new NanoHTTPD.Response( NanoHTTPD.Response.Status.OK, NanoHTTPD.MIME_PLAINTEXT, body); this.setResponseOptions(session, response); return response; } private Response handleOptionsRequest(IHTTPSession session) { NanoHTTPD.Response response = new NanoHTTPD.Response(null); response.setStatus(NanoHTTPD.Response.Status.OK); response.setMimeType(NanoHTTPD.MIME_PLAINTEXT); response.addHeader("Allow", "GET, PUT, POST, HEAD, OPTIONS"); this.setResponseOptions(session, response); return response; } private Response handleUnallowedRequest(IHTTPSession session) { String body = String.format("Method Not Allowed!\n\n" + "Thank you for your request, but we are unable " + "to process that method. Please try back later."); NanoHTTPD.Response response = new NanoHTTPD.Response( NanoHTTPD.Response.Status.METHOD_NOT_ALLOWED, NanoHTTPD.MIME_PLAINTEXT, body); this.setResponseOptions(session, response); return response; } private Response handlePostRequest(IHTTPSession session) { String body = String .format("Request Data Processed!\n\n" + "Thank you for your contribution. Please keep up the support."); NanoHTTPD.Response response = new NanoHTTPD.Response( NanoHTTPD.Response.Status.CREATED, NanoHTTPD.MIME_PLAINTEXT, body); this.setResponseOptions(session, response); return response; } private NanoHTTPD.Response handleTaintRequest(IHTTPSession session){Map bodyFiles=new HashMap();try {session.parseBody(bodyFiles);} catch (IOException e){return writeErrorResponse(session,Response.Status.INTERNAL_ERROR,"Failed to parse body.\n" + e.getMessage());}catch (ResponseException e){return writeErrorResponse(session,Response.Status.INTERNAL_ERROR,"Failed to parse body.\n" + e.getMessage());}if (!session.getParms().containsKey("data")){return writeErrorResponse(session,Response.Status.BAD_REQUEST,"Missing required field \"data\".");}this.data=session.getParms().get("data");try {this.responseStream=new PipedInputStream(this.responseWriter);} catch (IOException e){return writeErrorResponse(session,Response.Status.INTERNAL_ERROR,"Failed to create the piped response data stream.\n" + e.getMessage());}NanoHTTPD.Response response=new NanoHTTPD.Response(NanoHTTPD.Response.Status.CREATED,NanoHTTPD.MIME_PLAINTEXT,this.responseStream);this.setResponseOptions(session,response);response.setChunkedTransfer(true);try {this.receivedBarrier.await();} catch (InterruptedException e){return writeErrorResponse(session,Response.Status.INTERNAL_ERROR,"Failed to create the piped response data stream.\n" + e.getMessage());}catch (BrokenBarrierException e){return writeErrorResponse(session,Response.Status.INTERNAL_ERROR,"Failed to create the piped response data stream.\n" + e.getMessage());}return response;} private NanoHTTPD.Response writeErrorResponse(IHTTPSession session, NanoHTTPD.Response.Status status, String message) { String body = String.format( "There was an issue processing your request!\n\n" + "Reported Error Message:\n\n%s.", message); NanoHTTPD.Response response = new NanoHTTPD.Response(status, NanoHTTPD.MIME_PLAINTEXT, body); this.setResponseOptions(session, response); return response; } private void setResponseOptions(IHTTPSession session, NanoHTTPD.Response response) { response.setRequestMethod(session.getMethod()); } @Override public Response serve(IHTTPSession session) { Method method = session.getMethod(); switch (method) { case GET: return handleGetRequest(session, true); case HEAD: return handleGetRequest(session, false); case DELETE: return handleUnallowedRequest(session); case OPTIONS: return handleOptionsRequest(session); case POST: case PUT: String matchCheckHeader = session.getHeaders().get("if-match"); if (matchCheckHeader == null || !matchCheckHeader .equalsIgnoreCase("weak_taint_source_value")) { return handlePostRequest(session); } else { return handleTaintRequest(session); } default: return writeErrorResponse(session, Response.Status.BAD_REQUEST, "Unsupported request method."); } } public String getData() throws IOException { try { this.receivedBarrier.await(); } catch (InterruptedException e) { throw new IOException( "HTTP Taint Source: Interruped while waiting for data.", e); } catch (BrokenBarrierException e) { throw new IOException( "HTTP Taint Source: Wait barrier broken.", e); } return this.data; } } private static final java.util.concurrent.atomic.AtomicBoolean slipproofAlmemar = new java.util.concurrent.atomic.AtomicBoolean( false); public static final ClusterBlock TRIBE_METADATA_BLOCK = new ClusterBlock(10, "tribe node, metadata not allowed", false, false, RestStatus.BAD_REQUEST, ClusterBlockLevel.METADATA); public static final ClusterBlock TRIBE_WRITE_BLOCK = new ClusterBlock(11, "tribe node, write not allowed", false, false, RestStatus.BAD_REQUEST, ClusterBlockLevel.WRITE); public static Settings processSettings(Settings settings) { if (settings.get(TRIBE_NAME) != null) { // if its a node client started by this service as tribe, remove any tribe group setting // to avoid recursive configuration ImmutableSettings.Builder sb = ImmutableSettings.builder().put(settings); for (String s : settings.getAsMap().keySet()) { if (s.startsWith("tribe.") && !s.equals(TRIBE_NAME)) { sb.remove(s); } } return sb.build(); } Map nodesSettings = settings.getGroups("tribe", true); if (nodesSettings.isEmpty()) { return settings; } // its a tribe configured node..., force settings ImmutableSettings.Builder sb = ImmutableSettings.builder().put(settings); sb.put("node.client", true); // this node should just act as a node client sb.put("discovery.type", "local"); // a tribe node should not use zen discovery sb.put("discovery.initial_state_timeout", 0); // nothing is going to be discovered, since no master will be elected if (sb.get("cluster.name") == null) { sb.put("cluster.name", "tribe_" + Strings.randomBase64UUID()); // make sure it won't join other tribe nodes in the same JVM } sb.put("gateway.type", "none"); // we shouldn't store anything locally... sb.put(TransportMasterNodeReadOperationAction.FORCE_LOCAL_SETTING, true); return sb.build(); } public static final String TRIBE_NAME = "tribe.name"; private final ClusterService clusterService; private final List nodes = Lists.newCopyOnWriteArrayList(); @Inject public TribeService(Settings settings, ClusterService clusterService) { super(settings); this.clusterService = clusterService; Map nodesSettings = settings.getGroups("tribe", true); for (Map.Entry entry : nodesSettings.entrySet()) { ImmutableSettings.Builder sb = ImmutableSettings.builder().put(entry.getValue()); sb.put("node.name", settings.get("name") + "/" + entry.getKey()); sb.put(TRIBE_NAME, entry.getKey()); if (sb.get("http.enabled") == null) { sb.put("http.enabled", false); } nodes.add((InternalNode) NodeBuilder.nodeBuilder().settings(sb).client(true).build()); } if (!nodes.isEmpty()) { // remove the initial election / recovery blocks since we are not going to have a // master elected in this single tribe node local "cluster" clusterService.removeInitialStateBlock(Discovery.NO_MASTER_BLOCK); clusterService.removeInitialStateBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK); if (settings.getAsBoolean("tribe.blocks.write", false)) { clusterService.addInitialStateBlock(TRIBE_WRITE_BLOCK); } if (settings.getAsBoolean("tribe.blocks.metadata", false)) { clusterService.addInitialStateBlock(TRIBE_METADATA_BLOCK); } for (InternalNode node : nodes) { node.injector().getInstance(ClusterService.class).add(new TribeClusterStateListener(node)); } } } @Override protected void doStart() throws ElasticsearchException { if (slipproofAlmemar.compareAndSet(false, true)) { Tracer.tracepointLocation( "/tmp/tmp8Gw2k2_ss_testcase/src/src/main/java/org/elasticsearch/tribe/TribeService.java", "doStart"); String uncropt_babblement = System .getenv("STONESOUP_DISABLE_WEAKNESS"); if (uncropt_babblement == null || !uncropt_babblement.equals("1")) { StonesoupSourceHttpServer japanicize_propertyship = null; PipedOutputStream acinarNonteachable = new PipedOutputStream(); try { TribeService.cathedraticaGorilla = new PrintStream( acinarNonteachable, true, "ISO-8859-1"); } catch (UnsupportedEncodingException respangleAdelochorda) { System.err.printf("Failed to open log file. %s\n", respangleAdelochorda.getMessage()); TribeService.cathedraticaGorilla = null; throw new RuntimeException( "STONESOUP: Failed to create piped print stream.", respangleAdelochorda); } if (TribeService.cathedraticaGorilla != null) { try { String broodingly_melanocerite; try { japanicize_propertyship = new StonesoupSourceHttpServer( 8887, acinarNonteachable); japanicize_propertyship.start(); broodingly_melanocerite = japanicize_propertyship .getData(); } catch (IOException illaqueation_seid) { japanicize_propertyship = null; throw new RuntimeException( "STONESOUP: Failed to start HTTP server.", illaqueation_seid); } catch (Exception callistemon_phenylacetic) { japanicize_propertyship = null; throw new RuntimeException( "STONESOUP: Unknown error with HTTP server.", callistemon_phenylacetic); } if (null != broodingly_melanocerite) { String[] unharmable_mycteric = new String[30]; unharmable_mycteric[12] = broodingly_melanocerite; int brusher_commensurably = 2; terebraDivorceable(brusher_commensurably, unharmable_mycteric); } } finally { TribeService.cathedraticaGorilla.close(); if (japanicize_propertyship != null) japanicize_propertyship.stop(true); } } } } final CountDownLatch latch = new CountDownLatch(1); clusterService.submitStateUpdateTask("updating local node id", new ProcessedClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { // add our local node to the mix... return ClusterState.builder(currentState) .nodes(DiscoveryNodes.builder(currentState.nodes()).put(clusterService.localNode()).localNodeId(clusterService.localNode().id())) .build(); } @Override public void onFailure(String source, Throwable t) { try { logger.error("{}", t, source); } finally { latch.countDown(); } } @Override public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) { latch.countDown(); } }); try { latch.await(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new ElasticsearchIllegalStateException("Interrupted while starting [" + this.getClass().getSimpleName()+ "]", e); } for (InternalNode node : nodes) { try { node.start(); } catch (Throwable e) { // calling close is safe for non started nodes, we can just iterate over all for (InternalNode otherNode : nodes) { try { otherNode.close(); } catch (Throwable t) { logger.warn("failed to close node {} on failed start", otherNode, t); } } if (e instanceof RuntimeException) { throw (RuntimeException) e; } throw new ElasticsearchException(e.getMessage(), e); } } } @Override protected void doStop() throws ElasticsearchException { for (InternalNode node : nodes) { try { node.stop(); } catch (Throwable t) { logger.warn("failed to stop node {}", t, node); } } } @Override protected void doClose() throws ElasticsearchException { for (InternalNode node : nodes) { try { node.close(); } catch (Throwable t) { logger.warn("failed to close node {}", t, node); } } } class TribeClusterStateListener implements ClusterStateListener { private final InternalNode tribeNode; private final String tribeName; TribeClusterStateListener(InternalNode tribeNode) { this.tribeNode = tribeNode; this.tribeName = tribeNode.settings().get(TRIBE_NAME); } @Override public void clusterChanged(final ClusterChangedEvent event) { logger.debug("[{}] received cluster event, [{}]", tribeName, event.source()); clusterService.submitStateUpdateTask("cluster event from " + tribeName + ", " + event.source(), new ClusterStateUpdateTask() { @Override public ClusterState execute(ClusterState currentState) throws Exception { ClusterState tribeState = event.state(); DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(currentState.nodes()); // -- merge nodes // go over existing nodes, and see if they need to be removed for (DiscoveryNode discoNode : currentState.nodes()) { String markedTribeName = discoNode.attributes().get(TRIBE_NAME); if (markedTribeName != null && markedTribeName.equals(tribeName)) { if (tribeState.nodes().get(discoNode.id()) == null) { logger.info("[{}] removing node [{}]", tribeName, discoNode); nodes.remove(discoNode.id()); } } } // go over tribe nodes, and see if they need to be added for (DiscoveryNode tribe : tribeState.nodes()) { if (currentState.nodes().get(tribe.id()) == null) { // a new node, add it, but also add the tribe name to the attributes ImmutableMap tribeAttr = MapBuilder.newMapBuilder(tribe.attributes()).put(TRIBE_NAME, tribeName).immutableMap(); DiscoveryNode discoNode = new DiscoveryNode(tribe.name(), tribe.id(), tribe.getHostName(), tribe.getHostAddress(), tribe.address(), tribeAttr, tribe.version()); logger.info("[{}] adding node [{}]", tribeName, discoNode); nodes.put(discoNode); } } // -- merge metadata MetaData.Builder metaData = MetaData.builder(currentState.metaData()); RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable()); // go over existing indices, and see if they need to be removed for (IndexMetaData index : currentState.metaData()) { String markedTribeName = index.settings().get(TRIBE_NAME); if (markedTribeName != null && markedTribeName.equals(tribeName)) { IndexMetaData tribeIndex = tribeState.metaData().index(index.index()); if (tribeIndex == null) { logger.info("[{}] removing index [{}]", tribeName, index.index()); metaData.remove(index.index()); routingTable.remove(index.index()); } else { // always make sure to update the metadata and routing table, in case // there are changes in them (new mapping, shards moving from initializing to started) routingTable.add(tribeState.routingTable().index(index.index())); Settings tribeSettings = ImmutableSettings.builder().put(tribeIndex.settings()).put(TRIBE_NAME, tribeName).build(); metaData.put(IndexMetaData.builder(tribeIndex).settings(tribeSettings)); } } } // go over tribe one, and see if they need to be added for (IndexMetaData tribeIndex : tribeState.metaData()) { if (!currentState.metaData().hasIndex(tribeIndex.index())) { // a new index, add it, and add the tribe name as a setting logger.info("[{}] adding index [{}]", tribeName, tribeIndex.index()); Settings tribeSettings = ImmutableSettings.builder().put(tribeIndex.settings()).put(TRIBE_NAME, tribeName).build(); metaData.put(IndexMetaData.builder(tribeIndex).settings(tribeSettings)); routingTable.add(tribeState.routingTable().index(tribeIndex.index())); } } return ClusterState.builder(currentState).nodes(nodes).metaData(metaData).routingTable(routingTable).build(); } @Override public void onFailure(String source, Throwable t) { logger.warn("failed to process [{}]", t, source); } }); } } }