move api into core

This commit is contained in:
nikky 2018-08-15 18:42:43 +02:00
parent e30e8132ca
commit 7818afc920
4 changed files with 459 additions and 0 deletions

View File

@ -0,0 +1,92 @@
package matterlink.api
import com.google.gson.GsonBuilder
import com.google.gson.annotations.SerializedName
/**
* Created by nikky on 07/05/18.
*
* @author Nikky
* @version 1.0
*/
class ApiMessage (
username: String? = null,
text: String? = null,
gateway: String? = null,
channel: String? = null,
userid: String? = null,
avatar: String? = null,
account: String? = null,
protocol: String? = null,
event: String? = null,
id: String? = null
) {
@SerializedName("username") private var _username: String? = username
@SerializedName("text") private var _text: String? = text
@SerializedName("gateway") private var _gateway: String? = gateway
@SerializedName("channel") private var _channel: String? = channel
@SerializedName("userid") private var _userid: String? = userid
@SerializedName("avatar") private var _avatar: String? = avatar
@SerializedName("account") private var _account: String? = account
@SerializedName("protocol") private var _protocol: String? = protocol
@SerializedName("event") private var _event: String? = event
@SerializedName("id") private var _id: String? = id
var username: String
get() = _username ?: ""
set(username) { this._username = username }
var text: String
get() = _text ?: ""
set(text) { this._text = text }
var gateway: String
get() = _gateway ?: ""
set(gateway) { this._gateway = gateway }
var channel: String
get() = _channel ?: ""
set(channel) { this._channel = channel }
var userid: String
get() = _userid ?: ""
set(userid) { this._userid = userid }
var avatar: String
get() = _avatar ?: ""
set(avatar) { this._avatar = avatar }
var account: String
get() = _account ?: ""
set(account) { this._account = account }
var protocol: String
get() = _protocol ?: ""
set(protocol) { this._protocol = protocol }
var event: String
get() = _event ?: ""
set(event) { this._event = event }
var id: String
get() = _id ?: ""
set(id) { this._id = id }
fun encode(): String {
return gson.toJson(this)
}
override fun toString(): String = encode()
companion object {
val USER_ACTION = "user_action"
val JOIN_LEAVE = "join_leave"
private val gson = GsonBuilder()
.create()
fun decode(json: String): ApiMessage {
return gson.fromJson(json, ApiMessage::class.java)
}
}
}

View File

@ -0,0 +1,16 @@
package matterlink.api
data class Config (
var url: String = "",
var token: String = "",
var announceConnect: Boolean = true,
var announceDisconnect: Boolean = true,
var reconnectWait: Long = 500,
var systemUser: String = "Server"
)
{
fun sync(connection: StreamConnection) {
connection.token = token
connection.host = url
}
}

View File

@ -0,0 +1,201 @@
package matterlink.api
import org.apache.logging.log4j.Logger
import java.io.BufferedReader
import java.io.DataOutputStream
import java.io.IOException
import java.io.InputStreamReader
import java.net.HttpURLConnection
import java.net.MalformedURLException
import java.net.ProtocolException
import java.net.URL
import java.util.concurrent.ConcurrentLinkedQueue
/**
* Created by nikky on 07/05/18.
*
* @author Nikky
* @version 1.0
*/
open class MessageHandler {
private var enabled = false
private var connectErrors = 0
private var reconnectCooldown = 0
private var sendErrors = 0
var config: Config = Config()
set(value) {
field = value.apply {
sync(streamConnection)
}
}
//TODO: make callbacks: onConnect onDisconnect onError etc
var queue: ConcurrentLinkedQueue<ApiMessage> = ConcurrentLinkedQueue()
private set
private var streamConnection: StreamConnection = StreamConnection(queue)
var logger: Logger
get() = streamConnection.logger
set(l) {
streamConnection.logger = l
}
private var nextCheck: Long = 0
init {
streamConnection.addOnSuccess { success ->
if (success) {
logger.info("connected successfully")
connectErrors = 0
reconnectCooldown = 0
} else {
reconnectCooldown = connectErrors
connectErrors++
logger.error(String.format("connectErrors: %d", connectErrors))
}
}
}
fun stop(message: String? = null) {
if (message != null && config.announceDisconnect) {
sendStatusUpdate(message)
}
enabled = false
streamConnection.close()
}
fun start(message: String?, clear: Boolean) {
config.sync(streamConnection)
if (clear) {
clear()
}
enabled = true
streamConnection.open()
if (message != null && config.announceConnect) {
sendStatusUpdate(message)
}
}
private fun clear() {
try {
val url = URL(config.url + "/api/messages")
val conn = url.openConnection() as HttpURLConnection
if (!config.token.isEmpty()) {
val bearerAuth = "Bearer " + config.token
conn.setRequestProperty("Authorization", bearerAuth)
}
conn.requestMethod = "GET"
BufferedReader(InputStreamReader(conn.inputStream)).forEachLine { line ->
logger.trace("skipping $line")
}
} catch (e: MalformedURLException) {
e.printStackTrace()
} catch (e: ProtocolException) {
e.printStackTrace()
} catch (e: IOException) {
e.printStackTrace()
}
}
open fun sendStatusUpdate(message: String) {
transmit(ApiMessage(text = message))
}
open fun transmit(msg: ApiMessage) {
if (streamConnection.isConnected || streamConnection.isConnecting) {
if (msg.username.isEmpty())
msg.username = config.systemUser
if (msg.gateway.isEmpty()) {
logger.fatal("missing gateway on message: $msg")
return
}
logger.debug("Transmitting: $msg")
transmitMessage(msg)
}
}
private fun transmitMessage(message: ApiMessage) {
try {
val url = URL(config.url + "/api/message")
val conn = url.openConnection() as HttpURLConnection
if (!config.token.isEmpty()) {
val bearerAuth = "Bearer " + config.token
conn.setRequestProperty("Authorization", bearerAuth)
}
val postData = message.encode()
logger.trace(postData)
conn.requestMethod = "POST"
conn.setRequestProperty("Content-Type", "application/json")
conn.setRequestProperty("charset", "utf-8")
conn.setRequestProperty("Content-Length", "" + postData.toByteArray().size)
conn.doOutput = true
conn.doInput = true
DataOutputStream(conn.outputStream).use { wr -> wr.write(postData.toByteArray()) }
// conn.getInputStream().close();
conn.connect()
val code = conn.responseCode
if (code != 200) {
logger.error("Server returned $code")
sendErrors++
if (sendErrors > 5) {
logger.error("Interrupting Connection to matterbridge API due to status code $code")
stop()
}
} else {
sendErrors = 0
}
} catch (e: IOException) {
e.printStackTrace()
logger.error("sending message caused $e")
sendErrors++
if (sendErrors > 5) {
logger.error("Caught too many errors, closing bridge")
stop()
}
}
}
/**
* clll this method every tick / cycle to make sure it is reconnecting
*/
fun checkConnection() {
if (enabled && !streamConnection.isConnected && !streamConnection.isConnecting) {
logger.trace("check connection")
logger.trace("next: $nextCheck")
logger.trace("now: " + System.currentTimeMillis())
if (nextCheck > System.currentTimeMillis()) return
nextCheck = System.currentTimeMillis() + config.reconnectWait
if (connectErrors >= 10) {
logger.error("Caught too many errors, closing bridge")
stop("Interrupting connection to matterbridge API due to accumulated connection errors")
return
}
if (reconnectCooldown <= 0) {
logger.info("Trying to reconnect")
start("Reconnecting to matterbridge API after connection error", false)
} else {
reconnectCooldown--
}
}
}
}

