From f915955caf2233c57afebbed6bafbffab6e2325e Mon Sep 17 00:00:00 2001 From: Moritz Ruth Date: Fri, 4 Dec 2020 20:17:06 +0100 Subject: [PATCH] Improve Scheduler performance, fix ByteBuf leak and cache TagsPacket encoding --- blokk-api/build.gradle.kts | 3 +- blokk-packet-codecs/build.gradle.kts | 4 + .../blokk/net/packet/play/TagsPacketCodec.kt | 44 +++++++--- .../space/blokk/net/LoginAndJoinProcedure.kt | 5 +- .../space/blokk/net/PacketMessageCodec.kt | 3 +- .../space/blokk/plugin/BlokkPluginManager.kt | 2 +- .../kotlin/space/blokk/util/BlokkScheduler.kt | 84 +++++++++++-------- gradle.properties | 2 + 8 files changed, 95 insertions(+), 52 deletions(-) diff --git a/blokk-api/build.gradle.kts b/blokk-api/build.gradle.kts index 88a1bc7..4457b33 100644 --- a/blokk-api/build.gradle.kts +++ b/blokk-api/build.gradle.kts @@ -18,6 +18,7 @@ val coroutinesVersion = properties["version.kotlinx-coroutines"].toString() val nettyVersion = properties["version.netty"].toString() val junitVersion = properties["version.junit"].toString() val striktVersion = properties["version.strikt"].toString() +val guavaVersion = properties["version.guava"].toString() dependencies { // Kotlin @@ -32,7 +33,7 @@ dependencies { api("io.netty:netty-buffer:${nettyVersion}") // Other - api("com.google.guava:guava:30.0-jre") + api("com.google.guava:guava:$guavaVersion") // Testing testImplementation("io.strikt:strikt-core:${striktVersion}") diff --git a/blokk-packet-codecs/build.gradle.kts b/blokk-packet-codecs/build.gradle.kts index dd1267a..a8db0c2 100644 --- a/blokk-packet-codecs/build.gradle.kts +++ b/blokk-packet-codecs/build.gradle.kts @@ -13,6 +13,7 @@ repositories { val nettyVersion = properties["version.netty"].toString() val junitVersion = properties["version.junit"].toString() val striktVersion = properties["version.strikt"].toString() +val guavaVersion = properties["version.guava"].toString() dependencies { api(project(":blokk-packets")) @@ -20,6 +21,9 @@ dependencies { // Netty api("io.netty:netty-buffer:${nettyVersion}") + // Other + api("com.google.guava:guava:$guavaVersion") + // Testing testImplementation("io.strikt:strikt-core:${striktVersion}") testImplementation("org.junit.jupiter:junit-jupiter-api:${junitVersion}") diff --git a/blokk-packet-codecs/src/main/kotlin/space/blokk/net/packet/play/TagsPacketCodec.kt b/blokk-packet-codecs/src/main/kotlin/space/blokk/net/packet/play/TagsPacketCodec.kt index 6c082c6..e53f821 100644 --- a/blokk-packet-codecs/src/main/kotlin/space/blokk/net/packet/play/TagsPacketCodec.kt +++ b/blokk-packet-codecs/src/main/kotlin/space/blokk/net/packet/play/TagsPacketCodec.kt @@ -1,26 +1,44 @@ package space.blokk.net.packet.play +import com.google.common.cache.CacheBuilder +import com.google.common.cache.CacheLoader +import com.google.common.cache.LoadingCache import io.netty.buffer.ByteBuf +import io.netty.buffer.Unpooled import space.blokk.net.MinecraftProtocolDataTypes.writeString import space.blokk.net.MinecraftProtocolDataTypes.writeVarInt import space.blokk.net.packet.OutgoingPacketCodec import space.blokk.tags.Tag object TagsPacketCodec : OutgoingPacketCodec(0x5C, TagsPacket::class) { - override fun TagsPacket.encode(dst: ByteBuf) { - listOf( - tags.filter { it.type == Tag.Type.BLOCKS }, - tags.filter { it.type == Tag.Type.ITEMS }, - tags.filter { it.type == Tag.Type.FLUIDS }, - tags.filter { it.type == Tag.Type.ENTITY_TYPES } - ).forEach { tags -> - dst.writeVarInt(tags.size) + private val cache: LoadingCache = CacheBuilder.newBuilder() + .maximumSize(3) + .weakKeys() + .removalListener { it.value.release() } + .build(object : CacheLoader() { + override fun load(packet: TagsPacket): ByteBuf { + val dst = Unpooled.buffer() - tags.forEach { tag -> - dst.writeString(tag.name) - dst.writeVarInt(tag.values.size) - tag.numericIDs.forEach { dst.writeVarInt(it) } + listOf( + packet.tags.filter { it.type == Tag.Type.BLOCKS }, + packet.tags.filter { it.type == Tag.Type.ITEMS }, + packet.tags.filter { it.type == Tag.Type.FLUIDS }, + packet.tags.filter { it.type == Tag.Type.ENTITY_TYPES } + ).forEach { tags -> + dst.writeVarInt(tags.size) + + tags.forEach { tag -> + dst.writeString(tag.name) + dst.writeVarInt(tag.values.size) + tag.numericIDs.forEach { dst.writeVarInt(it) } + } + } + + return dst } - } + }) + + override fun TagsPacket.encode(dst: ByteBuf) { + dst.writeBytes(cache.get(this)) } } diff --git a/blokk-server/src/main/kotlin/space/blokk/net/LoginAndJoinProcedure.kt b/blokk-server/src/main/kotlin/space/blokk/net/LoginAndJoinProcedure.kt index 3a5faed..3532a72 100644 --- a/blokk-server/src/main/kotlin/space/blokk/net/LoginAndJoinProcedure.kt +++ b/blokk-server/src/main/kotlin/space/blokk/net/LoginAndJoinProcedure.kt @@ -13,6 +13,7 @@ import space.blokk.net.packet.login.* import space.blokk.net.packet.play.* import space.blokk.player.BlokkPlayer import space.blokk.player.GameMode +import space.blokk.tags.TagRegistry import space.blokk.util.* import space.blokk.world.Chunk import java.security.MessageDigest @@ -22,6 +23,8 @@ import javax.crypto.spec.SecretKeySpec import kotlin.random.Random class LoginAndJoinProcedure(val session: BlokkSession) { + private val tagsPacket by lazy { TagsPacket(TagRegistry.tags.values) } + suspend fun start(packet: LoginStartPacket) { session.state.getOrFail() @@ -173,7 +176,7 @@ class LoginAndJoinProcedure(val session: BlokkSession) { session.send(SetSelectedHotbarSlotPacket(state.selectedHotbarSlot)) session.send(DeclareRecipesPacket(session.server.recipes)) - // TODO: Send Tags packet + session.send(tagsPacket) // TODO: Send Entity Status packet with OP permission level session.send(PlayerPositionAndLookPacket(state.initialWorldAndLocation.location)) diff --git a/blokk-server/src/main/kotlin/space/blokk/net/PacketMessageCodec.kt b/blokk-server/src/main/kotlin/space/blokk/net/PacketMessageCodec.kt index 30166f7..f1a7221 100644 --- a/blokk-server/src/main/kotlin/space/blokk/net/PacketMessageCodec.kt +++ b/blokk-server/src/main/kotlin/space/blokk/net/PacketMessageCodec.kt @@ -27,12 +27,11 @@ class PacketMessageCodec(private val session: BlokkSession) : MessageToMessageCo override fun decode(ctx: ChannelHandlerContext, msg: ByteBuf, out: MutableList) { val packetID = msg.readVarInt() - val data = msg.readBytes(msg.readableBytes()) val codec = session.currentProtocol!!.incomingPacketCodecsByID[packetID] ?: throw IOException("Client sent an unknown packet (ID: $packetID)") - out.add(IncomingPacketMessage(session, codec.decode(data))) + out.add(IncomingPacketMessage(session, codec.decode(msg))) } override fun exceptionCaught(ctx: ChannelHandlerContext, cause: Throwable) { diff --git a/blokk-server/src/main/kotlin/space/blokk/plugin/BlokkPluginManager.kt b/blokk-server/src/main/kotlin/space/blokk/plugin/BlokkPluginManager.kt index 92bcc2b..2b1d604 100644 --- a/blokk-server/src/main/kotlin/space/blokk/plugin/BlokkPluginManager.kt +++ b/blokk-server/src/main/kotlin/space/blokk/plugin/BlokkPluginManager.kt @@ -35,7 +35,7 @@ class BlokkPluginManager(private val server: BlokkServer) : PluginManager { @Suppress("UNCHECKED_CAST") val pluginClass = loader.loadClass(pluginClassName) as Class - return@mapNotNull pluginClass.newInstance() + return@mapNotNull pluginClass.getDeclaredConstructor().newInstance() } } catch (e: ClassNotFoundException) { error = LoadError.PluginClassNotFound(e.message!!) diff --git a/blokk-server/src/main/kotlin/space/blokk/util/BlokkScheduler.kt b/blokk-server/src/main/kotlin/space/blokk/util/BlokkScheduler.kt index b0268e9..c69eedd 100644 --- a/blokk-server/src/main/kotlin/space/blokk/util/BlokkScheduler.kt +++ b/blokk-server/src/main/kotlin/space/blokk/util/BlokkScheduler.kt @@ -1,12 +1,13 @@ package space.blokk.util -import kotlinx.coroutines.* +import kotlinx.coroutines.CancellableContinuation +import kotlinx.coroutines.runBlocking +import kotlinx.coroutines.suspendCancellableCoroutine import space.blokk.Scheduler import space.blokk.logging.Logger import space.blokk.server.Server import java.util.* import java.util.concurrent.ConcurrentHashMap -import java.util.concurrent.Executors import kotlin.collections.LinkedHashSet import kotlin.coroutines.resume @@ -14,9 +15,7 @@ import kotlin.coroutines.resume * Basically ExecutorService but for coroutines and with ticks. */ class BlokkScheduler : Scheduler { - private val dispatcher = Executors.newSingleThreadExecutor().asCoroutineDispatcher() - private val scope = CoroutineScope(dispatcher) - private var job: Job? = null + private var thread: Thread? = null private val tasks = ConcurrentHashMap.newKeySet>() private val shutdownTasks = Collections.synchronizedSet(LinkedHashSet Unit>()) @@ -38,47 +37,64 @@ class BlokkScheduler : Scheduler { fun startTicking() { val interval = 1000L / Server.TICKS_PER_SECOND - job?.cancel() - job = scope.launch { - var lastTickEndTime = System.currentTimeMillis() + thread = Thread { + runBlocking { + var lastTickEndTime = System.currentTimeMillis() + val t = Thread.currentThread() - while (isActive) { - for (task in tasks) { - if (!task.cancelled) { - if (task.ticksUntilExecution == 0) { - try { - task.fn() + while (!t.isInterrupted) { + for (task in tasks) { + if (!task.cancelled) { + if (task.ticksUntilExecution == 0) { + try { + task.fn() - if (task.interval != null) { - task.ticksUntilExecution = task.interval - } else tasks.remove(task) - } catch (e: Exception) { - tasks.remove(task) - e.printStackTrace() + if (task.interval != null) { + task.ticksUntilExecution = task.interval + } else tasks.remove(task) + } catch (e: Exception) { + tasks.remove(task) + e.printStackTrace() + } + } else { + task.ticksUntilExecution -= 1 } - } else { - task.ticksUntilExecution -= 1 } } + + var first = true + var time: Long + + do { + time = System.currentTimeMillis() + val elapsedSinceLastTick = time - lastTickEndTime + val timeUntilNextTick = interval - elapsedSinceLastTick + + if (first && timeUntilNextTick < 0) { + logger.warn("Last tick took ${elapsedSinceLastTick}ms, but should only take ${interval}ms") + // TODO: Do something when this happens too often or suppress the warning for subsequent occurrences + } + + first = false + } while (timeUntilNextTick > 0) + + lastTickEndTime = time } - - val time = System.currentTimeMillis() - val elapsedSinceLastTick = time - lastTickEndTime - val timeUntilNextTick = interval - elapsedSinceLastTick - lastTickEndTime = time - - if (timeUntilNextTick < 0) { - logger.warn("Last tick took ${elapsedSinceLastTick}ms, but should only take ${interval}ms") - // TODO: Do something when this happens too often or suppress the warning for subsequent occurrences - } else delay(timeUntilNextTick) } + }.apply { + name = "Scheduler" + start() } } suspend fun shutdown() { - job?.cancelAndJoin() - dispatcher.close() + thread?.let { + it.interrupt() + + @Suppress("BlockingMethodInNonBlockingContext") + it.join() + } shutdownTasks.forEach { it.invoke() } } diff --git a/gradle.properties b/gradle.properties index bf66016..1e973c2 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,3 +1,4 @@ +# suppress inspection "UnusedProperty" for whole file kotlin.code.style=official version.kotlin=1.4.20 version.netty=4.1.54.Final @@ -6,3 +7,4 @@ version.kotlinx-coroutines=1.4.2 version.slf4j=1.7.30 version.junit=5.7.0 version.strikt=0.28.0 +version.guava=30.0-jre