snapuserd - исходный текст
#include "snapuserd.h"
#include <dirent.h>
#include <fcntl.h>
#include <linux/fs.h>
#include <unistd.h>
#include <algorithm>
#include <csignal>
#include <optional>
#include <set>
#include <android-base/file.h>
#include <android-base/logging.h>
#include <android-base/parseint.h>
#include <android-base/properties.h>
#include <android-base/strings.h>
#include <android-base/unique_fd.h>
#include <snapuserd/snapuserd_client.h>
namespace android {
namespace snapshot {
using namespace android;
using namespace android::dm;
using android::base::unique_fd;
#define SNAP_LOG(level) LOG(level) << misc_name_ << ": "
#define SNAP_PLOG(level) PLOG(level) << misc_name_ << ": "
Snapuserd::Snapuserd(const std::string& misc_name, const std::string& cow_device,
const std::string& backing_device) {
misc_name_ = misc_name;
cow_device_ = cow_device;
backing_store_device_ = backing_device;
control_device_ = "/dev/dm-user/" + misc_name;
}
bool Snapuserd::InitializeWorkers() {
for (int i = 0; i < NUM_THREADS_PER_PARTITION; i++) {
std::unique_ptr<WorkerThread> wt = std::make_unique<WorkerThread>(
cow_device_, backing_store_device_, control_device_, misc_name_, GetSharedPtr());
worker_threads_.push_back(std::move(wt));
}
read_ahead_thread_ = std::make_unique<ReadAheadThread>(cow_device_, backing_store_device_,
misc_name_, GetSharedPtr());
return true;
}
std::unique_ptr<CowReader> Snapuserd::CloneReaderForWorker() {
return reader_->CloneCowReader();
}
bool Snapuserd::CommitMerge(int num_merge_ops) {
struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
ch->num_merge_ops += num_merge_ops;
if (read_ahead_feature_ && read_ahead_ops_.size() > 0) {
struct BufferState* ra_state = GetBufferState();
ra_state->read_ahead_state = kCowReadAheadInProgress;
}
int ret = msync(mapped_addr_, BLOCK_SZ, MS_SYNC);
if (ret < 0) {
SNAP_PLOG(ERROR) << "msync header failed: " << ret;
return false;
}
merge_initiated_ = true;
return true;
}
void Snapuserd::PrepareReadAhead() {
if (!read_ahead_feature_) {
return;
}
struct BufferState* ra_state = GetBufferState();
// Check if the data has to be re-constructed from COW device
if (ra_state->read_ahead_state == kCowReadAheadDone) {
populate_data_from_cow_ = true;
} else {
populate_data_from_cow_ = false;
}
StartReadAhead();
}
bool Snapuserd::GetRABuffer(std::unique_lock<std::mutex>* lock, uint64_t block, void* buffer) {
if (!lock->owns_lock()) {
SNAP_LOG(ERROR) << "GetRABuffer - Lock not held";
return false;
}
std::unordered_map<uint64_t, void*>::iterator it = read_ahead_buffer_map_.find(block);
// This will be true only for IO's generated as part of reading a root
// filesystem. IO's related to merge should always be in read-ahead cache.
if (it == read_ahead_buffer_map_.end()) {
return false;
}
// Theoretically, we can send the data back from the read-ahead buffer
// all the way to the kernel without memcpy. However, if the IO is
// un-aligned, the wrapper function will need to touch the read-ahead
// buffers and transitions will be bit more complicated.
memcpy(buffer, it->second, BLOCK_SZ);
return true;
}
//
State transition functions for read-ahead operations
bool Snapuserd::GetReadAheadPopulatedBuffer(uint64_t block, void* buffer) {
if (!read_ahead_feature_) {
return false;
}
{
std::unique_lock<std::mutex> lock(lock_);
if (io_state_ == READ_AHEAD_IO_TRANSITION::READ_AHEAD_FAILURE) {
return false;
}
if (io_state_ == READ_AHEAD_IO_TRANSITION::IO_IN_PROGRESS) {
return GetRABuffer(&lock, block, buffer);
}
}
{
// Read-ahead thread IO is in-progress. Wait for it to complete
std::unique_lock<std::mutex> lock(lock_);
while (!(io_state_ == READ_AHEAD_IO_TRANSITION::READ_AHEAD_FAILURE ||
io_state_ == READ_AHEAD_IO_TRANSITION::IO_IN_PROGRESS)) {
cv.wait(lock);
}
return GetRABuffer(&lock, block, buffer);
}
}
// This is invoked by read-ahead thread waiting for merge IO's
// to complete
bool Snapuserd::WaitForMergeToComplete() {
{
std::unique_lock<std::mutex> lock(lock_);
while (!(io_state_ == READ_AHEAD_IO_TRANSITION::READ_AHEAD_BEGIN ||
io_state_ == READ_AHEAD_IO_TRANSITION::IO_TERMINATED)) {
cv.wait(lock);
}
if (io_state_ == READ_AHEAD_IO_TRANSITION::IO_TERMINATED) {
return false;
}
io_state_ = READ_AHEAD_IO_TRANSITION::READ_AHEAD_IN_PROGRESS;
return true;
}
}
// This is invoked during the launch of worker threads. We wait
// for read-ahead thread to by fully up before worker threads
// are launched; else we will have a race between worker threads
// and read-ahead thread specifically during re-construction.
bool Snapuserd::WaitForReadAheadToStart() {
{
std::unique_lock<std::mutex> lock(lock_);
while (!(io_state_ == READ_AHEAD_IO_TRANSITION::IO_IN_PROGRESS ||
io_state_ == READ_AHEAD_IO_TRANSITION::READ_AHEAD_FAILURE)) {
cv.wait(lock);
}
if (io_state_ == READ_AHEAD_IO_TRANSITION::READ_AHEAD_FAILURE) {
return false;
}
return true;
}
}
// Invoked by worker threads when a sequence of merge operation
// is complete notifying read-ahead thread to make forward
// progress.
void Snapuserd::StartReadAhead() {
{
std::lock_guard<std::mutex> lock(lock_);
io_state_ = READ_AHEAD_IO_TRANSITION::READ_AHEAD_BEGIN;
}
cv.notify_one();
}
void Snapuserd::MergeCompleted() {
{
std::lock_guard<std::mutex> lock(lock_);
io_state_ = READ_AHEAD_IO_TRANSITION::IO_TERMINATED;
}
cv.notify_one();
}
bool Snapuserd::ReadAheadIOCompleted(bool sync) {
if (sync) {
// Flush the entire buffer region
int ret = msync(mapped_addr_, total_mapped_addr_length_, MS_SYNC);
if (ret < 0) {
PLOG(ERROR) << "msync failed after ReadAheadIOCompleted: " << ret;
return false;
}
// Metadata and data are synced. Now, update the state.
// We need to update the state after flushing data; if there is a crash
// when read-ahead IO is in progress, the state of data in the COW file
// is unknown. kCowReadAheadDone acts as a checkpoint wherein the data
// in the scratch space is good and during next reboot, read-ahead thread
// can safely re-construct the data.
struct BufferState* ra_state = GetBufferState();
ra_state->read_ahead_state = kCowReadAheadDone;
ret = msync(mapped_addr_, BLOCK_SZ, MS_SYNC);
if (ret < 0) {
PLOG(ERROR) << "msync failed to flush Readahead completion state...";
return false;
}
}
// Notify the worker threads
{
std::lock_guard<std::mutex> lock(lock_);
io_state_ = READ_AHEAD_IO_TRANSITION::IO_IN_PROGRESS;
}
cv.notify_all();
return true;
}
void Snapuserd::ReadAheadIOFailed() {
{
std::lock_guard<std::mutex> lock(lock_);
io_state_ = READ_AHEAD_IO_TRANSITION::READ_AHEAD_FAILURE;
}
cv.notify_all();
}
//
End of state transition functions
bool Snapuserd::IsChunkIdMetadata(chunk_t chunk) {
uint32_t stride = exceptions_per_area_ + 1;
lldiv_t divresult = lldiv(chunk, stride);
return (divresult.rem == NUM_SNAPSHOT_HDR_CHUNKS);
}
// Find the next free chunk-id to be assigned. Check if the next free
// chunk-id represents a metadata page. If so, skip it.
chunk_t Snapuserd::GetNextAllocatableChunkId(chunk_t chunk) {
chunk_t next_chunk = chunk + 1;
if (IsChunkIdMetadata(next_chunk)) {
next_chunk += 1;
}
return next_chunk;
}
void Snapuserd::CheckMergeCompletionStatus() {
if (!merge_initiated_) {
SNAP_LOG(INFO) << "Merge was not initiated. Total-data-ops: "
<< reader_->get_num_total_data_ops();
return;
}
struct CowHeader* ch = reinterpret_cast<struct CowHeader*>(mapped_addr_);
SNAP_LOG(INFO) << "Merge-status: Total-Merged-ops: " << ch->num_merge_ops
<< " Total-data-ops: " << reader_->get_num_total_data_ops();
}
/*
* Read the metadata from COW device and
* construct the metadata as required by the kernel.
*
* Please see design on kernel COW format
*
* 1: Read the metadata from internal COW device
* 2: There are 3 COW operations:
* a: Replace op
* b: Copy op
* c: Zero op
* 3: For each of the 3 operations, op->new_block
* represents the block number in the base device
* for which one of the 3 operations have to be applied.
* This represents the old_chunk in the kernel COW format
* 4: We need to assign new_chunk for a corresponding old_chunk
* 5: The algorithm is similar to how kernel assigns chunk number
* while creating exceptions. However, there are few cases
* which needs to be addressed here:
* a: During merge process, kernel scans the metadata page
* from backwards when merge is initiated. Since, we need
* to make sure that the merge ordering follows our COW format,
* we read the COW operation from backwards and populate the
* metadata so that when kernel starts the merging from backwards,
* those ops correspond to the beginning of our COW format.
* b: Kernel can merge successive operations if the two chunk IDs
* are contiguous. This can be problematic when there is a crash
* during merge; specifically when the merge operation has dependency.
* These dependencies can only happen during copy operations.
*
* To avoid this problem, we make sure overlap copy operations
* are not batch merged.
* 6: Use a monotonically increasing chunk number to assign the
* new_chunk
* 7: Each chunk-id represents either
* a: Metadata page or
* b: Data page
* 8: Chunk-id representing a data page is stored in a map.
* 9: Chunk-id representing a metadata page is converted into a vector
* index. We store this in vector as kernel requests metadata during
* two stage:
* a: When initial dm-snapshot device is created, kernel requests
* all the metadata and stores it in its internal data-structures.
* b: During merge, kernel once again requests the same metadata
* once-again.
* In both these cases, a quick lookup based on chunk-id is done.
* 10: When chunk number is incremented, we need to make sure that
* if the chunk is representing a metadata page and skip.
* 11: Each 4k page will contain 256 disk exceptions. We call this
* exceptions_per_area_
* 12: Kernel will stop issuing metadata IO request when new-chunk ID is 0.
*/
bool Snapuserd::ReadMetadata() {
reader_ = std::make_unique<CowReader>();
CowOptions options;
bool metadata_found = false;
int replace_ops = 0, zero_ops = 0, copy_ops = 0;
SNAP_LOG(DEBUG) << "ReadMetadata: Parsing cow file";
if (!reader_->Parse(cow_fd_)) {
SNAP_LOG(ERROR) << "Failed to parse";
return false;
}
const auto& header = reader_->GetHeader();
if (!(header.block_size == BLOCK_SZ)) {
SNAP_LOG(ERROR) << "Invalid header block size found: " << header.block_size;
return false;
}
SNAP_LOG(DEBUG) << "Merge-ops: " << header.num_merge_ops;
if (!MmapMetadata()) {
SNAP_LOG(ERROR) << "mmap failed";
return false;
}
// Initialize the iterator for reading metadata
std::unique_ptr<ICowOpIter> cowop_rm_iter = reader_->GetRevMergeOpIter();
exceptions_per_area_ = (CHUNK_SIZE << SECTOR_SHIFT) / sizeof(struct disk_exception);
// Start from chunk number 2. Chunk 0 represents header and chunk 1
// represents first metadata page.
chunk_t data_chunk_id = NUM_SNAPSHOT_HDR_CHUNKS + 1;
size_t num_ops = 0;
loff_t offset = 0;
std::unique_ptr<uint8_t[]> de_ptr =
std::make_unique<uint8_t[]>(exceptions_per_area_ * sizeof(struct disk_exception));
// This memset is important. Kernel will stop issuing IO when new-chunk ID
// is 0. When Area is not filled completely with all 256 exceptions,
// this memset will ensure that metadata read is completed.
memset(de_ptr.get(), 0, (exceptions_per_area_ * sizeof(struct disk_exception)));
while (!cowop_rm_iter->AtEnd()) {
const CowOperation* cow_op = cowop_rm_iter->Get();
struct disk_exception* de =
reinterpret_cast<struct disk_exception*>((char*)de_ptr.get() + offset);
metadata_found = true;
// This loop will handle all the replace and zero ops.
// We will handle the copy ops later as it requires special
// handling of assigning chunk-id's. Furthermore, we make
// sure that replace/zero and copy ops are not batch merged; hence,
// the bump in the chunk_id before break of this loop
if (IsOrderedOp(*cow_op)) {
data_chunk_id = GetNextAllocatableChunkId(data_chunk_id);
break;
}
if (cow_op->type == kCowReplaceOp) {
replace_ops++;
} else if (cow_op->type == kCowZeroOp) {
zero_ops++;
}
// Construct the disk-exception
de->old_chunk = cow_op->new_block;
de->new_chunk = data_chunk_id;
// Store operation pointer.
chunk_vec_.push_back(std::make_pair(ChunkToSector(data_chunk_id), cow_op));
num_ops += 1;
offset += sizeof(struct disk_exception);
cowop_rm_iter->Next();
SNAP_LOG(DEBUG) << num_ops << ":"
<< " Old-chunk: " << de->old_chunk << " New-chunk: " << de->new_chunk;
if (num_ops == exceptions_per_area_) {
// Store it in vector at the right index. This maps the chunk-id to
// vector index.
vec_.push_back(std::move(de_ptr));
offset = 0;
num_ops = 0;
// Create buffer for next area
de_ptr = std::make_unique<uint8_t[]>(exceptions_per_area_ *
sizeof(struct disk_exception));
memset(de_ptr.get(), 0, (exceptions_per_area_ * sizeof(struct disk_exception)));
if (cowop_rm_iter->AtEnd()) {
vec_.push_back(std::move(de_ptr));
}
}
data_chunk_id = GetNextAllocatableChunkId(data_chunk_id);
}
int num_ra_ops_per_iter = ((GetBufferDataSize()) / BLOCK_SZ);
std::optional<chunk_t> prev_id = {};
std::vector<const CowOperation*> vec;
std::set<uint64_t> dest_blocks;
std::set<uint64_t> source_blocks;
size_t pending_ordered_ops = exceptions_per_area_ - num_ops;
uint64_t total_ordered_ops = reader_->get_num_ordered_ops_to_merge();
SNAP_LOG(DEBUG) << " Processing copy-ops at Area: " << vec_.size()
<< " Number of replace/zero ops completed in this area: " << num_ops
<< " Pending copy ops for this area: " << pending_ordered_ops;
while (!cowop_rm_iter->AtEnd()) {
do {
const CowOperation* cow_op = cowop_rm_iter->Get();
// We have two cases specific cases:
//
//
// Case 1: Overlapping copy regions
//
// Ex:
//
// Source -> Destination
//
// 1: 15 -> 18
// 2: 16 -> 19
// 3: 17 -> 20
// 4: 18 -> 21
// 5: 19 -> 22
// 6: 20 -> 23
//
// We have 6 copy operations to be executed in OTA and there is a overlap. Update-engine
// will write to COW file as follows:
//
// Op-1: 20 -> 23
// Op-2: 19 -> 22
// Op-3: 18 -> 21
// Op-4: 17 -> 20
// Op-5: 16 -> 19
// Op-6: 15 -> 18
//
// Note that the blocks numbers are contiguous. Hence, all 6 copy
// operations can be batch merged. However, that will be
// problematic if we have a crash as block 20, 19, 18 would have
// been overwritten and hence subsequent recovery may end up with
// a silent data corruption when op-1, op-2 and op-3 are
// re-executed.
//
// To address the above problem, read-ahead thread will
// read all the 6 source blocks, cache them in the scratch
// space of the COW file. During merge, read-ahead
// thread will serve the blocks from the read-ahead cache.
// If there is a crash during merge; on subsequent reboot,
// read-ahead thread will recover the data from the
// scratch space and re-construct it thereby there
// is no loss of data.
//
// Note that we will follow the same order of COW operations
// as present in the COW file. This will make sure that
// the merge of operations are done based on the ops present
// in the file.
//
uint64_t block_source = GetCowOpSourceInfoData(cow_op);
if (prev_id.has_value()) {
if (dest_blocks.count(cow_op->new_block) || source_blocks.count(block_source)) {
break;
}
}
metadata_found = true;
pending_ordered_ops -= 1;
vec.push_back(cow_op);
dest_blocks.insert(block_source);
source_blocks.insert(cow_op->new_block);
prev_id = cow_op->new_block;
cowop_rm_iter->Next();
} while (!cowop_rm_iter->AtEnd() && pending_ordered_ops);
data_chunk_id = GetNextAllocatableChunkId(data_chunk_id);
SNAP_LOG(DEBUG) << "Batch Merge copy-ops of size: " << vec.size()
<< " Area: " << vec_.size() << " Area offset: " << offset
<< " Pending-ordered-ops in this area: " << pending_ordered_ops;
for (size_t i = 0; i < vec.size(); i++) {
struct disk_exception* de =
reinterpret_cast<struct disk_exception*>((char*)de_ptr.get() + offset);
const CowOperation* cow_op = vec[i];
de->old_chunk = cow_op->new_block;
de->new_chunk = data_chunk_id;
// Store operation pointer.
chunk_vec_.push_back(std::make_pair(ChunkToSector(data_chunk_id), cow_op));
offset += sizeof(struct disk_exception);
num_ops += 1;
if (cow_op->type == kCowCopyOp) {
copy_ops++;
}
if (read_ahead_feature_) {
read_ahead_ops_.push_back(cow_op);
}
SNAP_LOG(DEBUG) << num_ops << ":"
<< " Ordered-op: "
<< " Old-chunk: " << de->old_chunk << " New-chunk: " << de->new_chunk;
if (num_ops == exceptions_per_area_) {
// Store it in vector at the right index. This maps the chunk-id to
// vector index.
vec_.push_back(std::move(de_ptr));
num_ops = 0;
offset = 0;
// Create buffer for next area
de_ptr = std::make_unique<uint8_t[]>(exceptions_per_area_ *
sizeof(struct disk_exception));
memset(de_ptr.get(), 0, (exceptions_per_area_ * sizeof(struct disk_exception)));
if (cowop_rm_iter->AtEnd()) {
vec_.push_back(std::move(de_ptr));
SNAP_LOG(DEBUG) << "ReadMetadata() completed; Number of Areas: " << vec_.size();
}
if (!(pending_ordered_ops == 0)) {
SNAP_LOG(ERROR) << "Invalid pending_ordered_ops: expected: 0 found: "
<< pending_ordered_ops;
return false;
}
pending_ordered_ops = exceptions_per_area_;
}
data_chunk_id = GetNextAllocatableChunkId(data_chunk_id);
total_ordered_ops -= 1;
/*
* Split the number of ops based on the size of read-ahead buffer
* region. We need to ensure that kernel doesn't issue IO on blocks
* which are not read by the read-ahead thread.
*/
if (read_ahead_feature_ && (total_ordered_ops % num_ra_ops_per_iter == 0)) {
data_chunk_id = GetNextAllocatableChunkId(data_chunk_id);
}
}
vec.clear();
dest_blocks.clear();
source_blocks.clear();
prev_id.reset();
}
// Partially filled area or there is no metadata
// If there is no metadata, fill with zero so that kernel
// is aware that merge is completed.
if (num_ops || !metadata_found) {
vec_.push_back(std::move(de_ptr));
SNAP_LOG(DEBUG) << "ReadMetadata() completed. Partially filled area num_ops: " << num_ops
<< "Areas : " << vec_.size();
}
chunk_vec_.shrink_to_fit();
vec_.shrink_to_fit();
read_ahead_ops_.shrink_to_fit();
// Sort the vector based on sectors as we need this during un-aligned access
std::sort(chunk_vec_.begin(), chunk_vec_.end(), compare);
SNAP_LOG(INFO) << "ReadMetadata completed. Final-chunk-id: " << data_chunk_id
<< " Num Sector: " << ChunkToSector(data_chunk_id)
<< " Replace-ops: " << replace_ops << " Zero-ops: " << zero_ops
<< " Copy-ops: " << copy_ops << " Areas: " << vec_.size()
<< " Num-ops-merged: " << header.num_merge_ops
<< " Total-data-ops: " << reader_->get_num_total_data_ops();
// Total number of sectors required for creating dm-user device
num_sectors_ = ChunkToSector(data_chunk_id);
merge_initiated_ = false;
PrepareReadAhead();
return true;
}
bool Snapuserd::MmapMetadata() {
const auto& header = reader_->GetHeader();
if (header.prefix.major_version >= 2 && header.buffer_size > 0) {
total_mapped_addr_length_ = header.prefix.header_size + BUFFER_REGION_DEFAULT_SIZE;
read_ahead_feature_ = true;
} else {
// mmap the first 4k page - older COW format
total_mapped_addr_length_ = BLOCK_SZ;
read_ahead_feature_ = false;
}
mapped_addr_ = mmap(NULL, total_mapped_addr_length_, PROT_READ | PROT_WRITE, MAP_SHARED,
cow_fd_.get(), 0);
if (mapped_addr_ == MAP_FAILED) {
SNAP_LOG(ERROR) << "mmap metadata failed";
return false;
}
return true;
}
void Snapuserd::UnmapBufferRegion() {
int ret = munmap(mapped_addr_, total_mapped_addr_length_);
if (ret < 0) {
SNAP_PLOG(ERROR) << "munmap failed";
}
}
void MyLogger(android::base::LogId, android::base::LogSeverity severity, const char*, const char*,
unsigned int, const char* message) {
if (severity == android::base::ERROR) {
fprintf(stderr, "%s\n", message);
} else {
fprintf(stdout, "%s\n", message);
}
}
bool Snapuserd::InitCowDevice() {
cow_fd_.reset(open(cow_device_.c_str(), O_RDWR));
if (cow_fd_ < 0) {
SNAP_PLOG(ERROR) << "Open Failed: " << cow_device_;
return false;
}
return ReadMetadata();
}
void Snapuserd::ReadBlocksToCache(const std::string& dm_block_device,
const std::string& partition_name, off_t offset, size_t size) {
android::base::unique_fd fd(TEMP_FAILURE_RETRY(open(dm_block_device.c_str(), O_RDONLY)));
if (fd.get() == -1) {
SNAP_PLOG(ERROR) << "Error reading " << dm_block_device
<< " partition-name: " << partition_name;
return;
}
size_t remain = size;
off_t file_offset = offset;
// We pick 4M I/O size based on the fact that the current
// update_verifier has a similar I/O size.
size_t read_sz = 1024 * BLOCK_SZ;
std::vector<uint8_t> buf(read_sz);
while (remain > 0) {
size_t to_read = std::min(remain, read_sz);
if (!android::base::ReadFullyAtOffset(fd.get(), buf.data(), to_read, file_offset)) {
SNAP_PLOG(ERROR) << "Failed to read block from block device: " << dm_block_device
<< " at offset: " << file_offset
<< " partition-name: " << partition_name << " total-size: " << size
<< " remain_size: " << remain;
return;
}
file_offset += to_read;
remain -= to_read;
}
SNAP_LOG(INFO) << "Finished reading block-device: " << dm_block_device
<< " partition: " << partition_name << " size: " << size
<< " offset: " << offset;
}
void Snapuserd::ReadBlocks(const std::string& partition_name, const std::string& dm_block_device) {
SNAP_LOG(DEBUG) << "Reading partition: " << partition_name
<< " Block-Device: " << dm_block_device;
uint64_t dev_sz = 0;
unique_fd fd(TEMP_FAILURE_RETRY(open(dm_block_device.c_str(), O_RDONLY | O_CLOEXEC)));
if (fd < 0) {
SNAP_LOG(ERROR) << "Cannot open block device";
return;
}
dev_sz = get_block_device_size(fd.get());
if (!dev_sz) {
SNAP_PLOG(ERROR) << "Could not determine block device size: " << dm_block_device;
return;
}
int num_threads = 2;
size_t num_blocks = dev_sz >> BLOCK_SHIFT;
size_t num_blocks_per_thread = num_blocks / num_threads;
size_t read_sz_per_thread = num_blocks_per_thread << BLOCK_SHIFT;
off_t offset = 0;
for (int i = 0; i < num_threads; i++) {
std::async(std::launch::async, &Snapuserd::ReadBlocksToCache, this, dm_block_device,
partition_name, offset, read_sz_per_thread);
offset += read_sz_per_thread;
}
}
/*
* Entry point to launch threads
*/
bool Snapuserd::Start() {
std::vector<std::future<bool>> threads;
std::future<bool> ra_thread;
bool rathread = (read_ahead_feature_ && (read_ahead_ops_.size() > 0));
// Start the read-ahead thread and wait
// for it as the data has to be re-constructed
// from COW device.
if (rathread) {
ra_thread = std::async(std::launch::async, &ReadAheadThread::RunThread,
read_ahead_thread_.get());
if (!WaitForReadAheadToStart()) {
SNAP_LOG(ERROR) << "Failed to start Read-ahead thread...";
return false;
}
SNAP_LOG(INFO) << "Read-ahead thread started...";
}
// Launch worker threads
for (int i = 0; i < worker_threads_.size(); i++) {
threads.emplace_back(
std::async(std::launch::async, &WorkerThread::RunThread, worker_threads_[i].get()));
}
bool second_stage_init = true;
// We don't want to read the blocks during first stage init.
if (android::base::EndsWith(misc_name_, "-init") || is_socket_present_) {
second_stage_init = false;
}
if (second_stage_init) {
SNAP_LOG(INFO) << "Reading blocks to cache....";
auto& dm = DeviceMapper::Instance();
auto dm_block_devices = dm.FindDmPartitions();
if (dm_block_devices.empty()) {
SNAP_LOG(ERROR) << "No dm-enabled block device is found.";
} else {
auto parts = android::base::Split(misc_name_, "-");
std::string partition_name = parts[0];
const char* suffix_b = "_b";
const char* suffix_a = "_a";
partition_name.erase(partition_name.find_last_not_of(suffix_b) + 1);
partition_name.erase(partition_name.find_last_not_of(suffix_a) + 1);
if (dm_block_devices.find(partition_name) == dm_block_devices.end()) {
SNAP_LOG(ERROR) << "Failed to find dm block device for " << partition_name;
} else {
ReadBlocks(partition_name, dm_block_devices.at(partition_name));
}
}
} else {
SNAP_LOG(INFO) << "Not reading block device into cache";
}
bool ret = true;
for (auto& t : threads) {
ret = t.get() && ret;
}
if (rathread) {
// Notify the read-ahead thread that all worker threads
// are done. We need this explicit notification when
// there is an IO failure or there was a switch
// of dm-user table; thus, forcing the read-ahead
// thread to wake up.
MergeCompleted();
ret = ret && ra_thread.get();
}
return ret;
}
uint64_t Snapuserd::GetBufferMetadataOffset() {
const auto& header = reader_->GetHeader();
size_t size = header.prefix.header_size + sizeof(BufferState);
return size;
}
/*
* Metadata for read-ahead is 16 bytes. For a 2 MB region, we will
* end up with 8k (2 PAGE) worth of metadata. Thus, a 2MB buffer
* region is split into:
*
* 1: 8k metadata
*
*/
size_t Snapuserd::GetBufferMetadataSize() {
const auto& header = reader_->GetHeader();
size_t metadata_bytes = (header.buffer_size * sizeof(struct ScratchMetadata)) / BLOCK_SZ;
return metadata_bytes;
}
size_t Snapuserd::GetBufferDataOffset() {
const auto& header = reader_->GetHeader();
return (header.prefix.header_size + GetBufferMetadataSize());
}
/*
* (2MB - 8K = 2088960 bytes) will be the buffer region to hold the data.
*/
size_t Snapuserd::GetBufferDataSize() {
const auto& header = reader_->GetHeader();
size_t size = header.buffer_size - GetBufferMetadataSize();
return size;
}
struct BufferState* Snapuserd::GetBufferState() {
const auto& header = reader_->GetHeader();
struct BufferState* ra_state =
reinterpret_cast<struct BufferState*>((char*)mapped_addr_ + header.prefix.header_size);
return ra_state;
}
} // namespace snapshot
} // namespace android
----------------
#pragma once
#include <linux/types.h>
#include <stdint.h>
#include <stdlib.h>
#include <sys/mman.h>
#include <bitset>
#include <condition_variable>
#include <csignal>
#include <cstring>
#include <future>
#include <iostream>
#include <limits>
#include <map>
#include <mutex>
#include <string>
#include <thread>
#include <unordered_map>
#include <unordered_set>
#include <vector>
#include <android-base/file.h>
#include <android-base/logging.h>
#include <android-base/stringprintf.h>
#include <android-base/unique_fd.h>
#include <ext4_utils/ext4_utils.h>
#include <libdm/dm.h>
#include <libsnapshot/cow_reader.h>
#include <libsnapshot/cow_writer.h>
#include <snapuserd/snapuserd_buffer.h>
#include <snapuserd/snapuserd_kernel.h>
namespace android {
namespace snapshot {
using android::base::unique_fd;
using namespace std::chrono_literals;
static constexpr size_t PAYLOAD_SIZE = (1UL << 20);
static_assert(PAYLOAD_SIZE >= BLOCK_SZ);
/*
* With 4 threads, we get optimal performance
* when update_verifier reads the partition during
* boot.
*/
static constexpr int NUM_THREADS_PER_PARTITION = 4;
/*
* State transitions between worker threads and read-ahead
* threads.
*
* READ_AHEAD_BEGIN: Worker threads initiates the read-ahead
* thread to begin reading the copy operations
* for each bounded region.
*
* READ_AHEAD_IN_PROGRESS: When read ahead thread is in-flight
* and reading the copy operations.
*
* IO_IN_PROGRESS: Merge operation is in-progress by worker threads.
*
* IO_TERMINATED: When all the worker threads are done, request the
* read-ahead thread to terminate
*
* READ_AHEAD_FAILURE: If there are any IO failures when read-ahead
* thread is reading from COW device.
*
* The transition of each states is described in snapuserd_readahead.cpp
*/
enum class READ_AHEAD_IO_TRANSITION {
READ_AHEAD_BEGIN,
READ_AHEAD_IN_PROGRESS,
IO_IN_PROGRESS,
IO_TERMINATED,
READ_AHEAD_FAILURE,
};
class Snapuserd;
class ReadAheadThread {
public:
ReadAheadThread(const std::string& cow_device, const std::string& backing_device,
const std::string& misc_name, std::shared_ptr<Snapuserd> snapuserd);
bool RunThread();
private:
void InitializeRAIter();
bool RAIterDone();
void RAIterNext();
const CowOperation* GetRAOpIter();
void InitializeBuffer();
bool InitializeFds();
void CloseFds() {
cow_fd_ = {};
backing_store_fd_ = {};
}
bool ReadAheadIOStart();
void PrepareReadAhead(uint64_t* source_offset, int* pending_ops, std::vector<uint64_t>& blocks);
bool ReconstructDataFromCow();
void CheckOverlap(const CowOperation* cow_op);
void* read_ahead_buffer_;
void* metadata_buffer_;
std::vector<const CowOperation*>::reverse_iterator read_ahead_iter_;
std::string cow_device_;
std::string backing_store_device_;
std::string misc_name_;
unique_fd cow_fd_;
unique_fd backing_store_fd_;
std::shared_ptr<Snapuserd> snapuserd_;
std::unordered_set<uint64_t> dest_blocks_;
std::unordered_set<uint64_t> source_blocks_;
bool overlap_;
};
class WorkerThread {
public:
WorkerThread(const std::string& cow_device, const std::string& backing_device,
const std::string& control_device, const std::string& misc_name,
std::shared_ptr<Snapuserd> snapuserd);
bool RunThread();
private:
// Initialization
void InitializeBufsink();
bool InitializeFds();
bool InitReader();
void CloseFds() {
ctrl_fd_ = {};
backing_store_fd_ = {};
}
// Functions interacting with dm-user
bool ReadDmUserHeader();
bool DmuserReadRequest();
bool DmuserWriteRequest();
bool ReadDmUserPayload(void* buffer, size_t size);
bool WriteDmUserPayload(size_t size, bool header_response);
bool ReadDiskExceptions(chunk_t chunk, size_t size);
bool ZerofillDiskExceptions(size_t read_size);
void ConstructKernelCowHeader();
// IO Path
bool ProcessIORequest();
int ReadData(sector_t sector, size_t size);
int ReadUnalignedSector(sector_t sector, size_t size,
std::vector<std::pair<sector_t, const CowOperation*>>::iterator& it);
// Processing COW operations
bool ProcessCowOp(const CowOperation* cow_op);
bool ProcessReplaceOp(const CowOperation* cow_op);
// Handles Copy
bool ProcessCopyOp(const CowOperation* cow_op);
bool ProcessZeroOp();
bool ReadFromBaseDevice(const CowOperation* cow_op);
bool GetReadAheadPopulatedBuffer(const CowOperation* cow_op);
// Merge related functions
bool ProcessMergeComplete(chunk_t chunk, void* buffer);
loff_t GetMergeStartOffset(void* merged_buffer, void* unmerged_buffer,
int* unmerged_exceptions);
int GetNumberOfMergedOps(void* merged_buffer, void* unmerged_buffer, loff_t offset,
int unmerged_exceptions, bool* copy_op, bool* commit);
sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; }
std::unique_ptr<CowReader> reader_;
BufferSink bufsink_;
std::string cow_device_;
std::string backing_store_device_;
std::string control_device_;
std::string misc_name_;
unique_fd cow_fd_;
unique_fd backing_store_fd_;
unique_fd ctrl_fd_;
std::shared_ptr<Snapuserd> snapuserd_;
uint32_t exceptions_per_area_;
};
class Snapuserd : public std::enable_shared_from_this<Snapuserd> {
public:
Snapuserd(const std::string& misc_name, const std::string& cow_device,
const std::string& backing_device);
bool InitCowDevice();
bool Start();
const std::string& GetControlDevicePath() { return control_device_; }
const std::string& GetMiscName() { return misc_name_; }
uint64_t GetNumSectors() { return num_sectors_; }
bool IsAttached() const { return attached_; }
void AttachControlDevice() { attached_ = true; }
void CheckMergeCompletionStatus();
bool CommitMerge(int num_merge_ops);
void CloseFds() { cow_fd_ = {}; }
void FreeResources() {
worker_threads_.clear();
read_ahead_thread_ = nullptr;
}
size_t GetMetadataAreaSize() { return vec_.size(); }
void* GetExceptionBuffer(size_t i) { return vec_[i].get(); }
bool InitializeWorkers();
std::unique_ptr<CowReader> CloneReaderForWorker();
std::shared_ptr<Snapuserd> GetSharedPtr() { return shared_from_this(); }
std::vector<std::pair<sector_t, const CowOperation*>>& GetChunkVec() { return chunk_vec_; }
const std::vector<std::unique_ptr<uint8_t[]>>& GetMetadataVec() const { return vec_; }
static bool compare(std::pair<sector_t, const CowOperation*> p1,
std::pair<sector_t, const CowOperation*> p2) {
return p1.first < p2.first;
}
void UnmapBufferRegion();
bool MmapMetadata();
// Read-ahead related functions
std::vector<const CowOperation*>& GetReadAheadOpsVec() { return read_ahead_ops_; }
std::unordered_map<uint64_t, void*>& GetReadAheadMap() { return read_ahead_buffer_map_; }
void* GetMappedAddr() { return mapped_addr_; }
bool IsReadAheadFeaturePresent() { return read_ahead_feature_; }
void PrepareReadAhead();
void StartReadAhead();
void MergeCompleted();
bool ReadAheadIOCompleted(bool sync);
void ReadAheadIOFailed();
bool WaitForMergeToComplete();
bool GetReadAheadPopulatedBuffer(uint64_t block, void* buffer);
bool ReconstructDataFromCow() { return populate_data_from_cow_; }
void ReconstructDataFromCowFinish() { populate_data_from_cow_ = false; }
bool WaitForReadAheadToStart();
uint64_t GetBufferMetadataOffset();
size_t GetBufferMetadataSize();
size_t GetBufferDataOffset();
size_t GetBufferDataSize();
// Final block to be merged in a given read-ahead buffer region
void SetFinalBlockMerged(uint64_t x) { final_block_merged_ = x; }
uint64_t GetFinalBlockMerged() { return final_block_merged_; }
// Total number of blocks to be merged in a given read-ahead buffer region
void SetTotalRaBlocksMerged(int x) { total_ra_blocks_merged_ = x; }
int GetTotalRaBlocksMerged() { return total_ra_blocks_merged_; }
void SetSocketPresent(bool socket) { is_socket_present_ = socket; }
private:
bool IsChunkIdMetadata(chunk_t chunk);
chunk_t GetNextAllocatableChunkId(chunk_t chunk_id);
bool GetRABuffer(std::unique_lock<std::mutex>* lock, uint64_t block, void* buffer);
bool ReadMetadata();
sector_t ChunkToSector(chunk_t chunk) { return chunk << CHUNK_SHIFT; }
chunk_t SectorToChunk(sector_t sector) { return sector >> CHUNK_SHIFT; }
bool IsBlockAligned(int read_size) { return ((read_size & (BLOCK_SZ - 1)) == 0); }
struct BufferState* GetBufferState();
void ReadBlocks(const std::string& partition_name, const std::string& dm_block_device);
void ReadBlocksToCache(const std::string& dm_block_device, const std::string& partition_name,
off_t offset, size_t size);
std::string cow_device_;
std::string backing_store_device_;
std::string control_device_;
std::string misc_name_;
unique_fd cow_fd_;
uint32_t exceptions_per_area_;
uint64_t num_sectors_;
std::unique_ptr<CowReader> reader_;
// Vector of disk exception which is a
// mapping of old-chunk to new-chunk
std::vector<std::unique_ptr<uint8_t[]>> vec_;
// chunk_vec stores the pseudo mapping of sector
// to COW operations.
std::vector<std::pair<sector_t, const CowOperation*>> chunk_vec_;
std::mutex lock_;
std::condition_variable cv;
void* mapped_addr_;
size_t total_mapped_addr_length_;
std::vector<std::unique_ptr<WorkerThread>> worker_threads_;
// Read-ahead related
std::unordered_map<uint64_t, void*> read_ahead_buffer_map_;
std::vector<const CowOperation*> read_ahead_ops_;
bool populate_data_from_cow_ = false;
bool read_ahead_feature_;
uint64_t final_block_merged_;
int total_ra_blocks_merged_ = 0;
READ_AHEAD_IO_TRANSITION io_state_;
std::unique_ptr<ReadAheadThread> read_ahead_thread_;
bool merge_initiated_ = false;
bool attached_ = false;
bool is_socket_present_;
};
} // namespace snapshot
} // namespace android