clear backlog before connecting to message stream

This commit is contained in:
NikkyAI 2018-02-09 22:49:37 +01:00 committed by Unknown
parent fdce1c0cc9
commit af7c93885b
7 changed files with 42 additions and 23 deletions

View File

@ -42,7 +42,7 @@ object MatterLink : IMatterLink() {
@Mod.EventHandler @Mod.EventHandler
fun serverAboutToStart(event: FMLServerAboutToStartEvent) { fun serverAboutToStart(event: FMLServerAboutToStartEvent) {
MessageHandler.start() MessageHandler.start(clear = true)
} }
@Mod.EventHandler @Mod.EventHandler
@ -50,12 +50,12 @@ object MatterLink : IMatterLink() {
logger.debug("Registering server commands") logger.debug("Registering server commands")
event.registerServerCommand(CommandMatterlink()) event.registerServerCommand(CommandMatterlink())
MessageHandler.rcvQueue.clear() connect()
} }
@Mod.EventHandler @Mod.EventHandler
fun serverStopping(event: FMLServerStoppingEvent) { fun serverStopping(event: FMLServerStoppingEvent) {
MessageHandler.stop() disconnect()
} }
//FORGE-DEPENDENT //FORGE-DEPENDENT

View File

@ -4,6 +4,7 @@ import com.google.common.collect.Lists
import matterlink.MODID import matterlink.MODID
import matterlink.bridge.MessageHandler import matterlink.bridge.MessageHandler
import matterlink.bridge.ServerChatHandler import matterlink.bridge.ServerChatHandler
import matterlink.instance
import matterlink.logger import matterlink.logger
import net.minecraft.command.CommandBase import net.minecraft.command.CommandBase
import net.minecraft.command.ICommandSender import net.minecraft.command.ICommandSender
@ -40,17 +41,10 @@ class CommandMatterlink : CommandBase() {
val cmd = args[0].toLowerCase() val cmd = args[0].toLowerCase()
when (cmd) { when (cmd) {
"connect" -> { "connect" -> {
if (MessageHandler.start()) { instance.connect()
MessageHandler.rcvQueue.clear()
logger.info("Connected to matterbridge relay")
ServerChatHandler.processMessages = true
} else {
logger.error("Connection to matterbridge relay failed.")
}
} }
"disconnect" -> { "disconnect" -> {
MessageHandler.stop() instance.disconnect()
ServerChatHandler.processMessages = false
} }
} }
} }

View File

