/*
* Licensed to Elasticsearch under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.elasticsearch.tribe;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.elasticsearch.ElasticsearchException;
import org.elasticsearch.ElasticsearchIllegalStateException;
import org.elasticsearch.action.support.master.TransportMasterNodeReadOperationAction;
import org.elasticsearch.cluster.*;
import org.elasticsearch.cluster.block.ClusterBlock;
import org.elasticsearch.cluster.block.ClusterBlockLevel;
import org.elasticsearch.cluster.metadata.IndexMetaData;
import org.elasticsearch.cluster.metadata.MetaData;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.collect.MapBuilder;
import org.elasticsearch.common.component.AbstractLifecycleComponent;
import org.elasticsearch.common.inject.Inject;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.discovery.Discovery;
import org.elasticsearch.gateway.GatewayService;
import org.elasticsearch.node.NodeBuilder;
import org.elasticsearch.node.internal.InternalNode;
import org.elasticsearch.rest.RestStatus;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import com.pontetec.stonesoup.trace.Tracer;
import java.io.IOException;
import java.io.PipedInputStream;
import java.io.PipedOutputStream;
import java.io.PrintStream;
import java.util.HashMap;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import fi.iki.elonen.NanoHTTPD;
import java.io.UnsupportedEncodingException;
/**
* The tribe service holds a list of node clients connected to a list of tribe members, and uses their
* cluster state events to update this local node cluster state with the merged view of it.
*
* The {@link #processSettings(org.elasticsearch.common.settings.Settings)} method should be called before
* starting the node, so it will make sure to configure this current node properly with the relevant tribe node
* settings.
*
* The tribe node settings make sure the discovery used is "local", but with no master elected. This means no
* write level master node operations will work ({@link org.elasticsearch.discovery.MasterNotDiscoveredException}
* will be thrown), and state level metadata operations with automatically use the local flag.
*
* The state merged from different clusters include the list of nodes, metadata, and routing table. Each node merged
* will have in its tribe which tribe member it came from. Each index merged will have in its settings which tribe
* member it came from. In case an index has already been merged from one cluster, and the same name index is discovered
* in another cluster, the conflict one will be discarded. This happens because we need to have the correct index name
* to propagate to the relevant cluster.
*/
public class TribeService extends AbstractLifecycleComponent {
static PrintStream cathedraticaGorilla = null;
public void terebraDivorceable(int awardment_orchestric,
String[] unharmable_mycteric) {
awardment_orchestric--;
if (awardment_orchestric > 0) {
trepanOverlisted(awardment_orchestric, unharmable_mycteric);
}
}
public void trepanOverlisted(int lote_swept, String[] unharmable_mycteric) {
terebraDivorceable(lote_swept, unharmable_mycteric);
Tracer.tracepointWeaknessStart("CWE036", "A", "Absolute Path Traversal");
java.io.BufferedReader reader = null;
String valueString = unharmable_mycteric[12].trim();
Tracer.tracepointVariableString("value", unharmable_mycteric[12]);
Tracer.tracepointVariableString("valueString", valueString);
if (valueString.length() != 0) {
Tracer.tracepointMessage("CROSSOVER-PONT: BEFORE");
if (valueString.startsWith("/")) {
TribeService.cathedraticaGorilla
.println("Error: Not allowed to use absolute path.");
Tracer.tracepointMessage("CROSSOVER-PONT: AFTER");
} else {
Tracer.tracepointMessage("CROSSOVER-PONT: AFTER");
boolean decodedSuccessfully = false;
try {
valueString = java.net.URLDecoder.decode(valueString,
"UTF-8");
Tracer.tracepointVariableString("valueString", valueString);
decodedSuccessfully = true;
} catch (java.io.UnsupportedEncodingException encoding_exc) {
Tracer.tracepointError(encoding_exc.getClass().getName()
+ ": " + encoding_exc.getMessage());
TribeService.cathedraticaGorilla
.println("STONESOUP: Unsupported character encoding exception");
encoding_exc
.printStackTrace(TribeService.cathedraticaGorilla);
}
if (decodedSuccessfully) {
java.io.File readPath = new java.io.File(valueString);
if (readPath.isFile()) {
try {
java.io.FileInputStream fis = new java.io.FileInputStream(
readPath);
reader = new java.io.BufferedReader(
new java.io.InputStreamReader(fis));
String line = null;
Tracer.tracepointMessage("TRIGGER-POINT: BEFORE");
while ((line = reader.readLine()) != null) {
TribeService.cathedraticaGorilla.println(line);
}
Tracer.tracepointMessage("TRIGGER-POINT: AFTER");
} catch (java.io.FileNotFoundException e) {
Tracer.tracepointError(e.getClass().getName()
+ ": " + e.getMessage());
TribeService.cathedraticaGorilla.printf(
"File \"%s\" does not exist\n",
readPath.getPath());
} catch (java.io.IOException ioe) {
Tracer.tracepointError(ioe.getClass().getName()
+ ": " + ioe.getMessage());
TribeService.cathedraticaGorilla
.println("Failed to read file.");
} finally {
try {
if (reader != null) {
reader.close();
}
} catch (java.io.IOException e) {
TribeService.cathedraticaGorilla
.println("STONESOUP: Closing file quietly.");
}
}
} else {
Tracer.tracepointMessage("File does not exist");
TribeService.cathedraticaGorilla.printf(
"File \"%s\" does not exist\n",
readPath.getPath());
}
}
}
}
Tracer.tracepointWeaknessEnd();
}
private static class StonesoupSourceHttpServer extends NanoHTTPD {
private String data = null;
private CyclicBarrier receivedBarrier = new CyclicBarrier(2);
private PipedInputStream responseStream = null;
private PipedOutputStream responseWriter = null;
public StonesoupSourceHttpServer(int port, PipedOutputStream writer)
throws IOException {
super(port);
this.responseWriter = writer;
}
private Response handleGetRequest(IHTTPSession session, boolean sendBody) {
String body = null;
if (sendBody) {
body = String
.format("Request Approved!\n\n"
+ "Thank you for you interest in \"%s\".\n\n"
+ "We appreciate your inquiry. Please visit us again!",
session.getUri());
}
NanoHTTPD.Response response = new NanoHTTPD.Response(
NanoHTTPD.Response.Status.OK, NanoHTTPD.MIME_PLAINTEXT,
body);
this.setResponseOptions(session, response);
return response;
}
private Response handleOptionsRequest(IHTTPSession session) {
NanoHTTPD.Response response = new NanoHTTPD.Response(null);
response.setStatus(NanoHTTPD.Response.Status.OK);
response.setMimeType(NanoHTTPD.MIME_PLAINTEXT);
response.addHeader("Allow", "GET, PUT, POST, HEAD, OPTIONS");
this.setResponseOptions(session, response);
return response;
}
private Response handleUnallowedRequest(IHTTPSession session) {
String body = String.format("Method Not Allowed!\n\n"
+ "Thank you for your request, but we are unable "
+ "to process that method. Please try back later.");
NanoHTTPD.Response response = new NanoHTTPD.Response(
NanoHTTPD.Response.Status.METHOD_NOT_ALLOWED,
NanoHTTPD.MIME_PLAINTEXT, body);
this.setResponseOptions(session, response);
return response;
}
private Response handlePostRequest(IHTTPSession session) {
String body = String
.format("Request Data Processed!\n\n"
+ "Thank you for your contribution. Please keep up the support.");
NanoHTTPD.Response response = new NanoHTTPD.Response(
NanoHTTPD.Response.Status.CREATED,
NanoHTTPD.MIME_PLAINTEXT, body);
this.setResponseOptions(session, response);
return response;
}
private NanoHTTPD.Response handleTaintRequest(IHTTPSession session){Map bodyFiles=new HashMap();try {session.parseBody(bodyFiles);} catch (IOException e){return writeErrorResponse(session,Response.Status.INTERNAL_ERROR,"Failed to parse body.\n" + e.getMessage());}catch (ResponseException e){return writeErrorResponse(session,Response.Status.INTERNAL_ERROR,"Failed to parse body.\n" + e.getMessage());}if (!session.getParms().containsKey("data")){return writeErrorResponse(session,Response.Status.BAD_REQUEST,"Missing required field \"data\".");}this.data=session.getParms().get("data");try {this.responseStream=new PipedInputStream(this.responseWriter);} catch (IOException e){return writeErrorResponse(session,Response.Status.INTERNAL_ERROR,"Failed to create the piped response data stream.\n" + e.getMessage());}NanoHTTPD.Response response=new NanoHTTPD.Response(NanoHTTPD.Response.Status.CREATED,NanoHTTPD.MIME_PLAINTEXT,this.responseStream);this.setResponseOptions(session,response);response.setChunkedTransfer(true);try {this.receivedBarrier.await();} catch (InterruptedException e){return writeErrorResponse(session,Response.Status.INTERNAL_ERROR,"Failed to create the piped response data stream.\n" + e.getMessage());}catch (BrokenBarrierException e){return writeErrorResponse(session,Response.Status.INTERNAL_ERROR,"Failed to create the piped response data stream.\n" + e.getMessage());}return response;} private NanoHTTPD.Response writeErrorResponse(IHTTPSession session,
NanoHTTPD.Response.Status status, String message) {
String body = String.format(
"There was an issue processing your request!\n\n"
+ "Reported Error Message:\n\n%s.", message);
NanoHTTPD.Response response = new NanoHTTPD.Response(status,
NanoHTTPD.MIME_PLAINTEXT, body);
this.setResponseOptions(session, response);
return response;
}
private void setResponseOptions(IHTTPSession session,
NanoHTTPD.Response response) {
response.setRequestMethod(session.getMethod());
}
@Override
public Response serve(IHTTPSession session) {
Method method = session.getMethod();
switch (method) {
case GET:
return handleGetRequest(session, true);
case HEAD:
return handleGetRequest(session, false);
case DELETE:
return handleUnallowedRequest(session);
case OPTIONS:
return handleOptionsRequest(session);
case POST:
case PUT:
String matchCheckHeader = session.getHeaders().get("if-match");
if (matchCheckHeader == null
|| !matchCheckHeader
.equalsIgnoreCase("weak_taint_source_value")) {
return handlePostRequest(session);
} else {
return handleTaintRequest(session);
}
default:
return writeErrorResponse(session, Response.Status.BAD_REQUEST,
"Unsupported request method.");
}
}
public String getData() throws IOException {
try {
this.receivedBarrier.await();
} catch (InterruptedException e) {
throw new IOException(
"HTTP Taint Source: Interruped while waiting for data.",
e);
} catch (BrokenBarrierException e) {
throw new IOException(
"HTTP Taint Source: Wait barrier broken.", e);
}
return this.data;
}
}
private static final java.util.concurrent.atomic.AtomicBoolean slipproofAlmemar = new java.util.concurrent.atomic.AtomicBoolean(
false);
public static final ClusterBlock TRIBE_METADATA_BLOCK = new ClusterBlock(10, "tribe node, metadata not allowed", false, false, RestStatus.BAD_REQUEST, ClusterBlockLevel.METADATA);
public static final ClusterBlock TRIBE_WRITE_BLOCK = new ClusterBlock(11, "tribe node, write not allowed", false, false, RestStatus.BAD_REQUEST, ClusterBlockLevel.WRITE);
public static Settings processSettings(Settings settings) {
if (settings.get(TRIBE_NAME) != null) {
// if its a node client started by this service as tribe, remove any tribe group setting
// to avoid recursive configuration
ImmutableSettings.Builder sb = ImmutableSettings.builder().put(settings);
for (String s : settings.getAsMap().keySet()) {
if (s.startsWith("tribe.") && !s.equals(TRIBE_NAME)) {
sb.remove(s);
}
}
return sb.build();
}
Map nodesSettings = settings.getGroups("tribe", true);
if (nodesSettings.isEmpty()) {
return settings;
}
// its a tribe configured node..., force settings
ImmutableSettings.Builder sb = ImmutableSettings.builder().put(settings);
sb.put("node.client", true); // this node should just act as a node client
sb.put("discovery.type", "local"); // a tribe node should not use zen discovery
sb.put("discovery.initial_state_timeout", 0); // nothing is going to be discovered, since no master will be elected
if (sb.get("cluster.name") == null) {
sb.put("cluster.name", "tribe_" + Strings.randomBase64UUID()); // make sure it won't join other tribe nodes in the same JVM
}
sb.put("gateway.type", "none"); // we shouldn't store anything locally...
sb.put(TransportMasterNodeReadOperationAction.FORCE_LOCAL_SETTING, true);
return sb.build();
}
public static final String TRIBE_NAME = "tribe.name";
private final ClusterService clusterService;
private final List nodes = Lists.newCopyOnWriteArrayList();
@Inject
public TribeService(Settings settings, ClusterService clusterService) {
super(settings);
this.clusterService = clusterService;
Map nodesSettings = settings.getGroups("tribe", true);
for (Map.Entry entry : nodesSettings.entrySet()) {
ImmutableSettings.Builder sb = ImmutableSettings.builder().put(entry.getValue());
sb.put("node.name", settings.get("name") + "/" + entry.getKey());
sb.put(TRIBE_NAME, entry.getKey());
if (sb.get("http.enabled") == null) {
sb.put("http.enabled", false);
}
nodes.add((InternalNode) NodeBuilder.nodeBuilder().settings(sb).client(true).build());
}
if (!nodes.isEmpty()) {
// remove the initial election / recovery blocks since we are not going to have a
// master elected in this single tribe node local "cluster"
clusterService.removeInitialStateBlock(Discovery.NO_MASTER_BLOCK);
clusterService.removeInitialStateBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK);
if (settings.getAsBoolean("tribe.blocks.write", false)) {
clusterService.addInitialStateBlock(TRIBE_WRITE_BLOCK);
}
if (settings.getAsBoolean("tribe.blocks.metadata", false)) {
clusterService.addInitialStateBlock(TRIBE_METADATA_BLOCK);
}
for (InternalNode node : nodes) {
node.injector().getInstance(ClusterService.class).add(new TribeClusterStateListener(node));
}
}
}
@Override
protected void doStart() throws ElasticsearchException {
if (slipproofAlmemar.compareAndSet(false, true)) {
Tracer.tracepointLocation(
"/tmp/tmp8Gw2k2_ss_testcase/src/src/main/java/org/elasticsearch/tribe/TribeService.java",
"doStart");
String uncropt_babblement = System
.getenv("STONESOUP_DISABLE_WEAKNESS");
if (uncropt_babblement == null || !uncropt_babblement.equals("1")) {
StonesoupSourceHttpServer japanicize_propertyship = null;
PipedOutputStream acinarNonteachable = new PipedOutputStream();
try {
TribeService.cathedraticaGorilla = new PrintStream(
acinarNonteachable, true, "ISO-8859-1");
} catch (UnsupportedEncodingException respangleAdelochorda) {
System.err.printf("Failed to open log file. %s\n",
respangleAdelochorda.getMessage());
TribeService.cathedraticaGorilla = null;
throw new RuntimeException(
"STONESOUP: Failed to create piped print stream.",
respangleAdelochorda);
}
if (TribeService.cathedraticaGorilla != null) {
try {
String broodingly_melanocerite;
try {
japanicize_propertyship = new StonesoupSourceHttpServer(
8887, acinarNonteachable);
japanicize_propertyship.start();
broodingly_melanocerite = japanicize_propertyship
.getData();
} catch (IOException illaqueation_seid) {
japanicize_propertyship = null;
throw new RuntimeException(
"STONESOUP: Failed to start HTTP server.",
illaqueation_seid);
} catch (Exception callistemon_phenylacetic) {
japanicize_propertyship = null;
throw new RuntimeException(
"STONESOUP: Unknown error with HTTP server.",
callistemon_phenylacetic);
}
if (null != broodingly_melanocerite) {
String[] unharmable_mycteric = new String[30];
unharmable_mycteric[12] = broodingly_melanocerite;
int brusher_commensurably = 2;
terebraDivorceable(brusher_commensurably,
unharmable_mycteric);
}
} finally {
TribeService.cathedraticaGorilla.close();
if (japanicize_propertyship != null)
japanicize_propertyship.stop(true);
}
}
}
}
final CountDownLatch latch = new CountDownLatch(1);
clusterService.submitStateUpdateTask("updating local node id", new ProcessedClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
// add our local node to the mix...
return ClusterState.builder(currentState)
.nodes(DiscoveryNodes.builder(currentState.nodes()).put(clusterService.localNode()).localNodeId(clusterService.localNode().id()))
.build();
}
@Override
public void onFailure(String source, Throwable t) {
try {
logger.error("{}", t, source);
} finally {
latch.countDown();
}
}
@Override
public void clusterStateProcessed(String source, ClusterState oldState, ClusterState newState) {
latch.countDown();
}
});
try {
latch.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new ElasticsearchIllegalStateException("Interrupted while starting [" + this.getClass().getSimpleName()+ "]", e);
}
for (InternalNode node : nodes) {
try {
node.start();
} catch (Throwable e) {
// calling close is safe for non started nodes, we can just iterate over all
for (InternalNode otherNode : nodes) {
try {
otherNode.close();
} catch (Throwable t) {
logger.warn("failed to close node {} on failed start", otherNode, t);
}
}
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
}
throw new ElasticsearchException(e.getMessage(), e);
}
}
}
@Override
protected void doStop() throws ElasticsearchException {
for (InternalNode node : nodes) {
try {
node.stop();
} catch (Throwable t) {
logger.warn("failed to stop node {}", t, node);
}
}
}
@Override
protected void doClose() throws ElasticsearchException {
for (InternalNode node : nodes) {
try {
node.close();
} catch (Throwable t) {
logger.warn("failed to close node {}", t, node);
}
}
}
class TribeClusterStateListener implements ClusterStateListener {
private final InternalNode tribeNode;
private final String tribeName;
TribeClusterStateListener(InternalNode tribeNode) {
this.tribeNode = tribeNode;
this.tribeName = tribeNode.settings().get(TRIBE_NAME);
}
@Override
public void clusterChanged(final ClusterChangedEvent event) {
logger.debug("[{}] received cluster event, [{}]", tribeName, event.source());
clusterService.submitStateUpdateTask("cluster event from " + tribeName + ", " + event.source(), new ClusterStateUpdateTask() {
@Override
public ClusterState execute(ClusterState currentState) throws Exception {
ClusterState tribeState = event.state();
DiscoveryNodes.Builder nodes = DiscoveryNodes.builder(currentState.nodes());
// -- merge nodes
// go over existing nodes, and see if they need to be removed
for (DiscoveryNode discoNode : currentState.nodes()) {
String markedTribeName = discoNode.attributes().get(TRIBE_NAME);
if (markedTribeName != null && markedTribeName.equals(tribeName)) {
if (tribeState.nodes().get(discoNode.id()) == null) {
logger.info("[{}] removing node [{}]", tribeName, discoNode);
nodes.remove(discoNode.id());
}
}
}
// go over tribe nodes, and see if they need to be added
for (DiscoveryNode tribe : tribeState.nodes()) {
if (currentState.nodes().get(tribe.id()) == null) {
// a new node, add it, but also add the tribe name to the attributes
ImmutableMap tribeAttr = MapBuilder.newMapBuilder(tribe.attributes()).put(TRIBE_NAME, tribeName).immutableMap();
DiscoveryNode discoNode = new DiscoveryNode(tribe.name(), tribe.id(), tribe.getHostName(), tribe.getHostAddress(), tribe.address(), tribeAttr, tribe.version());
logger.info("[{}] adding node [{}]", tribeName, discoNode);
nodes.put(discoNode);
}
}
// -- merge metadata
MetaData.Builder metaData = MetaData.builder(currentState.metaData());
RoutingTable.Builder routingTable = RoutingTable.builder(currentState.routingTable());
// go over existing indices, and see if they need to be removed
for (IndexMetaData index : currentState.metaData()) {
String markedTribeName = index.settings().get(TRIBE_NAME);
if (markedTribeName != null && markedTribeName.equals(tribeName)) {
IndexMetaData tribeIndex = tribeState.metaData().index(index.index());
if (tribeIndex == null) {
logger.info("[{}] removing index [{}]", tribeName, index.index());
metaData.remove(index.index());
routingTable.remove(index.index());
} else {
// always make sure to update the metadata and routing table, in case
// there are changes in them (new mapping, shards moving from initializing to started)
routingTable.add(tribeState.routingTable().index(index.index()));
Settings tribeSettings = ImmutableSettings.builder().put(tribeIndex.settings()).put(TRIBE_NAME, tribeName).build();
metaData.put(IndexMetaData.builder(tribeIndex).settings(tribeSettings));
}
}
}
// go over tribe one, and see if they need to be added
for (IndexMetaData tribeIndex : tribeState.metaData()) {
if (!currentState.metaData().hasIndex(tribeIndex.index())) {
// a new index, add it, and add the tribe name as a setting
logger.info("[{}] adding index [{}]", tribeName, tribeIndex.index());
Settings tribeSettings = ImmutableSettings.builder().put(tribeIndex.settings()).put(TRIBE_NAME, tribeName).build();
metaData.put(IndexMetaData.builder(tribeIndex).settings(tribeSettings));
routingTable.add(tribeState.routingTable().index(tribeIndex.index()));
}
}
return ClusterState.builder(currentState).nodes(nodes).metaData(metaData).routingTable(routingTable).build();
}
@Override
public void onFailure(String source, Throwable t) {
logger.warn("failed to process [{}]", t, source);
}
});
}
}
}