Customizing Dovetail Carrier – Creating and Publishing Messages
Previously we made a plan to go get Twitter direct messages and publish them into the Dovetail Carrier message bus. Thankfully the Twitter API is pretty simple but there are a few things to worry about.
- We need to listen to Twitter for direct messages.
- Transforming and publishing the direct message into the message bus.
- Host your Twitter polling and publishing service with Carrier.
First Things First
This is not a cookbook post. I don’t describe each step along the way that I took to create a Carrier Twitter extension. That said I hope to provide the Visual Studio solution and code for this example in a future version of Carrier.
A general house keeping item. To start I created a new project solution called Carrier.Extension.Twitter and added references to Carrier.Core and Dovetail.Commons.
Carrier Extension Attribute
For now we need a way to tell Carrier that your assembly should be treated as an extension assembly. For now the way to do this is to add an attribute to the AssemblyInfo.cs file of your extension.
[assembly: CarrierExtension]
Extension Deployment
Right now to deploy your extension you’ll need to drop your extension assembly and it’s dependencies into the Carrier application directory. We hope to improve on how Carrier locates an configures extensions down the road.
Using Tweet Sharp
The first real code we need is something that will go get new direct messages from Twitter. I really don’t want to tackle and maintain the code that is responsible for the nitty gritty details required to talk directly to Twitter. I looked around and found quite a few C# libraries and after asking around on Twitter I settled on TweetSharp.
TweetSharp works well. They have all access to Twitter wrapped in a Fluent DSL which I found a little tricky to get used to but using examples I was able to make it work for my simple use case. I created a class called TwitterStream to encapsulate all my access to Twitter. Here is the meat of how I am using TweetSharp:
public IEnumerable<TwitterDirectMessage> GetDirectMessages() { // Get the public timeline var twitter = AuthenticatedRequest().DirectMessages().Received(); AddSinceClause(twitter); var response = twitter.Request(); var directMessages = response.AsDirectMessages(); RememberMostRecentDirectMessageIdentifier(directMessages); CheckResponseForError(response); return directMessages; }
Avoid getting duplicate Tweets
Twitter’s API lets you optionally specify the last Tweet ID that you’ve received to avoid getting duplicate Tweets back in your request. I added support for this to TwitterStream via a little repository which is in charge of remembering the id of the last direct message we’ve received. I punted and created a simple in-memory version of the repository but it would be easy to add a bit of schema and create an twitter identifier repository which saves to the database.
private void AddSinceClause(ITwitterDirectMessagesReceived twitter) { var mostRecentTweetId = _identifierRepository.Get(); if (mostRecentTweetId.HasValue) twitter.Since(mostRecentTweetId.Value); } private void RememberMostRecentDirectMessageIdentifier(IEnumerable<TwitterDirectMessage>
directMessages) { if (directMessages != null && directMessages.Count() > 0) _identifierRepository.Save(directMessages.First().Id); }
Here is the gist of the full version of TwitterStream. Next we need to add some code that will poll Twitter for new direct messages.
Listening For Tweets
Twitter limits your API calls to 150 per hour. Which means that you can check every 30 seconds for new direct messages with out risk of exceeding your limit. It would be easy to create a Timer that fires every 30 seconds which might work fine but what if you are checking more often or what if Twitter takes a long time to get back to you? It is possible that your timer events can start to overlap and execute concurrently. This could cause you to get the same message twice.
I’ve had to write code to compensate for this a few times so I pulled the common code into an abstract class Listener<T> where the T is the type of the message being received. We will be moving this code somewhere Common. All that is left to do is derive a class from Listener<T> and implement the Poll method firing the OnMessageReceived event each time a direct message is received.
public class TwitterDirectMessageListener : Listener<TwitterDirectMessage> { private readonly ITwitterStream _twitterStream; public TwitterDirectMessageListener(ITwitterStream twitterStream, ILogger logger) : base(logger) { _twitterStream = twitterStream; } public override void Poll() { _logger.LogDebug("Getting direct messages."); var directMessages = _twitterStream.GetDirectMessages(); foreach (var directMessage in directMessages) { OnMessageReceived(directMessage); } _logger.LogDebug("Done getting direct messages."); } }
Once we get the direct message it will need to be published to the message bus.
Publishing Direct Messages to the Bus
The final leg of our journey involves transforming the direct message that we get from TweetSharp, which is a class called TwitterDirectMessage (the source). We could publish this as a message to the bus, but this is not a good idea. It is recommended to use a dedicated message type. We will be creating a message type called IncomingDirectMessage.
What Could Go Wrong If you Don’t Use a Dedicated Message Type?
The short answer is that any changes to the message type could break compatibility with anything in your message bus which is subscribed to that message.
Say we used TweetSharp’s TwitterDirectMessage in our service bus we would run into problems if the next version of TweetSharp changes the TwitterDirectMessage’s type signature. The service bus would only know how to de/serialize one version of the message. If an older message was laying around serialized against an old message format. The message would be unusable. The enterprise service bus would do it’s best but chances are it would fail.
Control Your Message Types
You want to have very good control over the message types used by your message bus to avoid compatibility problems between different versions of the components that consume messages. The general guidance handed down from those with more experience than myself is that anytime you change a message in use in production is that you should create a new message type.
One important thing to know is that strongly naming the assembly that contains your message type will cause the signature of the message to change whenever the version number of the assembly changes. So it is likely a good idea to keep your messages in a separate assembly whose version number does not change.
Assembling An IncomingDirectMessage
To control contents of the message which we are publishing to our message bus I created a type called IncomingDirectMessage:
public class IncomingDirectMessage { public long Id { get; set; } public DateTime CreatedDate { get; set; } public long SenderUserId { get; set; } public string SenderScreenName { get; set; } public string SenderName { get; set; } public string SenderProfileImageUrl { get; set; } public string Text { get; set; } public long RecipientUserId { get; set; } public string RecipientScreenName { get; set; } public string RecipientName { get; set; } public string RecipientProfileImageUrl { get; set; } }
To transform TweetSharp’s TwitterDirectMessage into one of our IncomingDirectMessages I wrote a simple method whose only responsibility is this transformation.
public static IncomingDirectMessage AssembleIncomingDirectMessage(TwitterDirectMessage
directMessage) { return new IncomingDirectMessage { Id = directMessage.Id, CreatedDate = directMessage.CreatedDate, SenderUserId = directMessage.SenderId, SenderScreenName = directMessage.SenderScreenName, SenderName = directMessage.Sender.Name, SenderProfileImageUrl = directMessage.Sender.ProfileBackgroundImageUrl, Text = directMessage.Text, RecipientScreenName = directMessage.RecipientScreenName, RecipientUserId = directMessage.RecipientId, RecipientProfileImageUrl = directMessage.Recipient.ProfileBackgroundImageUrl, RecipientName = directMessage.Recipient.Name }; }
Note: You could also use a tool like AutoMapper to do this.
Publishing The IncomingDirectMessage
Carrier uses a library called MassTransit do all the message bus heavy lifting so publishing a message to the bus is pretty simple.
public class DirectMessagePublisher : IPublisher<TwitterDirectMessage> { private readonly MessageBusSettings _messageBusSettings; private readonly IEndpointFactory _endpointFactory; private readonly ILogger _logger; public DirectMessagePublisher(MessageBusSettings messageBusSettings, IEndpointFactory endpointFactory, ILogger logger) { _messageBusSettings = messageBusSettings; _endpointFactory = endpointFactory; _logger = logger; } public void Publish(TwitterDirectMessage directMessage) { using (_logger.Push("DM {0}".ToFormat(directMessage.Id))) { var incomingDirectMessage = AssembleIncomingDirectMessage(directMessage); _logger.LogDebug("Publishing direct message from @{0} to @{1}",
incomingDirectMessage.SenderScreenName, incomingDirectMessage.RecipientScreenName); var queueName = _messageBusSettings.MessageQueue; var messageEndpoint = _endpointFactory.GetEndpoint(queueName); messageEndpoint.Send(incomingDirectMessage); _logger.LogDebug("Twitter direct message published."); } } //... }
We get the name of the queue the message bus is using from configuration via the MessageBusSettings object. The EndpointFactory is in-charge of serializing the message and sending it to the message queue being used to manage the message bus.
Putting It All Together
We’ve assembled all the pieces we need to poll, produce, and publish direct messages to the message bus. All we have left to do is compose everything together into a TwitterService which gets started up with Carrier.
The Twitter Service will do the following:
- Use the Listener to fire events when direct messages are received.
- The direct message event handler will publish the direct message as an IncomingDirectMessage to the message bus.
public class TwitterService : IMessagePublishingService<IncomingDirectMessage> { private readonly IListener<TwitterDirectMessage> _twitterDirectMessageListener; private readonly IPublisher<TwitterDirectMessage> _publisher; private readonly ILogger _logger; public TwitterService(IListener<TwitterDirectMessage> twitterDirectMessageListener,
IPublisher<TwitterDirectMessage> publisher, ILogger logger) { _twitterDirectMessageListener = twitterDirectMessageListener; _publisher = publisher; _logger = logger; } public void Start() { _logger.LogDebug("Starting Twitter Service"); _twitterDirectMessageListener.MessageReceived += TwitterListener_MessageReceived; _twitterDirectMessageListener.Start(); } private void TwitterListener_MessageReceived(object sender, MessageArg<TwitterDirectMessage> messageArg) { _logger.LogDebug("Publishing message from {0}.", messageArg.Message.SenderScreenName); _publisher.Publish(messageArg.Message); } public void Stop() { _logger.LogDebug("Stopping Twitter Service"); _twitterDirectMessageListener.Stop(); } }
Notice that TwitterService implements IMessagePublishingService<IncomingDirectMessage> which is essentially a marker interface declaring to Carrier that this service publishes Twitter direct messages.
Hosting Twitter Service
Now that we’ve created the TwitterService. Who is in charge of hosting it? You could write a windows service for each message producer you create but that would not scale well as you create more and more messages needing publishing. We wanted to make publishing messages easy so we let you leverage a windows service we know you’ll already have: Dovetail Carrier.
To host your service Carrier needs to a way of knowing which types of messages your service publishes. This is accomplished by implementing the IMessagePublishingService interface.
public interface IMessagePublishingService<MESSAGE> : ISimpleService { } public interface ISimpleService { void Start(); void Stop(); }
During start up Carrier will compile a list of the message types used by all of the message consumers present in the application. Next, it will then go through that list to see if there is a IMessagePublishingService of each message type present. When it finds one it will configure the runtime to start up that service running on its own thread within the process of the Dovetail Carrier windows service.
You can see that the TwitterService example above is a message publishing service for IncomingDirectMessages because it implements the IMessagePublishingService<IncomingDirectMessage> interface.
The Start and Stop methods present on the interface are required to allow the runtime control over the lifecycle of the service.
Note: We will talk more about creating message consumers in the next post.
Publishing Accomplished
As you can see. Not a lot of code went into creating the Twitter Service. We leveraged the handy TweetSharp library and the Listener<T> class for polling Twitter. It also doesn’t take too much code to transform direct messages and to publish them to the message bus. And we learned how Dovetail Carrier will take care of hosting the Twitter Service when we get done creating consumers for the messages we’re publishing.
I didn’t show you any pretty screen shots… Ok, I’ll show you one but it won’t be pretty.
Now that we can push messages into the message bus what are we going to do with them? Next time we’ll talk about creating message consumers.