@ -6,7 +6,7 @@ subprojects {
apply plugin: 'idea' apply plugin: 'idea'
group = 'matterlink' group = 'matterlink'
version = '1.1.0' version = '1.1.1'
idea { idea {
module { module {

View File

@ -1,5 +1,7 @@
package matterlink package matterlink
import matterlink.bridge.MessageHandler
import matterlink.bridge.ServerChatHandler
import org.apache.logging.log4j.Level import org.apache.logging.log4j.Level
import org.apache.logging.log4j.Logger import org.apache.logging.log4j.Logger
import org.apache.logging.log4j.message.SimpleMessageFactory import org.apache.logging.log4j.message.SimpleMessageFactory
@ -25,16 +27,28 @@ abstract class IMatterLink {
abstract fun wrappedSendToPlayers(msg: String) abstract fun wrappedSendToPlayers(msg: String)
abstract fun wrappedPlayerList(): Array<String> abstract fun wrappedPlayerList(): Array<String>
fun connect() {
if (MessageHandler.start(clear = true)) {
logger.info("Connected to matterbridge relay")
} else {
logger.error("Connection to matterbridge relay failed.")
}
}
fun disconnect () {
MessageHandler.stop()
}
} }
class DummyLink : IMatterLink() { class DummyLink : IMatterLink() {
override fun wrappedPlayerList(): Array<String> { override fun wrappedPlayerList(): Array<String> {
TODO("not implemented") //To change body of created functions use File | Settings | File Templates. TODO("not implemented, make sure the MatterLink implementations is loaded") //To change body of created functions use File | Settings | File Templates.
} }
override fun wrappedSendToPlayers(msg: String) { override fun wrappedSendToPlayers(msg: String) {
TODO("not implemented") //To change body of created functions use File | Settings | File Templates. TODO("not implemented, make sure the MatterLink implementations is loaded") //To change body of created functions use File | Settings | File Templates.
} }
} }

View File

@ -8,16 +8,23 @@ import java.net.SocketException
val BUFFER_SIZE = 1000 val BUFFER_SIZE = 1000
class HttpStreamConnection(private val getClosure: () -> HttpGet, private val mhandler: (String) -> Unit, private val onClose: () -> Unit) : Thread() { class HttpStreamConnection(getClosure: () -> HttpGet, clearClosure: () -> HttpGet, private val mhandler: (String) -> Unit, private val onClose: () -> Unit, private val clear: Boolean = true) : Thread() {
private val client = HttpClients.createDefault() private val client = HttpClients.createDefault()
private var stream: InputStream? = null private var stream: InputStream? = null
val get = getClosure() val get = getClosure()
private val clearGet = clearClosure()
var cancelled: Boolean = false var cancelled: Boolean = false
private set private set
override fun run() { override fun run() {
if(clear) {
val r = client.execute(clearGet)
r.entity.content.bufferedReader().forEachLine {
logger.debug("skipping $it")
}
}
val response = client.execute(get) val response = client.execute(get)
val content = response.entity.content.buffered() val content = response.entity.content.buffered()
stream = content stream = content

View File

@ -25,12 +25,12 @@ object MessageHandler {
connected = true connected = true
} }
fun HttpRequestBase.authorize() { private fun HttpRequestBase.authorize() {
if (cfg!!.connect.authToken.isNotEmpty() && getHeaders("Authorization").isEmpty()) if (cfg!!.connect.authToken.isNotEmpty() && getHeaders("Authorization").isEmpty())
setHeader("Authorization", "Bearer " + cfg!!.connect.authToken) setHeader("Authorization", "Bearer " + cfg!!.connect.authToken)
} }
private fun createThread(): HttpStreamConnection { private fun createThread(clear: Boolean = true): HttpStreamConnection {
logger.info("Attempting to open bridge connection.") logger.info("Attempting to open bridge connection.")
return HttpStreamConnection( return HttpStreamConnection(
{ {
@ -38,6 +38,11 @@ object MessageHandler {
authorize() authorize()
} }
}, },
{
HttpGet(cfg!!.connect.url + "/api/messages").apply {
authorize()
}
},
{ {
rcvQueue.add( rcvQueue.add(
ApiMessage.decode(it) ApiMessage.decode(it)
@ -47,7 +52,8 @@ object MessageHandler {
{ {
logger.info("Bridge connection closed!") logger.info("Bridge connection closed!")
connected = false connected = false
} },
clear
) )
} }
@ -64,9 +70,9 @@ object MessageHandler {
streamConnection.close() streamConnection.close()
} }
fun start(): Boolean { fun start(clear: Boolean = true): Boolean {
if (!connected) if (!connected)
streamConnection = createThread() streamConnection = createThread(clear)
if (!streamConnection.isAlive) { if (!streamConnection.isAlive) {
streamConnection.start() streamConnection.start()
// MessageHandler.transmit(ApiMessage(text="bridge connected", username="Server")) // MessageHandler.transmit(ApiMessage(text="bridge connected", username="Server"))

View File

@ -6,13 +6,11 @@ import matterlink.bridge.command.BridgeCommandRegistry
import matterlink.cfg import matterlink.cfg
object ServerChatHandler { object ServerChatHandler {
var processMessages: Boolean = true
/** /**
* This method must be called every server tick with no arguments. * This method must be called every server tick with no arguments.
*/ */
fun writeIncomingToChat() { fun writeIncomingToChat() {
if (!processMessages) return
if (MessageHandler.rcvQueue.isNotEmpty()) if (MessageHandler.rcvQueue.isNotEmpty())
logger.debug("incoming: " + MessageHandler.rcvQueue.toString()) logger.debug("incoming: " + MessageHandler.rcvQueue.toString())
val nextMessage = MessageHandler.rcvQueue.poll() val nextMessage = MessageHandler.rcvQueue.poll()