新聞中心

EEPW首頁(yè) > 嵌入式系統(tǒng) > 設(shè)計(jì)應(yīng)用 > 如何構(gòu)建應(yīng)用程序引擎以及使用 Windows Azure Storage 實(shí)現(xiàn)異步消息傳送

如何構(gòu)建應(yīng)用程序引擎以及使用 Windows Azure Storage 實(shí)現(xiàn)異步消息傳送

作者: 時(shí)間:2012-02-18 來(lái)源:網(wǎng)絡(luò) 收藏

 現(xiàn)在,我們需要一個(gè)簡(jiǎn)單的包裝來(lái)與我們的隊(duì)列交互。從本質(zhì)上說(shuō),我們需要能夠?qū)?a class="contentlabel" href="http://butianyuan.cn/news/listbylabel/label/消息">消息插入隊(duì)列,獲取任何掛起的并清除該隊(duì)列(請(qǐng)參見圖 3)。

本文引用地址:http://butianyuan.cn/article/149634.htm

  圖 3 用于與隊(duì)列交互的包裝

namespace HollywoodHackers..Queue 
{ 
  public class StdQueueT> : 
  Base where T : QueueMessageBase, new() 
  { 
    protected CloudQueue queue; 
    protected CloudQueueClient client; 
 
    public StdQueue(string queueName) 
    { 
      client = new CloudQueueClient 
      (Base.QueueBaseUri, StorageBase.Credentials); 
      queue = client.GetQueueReference(queueName); 
      queue.CreateIfNotExist(); 
    } 
    public void AddMessage(T message) 
    { 
      CloudQueueMessage msg = 
      new CloudQueueMessage(message.ToBinary()); 
      queue.AddMessage(msg); 
    } 
    public void DeleteMessage(CloudQueueMessage msg) 
    { 
      queue.DeleteMessage(msg); 
    } 
    public CloudQueueMessage GetMessage() 
    { 
      return queue.GetMessage(TimeSpan.FromSeconds(60)); 
    } 
  } 
  public class ToastQueue : StdQueueToastQueueMessage> 
  { 
    public ToastQueue() 
      : base(toasts) 
    { 
    } 
  } 
}

我們還需要為表存儲(chǔ)設(shè)置一個(gè)包裝,以便在用戶登錄到站點(diǎn)之前可以存儲(chǔ)用戶通知。可以使用 PartitionKey(行集合的標(biāo)識(shí)符)和 RowKey(可唯一標(biāo)識(shí)特定分區(qū)中的每個(gè)單獨(dú)行)組織表數(shù)據(jù)。選擇 PartitionKey 和 RowKey 使用的數(shù)據(jù)是在使用表存儲(chǔ)時(shí)所做的最重要的設(shè)計(jì)決策之一。

  這些特點(diǎn)允許跨存儲(chǔ)節(jié)點(diǎn)進(jìn)行負(fù)載平衡,并在中提供內(nèi)置的可伸縮性選項(xiàng)。不考慮數(shù)據(jù)的數(shù)據(jù)中心關(guān)聯(lián)性,使用同一分區(qū)鍵的表存儲(chǔ)中的行將保留在相同的物理數(shù)據(jù)存儲(chǔ)中。因?yàn)獒槍?duì)每個(gè)用戶存儲(chǔ)對(duì)應(yīng)的,所以分區(qū)鍵將是 UserName,而 RowKey 則成為標(biāo)識(shí)每行的 GUID(請(qǐng)參見圖 4)。

  圖 4 表存儲(chǔ)的包裝

namespace HollywoodHackers.Storage.Repositories 
{ 
  public class UserTextNotificationRepository : StorageBase 
  { 
    public const string EntitySetName = 
    UserTextNotifications; 
    CloudTableClient tableClient; 
    UserTextNotificationContext notificationContext; 
    public UserTextNotificationRepository() 
      : base() 
    { 
      tableClient = new CloudTableClient 
      (StorageBase.TableBaseUri, StorageBase.Credentials); 
      notificationContext = new UserTextNotificationContext 
      (StorageBase.TableBaseUri,StorageBase.Credentials); 
 
      tableClient.CreateTableIfNotExist(EntitySetName); 
    } 
    public UserTextNotification[] 
    GetNotificationsForUser(string userName) 
    { 
      var q = from notification in 
          notificationContext.UserNotifications 
          where notification.TargetUserName == 
          userName select notification; 
      return q.ToArray(); 
    } 
    public void AddNotification 
    (UserTextNotification notification) 
    { 
      notification.RowKey = Guid.NewGuid().ToString(); 
      notificationContext.AddObject 
      (EntitySetName, notification); 
      notificationContext.SaveChanges(); 
    } 
  } 
}

因?yàn)槲覀兊拇鎯?chǔ)機(jī)制已經(jīng)確定,所以我們需要一個(gè)工作者角色作為引擎;以便在我們的電子商務(wù)站點(diǎn)的后臺(tái)處理消息。為此,我們定義了一個(gè)從 Microsoft.ServiceHosting.ServiceRuntime.RoleEntryPoint 類繼承的類,并將其與云服務(wù)項(xiàng)目中的工作者角色關(guān)聯(lián)(請(qǐng)參見圖 5)。

  圖 5 作為引擎的工作者角色

