Yarn

Recent twts in reply to #fuhaoaa

Hi, I am playing with making an event sourcing database. Its super alpha but I thought I would share since others are talking about databases and such.

It’s super basic. Using tidwall/wal as the disk backing. The first use case I am playing with is an implementation of msgbus. I can post events to it and read them back in reverse order.

Download

I plan to expand it to handle other event sourcing type things like aggregates and projections.

Find it here: sour-is/ev

@prologic@twtxt.net @movq@www.uninformativ.de @lyse@lyse.isobeef.org

​ Read More

I have updated my eventDB to have subscriptions! It now has websockets like msgbus. I have also added a in memory store that can be used along side the disk backed wal.

​ Read More

Progress! so i have moved into working on aggregates. Which are a grouping of events that replayed on an object set the current state of the object. I came up with this little bit of generic wonder.

type PA[T any] interface {
	event.Aggregate
	*T
}

// Create uses fn to create a new aggregate and store in db.
func Create[A any, T PA[A]](ctx context.Context, es *EventStore, streamID string, fn func(context.Context, T) error) (agg T, err error) {
	ctx, span := logz.Span(ctx)
	defer span.End()

	agg = new(A)
	agg.SetStreamID(streamID)

	if err = es.Load(ctx, agg); err != nil {
		return
	}

	if err = event.NotExists(agg); err != nil {
		return
	}

	if err = fn(ctx, agg); err != nil {
		return
	}

	var i uint64
	if i, err = es.Save(ctx, agg); err != nil {
		return
	}

	span.AddEvent(fmt.Sprint("wrote events = ", i))

	return
}

fig. 1

This lets me do something like this:

a, err := es.Create(ctx, r.es, streamID, func(ctx context.Context, agg *domain.SaltyUser) error {
		return agg.OnUserRegister(nick, key)
})

fig. 2

I can tell the function the type being modified and returned using the function argument that is passed in. pretty cray cray.

​ Read More

Participate

Login to join in on this yarn.