View File

@ -0,0 +1,150 @@
package matterlink.api
import org.apache.logging.log4j.LogManager
import java.io.IOException
import java.io.InputStream
import java.net.ConnectException
import java.net.HttpURLConnection
import java.net.MalformedURLException
import java.net.URL
import java.util.Arrays
import java.util.LinkedList
import java.util.concurrent.ConcurrentLinkedQueue
/**
* Created by nikky on 07/05/18.
*
* @author Nikky
* @version 1.0
*/
class StreamConnection(private val rcvQueue: ConcurrentLinkedQueue<ApiMessage>) : Runnable {
private var thread: Thread = createThread()
private var urlConnection: HttpURLConnection? = null
private val onSuccessCallbacks = LinkedList<(Boolean) -> Unit>()
var logger = LogManager.getLogger("matterlink.api")
var host = ""
var token = ""
var isConnected = false
private set
var isConnecting = false
private set
var isCancelled = false
private set
private fun createThread(): Thread {
val thread = Thread(this)
thread.name = "RcvThread"
return thread
}
fun addOnSuccess(callback: (Boolean) -> Unit) {
onSuccessCallbacks.add(callback)
}
fun removeOnSuccess(callback: (Boolean) -> Unit) {
onSuccessCallbacks.remove(callback)
}
private fun onSuccess(success: Boolean) {
isConnecting = false
isConnected = success
for (callback in onSuccessCallbacks) {
callback(success)
}
}
override fun run() {
try {
val serviceURL = "$host/api/stream"
val myURL: URL
myURL = URL(serviceURL)
urlConnection = myURL.openConnection() as HttpURLConnection
urlConnection!!.requestMethod = "GET"
if (!token.isEmpty()) {
val bearerAuth = "Bearer $token"
urlConnection!!.setRequestProperty("Authorization", bearerAuth)
}
try {
urlConnection!!.inputStream.use { input ->
logger.info("connection opened")
onSuccess(true)
// BufferedInputStream bufferedInput = new BufferedInputStream(input, 8 * 1024);
val buffer = StringBuilder()
while (!isCancelled) {
val buf = ByteArray(1024)
Thread.sleep(10)
while (input.available() <= 0) {
if (isCancelled) break
Thread.sleep(10)
}
val chars = input.read(buf)
logger.trace( String.format("read %d chars", chars))
if (chars > 0) {
val added = String(Arrays.copyOfRange(buf, 0, chars))
logger.debug("DEBUG", "json: $added")
buffer.append(added)
while (buffer.toString().contains("\n")) {
val index = buffer.indexOf("\n")
val line = buffer.substring(0, index)
buffer.delete(0, index + 1)
rcvQueue.add(ApiMessage.decode(line))
}
} else if (chars < 0) {
break
}
}
}
} finally {
onClose()
}
} catch (e: MalformedURLException) {
e.printStackTrace()
} catch (e: ConnectException) {
e.printStackTrace()
onSuccess(false)
} catch (e: IOException) {
e.printStackTrace()
onSuccess(false)
} catch (e: InterruptedException) {
e.printStackTrace()
}
}
private fun onClose() {
logger.info("Bridge connection closed!")
isConnected = false
isConnecting = false
}
fun open() {
if (!thread.isAlive) {
thread = createThread()
isConnecting = true
isCancelled = false
thread.start()
logger.info("Starting Connection")
}
if (thread.isAlive) {
logger.info("Bridge is connecting")
}
}
fun close() {
try {
isCancelled = true
if (urlConnection != null) {
urlConnection!!.disconnect()
}
thread.join()
logger.info("Thread stopped")
} catch (e: InterruptedException) {
e.printStackTrace()
}
}
}