Archived
1
0
Fork 0

Use one EventBus for all events instead of one for every player/entity/session/...

This commit is contained in:
Moritz Ruth 2020-12-23 23:28:23 +01:00
parent afb806f9fb
commit 1f10e52131
No known key found for this signature in database
GPG key ID: AFD57E23E753841B
34 changed files with 265 additions and 474 deletions

View file

@ -1,8 +1,5 @@
package space.blokk
import space.blokk.event.EventTargetGroup
import space.blokk.net.Session
import space.blokk.player.Player
import space.blokk.server.Server
private lateinit var serverInstance: Server

View file

@ -3,16 +3,13 @@ package space.blokk.entity
import space.blokk.NamespacedID
import space.blokk.event.Event
import space.blokk.event.EventBus
import space.blokk.event.EventTarget
import space.blokk.logging.Logger
import java.util.*
import kotlin.reflect.KClass
abstract class Entity internal constructor() : EventTarget<Event> {
abstract class Entity internal constructor() {
val uuid: UUID = UUID.randomUUID()
override val eventBus: EventBus<Event> = EventBus(Event::class, Logger("Entity/$uuid"))
companion object {
internal inline fun <reified T : Entity> type(
numericID: Int,

View file

@ -1,3 +1,10 @@
package space.blokk.event
import space.blokk.Blokk
import kotlin.reflect.KClass
abstract class Event
abstract class TargetedEvent<T>: Event() {
abstract val target: T
}

View file

@ -1,104 +1,116 @@
package space.blokk.event
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.async
import space.blokk.Blokk
import space.blokk.logging.Logger
import kotlinx.coroutines.suspendCancellableCoroutine
import space.blokk.plugin.Plugin
import space.blokk.util.pluralizeWithCount
import kotlin.coroutines.CoroutineContext
import java.util.concurrent.ConcurrentSkipListSet
import kotlin.coroutines.resume
import kotlin.reflect.KClass
import kotlin.reflect.KFunction
import kotlin.reflect.full.*
import kotlin.system.measureTimeMillis
// TODO: Only create one event bus for everything and add helper method instead
class EventBus<EventT : Event>(
private val eventClass: KClass<EventT>,
private val logger: Logger,
coroutineContext: CoroutineContext = Blokk.coroutineContext
) {
private val scope = CoroutineScope(coroutineContext)
abstract class EventEmitter<E: Event> {
protected val handlers = ConcurrentSkipListSet<EventHandler<*>>()
/**
* All event handlers, sorted by their priority and the order in which they were registered.
* Registers a new event handler.
*/
private val handlers = mutableListOf<Handler<EventT>>()
open fun <T: E> on(
eventType: KClass<T>,
priority: EventPriority = EventPriority.NORMAL,
fn: suspend (event: T) -> Unit
): EventHandler<T> {
val handler = EventHandler(this, eventType, Plugin.getCalling(), priority, fn)
handlers.add(handler)
return handler
}
/**
* Invokes all previously registered event handlers sorted by their priority
* and the order in which they were registered.
* Registers a new event handler.
*
* The handler function receives a function to remove the event handler as second parameter.
*/
fun <T: E> onRemovable(
eventType: KClass<T>,
priority: EventPriority = EventPriority.NORMAL,
fn: suspend (event: T, remove: () -> Unit) -> Unit
): EventHandler<T> {
lateinit var handler: EventHandler<T>
handler = on(eventType, priority) {
fn(it, handler::remove)
}
return handler
}
/**
* Registers a new event handler which is automatically removed after it was invoked.
*/
fun <T: E> once(
eventType: KClass<T>,
priority: EventPriority = EventPriority.NORMAL,
fn: suspend (event: T) -> Unit
): EventHandler<T> = onRemovable(eventType, priority) { event, remove -> remove(); fn(event) }
/**
* Suspends until an event of type [T] is emitted and returns it.
*/
suspend fun <T: E> waitFor(eventType: KClass<T>, priority: EventPriority = EventPriority.NORMAL): T =
suspendCancellableCoroutine { c ->
val handler = EventHandler(this, eventType, Plugin.getCalling(), priority) {
c.resume(it)
}
handlers.add(handler)
c.invokeOnCancellation {
handlers.remove(handler)
}
}
/**
* @return Whether the event handler was not already removed.
*/
fun remove(eventHandler: EventHandler<*>): Boolean = handlers.remove(eventHandler)
/**
* Registers a new event handler.
*/
inline fun <reified T: E> on(
priority: EventPriority = EventPriority.NORMAL,
noinline fn: suspend (event: T) -> Unit
): EventHandler<T> = on(T::class, priority, fn)
/**
* Registers a new event handler.
*
* The handler function receives a function to remove the event handler as second parameter.
*/
inline fun <reified T: E> onRemovable(
priority: EventPriority = EventPriority.NORMAL,
noinline fn: suspend (event: T, remove: () -> Unit) -> Unit
): EventHandler<T> = onRemovable(T::class, priority, fn)
/**
* Registers a new event handler which is automatically removed after it was invoked.
*/
inline fun <reified T: E> once(
priority: EventPriority = EventPriority.NORMAL,
noinline fn: suspend (event: T) -> Unit
): EventHandler<T> = once(T::class, priority, fn)
/**
* Suspends until an event of type [T] is emitted and returns it.
*/
suspend inline fun <reified T: E> waitFor(priority: EventPriority = EventPriority.NORMAL): T =
waitFor(T::class, priority)
}
abstract class EventBus: EventEmitter<Event>() {
/**
* Invokes all previously registered event handlers sorted by their priority.
*
* The coroutine context is inherited.
*
* @return [event]
*/
suspend fun <T : EventT> emit(event: T): T {
if (Blokk.developmentMode) {
val handlers = handlers.filter { it.eventType.isInstance(event) }
val time = measureTimeMillis { handlers.forEach { it.fn.callSuspend(it.listener, event) } }
logger trace "Emitted ${event::class.java.simpleName} to " +
"${pluralizeWithCount("handler", handlers.size)}, took ${time}ms"
} else handlers.filter { it.eventType.isInstance(event) }.forEach { it.fn.callSuspend(it.listener, event) }
return event
}
fun <T : EventT> emitAsync(event: T) = scope.async { emit(event) }
/**
* Registers all [event handlers][EventHandler] in [listener] to be invoked when their corresponding event is emitted.
*
* @return [listener]
* @throws InvalidEventHandlerException if one of the event handlers does not meet the requirements
*/
fun <T : Listener> register(listener: T): T {
val handlersOfListener = listener::class.functions
.mapNotNull { method -> method.findAnnotation<EventHandler>()?.let { method to it } }
.toMap()
for ((method, data) in handlersOfListener) {
if (method.valueParameters.size != 1)
throw InvalidEventHandlerException("${method.name} must have exactly one parameter")
@Suppress("UNCHECKED_CAST")
val klass = method.parameters[1].type.classifier as KClass<EventT>
if (!eventClass.isSuperclassOf(klass))
throw InvalidEventHandlerException(
"${method.name}'s first parameter type is incompatible with the " +
"one required by the EventBus"
)
@Suppress("UNCHECKED_CAST")
val handler = Handler(
klass,
listener,
method as KFunction<EventT>,
Plugin.getCalling(),
data.priority
)
val insertIndex = handlers.indexOfLast { it.priority.ordinal <= handler.priority.ordinal } + 1
handlers.add(insertIndex, handler)
}
return listener
}
/**
* Unregisters all [event handlers][EventHandler] in [listener].
*/
fun unregister(listener: Listener) {
handlers.removeIf { it.listener === listener }
}
class InvalidEventHandlerException internal constructor(message: String) : Exception(message)
private data class Handler<T : Event>(
val eventType: KClass<T>,
val listener: Listener,
val fn: KFunction<T>,
val plugin: Plugin?,
val priority: EventPriority
)
abstract suspend fun <T : Event> emit(event: T): T
}

View file

@ -0,0 +1,12 @@
package space.blokk.event
import space.blokk.Blokk
import kotlin.reflect.KClass
class EventBusWrapper<E>(private val target: Any): EventEmitter<TargetedEvent<E>>() {
override fun <T : TargetedEvent<E>> on(
eventType: KClass<T>,
priority: EventPriority,
fn: suspend (event: T) -> Unit
): EventHandler<T> = Blokk.eventBus.on(eventType, priority) { event -> if (event.target == target) fn(event) }
}

View file

@ -1,3 +1,19 @@
package space.blokk.event
annotation class EventHandler(val priority: EventPriority = EventPriority.NORMAL)
import space.blokk.plugin.Plugin
import kotlin.reflect.KClass
data class EventHandler<T : Event> internal constructor(
private val eventEmitter: EventEmitter<*>,
val eventType: KClass<T>,
val plugin: Plugin?,
val priority: EventPriority,
val fn: suspend (event: T) -> Unit
): Comparable<EventHandler<*>> {
/**
* Unregisters this event handler.
*/
fun remove(): Boolean = eventEmitter.remove(this)
override fun compareTo(other: EventHandler<*>): Int = priority.compareTo(other.priority)
}

View file

@ -1,27 +0,0 @@
package space.blokk.event
import kotlinx.coroutines.Deferred
interface EventTarget<T : Event> {
val eventBus: EventBus<T>
/**
* Shorthand for [`eventBus.emit(event)`][EventBus.emit].
*/
suspend fun <E : T> emit(event: E): E = eventBus.emit(event)
/**
* Shorthand for [`eventBus.emitAsync(event)`][EventBus.emitAsync].
*/
fun <E : T> emitAsync(event: E): Deferred<E> = eventBus.emitAsync(event)
/**
* Shorthand for [`eventBus.register(listener)`][EventBus.register].
*/
fun registerListener(listener: Listener) = eventBus.register(listener)
/**
* Shorthand for [`eventBus.unregister(listener)`][EventBus.unregister].
*/
fun unregisterListener(listener: Listener) = eventBus.unregister(listener)
}

View file

@ -1,47 +0,0 @@
package space.blokk.event
import java.util.concurrent.CopyOnWriteArraySet
abstract class EventTargetGroup<T : EventTarget<*>>(threadSafe: Boolean = false) : Iterable<T> {
private val targets: MutableSet<T> = if (threadSafe) CopyOnWriteArraySet() else HashSet()
private val listeners: MutableSet<Listener> = if (threadSafe) CopyOnWriteArraySet() else HashSet()
/**
* Registers a listener for all elements in this group.
*
* You should never reuse [listener] for other groups or directly on event targets
* because this may lead to strange behaviour.
*/
fun <T : Listener> registerListener(listener: T): T {
listeners.add(listener)
targets.forEach { it.eventBus.register(listener) }
return listener
}
/**
* Unregisters a listener for all elements in this group.
*
* You should only unregister listeners which you previously registered using [registerListener].
*/
fun unregisterListener(listener: Listener) {
targets.forEach { it.eventBus.unregister(listener) }
listeners.remove(listener)
}
override fun iterator(): Iterator<T> = targets.iterator()
protected fun addTarget(target: T) {
targets.add(target)
listeners.forEach { target.eventBus.register(it) }
}
protected fun removeTarget(target: T) {
targets.remove(target)
listeners.forEach { target.eventBus.unregister(it) }
}
class Mutable<T : EventTarget<*>>(threadSafe: Boolean = false) : EventTargetGroup<T>(threadSafe) {
fun add(target: T) = addTarget(target)
fun remove(target: T) = removeTarget(target)
}
}

View file

@ -1,3 +0,0 @@
package space.blokk.event
interface Listener

View file

@ -3,7 +3,7 @@ package space.blokk.net
import io.netty.buffer.ByteBuf
import kotlinx.coroutines.CoroutineScope
import space.blokk.chat.TextComponent
import space.blokk.event.EventTarget
import space.blokk.event.EventBusWrapper
import space.blokk.net.event.SessionEvent
import space.blokk.net.packet.OutgoingPacket
import space.blokk.net.packet.Protocol
@ -14,29 +14,32 @@ import java.net.InetAddress
import java.util.*
import kotlin.coroutines.CoroutineContext
interface Session : EventTarget<SessionEvent>, CoroutineScope {
abstract class Session {
@Suppress("LeakingThis")
val events = EventBusWrapper<Session>(this)
/**
* The IP address of this session
*/
val address: InetAddress
abstract val address: InetAddress
/**
* Unconfined [CoroutineContext] which is cancelled when the session is disconnected.
*/
override val coroutineContext: CoroutineContext
abstract val coroutineContext: CoroutineContext
/**
* The brand name the client optionally sent during the login procedure.
* [ClientBrandReceivedEvent][space.blokk.net.event.ClientBrandReceivedEvent] is emitted when this value changes.
*/
val brand: String?
abstract val brand: String?
/**
* The round trip time (in milliseconds) of the KeepAlive packet which is sent to the client every few seconds.
*
* It is -1 until the first IncomingKeepAlive packet is received.
*/
val ping: Int
abstract val ping: Int
/**
* The player corresponding to this session.
@ -46,7 +49,7 @@ interface Session : EventTarget<SessionEvent>, CoroutineScope {
/**
* The current state of this session.
*/
val state: State
abstract val state: State
sealed class State {
object WaitingForHandshake : State()
@ -90,7 +93,7 @@ interface Session : EventTarget<SessionEvent>, CoroutineScope {
/**
* The protocol this session is currently using. If this is null, the client is not connected anymore.
*/
val currentProtocol: Protocol?
abstract val currentProtocol: Protocol?
/**
* Closes the connection with the client.
@ -99,7 +102,7 @@ interface Session : EventTarget<SessionEvent>, CoroutineScope {
* @param loggableReason A short, loggable representation of [reason]. Not shown to the player, except
* in development mode.
*/
suspend fun disconnect(
abstract suspend fun disconnect(
reason: TextComponent = TextComponent of "Disconnected.",
loggableReason: String = reason.text
)
@ -107,10 +110,10 @@ interface Session : EventTarget<SessionEvent>, CoroutineScope {
/**
* Sends a packet.
*/
suspend fun send(packet: OutgoingPacket)
abstract suspend fun send(packet: OutgoingPacket)
/**
* Sends a plugin message packet.
*/
suspend fun sendPluginMessage(channel: String, data: ByteBuf)
abstract suspend fun sendPluginMessage(channel: String, data: ByteBuf)
}

View file

@ -7,8 +7,8 @@ import space.blokk.net.Session
* Emitted when the client sends his brand during the login process.
*/
class ClientBrandReceivedEvent(
session: Session,
override val target: Session,
var brand: String
) : SessionEvent(session), Cancellable {
) : SessionEvent(), Cancellable {
override var cancelled = false
}

View file

@ -7,6 +7,6 @@ import space.blokk.net.packet.IncomingPacket
/**
* Emitted when a packet is received.
*/
class PacketReceivedEvent<T : IncomingPacket>(session: Session, var packet: T) : SessionEvent(session), Cancellable {
class PacketReceivedEvent<T : IncomingPacket>(override val target: Session, var packet: T) : SessionEvent(), Cancellable {
override var cancelled = false
}

View file

@ -7,6 +7,6 @@ import space.blokk.net.packet.OutgoingPacket
/**
* Emitted when a packet is going to be sent.
*/
class PacketSendEvent(session: Session, var packet: OutgoingPacket) : SessionEvent(session), Cancellable {
class PacketSendEvent(override val target: Session, var packet: OutgoingPacket) : SessionEvent(), Cancellable {
override var cancelled = false
}

View file

@ -10,7 +10,7 @@ import space.blokk.world.VoxelLocation
*
* If the event is cancelled, the session is disconnected.
*/
class PlayerInitializationEvent(session: Session, val settings: Player.Settings) : SessionEvent(session), Cancellable {
class PlayerInitializationEvent(override val target: Session, val settings: Player.Settings) : SessionEvent(), Cancellable {
override var cancelled = false
var compassTarget: VoxelLocation = VoxelLocation(0, 0, 0)
}

View file

@ -10,8 +10,8 @@ import space.blokk.net.Session
* If the event is cancelled or [response] is null after all handlers ran, the session is disconnected.
*/
class ServerListInfoRequestEvent(
session: Session,
override val target: Session,
var response: ServerListInfo? = null
) : SessionEvent(session), Cancellable {
) : SessionEvent(), Cancellable {
override var cancelled = false
}

View file

@ -11,7 +11,7 @@ import space.blokk.world.WorldAndLocationWithRotation
*
* If the event is cancelled, the session is disconnected.
*/
class SessionAfterLoginEvent(session: Session) : SessionEvent(session), Cancellable {
class SessionAfterLoginEvent(override val target: Session) : SessionEvent(), Cancellable {
override var cancelled = false
/**

View file

@ -1,6 +1,7 @@
package space.blokk.net.event
import space.blokk.event.Event
import space.blokk.event.TargetedEvent
import space.blokk.net.Session
abstract class SessionEvent(val session: Session) : Event()
abstract class SessionEvent : TargetedEvent<Session>()

View file

@ -2,7 +2,6 @@ package space.blokk.player
import kotlinx.coroutines.CoroutineScope
import space.blokk.chat.TextComponent
import space.blokk.event.EventTarget
import space.blokk.net.Session
import space.blokk.player.event.PlayerEvent
import space.blokk.world.Chunk
@ -15,8 +14,8 @@ import kotlin.coroutines.CoroutineContext
/**
* A **real** player.
*/
interface Player : EventTarget<PlayerEvent>, CoroutineScope {
override val coroutineContext: CoroutineContext get() = session.coroutineContext
interface Player {
val coroutineContext: CoroutineContext get() = session.coroutineContext
/**
* The session of this player.

View file

@ -4,22 +4,19 @@ import space.blokk.Registry
import space.blokk.Scheduler
import space.blokk.command.Command
import space.blokk.event.EventBus
import space.blokk.event.EventTarget
import space.blokk.event.EventTargetGroup
import space.blokk.logging.Logger
import space.blokk.logging.LoggingOutputProvider
import space.blokk.net.Session
import space.blokk.player.Player
import space.blokk.plugin.PluginManager
import space.blokk.recipe.Recipe
import space.blokk.server.event.ServerEvent
import space.blokk.world.BiomeRegistry
import space.blokk.world.Dimension
import java.io.File
import kotlin.coroutines.CoroutineContext
interface Server : EventTarget<ServerEvent> {
override val eventBus: EventBus<ServerEvent>
interface Server {
val eventBus: EventBus
/**
* [CoroutineContext] confined to the server thread.
@ -31,12 +28,12 @@ interface Server : EventTarget<ServerEvent> {
/**
* All sessions connected to the server.
*/
val sessions: EventTargetGroup<Session>
val sessions: Collection<Session>
/**
* All players connected to the server.
*/
val players: EventTargetGroup<Player>
val players: Collection<Player>
val pluginManager: PluginManager
val serverDirectory: File

View file

@ -1,9 +1,7 @@
package space.blokk.world
import kotlinx.coroutines.*
import space.blokk.CoordinatePartOrder
import space.blokk.entity.Entity
import space.blokk.event.EventTargetGroup
import space.blokk.util.newSingleThreadDispatcher
import java.util.*
import kotlin.coroutines.CoroutineContext
@ -39,7 +37,7 @@ abstract class World(val uuid: UUID) {
/**
* All entities in this world. Use [spawnEntity] for spawning new ones.
*/
abstract val entities: EventTargetGroup.Mutable<Entity>
abstract val entities: Collection<Entity>
abstract fun getChunk(key: Chunk.Key): Chunk

View file

@ -1,7 +0,0 @@
package space.blokk
import space.blokk.server.Server
fun mockServerInstance() {
// TODO
}

View file

@ -1,211 +0,0 @@
package space.blokk.event
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test
import space.blokk.logging.Logger
import space.blokk.mockServerInstance
import strikt.api.expectThat
import strikt.api.expectThrows
import strikt.assertions.*
import kotlin.system.measureTimeMillis
abstract class TestEvent : Event()
private class FirstEvent : TestEvent()
private class SecondEvent : TestEvent()
class EventBusTest {
init {
mockServerInstance()
}
private val eventBus = EventBus(TestEvent::class, Logger("logger"), Dispatchers.Default)
@Test
fun `calls the handler exactly 1 time when the event is emitted 1 time`() {
var calledCount = 0
eventBus.register(object : Listener {
@EventHandler
fun onFirstEvent(event: FirstEvent) {
calledCount++
}
})
runBlocking { eventBus.emit(FirstEvent()) }
expectThat(calledCount).isEqualTo(1)
}
@Test
fun `calls the handler exactly 3 times when the event is emitted 3 times`() {
var calledCount = 0
eventBus.register(object : Listener {
@EventHandler
fun onFirstEvent(event: FirstEvent) {
calledCount++
}
})
runBlocking {
eventBus.emit(FirstEvent())
eventBus.emit(FirstEvent())
eventBus.emit(FirstEvent())
}
expectThat(calledCount).isEqualTo(3)
}
@Test
fun `calls no handlers if no event is emitted`() {
var onFirstEventCalled = false
var onSecondEventCalled = false
eventBus.register(object : Listener {
@EventHandler
fun onFirstEvent(event: FirstEvent) {
onFirstEventCalled = true
}
@EventHandler
fun onSecondEvent(event: SecondEvent) {
onSecondEventCalled = true
}
})
// No emit
expectThat(onFirstEventCalled).isFalse()
expectThat(onSecondEventCalled).isFalse()
}
@Test
fun `calls handlers for supertypes of the emitted event`() {
var onTestEventCalled = false
var onFirstEventCalled = false
eventBus.register(object : Listener {
@EventHandler
fun onTestEvent(event: TestEvent) {
onTestEventCalled = true
}
@EventHandler
fun onFirstEvent(event: FirstEvent) {
onFirstEventCalled = true
}
})
runBlocking { eventBus.emit(FirstEvent()) }
expectThat(onTestEventCalled).isTrue()
expectThat(onFirstEventCalled).isTrue()
}
@Test
fun `stops calling handlers after they were unregistered`() {
var called = false
val listener = eventBus.register(object : Listener {
@EventHandler
fun onFirstEvent(event: FirstEvent) {
called = true
}
})
eventBus.unregister(listener)
runBlocking { eventBus.emit(FirstEvent()) }
expectThat(called).isFalse()
}
@Test
fun `throws an error if an event handler function has an invalid signature`() {
expectThrows<EventBus.InvalidEventHandlerException> {
eventBus.register(object : Listener {
@EventHandler
fun onEvent(coolParam: String) {
}
})
}
expectThrows<EventBus.InvalidEventHandlerException> {
eventBus.register(object : Listener {
@EventHandler
fun onFirstEvent(coolParam: String, event: FirstEvent) {
}
})
}
expectThrows<EventBus.InvalidEventHandlerException> {
eventBus.register(object : Listener {
@EventHandler
fun onFirstEvent(event: FirstEvent, coolParam: String) {
}
})
}
}
@Test
fun `calls handlers in the right order, sorted by priority`() {
val order = mutableListOf<EventPriority>()
eventBus.register(object : Listener {
@EventHandler(EventPriority.HIGHEST)
fun onFirstEventHighest(event: FirstEvent) {
order.add(EventPriority.HIGHEST)
}
@EventHandler(EventPriority.LOW)
fun onFirstEventLow(event: FirstEvent) {
order.add(EventPriority.LOW)
}
@EventHandler(EventPriority.HIGHER)
fun onFirstEventHigher(event: FirstEvent) {
order.add(EventPriority.HIGHER)
}
@EventHandler(EventPriority.LOWEST)
fun onFirstEventLowest(event: FirstEvent) {
order.add(EventPriority.LOWEST)
}
@EventHandler(EventPriority.MONITOR)
fun onFirstEventMonitor(event: FirstEvent) {
order.add(EventPriority.MONITOR)
}
@EventHandler(EventPriority.LOWER)
fun onFirstEventLower(event: FirstEvent) {
order.add(EventPriority.LOWER)
}
@EventHandler(EventPriority.NORMAL)
fun onFirstEventNormal(event: FirstEvent) {
order.add(EventPriority.NORMAL)
}
@EventHandler(EventPriority.HIGH)
fun onFirstEventHigh(event: FirstEvent) {
order.add(EventPriority.HIGH)
}
})
runBlocking { eventBus.emit(FirstEvent()) }
expectThat(order).isSorted(Comparator.comparing<EventPriority, Int> { it.ordinal })
}
@Test
fun `suspends until all handlers ran`() {
val delay = 2000L
eventBus.register(object : Listener {
@EventHandler
suspend fun onFirstEvent(event: FirstEvent) {
// Simulate long running operation
delay(2000)
}
})
val duration = measureTimeMillis { runBlocking { eventBus.emit(FirstEvent()) } }
expectThat(duration).isGreaterThanOrEqualTo(delay)
}
}

View file

@ -9,8 +9,8 @@ import kotlinx.coroutines.asCoroutineDispatcher
import kotlinx.coroutines.runBlocking
import space.blokk.command.Command
import space.blokk.config.BlokkConfig
import space.blokk.event.BlokkEventBus
import space.blokk.event.EventBus
import space.blokk.event.EventTargetGroup
import space.blokk.logging.BlokkLoggingOutputProvider
import space.blokk.logging.Logger
import space.blokk.net.BlokkSocketServer
@ -44,10 +44,10 @@ class BlokkServer internal constructor() : Server {
override val coroutineContext: CoroutineContext =
CoroutineName("Server") + Executors.newSingleThreadExecutor().asCoroutineDispatcher() + SupervisorJob()
override val eventBus = EventBus(ServerEvent::class, logger, coroutineContext)
override val eventBus = BlokkEventBus(this)
override val sessions by socketServer::sessions
override val players = EventTargetGroup.Mutable<Player>(true)
override val players get() = sessions.mapNotNull { it.player }
override val pluginManager = BlokkPluginManager(this)
override val serverDirectory: File =

View file

@ -0,0 +1,53 @@
package space.blokk.event
import kotlinx.coroutines.suspendCancellableCoroutine
import space.blokk.BlokkServer
import space.blokk.logging.Logger
import space.blokk.plugin.Plugin
import space.blokk.util.pluralizeWithCount
import java.util.concurrent.ConcurrentSkipListSet
import kotlin.coroutines.resume
import kotlin.reflect.KClass
import kotlin.system.measureTimeMillis
class BlokkEventBus(private val server: BlokkServer) : EventBus() {
private val logger = Logger("EventBus", false)
/**
* Invokes all previously registered event handlers sorted by their priority.
*
* The coroutine context is inherited.
*
* @return [event]
*/
override suspend fun <T : Event> emit(event: T): T {
if (server.developmentMode) {
var count = 0
val time = measureTimeMillis {
for (handler in handlers) {
if (handler.eventType.isInstance(event)) {
@Suppress("UNCHECKED_CAST")
handler as EventHandler<T>
count++
handler.fn(event)
}
}
}
logger trace "Emitted ${event::class.java.simpleName} to " +
"${pluralizeWithCount("handler", count)}, took ${time}ms"
} else {
for (handler in handlers) {
if (handler.eventType.isInstance(event)) {
@Suppress("UNCHECKED_CAST")
handler as EventHandler<T>
handler.fn(event)
}
}
}
return event
}
}

View file

@ -10,13 +10,17 @@ import space.blokk.chat.TextComponent
import space.blokk.event.*
import space.blokk.logging.Logger
import space.blokk.net.Session.State
import space.blokk.net.event.ClientBrandReceivedEvent
import space.blokk.net.event.PacketReceivedEvent
import space.blokk.net.event.PacketSendEvent
import space.blokk.net.event.SessionEvent
import space.blokk.net.packet.IncomingPacket
import space.blokk.net.packet.OutgoingPacket
import space.blokk.net.packet.Packet
import space.blokk.net.packet.Protocol
import space.blokk.net.packet.handshaking.HandshakingProtocol
import space.blokk.net.packet.login.LoginProtocol
import space.blokk.net.packet.play.IncomingKeepAlivePacket
import space.blokk.net.packet.play.OutgoingKeepAlivePacket
import space.blokk.net.packet.play.OutgoingPluginMessagePacket
import space.blokk.net.packet.play.PlayProtocol
@ -29,7 +33,7 @@ import javax.crypto.SecretKey
import kotlin.coroutines.CoroutineContext
import kotlin.properties.Delegates
class BlokkSession(private val channel: Channel, val server: BlokkServer) : Session {
class BlokkSession(private val channel: Channel, val server: BlokkServer) : Session() {
override val address: InetAddress = (channel.remoteAddress() as InetSocketAddress).address
private val identifier = "BlokkSession(${address.hostAddress})"
@ -38,6 +42,8 @@ class BlokkSession(private val channel: Channel, val server: BlokkServer) : Sess
override val coroutineContext: CoroutineContext =
CoroutineName(identifier) + Dispatchers.Unconfined + SupervisorJob()
val scope = CoroutineScope(coroutineContext)
override var brand: String? = null
override var ping: Int = -1
@ -61,8 +67,6 @@ class BlokkSession(private val channel: Channel, val server: BlokkServer) : Sess
is State.Disconnected -> null
}
override val eventBus = EventBus(SessionEvent::class, logger, coroutineContext)
private var disconnectReason: String? = null
override suspend fun disconnect(reason: TextComponent, loggableReason: String) {
@ -95,7 +99,7 @@ class BlokkSession(private val channel: Channel, val server: BlokkServer) : Sess
fun scheduleKeepAlivePacket() {
val timeSinceLastPacket = (System.currentTimeMillis() - lastKeepAlivePacketTimestamp).toInt()
launch {
scope.launch {
delay(KEEP_ALIVE_PACKET_INTERVAL.toLong() - timeSinceLastPacket)
lastKeepAlivePacketTimestamp = System.currentTimeMillis()
@ -114,18 +118,9 @@ class BlokkSession(private val channel: Channel, val server: BlokkServer) : Sess
val joinProcedure = LoginAndJoinProcedure(this)
init {
eventBus.register(object : Listener {
@EventHandler(priority = EventPriority.INTERNAL)
suspend fun onSessionPacketReceived(event: PacketReceivedEvent<*>) {
SessionPacketReceivedEventHandler.handle(event.session as BlokkSession, event.packet)
}
})
}
fun onConnect() = launch {
fun onConnect() = scope.launch {
logger trace "Connected"
if (server.eventBus.emitAsync(SessionInitializedEvent(this@BlokkSession)).isCancelled) channel.close()
if (server.eventBus.emit(SessionInitializedEvent(this@BlokkSession)).cancelled) channel.close()
else server.sessions.add(this@BlokkSession)
}
@ -139,12 +134,22 @@ class BlokkSession(private val channel: Channel, val server: BlokkServer) : Sess
else message
}
cancel(DisconnectedCancellationException(reason))
coroutineContext.cancel(DisconnectedCancellationException(reason))
state = State.Disconnected(reason)
server.sessions.remove(this)
}
fun onPacketReceived(packet: IncomingPacket) {
logger.trace { "Packet received: $packet" }
scope.launch {
server.eventBus.emit(PacketReceivedEvent(this@BlokkSession, packet)).ifNotCancelled {
SessionPacketReceivedEventHandler.handle(this@BlokkSession, packet)
}
}
}
fun failBecauseOfClient(message: String) {
val messageGetter = { "A connection error caused by the client occurred: $message" }
if (server.config.silentNonServerErrors) logger debug messageGetter else logger error messageGetter
@ -169,7 +174,7 @@ class BlokkSession(private val channel: Channel, val server: BlokkServer) : Sess
if (state is State.Disconnected) throw IllegalStateException("The session is not active anymore")
logger.trace { "Sending packet: $packet" }
eventBus.emit(PacketSendEvent(this@BlokkSession, packet)).ifNotCancelled {
server.eventBus.emit(PacketSendEvent(this@BlokkSession, packet)).ifNotCancelled {
try {
channel.writeAndFlush(OutgoingPacketMessage(this@BlokkSession, it.packet)).awaitSuspending()
} catch (t: Throwable) {

View file

@ -11,8 +11,8 @@ import io.netty.channel.kqueue.KQueueServerSocketChannel
import io.netty.channel.nio.NioEventLoopGroup
import io.netty.channel.socket.nio.NioServerSocketChannel
import space.blokk.BlokkServer
import space.blokk.event.EventTargetGroup
import space.blokk.logging.Logger
import java.util.concurrent.CopyOnWriteArraySet
class BlokkSocketServer(val server: BlokkServer) {
private val logger = Logger("BlokkSocketServer")
@ -32,7 +32,7 @@ class BlokkSocketServer(val server: BlokkServer) {
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(BlokkChannelInitializer(this))
internal val sessions = EventTargetGroup.Mutable<Session>(true)
internal val sessions: MutableSet<BlokkSession> = CopyOnWriteArraySet()
fun bind() {
bootstrap.bind(server.config.host, server.config.port)

View file

@ -80,7 +80,7 @@ class LoginAndJoinProcedure(val session: BlokkSession) {
private suspend fun afterLogin() {
val state: Session.State.LoginSucceeded = session.state.getOrFail()
val event = session.emit(SessionAfterLoginEvent(session))
val event = session.server.eventBus.emit(SessionAfterLoginEvent(session))
val initialWorldAndLocation = event.initialWorldAndLocation
when {
@ -145,7 +145,7 @@ class LoginAndJoinProcedure(val session: BlokkSession) {
val state: Session.State.WaitingForClientSettings = session.state.getOrFail()
val settings = packet.asPlayerSettings()
val event = session.emit(PlayerInitializationEvent(session, settings))
val event = session.server.eventBus.emit(PlayerInitializationEvent(session, settings))
event.ifCancelled {
session.disconnect(loggableReason = "PlayerInitializationEvent was cancelled")

View file

@ -8,7 +8,6 @@ import space.blokk.net.event.PacketReceivedEvent
class PacketMessageHandler(private val session: BlokkSession) :
SimpleChannelInboundHandler<IncomingPacketMessage<*>>() {
override fun channelRead0(ctx: ChannelHandlerContext, msg: IncomingPacketMessage<*>) {
session.logger.trace { "Packet received: ${msg.packet}" }
session.launch { session.eventBus.emit(PacketReceivedEvent(session, msg.packet)) }
session.onPacketReceived(msg.packet)
}
}

View file

@ -17,7 +17,7 @@ object ClientSettingsPacketHandler : PacketReceivedEventHandler<ClientSettingsPa
}
} else {
(player as BlokkPlayer).settings =
player.emit(PlayerSettingsUpdateEvent(player, packet.asPlayerSettings())).settings
session.server.eventBus.emit(PlayerSettingsUpdateEvent(player, packet.asPlayerSettings())).settings
}
}
}

View file

@ -19,7 +19,7 @@ object IncomingPluginMessagePacketHandler : PacketReceivedEventHandler<IncomingP
val brand = buffer.readString()
buffer.release()
session.emit(ClientBrandReceivedEvent(session, brand)).ifNotCancelled { session.brand = it.brand }
session.server.eventBus.emit(ClientBrandReceivedEvent(session, brand)).ifNotCancelled { session.brand = it.brand }
}
}
}

View file

@ -6,7 +6,7 @@ import space.blokk.net.event.ServerListInfoRequestEvent
object StatusProtocolHandler : ProtocolPacketReceivedEventHandler(mapOf(
RequestPacket::class to PacketReceivedEventHandler.of<RequestPacket> { session, _ ->
val event = session.eventBus.emit(ServerListInfoRequestEvent(session))
val event = session.server.eventBus.emit(ServerListInfoRequestEvent(session))
val response = event.response
when {

View file

@ -41,10 +41,6 @@ class BlokkPlayer(
this.selectedHotbarSlot = selectedHotbarSlot
}
private val identifier = "BlokkPlayer($name)"
private val logger = Logger(identifier)
override val eventBus = EventBus(PlayerEvent::class, logger, coroutineContext)
override var playerListName: TextComponent? = null
override var currentlyViewedChunks: List<Chunk> = emptyList()

View file

@ -2,8 +2,6 @@ package space.blokk.testplugin
import space.blokk.Blokk
import space.blokk.NamespacedID
import space.blokk.event.EventHandler
import space.blokk.event.Listener
import space.blokk.net.event.SessionAfterLoginEvent
import space.blokk.plugin.Plugin
import space.blokk.testplugin.anvil.AnvilWorld
@ -32,16 +30,13 @@ class TestPlugin: Plugin("Test", "1.0.0") {
world.getVoxel(VoxelLocation(-1, 2, -1)).block = CraftingTable()
Blokk.sessions.registerListener(object : Listener {
@EventHandler
fun onSessionAfterLogin(event: SessionAfterLoginEvent) {
event.canFly = true
event.flyingSpeed = 2.0f
event.initialWorldAndLocation = WorldAndLocationWithRotation(
world,
VoxelLocation(0, 2, 0).atTopCenter().withRotation(0f, 0f)
)
}
})
Blokk.eventBus.on<SessionAfterLoginEvent> { event ->
event.canFly = true
event.flyingSpeed = 2.0f
event.initialWorldAndLocation = WorldAndLocationWithRotation(
world,
VoxelLocation(0, 2, 0).atTopCenter().withRotation(0f, 0f)
)
}
}
}

View file

@ -4,7 +4,6 @@ import com.google.common.cache.CacheBuilder
import com.google.common.cache.CacheLoader
import com.google.common.cache.LoadingCache
import space.blokk.entity.Entity
import space.blokk.event.EventTargetGroup
import space.blokk.world.Chunk
import space.blokk.world.Dimension
import space.blokk.world.World
@ -15,7 +14,7 @@ class AnvilWorld(
override val isFlat: Boolean
) : World(UUID.randomUUID()) {
override val loadedChunks: Map<Chunk.Key, Chunk> get() = chunks.asMap().filter { (_, chunk) -> chunk.loaded }
override val entities: EventTargetGroup.Mutable<Entity> = EventTargetGroup.Mutable(false)
override val entities = emptyList<Entity>()
private val chunks: LoadingCache<Chunk.Key, Chunk> = CacheBuilder.newBuilder()
.build(object : CacheLoader<Chunk.Key, Chunk>() {