可以毫不夸张的说:这门课程是初中级Python开发人员向高级进阶的必学课程 许多Pythoner喜欢追求新的框架,但却不重视Python本身基础知识的学习, 他们不知道的是,语言本身的进阶优先于框架,大公司更注重语言本身的功底。万丈高楼平地起,学透了Python高级基础知识再学习其它框架,才会事半功倍,才会更好的理解和使用这些框架
适合人群及技术储备要求
本课程可作为慕课网七月老师《全网最热Python3入门+进阶》课程的进阶课程,如果你学习了七月老师的课程,再学习本门
你将能完成Python3入门,进阶,深入的全过程,可以把Python3所有知识点都学会、学透,为以后的用Python3及Python框架进行实际开发
打下无比坚实的基础,掌握这些知识,你必将所向披靡,无往不利
技术储备要求:
具备一定的自学能力,掌握Python基础(或已学习过《全网最热Python3入门+进阶》)
rocketMQ简單示例
前言
在微效勞遍地開花的今天,音訊隊列的應用特別普遍,但在此之前,我抵消息隊列的認知僅僅停留在是什麼和能幹什麼的認知,沒有運用過任何一款音訊隊列,對它的實践應用也沒有任何認知,但是從如今市場上的技術狀況來說,音訊隊列曾經是一個web後端開發必需控製的中心組件之一,所以我就應用閑暇時間來理解下,今天我們分享的是rocketMQ,同樣也是阿里巴巴開源的一個組件,2016年阿里巴巴把它捐赠給了Apache開源基金會,目前是該基金會的頂級項目之一。
正文
在開端正文之前,我們先來看下音訊隊列的一些學問。
什麼是音訊隊列
音訊隊列简單來講,它就相似於寄存音訊的容器,音訊消费者將音訊放入音訊隊列中,消费者從音訊隊列中拿出音訊停止消费(比方寫订單)。音訊隊列是散佈式係統中重要的組件,運用音訊隊列主要是爲了經過異步處置進步係統性能和削峰、降低係統耦合性。目前運用較多的音訊隊列有ActiveMQ,RabbitMQ,Kafka,RocketMQ。
爲什麼需求音訊隊列
削峰填谷,降低係統瞬時峰值並發量
隨著互聯網業務需求的不時開展,傳統的架構形式曾經無法滿足需求,特別是在電商業務大開展的當下更是如此。爲了應對這種百萬並發、千萬並發以至億級數量的並發,這種並發形式的特性是並發量集中呈現在某一時間段,比方雙十一、雙十二或者促銷、打摺日,而其他時間的並發量是峰值並發量的非常之一,以至百分之一,假如依照峰值的並發量來設計係統架構,不只初投資大消耗鉅資,而且在日常運轉維護也很燒錢。爲了均衡這兩種狀況下的並發量,同時降低係統建立本錢,音訊隊列這樣的係統組件應運而生。
简單來說,音訊隊列就是一種均衡係統並發量的係統組件,主要的作用就是削峰填谷。我們假定這張圖中红色表示沒有參加音訊隊列組件的係統並發時序圖(這里的圖是隨意畫的,只是爲了阐明問題),蓝色表示架構中參加了音訊隊列組件:
![]()
比照之後我們會發現,我們發現引入音訊隊列組件之後,降低了订單係統的峰值並發量,就相當於我們將一局部峰值懇求的業務處置,放在係統壓力小時去處置,當然實践過程中订單係統不斷在雜亂無章地處置音訊隊列中的订單信息,直到一切订單處置完成。當然,它主要是針對一些實時性請求不強但並發量大的業務,比方購票、搶購下單等允許稍後推送處置結果的業務。
降低係統之間的耦合性,進步用戶體驗
我們來想象這樣一個應用場景,我們要做一個購票係統,購票勝利後要短信告知用戶購票結果,假如採用串行方式(也就是同步伐用),我們的調用方式是這樣的:用戶提交購票订單後,订單係統受理購票订單後,調用短信係統發送購票結果。這里我們放一個圖,大家就更分明了:
![]()
串行架構下係統的響應時間是150ms + 150ms,需求300ms,從業務流程上來說,订單受理勝利後發送購票結果,這沒有什麼問題,但是假如短信係統在某個時間段内係統宕機,短信效勞不可用,購票短信無法發送,最終招致的結果是用戶是無法下單的,不只影響用戶體驗,也影響公司的業務受益。
而且再深化剖析下這個業務流程,妳會發現短信能否發送與用戶提交订單的操作是沒有關係的,用戶需求晓得的只是它的订單能否處置勝利,至於發送購票成結果的短信,對整個業務而言並發中心業務,就算短信發送失敗,也不應該影響購票業務。
所以,我們更合理的業務處置方式是,订單係統受理勝利後,直接將受理結果返回給用戶,至於订單的處置結果,能夠等整個業務完成後生成,然後订單係統向短信係統發送订單處置結果的音訊,短信係統收到音訊後給給用戶發購票結果,這里我們寄存音訊的容器就是音訊隊列,這時分我們的係統機構是這樣的:
![]()
上面這種架構中,我們發送短信的業務是異步的,這樣不只能夠減少用戶等候的時間,進步效勞的響應效率,假如省去短信係統的處置時間,那麼最終業務的響應時間就缩減爲150ms,而且這種架構下係統的耦合性更低,比方假如將來業務需求發送變化,不只需求給用戶發送短信,還要將結果推送到微信公眾號,業務耗時100ms,以至還需求將結果以郵件的方式發送到用戶,業務耗時150ms,假如還是第一種架構形式,那用戶要等候的響應時長是150ms + 150ms + 100ms + 150ms,但對第二種架構,用戶的響應時間一直是150ms:
![]()
RocketMQ入門
rocketMQ是什麼
下载
目前最新的版本是4.8,下面傳送門:
https://www.apache.org/dyn/closer.cgi?path=rocketmq/4.8.0/rocketmq-all-4.8.0-source-release.zip
![]()
點擊上圖的地址,红框内的地址是引薦的國内的镜像地址,下载比擬快
裝置
解壓下载的zip文件
windows
直接緊缩软件解壓即可
linux
unzip rocketmq-all-4.8.0-source-release.zip
cd rocketmq-all-4.8.0/
mvn -Prelease-all -DskipTests clean install -U
cd distribution/target/rocketmq-4.8.0/rocketmq-4.8.0
設置環境變量
這里windows需求設置,Linux並不需求,當然我也沒有在Linux環境下測試,有興味的小同伴本人去實驗。在windows添加如下環境變量:
ROCKETMQ_HOME="D:\workspace\tools\rocketmq-all-4.8.0-bin-release"
NAMESRV_ADDR="localhost:9876"
![]()
![image-20210310230651594]()
或者在啟動前的powershell窗口里設置,這里是暫時,每次都要設置,嫌费事的直接設置永世的:
$Env:ROCKETMQ_HOME="D:\workspace\tools\rocketmq-all-4.8.0-bin-release"
$Env:NAMESRV_ADDR="localhost:9876"
啟動
啟動Name效勞
linux
nohup sh bin/mqnamesrv &
tail -f ~/logs/rocketmqlogs/namesrv.log
The Name Server boot success...
windows
翻開powershell,假如沒有設置環境環境變量需求先執行下面的操作,停止環境變量設置
$Env:ROCKETMQ_HOME="D:\workspace\tools\rocketmq-all-4.8.0-bin-release"
$Env:NAMESRV_ADDR="localhost:9876"
然後進入rocketMQ裝置目錄,執行如下操作
cd D:\workspace\tools\rocketmq-all-4.8.0-bin-release
.\bin\mqnamesrv.cmd
![]()
![]()
啟動代理效勞
Linux
nohup sh bin/mqbroker -n localhost:9876 &
tail -f ~/logs/rocketmqlogs/broker.log
The broker[%s, 172.30.30.233:10911] boot success...
windows
和上面啟動Name效勞一樣,沒設置環境需求先執行前面兩行環境變量設置的操作
$Env:ROCKETMQ_HOME="D:\workspace\tools\rocketmq-all-4.8.0-bin-release"
$Env:NAMESRV_ADDR="localhost:9876"
然後執行啟動操作
cd D:\workspace\tools\rocketmq-all-4.8.0-bin-release
.\bin\mqbroker.cmd -n localhost:9876 autoCreateTopicEnable=true
操作之前,一定要進入rocketMQ裝置目錄,否則報答如下红色錯誤
![]()
接纳&發送音訊
發送音訊
Linux
export NAMESRV_ADDR=localhost:9876
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Producer
SendResult [sendStatus=SEND_OK, msgId= ...
windows
同樣的,沒設置環境變量的記得先設置,嫌费事就直接設置永世的環境變量,參照設置環境變量
cd D:\workspace\tools\rocketmq-all-4.8.0-bin-release
.\bin\tools.cmd org.apache.rocketmq.example.quickstart.Producer
執行命令後,會看到我們向音訊隊列中發送了很多音訊
![]()
接纳音訊
Linux
sh bin/tools.sh org.apache.rocketmq.example.quickstart.Consumer
ConsumeMessageThread_%d Receive New Messages: [MessageExt...
windows
同樣的,沒設置環境變量的記得先設置
cd D:\workspace\tools\rocketmq-all-4.8.0-bin-release
.\bin\tools.cmd org.apache.rocketmq.example.quickstart.Consumer
執行上面命令後,能夠看到控製台接纳到方才發送的音訊
![]()
Java简單demo
這里的demo在官網都能夠看到,也都很简單,需求補充阐明的,我會停下來解释。開端項目之前,先引入如下依賴:
org.apache.rocketmqrocketmq-client4.3.0
發送同步音訊
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
- @program: rocketmq-demo
- @description: 發送同步音訊
- @author: syske
- @create: 2021-03-09 20:24
/
public class SyncProducer {
public static void main(String[] args) throws Exception {
// 實例化音訊消费者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 設置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 啟動Producer實例
producer.start();
for (int i = 0; i < 100; i++) {
// 創立音訊,並指定Topic,Tag和音訊體
Message msg = new Message("TopicTest" / Topic /,
"TagA" / Tag /,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) / Message body */
);
// 發送音訊到一個Broker
SendResult sendResult = producer.send(msg);
// 經過sendResult返回音訊能否勝利送達
System.out.printf("%s%n", sendResult);
}
// 假如不再發送音訊,關閉Producer實例。
producer.shutdown();
}
}
發送異步音訊
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.CountDownLatch2;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
import java.util.concurrent.TimeUnit;
/** - @program: rocketmq-demo
- @description: 異步音訊消费者
- @author: syske
- @create: 2021-03-09 20:28
*/
public class AsyncProducer {
public static void main(String[] args) throws Exception {
// 實例化音訊消费者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 設置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 啟動Producer實例
producer.start();
producer.setRetryTimesWhenSendAsyncFailed(0);
int messageCount = 100;
// 依據音訊數量實例化倒計時計算器
final CountDownLatch2 countDownLatch = new CountDownLatch2(messageCount);
for (int i = 0; i < messageCount; i++) {
final int index = i;
// 創立音訊,並指定Topic,Tag和音訊體
Message msg = new Message("TopicTest",
"TagA",
"OrderID188",
"Hello world".getBytes(RemotingHelper.DEFAULT_CHARSET));
// SendCallback接纳異步返回結果的回調
producer.send(msg, new SendCallback() {@Override
br/>@Override
System.out.printf("%-10d OK %s %n", index,sendResult.getMsgId());
}
@Override
br/>sendResult.getMsgId());
}
@Override
System.out.printf("%-10d Exception %s %n", index, e);
e.printStackTrace();
}
});
}
// 等候5s
countDownLatch.await(5, TimeUnit.SECONDS);
// 假如不再發送音訊,關閉Producer實例。
producer.shutdown();
}
}
發送單向音訊
單向音訊就是沒有返回值的音訊
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;
/**
- @program: rocketmq-demo
- @description: 單向音訊消费者
- @author: syske
- @create: 2021-03-09 20:30
/
public class OnewayProducer {
public static void main(String[] args) throws Exception{
// 實例化音訊消费者Producer
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
// 設置NameServer的地址
producer.setNamesrvAddr("localhost:9876");
// 啟動Producer實例
producer.start();
for (int i = 0; i < 100; i++) {
// 創立音訊,並指定Topic,Tag和音訊體
Message msg = new Message("TopicTest" / Topic /,
"TagA" / Tag /,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) / Message body */
);
// 發送單向音訊,沒有任何返回結果
producer.sendOneway(msg);
}
// 假如不再發送音訊,關閉Producer實例。
producer.shutdown();
}
}
音訊消费者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/** - @program: rocketmq-demo
- @description: 音訊消费者
- @author: syske
- @create: 2021-03-09 20:26
/
public class Consumer {
public static void main(String[] args) throws InterruptedException, MQClientException {
// 實例化消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name");
// 設置NameServer的地址
consumer.setNamesrvAddr("localhost:9876");
// 订阅一個或者多個Topic,以及Tag來過濾需求消费的音訊
consumer.subscribe("TopicTest", "");
// 注册回調完成類來處置從broker拉取回來的音訊
consumer.registerMessageListener(new MessageListenerConcurrently() {@Override
br/>@Override
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
// 標誌該音訊曾經被勝利消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 啟動消费者實例
consumer.start();
System.out.printf("Consumer Started.%n");
}
}
發送次第音訊
次第音訊简單來說,就是消费者的消费次第和消费者消费次第是分歧的,比方對下面代码中的創立订單,消费的時分肯定是先消费創立1,然後是創立2,再是創立3,這里的是辨別能否是同一類音訊,是經過Message的tag屬性的值來判別的。關於次第音訊藉用網上的一個圖來阐明吧:
![]()
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
- @program: rocketmq-demo
- @description: 次第音訊消费
- @author: syske
- @create: 2021-03-09 20:35
*/
public class Producer {
public static void main(String[] args) throws Exception {
DefaultMQProducer producer = new DefaultMQProducer("order_group_1");
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();
String[] tags = new String[]{"TagA", "TagC", "TagD"};
// 订單列表
List orderList = new Producer().buildOrders();
Date date = new Date();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
String dateStr = sdf.format(date);
for (int i = 0; i < 10; i++) {
// 加個時間前缀
String body = dateStr + " Hello RocketMQ " + orderList.get(i);
Message msg = new Message("TopicTest", tags[i % tags.length], "KEY" + i, body.getBytes());
SendResult sendResult = producer.send(msg, (mqs, msg1, arg) -> {
Long id = (Long) arg; //依據订單id選择發送queue
long index = id % mqs.size();
return mqs.get((int) index);
}, orderList.get(i).getOrderId());//订單id
System.out.println(String.format("SendResult status:%s, queueId:%d, body:%s",
sendResult.getSendStatus(),
sendResult.getMessageQueue().getQueueId(),
body));
}
producer.shutdown();
}
/**- 订單的步骤
*/
private static class OrderStep {
private long orderId;
private String desc;
public long getOrderId() {
return orderId;
}
public void setOrderId(long orderId) {
this.orderId = orderId;
}
public String getDesc() {
return desc;
}
public void setDesc(String desc) {
this.desc = desc;}
@Override
br/>}
@Override
return "OrderStep{" +
"orderId=" + orderId +
", desc='" + desc + '\'' +
'}';
}
}
/** - 生成模仿订單數據
*/
private List buildOrders() {
List orderList = new ArrayList();
OrderStep orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("創立1");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("創立2");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("付款1");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("創立3");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("付款2");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("付款3");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111065L);
orderDemo.setDesc("完成1");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("推送1");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103117235L);
orderDemo.setDesc("完成2");
orderList.add(orderDemo);
orderDemo = new OrderStep();
orderDemo.setOrderId(15103111039L);
orderDemo.setDesc("完成3");
orderList.add(orderDemo);
return orderList;
}
}
次第音訊消费者
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeOrderlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
import java.util.Random;
import java.util.concurrent.TimeUnit;
/**
- 订單的步骤
- @program: rocketmq-demo
- @description: 次第音訊消费者
- @author: syske
@create: 2021-03-09 20:37
*/
public class ConsumerInOrder {
public static void main(String[] args) throws Exception {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("order_group_3");
consumer.setNamesrvAddr("127.0.0.1:9876");
/**設置Consumer第一次啟動是從隊列頭部開端消费還是隊列尾部開端消费
- 假如非第一次啟動,那麼依照上次消费的位置繼續消费
*/
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
consumer.subscribe("TopicTest", "TagA || TagC || TagD");
consumer.registerMessageListener(new MessageListenerOrderly() {
Random random = new Random();@Override
br/>@Override
context.setAutoCommit(true);
for (MessageExt msg : msgs) {
// 能夠看到每個queue有獨一的consume線程來消费, 订單對每個queue(分區)有序
System.out.println("consumeThread=" + Thread.currentThread().getName() + "queueId=" + msg.getQueueId() + ", content:" + new String(msg.getBody()));
}
try {
//模仿業務逻輯處置中...
TimeUnit.SECONDS.sleep(random.nextInt(10));
} catch (Exception e) {
e.printStackTrace();
}
return ConsumeOrderlyStatus.SUCCESS;
}
});
consumer.start();
System.out.println("Consumer Started.");
}
}