refactor message stream

This commit is contained in:
NikkyAI 2018-02-17 22:45:48 +01:00
parent e495e588da
commit cfc677868a
4 changed files with 101 additions and 90 deletions

3
.gitignore vendored
View File

@ -100,3 +100,6 @@ classes/
run/ run/
*.tmp *.tmp
**/gen/ **/gen/
\.floo
\.flooignore

View File

@ -1,27 +1,65 @@
package matterlink.bridge; package matterlink.bridge
import matterlink.config.cfg
import matterlink.instance import matterlink.instance
import org.apache.http.client.methods.HttpGet import org.apache.http.client.methods.HttpGet
import org.apache.http.client.methods.HttpRequestBase
import org.apache.http.impl.client.HttpClients import org.apache.http.impl.client.HttpClients
import java.io.InputStream import java.io.InputStream
import java.net.SocketException import java.net.SocketException
import java.util.concurrent.ConcurrentLinkedQueue
val BUFFER_SIZE = 1000 const val BUFFER_SIZE = 1000
class HttpStreamConnection(getClosure: () -> HttpGet, /**
clearClosure: () -> HttpGet, * adds the correct headers for MatterBridge authorization
private val mhandler: (String) -> Unit, */
private val onClose: () -> Unit, fun HttpRequestBase.authorize() {
private val setSuccess: (Boolean) -> Unit, if (cfg.connect.authToken.isNotEmpty() && getHeaders("Authorization").isEmpty())
setHeader("Authorization", "Bearer " + cfg.connect.authToken)
}
class HttpStreamConnection(private val rcvQueue: ConcurrentLinkedQueue<ApiMessage>,
private val clear: Boolean = true private val clear: Boolean = true
) : Thread() { ) : Thread() {
var connected = false
var connecting = false
var enabled = true
var connectErrors = 0
init {
name = "MsgRcvThread"
}
private fun onClose() {
instance.warn("Bridge connection closed!")
connected = false
connecting = false
}
private fun setSuccess(success: Boolean) {
connecting = false
if (success) {
instance.info("connected successfully")
connectErrors = 0
connected = true
} else {
connectErrors++
connected = false
}
}
private val client = HttpClients.createDefault() private val client = HttpClients.createDefault()
private var stream: InputStream? = null private var stream: InputStream? = null
val get = getClosure() val get = HttpGet(cfg.connect.url + "/api/stream").apply {
private val clearGet = clearClosure() authorize()
var cancelled: Boolean = false }
private set private val clearGet = HttpGet(cfg.connect.url + "/api/messages").apply {
authorize()
}
private var cancelled: Boolean = false
override fun run() { override fun run() {
@ -58,7 +96,11 @@ class HttpStreamConnection(getClosure: () -> HttpGet,
while (buffer.contains("\n")) { while (buffer.contains("\n")) {
val line = buffer.substringBefore("\n") val line = buffer.substringBefore("\n")
buffer = buffer.substringAfter("\n") buffer = buffer.substringAfter("\n")
mhandler(line)
rcvQueue.add(
ApiMessage.decode(line)
)
} }
} else if (chars < 0) { } else if (chars < 0) {
break break
@ -81,10 +123,29 @@ class HttpStreamConnection(getClosure: () -> HttpGet,
return return
} }
fun open() {
enabled = true
if (!isAlive && cfg.connect.enable) {
connecting = true
super.start()
// MessageHandler.transmit(ApiMessage(text="bridge connected", username="Server"))
}
if (isAlive) {
instance.info("Bridge Connection opened")
}
}
fun close() { fun close() {
instance.info("Closing bridge connection...")
// MessageHandler.transmit(ApiMessage(text="bridge closing", username="Server"))
try {
enabled = false
cancelled = true cancelled = true
get.abort() get.abort()
join() join()
} catch (e: SocketException) {
instance.error("exception: $e")
}
} }
} }

View File

@ -2,108 +2,48 @@ package matterlink.bridge
import matterlink.config.cfg import matterlink.config.cfg
import matterlink.instance import matterlink.instance
import org.apache.http.client.methods.HttpGet
import org.apache.http.client.methods.HttpPost import org.apache.http.client.methods.HttpPost
import org.apache.http.client.methods.HttpRequestBase
import org.apache.http.entity.ContentType import org.apache.http.entity.ContentType
import org.apache.http.entity.StringEntity import org.apache.http.entity.StringEntity
import org.apache.http.impl.client.HttpClients import org.apache.http.impl.client.HttpClients
import java.io.IOException import java.io.IOException
import java.net.SocketException
import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.ConcurrentLinkedQueue
object MessageHandler { object MessageHandler {
var connected = false
private var connecting = false
private var enabled = true
private var connectErrors = 0
private var sendErrors = 0 private var sendErrors = 0
private var streamConnection: HttpStreamConnection private var streamConnection: HttpStreamConnection
var rcvQueue = ConcurrentLinkedQueue<ApiMessage>() var rcvQueue = ConcurrentLinkedQueue<ApiMessage>()
private set
init { init {
//initialized here so we can make sure rcvQueue is never null
streamConnection = createThread() streamConnection = createThread()
streamConnection.start()
connected = true
} }
private fun HttpRequestBase.authorize() { val connected get() = streamConnection.connected
if (cfg.connect.authToken.isNotEmpty() && getHeaders("Authorization").isEmpty())
setHeader("Authorization", "Bearer " + cfg.connect.authToken)
}
private fun createThread(clear: Boolean = true): HttpStreamConnection { private fun createThread(clear: Boolean = true): HttpStreamConnection {
instance.info("Attempting to open bridge connection.") instance.info("Attempting to open bridge connection.")
instance.info("queue: $rcvQueue")
return HttpStreamConnection( return HttpStreamConnection(
{ rcvQueue,
HttpGet(cfg.connect.url + "/api/stream").apply {
authorize()
}
},
{
HttpGet(cfg.connect.url + "/api/messages").apply {
authorize()
}
},
{
rcvQueue.add(
ApiMessage.decode(it)
)
// instance.debug("Received: " + it)
},
{
instance.warn("Bridge connection closed!")
connected = false
connecting = false
},
{ success ->
connecting = false
if (success) {
instance.info("connected successfully")
connectErrors = 0
connected = true
} else {
connectErrors++
connected = false
}
},
clear clear
) )
} }
fun transmit(msg: ApiMessage) { fun transmit(msg: ApiMessage) {
if ((connected || connecting) && streamConnection.isAlive) { if ((streamConnection.connected || streamConnection.connecting) && streamConnection.isAlive) {
instance.debug("Transmitting: " + msg) instance.debug("Transmitting: " + msg)
transmitMessage(msg) transmitMessage(msg)
} }
} }
fun stop() { fun stop() = streamConnection.close()
enabled = false
instance.info("Closing bridge connection...")
// MessageHandler.transmit(ApiMessage(text="bridge closing", username="Server"))
try {
streamConnection.close()
} catch (e: SocketException) {
instance.error("exception: $e")
}
}
fun start(clear: Boolean = true) { fun start(clear: Boolean = true) {
enabled = true
if (!connected) if (!connected)
streamConnection = createThread(clear) streamConnection = createThread(clear)
if (!streamConnection.isAlive) { streamConnection.open()
connecting = true
streamConnection.start()
// MessageHandler.transmit(ApiMessage(text="bridge connected", username="Server"))
}
if (streamConnection.isAlive) {
instance.info("Bridge Connection opened")
}
} }
private fun transmitMessage(message: ApiMessage) { private fun transmitMessage(message: ApiMessage) {
@ -122,7 +62,7 @@ object MessageHandler {
instance.error("Server returned $code for $post") instance.error("Server returned $code for $post")
sendErrors++ sendErrors++
if (sendErrors > 5) { if (sendErrors > 5) {
instance.error("caught too many errors, closing bridge") instance.error("Caught too many errors, closing bridge")
stop() stop()
} }
} }
@ -131,17 +71,17 @@ object MessageHandler {
instance.error("sending message caused $e") instance.error("sending message caused $e")
sendErrors++ sendErrors++
if (sendErrors > 5) { if (sendErrors > 5) {
instance.error("caught too many errors, closing bridge") instance.error("Caught too many errors, closing bridge")
stop() stop()
} }
} }
} }
fun checkConnection(tick: Int) { fun checkConnection(tick: Int) {
if (enabled && tick % 20 == 0 && !MessageHandler.connected && !connecting) { if (streamConnection.enabled && tick % 20 == 0 && !streamConnection.connected && !streamConnection.connecting) {
if (connectErrors > 5) { if (streamConnection.connectErrors > 5) {
instance.fatal("caught too many errors, closing bridge") instance.fatal("Caught too many errors, closing bridge")
stop() stop()
return return
} }

View File

@ -49,7 +49,8 @@ abstract class BaseConfig {
data class ConnectOptions( data class ConnectOptions(
var url: String = "http://localhost:4242", var url: String = "http://localhost:4242",
var authToken: String = "", var authToken: String = "",
var gateway: String = "minecraft" var gateway: String = "minecraft",
var enable: Boolean = true
) )
data class CommandOptions( data class CommandOptions(
@ -245,6 +246,12 @@ abstract class BaseConfig {
category, category,
connect.gateway, connect.gateway,
"MatterBridge gateway" "MatterBridge gateway"
),
enable = getBoolean(
"enable",
category,
connect.enable,
"Enable the relay, it will not work if it is not enabled"
) )
) )
category = CATEGORY_DEATH category = CATEGORY_DEATH