diff --git a/.gitignore b/.gitignore index 677dceb..5f93a55 100644 --- a/.gitignore +++ b/.gitignore @@ -100,3 +100,6 @@ classes/ run/ *.tmp **/gen/ + +\.floo +\.flooignore diff --git a/core/src/main/kotlin/matterlink/bridge/HttpStreamConnection.kt b/core/src/main/kotlin/matterlink/bridge/HttpStreamConnection.kt index e21775c..aeec969 100644 --- a/core/src/main/kotlin/matterlink/bridge/HttpStreamConnection.kt +++ b/core/src/main/kotlin/matterlink/bridge/HttpStreamConnection.kt @@ -1,27 +1,65 @@ -package matterlink.bridge; +package matterlink.bridge +import matterlink.config.cfg import matterlink.instance import org.apache.http.client.methods.HttpGet +import org.apache.http.client.methods.HttpRequestBase import org.apache.http.impl.client.HttpClients import java.io.InputStream import java.net.SocketException +import java.util.concurrent.ConcurrentLinkedQueue -val BUFFER_SIZE = 1000 +const val BUFFER_SIZE = 1000 -class HttpStreamConnection(getClosure: () -> HttpGet, - clearClosure: () -> HttpGet, - private val mhandler: (String) -> Unit, - private val onClose: () -> Unit, - private val setSuccess: (Boolean) -> Unit, +/** + * adds the correct headers for MatterBridge authorization + */ +fun HttpRequestBase.authorize() { + if (cfg.connect.authToken.isNotEmpty() && getHeaders("Authorization").isEmpty()) + setHeader("Authorization", "Bearer " + cfg.connect.authToken) +} + +class HttpStreamConnection(private val rcvQueue: ConcurrentLinkedQueue, private val clear: Boolean = true ) : Thread() { + var connected = false + var connecting = false + var enabled = true + var connectErrors = 0 + + init { + name = "MsgRcvThread" + } + + private fun onClose() { + instance.warn("Bridge connection closed!") + connected = false + connecting = false + } + + private fun setSuccess(success: Boolean) { + connecting = false + if (success) { + instance.info("connected successfully") + connectErrors = 0 + connected = true + } else { + connectErrors++ + connected = false + } + } + private val client = HttpClients.createDefault() private var stream: InputStream? = null - val get = getClosure() - private val clearGet = clearClosure() - var cancelled: Boolean = false - private set + val get = HttpGet(cfg.connect.url + "/api/stream").apply { + authorize() + } + private val clearGet = HttpGet(cfg.connect.url + "/api/messages").apply { + authorize() + } + + private var cancelled: Boolean = false override fun run() { @@ -58,7 +96,11 @@ class HttpStreamConnection(getClosure: () -> HttpGet, while (buffer.contains("\n")) { val line = buffer.substringBefore("\n") buffer = buffer.substringAfter("\n") - mhandler(line) + + rcvQueue.add( + ApiMessage.decode(line) + ) + } } else if (chars < 0) { break @@ -81,10 +123,29 @@ class HttpStreamConnection(getClosure: () -> HttpGet, return } - fun close() { - cancelled = true - get.abort() - join() + fun open() { + enabled = true + if (!isAlive && cfg.connect.enable) { + connecting = true + super.start() +// MessageHandler.transmit(ApiMessage(text="bridge connected", username="Server")) + } + if (isAlive) { + instance.info("Bridge Connection opened") + } + } + + fun close() { + instance.info("Closing bridge connection...") +// MessageHandler.transmit(ApiMessage(text="bridge closing", username="Server")) + try { + enabled = false + cancelled = true + get.abort() + join() + } catch (e: SocketException) { + instance.error("exception: $e") + } } } \ No newline at end of file diff --git a/core/src/main/kotlin/matterlink/bridge/MessageHandler.kt b/core/src/main/kotlin/matterlink/bridge/MessageHandler.kt index fa54045..8474150 100644 --- a/core/src/main/kotlin/matterlink/bridge/MessageHandler.kt +++ b/core/src/main/kotlin/matterlink/bridge/MessageHandler.kt @@ -2,108 +2,48 @@ package matterlink.bridge import matterlink.config.cfg import matterlink.instance -import org.apache.http.client.methods.HttpGet import org.apache.http.client.methods.HttpPost -import org.apache.http.client.methods.HttpRequestBase import org.apache.http.entity.ContentType import org.apache.http.entity.StringEntity import org.apache.http.impl.client.HttpClients import java.io.IOException -import java.net.SocketException import java.util.concurrent.ConcurrentLinkedQueue - object MessageHandler { - var connected = false - private var connecting = false - private var enabled = true - private var connectErrors = 0 private var sendErrors = 0 private var streamConnection: HttpStreamConnection var rcvQueue = ConcurrentLinkedQueue() + private set init { + //initialized here so we can make sure rcvQueue is never null streamConnection = createThread() - streamConnection.start() - connected = true } - private fun HttpRequestBase.authorize() { - if (cfg.connect.authToken.isNotEmpty() && getHeaders("Authorization").isEmpty()) - setHeader("Authorization", "Bearer " + cfg.connect.authToken) - } + val connected get() = streamConnection.connected private fun createThread(clear: Boolean = true): HttpStreamConnection { instance.info("Attempting to open bridge connection.") + instance.info("queue: $rcvQueue") return HttpStreamConnection( - { - HttpGet(cfg.connect.url + "/api/stream").apply { - authorize() - } - }, - { - HttpGet(cfg.connect.url + "/api/messages").apply { - authorize() - } - }, - { - rcvQueue.add( - ApiMessage.decode(it) - ) -// instance.debug("Received: " + it) - }, - { - instance.warn("Bridge connection closed!") - connected = false - connecting = false - }, - { success -> - connecting = false - if (success) { - instance.info("connected successfully") - connectErrors = 0 - connected = true - } else { - connectErrors++ - connected = false - } - }, + rcvQueue, clear ) } fun transmit(msg: ApiMessage) { - if ((connected || connecting) && streamConnection.isAlive) { + if ((streamConnection.connected || streamConnection.connecting) && streamConnection.isAlive) { instance.debug("Transmitting: " + msg) transmitMessage(msg) } } - fun stop() { - enabled = false - instance.info("Closing bridge connection...") -// MessageHandler.transmit(ApiMessage(text="bridge closing", username="Server")) - try { - streamConnection.close() - } catch (e: SocketException) { - instance.error("exception: $e") - } - } + fun stop() = streamConnection.close() fun start(clear: Boolean = true) { - enabled = true if (!connected) streamConnection = createThread(clear) - if (!streamConnection.isAlive) { - connecting = true - streamConnection.start() - -// MessageHandler.transmit(ApiMessage(text="bridge connected", username="Server")) - } - if (streamConnection.isAlive) { - instance.info("Bridge Connection opened") - } - + streamConnection.open() } private fun transmitMessage(message: ApiMessage) { @@ -122,7 +62,7 @@ object MessageHandler { instance.error("Server returned $code for $post") sendErrors++ if (sendErrors > 5) { - instance.error("caught too many errors, closing bridge") + instance.error("Caught too many errors, closing bridge") stop() } } @@ -131,17 +71,17 @@ object MessageHandler { instance.error("sending message caused $e") sendErrors++ if (sendErrors > 5) { - instance.error("caught too many errors, closing bridge") + instance.error("Caught too many errors, closing bridge") stop() } } } fun checkConnection(tick: Int) { - if (enabled && tick % 20 == 0 && !MessageHandler.connected && !connecting) { + if (streamConnection.enabled && tick % 20 == 0 && !streamConnection.connected && !streamConnection.connecting) { - if (connectErrors > 5) { - instance.fatal("caught too many errors, closing bridge") + if (streamConnection.connectErrors > 5) { + instance.fatal("Caught too many errors, closing bridge") stop() return } diff --git a/core/src/main/kotlin/matterlink/config/BaseConfig.kt b/core/src/main/kotlin/matterlink/config/BaseConfig.kt index e0888ba..a15241e 100644 --- a/core/src/main/kotlin/matterlink/config/BaseConfig.kt +++ b/core/src/main/kotlin/matterlink/config/BaseConfig.kt @@ -49,7 +49,8 @@ abstract class BaseConfig { data class ConnectOptions( var url: String = "http://localhost:4242", var authToken: String = "", - var gateway: String = "minecraft" + var gateway: String = "minecraft", + var enable: Boolean = true ) data class CommandOptions( @@ -245,6 +246,12 @@ abstract class BaseConfig { category, connect.gateway, "MatterBridge gateway" + ), + enable = getBoolean( + "enable", + category, + connect.enable, + "Enable the relay, it will not work if it is not enabled" ) ) category = CATEGORY_DEATH