public class WorkerRole : RoleEntryPoint 
{ 
  ShoppingCartQueue cartQueue; 
  ToastQueue toastQueue; 
  UserTextNotificationRepository toastRepository; 
 
  public override void Run() 
  { 
    // This is a sample worker implementation. 
    //Replace with your logic. 
    Trace.WriteLine(WorkerRole1 entry point called, 
    Information); 
    toastRepository = new UserTextNotificationRepository(); 
    InitQueue(); 
    while (true) 
    { 
      Thread.Sleep(10000); 
      Trace.WriteLine(Working, Information); 
 
      ProcessNewTextNotifications(); 
      ProcessShoppingCarts(); 
    } 
  } 
  private void InitQueue() 
  { 
    cartQueue = new ShoppingCartQueue(); 
    toastQueue = new ToastQueue(); 
  } 
  private void ProcessNewTextNotifications() 
  { 
    CloudQueueMessage cqm = toastQueue.GetMessage(); 
    while (cqm != null) 
    { 
      ToastQueueMessage message = 
      QueueMessageBase.FromMessageToastQueueMessage>(cqm); 
 
      toastRepository.AddNotification(new 
      UserTextNotification() 
      { 
        MessageText = message.MessageText, 
        MessageDate = DateTime.Now, 
        TargetUserName = message.TargetUserName, 
        Title = message.Title 
      }); 
      toastQueue.DeleteMessage(cqm); 
      cqm = toastQueue.GetMessage(); 
    } 
  } 
  private void ProcessShoppingCarts() 
  { 
    // We will add this later in the article! 
  } 
  public override bool OnStart() 
  { 
    // Set the maximum number of concurrent connections 
    ServicePointManager.DefaultConnectionLimit = 12; 
 
    DiagnosticMonitor.Start(DiagnosticsConnectionString); 
    // For information on handling configuration changes 
    // see the MSDN topic at 
    //http://go.microsoft.com/fwlink/?LinkId=166357. 
    RoleEnvironment.Changing += RoleEnvironmentChanging; 
    return base.OnStart(); 
  } 
  private void RoleEnvironmentChanging(object sender, RoleEnvironmentChangingEventArgs e) 
  { 
    // If a configuration setting is changing 
    if (e.Changes.Any(change => change is RoleEnvironmentConfigurationSettingChange)) 
    { 
      // Set e.Cancel to true to restart this role instance 
      e.Cancel = true; 
    } 
  } 
}

讓我們看一下工作者角色代碼。在初始化和設(shè)置所需的隊(duì)列和表存儲(chǔ)之后,此代碼將進(jìn)入一個(gè)循環(huán)。每 10 秒鐘,它就會(huì)處理一次隊(duì)列中的消息。每次我們通過(guò)處理循環(huán)時(shí)都將獲取隊(duì)列中的消息,直到最終返回 null,這表示隊(duì)列為空。

  您從隊(duì)列中看到的消息永遠(yuǎn)不會(huì)超過(guò) 20 個(gè),如果不信,您可以反復(fù)嘗試來(lái)驗(yàn)證一下。對(duì)隊(duì)列進(jìn)行的任何處理都有時(shí)間限制,必須在該時(shí)間范圍內(nèi)對(duì)每個(gè)隊(duì)列消息執(zhí)行有意義的操作,否則隊(duì)列消息將被視為超時(shí),并在隊(duì)列中顯示備份,以便可以由其他工作者來(lái)處理此消息。每個(gè)消息都會(huì)作為用戶通知添加到表存儲(chǔ)中。關(guān)于工作者角色需要記住的重要一點(diǎn)是:一旦入口點(diǎn)方法完成,工作者角色也就結(jié)束了。這就是您需要在一個(gè)循環(huán)中保持邏輯運(yùn)行的原因。

  從客戶端的角度來(lái)說(shuō),我們需要能夠以 JSON 形式返回消息,以便 jQuery 可以輪詢并顯示新的用戶通知。為此,我們會(huì)將一些代碼添加到消息控制器中,以便可以訪問(wèn)這些通知(請(qǐng)參見圖 6)。

  圖 6 以 JSON 形式返回消息

public JsonResult GetMessages() 
{ 
   if (User.Identity.IsAuthenticated) 
   { 
UserTextNotification[] userToasts = 
    toastRepository.GetNotifications(User.Identity.Name); 
object[] data = 
(from UserTextNotification toast in userToasts 
      select new { title = toast.Title ?? Notification, 
 text = toast.MessageText }).ToArray(); 
      return Json(data, JsonRequestBehavior.AllowGet); 
   } 
   else 
     return Json(null); 
}

 在 Visual Studio 2010 beta 2 下的 ASP.NET MVC 2(我們用于撰寫本文的環(huán)境)中,如果沒(méi)有 JsonRequestBehavior.AllowGet 選項(xiàng),您無(wú)法將 JSON 數(shù)據(jù)返回到 jQuery 或其他客戶端。在 ASP.NET MVC 1 中,不需要此選項(xiàng)?,F(xiàn)在,我們可以編寫 JavaScript,它每 15 秒將調(diào)用一次 GetMessages 方法并將以 toast 形式消息顯示通知(請(qǐng)參見圖 7)。



評(píng)論


相關(guān)推薦

技術(shù)專區(qū)

關(guān)閉